changeset 511:3b735fdf320b

monitoring of stable patients/studies/series
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 16 Aug 2013 15:39:53 +0200
parents e7841864c97c
children 935e8c7e0b18
files NEWS OrthancServer/ServerEnumerations.cpp OrthancServer/ServerEnumerations.h OrthancServer/ServerIndex.cpp OrthancServer/ServerIndex.h Resources/Configuration.json UnitTests/MemoryCache.cpp
diffstat 7 files changed, 186 insertions(+), 133 deletions(-) [+]
line wrap: on
line diff
--- a/NEWS	Fri Aug 16 14:23:54 2013 +0200
+++ b/NEWS	Fri Aug 16 15:39:53 2013 +0200
@@ -1,6 +1,7 @@
 Pending changes in the mainline
 ===============================
 
+* Detection of stable patients/studies/series
 * C-Find SCU at the instance level
 
 
--- a/OrthancServer/ServerEnumerations.cpp	Fri Aug 16 14:23:54 2013 +0200
+++ b/OrthancServer/ServerEnumerations.cpp	Fri Aug 16 15:39:53 2013 +0200
@@ -205,6 +205,15 @@
       case ChangeType_ModifiedPatient:
         return "ModifiedPatient";
 
+      case ChangeType_StablePatient:
+        return "StablePatient";
+
+      case ChangeType_StableStudy:
+        return "StableStudy";
+
+      case ChangeType_StableSeries:
+        return "StableSeries";
+
       default:
         throw OrthancException(ErrorCode_ParameterOutOfRange);
     }
--- a/OrthancServer/ServerEnumerations.h	Fri Aug 16 14:23:54 2013 +0200
+++ b/OrthancServer/ServerEnumerations.h	Fri Aug 16 15:39:53 2013 +0200
@@ -100,7 +100,10 @@
     ChangeType_ModifiedStudy = 8,
     ChangeType_ModifiedSeries = 9,
     ChangeType_AnonymizedPatient = 10,
-    ChangeType_ModifiedPatient = 11
+    ChangeType_ModifiedPatient = 11,
+    ChangeType_StablePatient = 12,
+    ChangeType_StableStudy = 13,
+    ChangeType_StableSeries = 14
   };
 
   void InitializeServerEnumerations();
--- a/OrthancServer/ServerIndex.cpp	Fri Aug 16 14:23:54 2013 +0200
+++ b/OrthancServer/ServerIndex.cpp	Fri Aug 16 15:39:53 2013 +0200
@@ -37,6 +37,7 @@
 #endif
 
 #include "EmbeddedResources.h"
+#include "OrthancInitialization.h"
 #include "../Core/Toolbox.h"
 #include "../Core/Uuid.h"
 #include "../Core/DicomFormat/DicomArray.h"
@@ -187,6 +188,27 @@
   };
 
 
+  struct ServerIndex::UnstableResourcePayload
+  {
+    Orthanc::ResourceType type_;
+    boost::posix_time::ptime time_;
+
+    UnstableResourcePayload() : type_(Orthanc::ResourceType_Instance)
+    {
+    }
+
+    UnstableResourcePayload(Orthanc::ResourceType type) : type_(type)
+    {
+      time_ = boost::posix_time::second_clock::local_time();
+    }
+
+    unsigned int GetAge() const
+    {
+      return (boost::posix_time::second_clock::local_time() - time_).total_seconds();
+    }
+  };
+
+
   bool ServerIndex::DeleteResource(Json::Value& target,
                                    const std::string& uuid,
                                    ResourceType expectedType)
@@ -227,18 +249,40 @@
   }
 
 
