diff OrthancServer/ServerIndex.cpp @ 511:3b735fdf320b

monitoring of stable patients/studies/series
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 16 Aug 2013 15:39:53 +0200
parents 23e5b35e3c5c
children 935e8c7e0b18
line wrap: on
line diff
--- a/OrthancServer/ServerIndex.cpp	Fri Aug 16 14:23:54 2013 +0200
+++ b/OrthancServer/ServerIndex.cpp	Fri Aug 16 15:39:53 2013 +0200
@@ -37,6 +37,7 @@
 #endif
 
 #include "EmbeddedResources.h"
+#include "OrthancInitialization.h"
 #include "../Core/Toolbox.h"
 #include "../Core/Uuid.h"
 #include "../Core/DicomFormat/DicomArray.h"
@@ -187,6 +188,27 @@
   };
 
 
+  struct ServerIndex::UnstableResourcePayload
+  {
+    Orthanc::ResourceType type_;
+    boost::posix_time::ptime time_;
+
+    UnstableResourcePayload() : type_(Orthanc::ResourceType_Instance)
+    {
+    }
+
+    UnstableResourcePayload(Orthanc::ResourceType type) : type_(type)
+    {
+      time_ = boost::posix_time::second_clock::local_time();
+    }
+
+    unsigned int GetAge() const
+    {
+      return (boost::posix_time::second_clock::local_time() - time_).total_seconds();
+    }
+  };
+
+
   bool ServerIndex::DeleteResource(Json::Value& target,
                                    const std::string& uuid,
                                    ResourceType expectedType)
@@ -227,18 +249,40 @@
   }
 
 
-  static void FlushThread(DatabaseWrapper* db,
-                          boost::mutex* mutex,
-                          unsigned int sleep)
+  void ServerIndex::FlushThread(ServerIndex* that)
   {
+    unsigned int sleep;
+
+    try
+    {
+      std::string sleepString = that->db_->GetGlobalProperty(GlobalProperty_FlushSleep);
+      sleep = boost::lexical_cast<unsigned int>(sleepString);
+    }
+    catch (boost::bad_lexical_cast&)
+    {
+      // By default, wait for 10 seconds before flushing
+      sleep = 10;
+    }
+
     LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")";
 
-    while (1)
+    unsigned int count = 0;
+
+    while (!that->done_)
     {
-      boost::this_thread::sleep(boost::posix_time::seconds(sleep));
-      boost::mutex::scoped_lock lock(*mutex);
-      db->FlushToDisk();
+      boost::this_thread::sleep(boost::posix_time::seconds(1));
+      count++;
+      if (count < sleep)
+      {
+        continue;
+      }
+
+      boost::mutex::scoped_lock lock(that->mutex_);
+      that->db_->FlushToDisk();
+      count = 0;
     }
+
+    LOG(INFO) << "Stopping the database flushing thread";
   }
 
 
@@ -284,6 +328,7 @@
 
   ServerIndex::ServerIndex(ServerContext& context,
                            const std::string& dbPath) : 
+    done_(false),
     maximumStorageSize_(0),
     maximumPatients_(0)
   {
@@ -314,25 +359,24 @@
     // execution of Orthanc
     StandaloneRecycling();
 
-    unsigned int sleep;
-    try
-    {
-      std::string sleepString = db_->GetGlobalProperty(GlobalProperty_FlushSleep);
-      sleep = boost::lexical_cast<unsigned int>(sleepString);
-    }
-    catch (boost::bad_lexical_cast&)
-    {
-      // By default, wait for 10 seconds before flushing
-      sleep = 10;
-    }
-
-    flushThread_ = boost::thread(FlushThread, db_.get(), &mutex_, sleep);
+    flushThread_ = boost::thread(FlushThread, this);
+    unstableResourcesMonitorThread_ = boost::thread(UnstableResourcesMonitorThread, this);
   }
 
 
   ServerIndex::~ServerIndex()
   {
-    LOG(INFO) << "Stopping the database flushing thread";
+    done_ = true;
+
+    if (flushThread_.joinable())
+    {
+      flushThread_.join();
+    }
+
+    if (unstableResourcesMonitorThread_.joinable())
+    {
+      unstableResourcesMonitorThread_.join();
+    }
   }
 
 
@@ -479,7 +523,6 @@
       db_->SetMetadata(series, MetadataType_LastUpdate, now);
       db_->SetMetadata(study, MetadataType_LastUpdate, now);
       db_->SetMetadata(patient, MetadataType_LastUpdate, now);
-
       db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet);
 
       const DicomValue* value;
@@ -501,6 +544,11 @@
         db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series);
       }
 
+      // Mark the parent resources of this instance as unstable
+      MarkAsUnstable(patient, ResourceType_Patient);
+      MarkAsUnstable(study, ResourceType_Study);
+      MarkAsUnstable(series, ResourceType_Series);
+
       t.Commit(instanceSize);
 
       return StoreStatus_Success;
@@ -756,6 +804,13 @@
     if (tmp.size() != 0)
       result["ModifiedFrom"] = tmp;
 
+    if (type == ResourceType_Patient ||
+        type == ResourceType_Study ||
+        type == ResourceType_Series)
+    {
+      result["IsStable"] = !unstableResources_.Contains(id);
+    }
+
     return true;
   }
 
@@ -1343,4 +1398,69 @@
         break;
     }
   }
+
+
+  void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that)
+  {
+    int stableAge = GetGlobalIntegerParameter("StableAge", 60);
+    if (stableAge <= 0)
+    {
+      stableAge = 60;
+    }
+
+    LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")";
+
+    while (!that->done_)
+    {
+      // Check for stable resources each second
+      boost::this_thread::sleep(boost::posix_time::seconds(1));
+
+      boost::mutex::scoped_lock lock(that->mutex_);
+      while (!that->unstableResources_.IsEmpty() &&
+             that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge))
+      {
+        // This DICOM resource has not received any new instance for
+        // some time. It can be considered as stable.
+          
+        UnstableResourcePayload payload;
+        int64_t id = that->unstableResources_.RemoveOldest(payload);
+
+        switch (payload.type_)
+        {
+          case Orthanc::ResourceType_Patient:
+            that->db_->LogChange(ChangeType_StablePatient, id, ResourceType_Patient);
+            break;
+
+          case Orthanc::ResourceType_Study:
+            that->db_->LogChange(ChangeType_StableStudy, id, ResourceType_Study);
+            break;
+
+          case Orthanc::ResourceType_Series:
+            that->db_->LogChange(ChangeType_StableSeries, id, ResourceType_Series);
+            break;
+
+          default:
+            throw OrthancException(ErrorCode_InternalError);
+        }
+
+        //LOG(INFO) << "Stable resource: " << EnumerationToString(payload.type_) << " " << id;
+      }
+    }
+
+    LOG(INFO) << "Closing the monitor thread for stable resources";
+  }
+  
+
+  void ServerIndex::MarkAsUnstable(int64_t id,
+                                   Orthanc::ResourceType type)
+  {
+    // WARNING: Before calling this method, "mutex_" must be locked.
+
+    assert(type == Orthanc::ResourceType_Patient ||
+           type == Orthanc::ResourceType_Study ||
+           type == Orthanc::ResourceType_Series);
+
+    unstableResources_.AddOrMakeMostRecent(id, type);
+    //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id;
+  }
 }