-  static void FlushThread(DatabaseWrapper* db,
-                          boost::mutex* mutex,
-                          unsigned int sleep)
+  void ServerIndex::FlushThread(ServerIndex* that)
   {
+    unsigned int sleep;
+
+    try
+    {
+      std::string sleepString = that->db_->GetGlobalProperty(GlobalProperty_FlushSleep);
+      sleep = boost::lexical_cast<unsigned int>(sleepString);
+    }
+    catch (boost::bad_lexical_cast&)
+    {
+      // By default, wait for 10 seconds before flushing
+      sleep = 10;
+    }
+
     LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")";
 
-    while (1)
+    unsigned int count = 0;
+
+    while (!that->done_)
     {
-      boost::this_thread::sleep(boost::posix_time::seconds(sleep));
-      boost::mutex::scoped_lock lock(*mutex);
-      db->FlushToDisk();
+      boost::this_thread::sleep(boost::posix_time::seconds(1));
+      count++;
+      if (count < sleep)
+      {
+        continue;
+      }
+
+      boost::mutex::scoped_lock lock(that->mutex_);
+      that->db_->FlushToDisk();
+      count = 0;
     }
+
+    LOG(INFO) << "Stopping the database flushing thread";
   }
 
 
@@ -284,6 +328,7 @@
 
   ServerIndex::ServerIndex(ServerContext& context,
                            const std::string& dbPath) : 
+    done_(false),
     maximumStorageSize_(0),
     maximumPatients_(0)
   {
@@ -314,25 +359,24 @@
     // execution of Orthanc
     StandaloneRecycling();
 
-    unsigned int sleep;
-    try
-    {
-      std::string sleepString = db_->GetGlobalProperty(GlobalProperty_FlushSleep);
-      sleep = boost::lexical_cast<unsigned int>(sleepString);
-    }
-    catch (boost::bad_lexical_cast&)
-    {
-      // By default, wait for 10 seconds before flushing
-      sleep = 10;
-    }
-
-    flushThread_ = boost::thread(FlushThread, db_.get(), &mutex_, sleep);
+    flushThread_ = boost::thread(FlushThread, this);
+    unstableResourcesMonitorThread_ = boost::thread(UnstableResourcesMonitorThread, this);
   }
 
 
   ServerIndex::~ServerIndex()
   {
-    LOG(INFO) << "Stopping the database flushing thread";
+    done_ = true;
+
+    if (flushThread_.joinable())
+    {
+      flushThread_.join();
+    }
+
+    if (unstableResourcesMonitorThread_.joinable())
+    {
+      unstableResourcesMonitorThread_.join();
+    }
   }
 
 
@@ -479,7 +523,6 @@
       db_->SetMetadata(series, MetadataType_LastUpdate, now);
       db_->SetMetadata(study, MetadataType_LastUpdate, now);
       db_->SetMetadata(patient, MetadataType_LastUpdate, now);
-
       db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet);
 
       const DicomValue* value;
@@ -501,6 +544,11 @@
         db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series);
       }
 
+      // Mark the parent resources of this instance as unstable
+      MarkAsUnstable(patient, ResourceType_Patient);
+      MarkAsUnstable(study, ResourceType_Study);
+      MarkAsUnstable(series, ResourceType_Series);
+
       t.Commit(instanceSize);
 
       return StoreStatus_Success;
@@ -756,6 +804,13 @@
     if (tmp.size() != 0)
       result["ModifiedFrom"] = tmp;
 
+    if (type == ResourceType_Patient ||
+        type == ResourceType_Study ||
+        type == ResourceType_Series)
+    {
+      result["IsStable"] = !unstableResources_.Contains(id);
+    }
+
     return true;
   }
 
@@ -1343,4 +1398,69 @@
         break;
     }
   }
+
+
+  void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that)
+  {
+    int stableAge = GetGlobalIntegerParameter("StableAge", 60);
+    if (stableAge <= 0)
+    {
+      stableAge = 60;
+    }
+
+    LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")";
+
+    while (!that->done_)
+    {
+      // Check for stable resources each second
+      boost::this_thread::sleep(boost::posix_time::seconds(1));
+
+      boost::mutex::scoped_lock lock(that->mutex_);
+      while (!that->unstableResources_.IsEmpty() &&
+             that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge))
+      {
+        // This DICOM resource has not received any new instance for
+        // some time. It can be considered as stable.
+          
+        UnstableResourcePayload payload;
+        int64_t id = that->unstableResources_.RemoveOldest(payload);
+
+        switch (payload.type_)
+        {
+          case Orthanc::ResourceType_Patient:
+            that->db_->LogChange(ChangeType_StablePatient, id, ResourceType_Patient);
+            break;
+
+          case Orthanc::ResourceType_Study:
+            that->db_->LogChange(ChangeType_StableStudy, id, ResourceType_Study);
+            break;
+
+          case Orthanc::ResourceType_Series:
+            that->db_->LogChange(ChangeType_StableSeries, id, ResourceType_Series);
+            break;
+
+          default:
+            throw OrthancException(ErrorCode_InternalError);
+        }
+
+        //LOG(INFO) << "Stable resource: " << EnumerationToString(payload.type_) << " " << id;
+      }
+    }
+
+    LOG(INFO) << "Closing the monitor thread for stable resources";
+  }
+  
+
+  void ServerIndex::MarkAsUnstable(int64_t id,
+                                   Orthanc::ResourceType type)
+  {
+    // WARNING: Before calling this method, "mutex_" must be locked.
+
+    assert(type == Orthanc::ResourceType_Patient ||
+           type == Orthanc::ResourceType_Study ||
+           type == Orthanc::ResourceType_Series);
+
+    unstableResources_.AddOrMakeMostRecent(id, type);
+    //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id;
+  }
 }
--- a/OrthancServer/ServerIndex.h	Fri Aug 16 14:23:54 2013 +0200
+++ b/OrthancServer/ServerIndex.h	Fri Aug 16 15:39:53 2013 +0200
@@ -34,6 +34,7 @@
 
 #include <boost/thread.hpp>
 #include <boost/noncopyable.hpp>
+#include "../Core/Cache/LeastRecentlyUsedIndex.h"
 #include "../Core/SQLite/Connection.h"
 #include "../Core/DicomFormat/DicomMap.h"
 #include "../Core/DicomFormat/DicomInstanceHasher.h"
@@ -55,17 +56,25 @@
   {
   private:
     class Transaction;
+    struct UnstableResourcePayload;
 
+    bool done_;
     boost::mutex mutex_;
     boost::thread flushThread_;
+    boost::thread unstableResourcesMonitorThread_;
 
     std::auto_ptr<Internals::ServerIndexListener> listener_;
     std::auto_ptr<DatabaseWrapper> db_;
+    LeastRecentlyUsedIndex<int64_t, UnstableResourcePayload>  unstableResources_;
 
     uint64_t currentStorageSize_;
     uint64_t maximumStorageSize_;
     unsigned int maximumPatients_;
 
+    static void FlushThread(ServerIndex* that);
+
+    static void UnstableResourcesMonitorThread(ServerIndex* that);
+
     void MainDicomTagsToJson(Json::Value& result,
                              int64_t resourceId);
 
@@ -78,6 +87,9 @@
 
     void StandaloneRecycling();
 
+    void MarkAsUnstable(int64_t id,
+                        Orthanc::ResourceType type);
+
   public:
     typedef std::list<FileInfo> Attachments;
 
--- a/Resources/Configuration.json	Fri Aug 16 14:23:54 2013 +0200
+++ b/Resources/Configuration.json	Fri Aug 16 15:39:53 2013 +0200
@@ -33,13 +33,6 @@
   "LuaScripts" : [
   ],
 
-  // Dictionary of symbolic names for the user-defined metadata. Each
-  // entry must map a number between 1024 and 65535 to an unique
-  // string.
-  "UserMetadata" : {
-    // "Sample" : 1024
-  },
-
 
 
   /**
@@ -114,5 +107,22 @@
      **/
     // "peer"  : [ "http://localhost:8043/", "alice", "alicePassword" ]
     // "peer2" : [ "http://localhost:8044/" ]
-  }
+  },
+
+
+
+  /**
+   * Advanced options
+   **/
+
+  // Dictionary of symbolic names for the user-defined metadata. Each
+  // entry must map a number between 1024 and 65535 to an unique
+  // string.
+  "UserMetadata" : {
+    // "Sample" : 1024
+  },
+
+  // Number of seconds without receiving any instance before a
+  // patient, a study or a series is considered as stable.
+  "StableAge" : 60
 }
--- a/UnitTests/MemoryCache.cpp	Fri Aug 16 14:23:54 2013 +0200
+++ b/UnitTests/MemoryCache.cpp	Fri Aug 16 15:39:53 2013 +0200
@@ -195,105 +195,3 @@
 
   ASSERT_EQ("45 42 43 47 44 42 ", provider.log_);
 }
-
-
-#include "../OrthancServer/ServerEnumerations.h"
-
-namespace
-{
-  struct Payload
-  {
-    Orthanc::ResourceType type_;
-    boost::posix_time::ptime time_;
-
-    Payload() : type_(Orthanc::ResourceType_Instance)
-    {
-    }
-
-    Payload(Orthanc::ResourceType type) : type_(type)
-    {
-      time_ = boost::posix_time::second_clock::local_time();
-    }
-
-    unsigned int GetAge() const
-    {
-      return (boost::posix_time::second_clock::local_time() - time_).total_seconds();
-    }
-  };
-
-
-  class StableResourcesMonitor
-  {
-  private:
-    bool done_;
-    boost::mutex mutex_;
-    unsigned int stableTimeout_;
-    Orthanc::LeastRecentlyUsedIndex<std::string, Payload>  unstableResources_;
-    boost::thread thread_;
-
-    static void Thread(StableResourcesMonitor* that)
-    {
-      static const unsigned int SLEEP = 1;  // Check for stable resources each second
-
-      while (!that->done_)
-      {
-        boost::this_thread::sleep(boost::posix_time::seconds(SLEEP));
-
-        boost::mutex::scoped_lock lock(that->mutex_);
-        while (!that->unstableResources_.IsEmpty() &&
-               that->unstableResources_.GetOldestPayload().GetAge() > that->stableTimeout_)
-        {
-          // This DICOM resource has not received any new instance for
-          // some time. It can be considered as stable.
-          
-          Payload payload;
-          std::string id = that->unstableResources_.RemoveOldest(payload);
-
-          LOG(INFO) << "Stable resource: " << id << " (type " << payload.type_ << ")";
-        }
-      }
-
-      LOG(INFO) << "Closing the monitor for stable resources";
-    }
-
-  public:
-    StableResourcesMonitor(unsigned int stableTimeout)
-    {
-      done_ = false;
-      stableTimeout_ = stableTimeout;
-      thread_ = boost::thread(Thread, this);
-    }
-
-    ~StableResourcesMonitor()
-    {
-      done_ = true;
-
-      if (thread_.joinable())
-      {
-        thread_.join();
-      }
-    }
-
-    void ResourceUpdated(const std::string& id,
-                         Orthanc::ResourceType type)
-    {
-      assert(type == Orthanc::ResourceType_Patient ||
-             type == Orthanc::ResourceType_Study ||
-             type == Orthanc::ResourceType_Series);
-
-      boost::mutex::scoped_lock lock(mutex_);
-      unstableResources_.AddOrMakeMostRecent(id, type);
-    }
-  };
-}
-
-TEST(LRU, Hello)
-{
-  StableResourcesMonitor m(5);
-  boost::this_thread::sleep(boost::posix_time::seconds(1));
-  m.ResourceUpdated("Hello", Orthanc::ResourceType_Study);
-  m.ResourceUpdated("World", Orthanc::ResourceType_Series);
-  boost::this_thread::sleep(boost::posix_time::seconds(2));
-  m.ResourceUpdated("Hello", Orthanc::ResourceType_Study);
-  boost::this_thread::sleep(boost::posix_time::seconds(10));
-}