Mercurial > hg > orthanc
changeset 511:3b735fdf320b
monitoring of stable patients/studies/series
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 16 Aug 2013 15:39:53 +0200 |
parents | e7841864c97c |
children | 935e8c7e0b18 |
files | NEWS OrthancServer/ServerEnumerations.cpp OrthancServer/ServerEnumerations.h OrthancServer/ServerIndex.cpp OrthancServer/ServerIndex.h Resources/Configuration.json UnitTests/MemoryCache.cpp |
diffstat | 7 files changed, 186 insertions(+), 133 deletions(-) [+] |
line wrap: on
line diff
--- a/NEWS Fri Aug 16 14:23:54 2013 +0200 +++ b/NEWS Fri Aug 16 15:39:53 2013 +0200 @@ -1,6 +1,7 @@ Pending changes in the mainline =============================== +* Detection of stable patients/studies/series * C-Find SCU at the instance level
--- a/OrthancServer/ServerEnumerations.cpp Fri Aug 16 14:23:54 2013 +0200 +++ b/OrthancServer/ServerEnumerations.cpp Fri Aug 16 15:39:53 2013 +0200 @@ -205,6 +205,15 @@ case ChangeType_ModifiedPatient: return "ModifiedPatient"; + case ChangeType_StablePatient: + return "StablePatient"; + + case ChangeType_StableStudy: + return "StableStudy"; + + case ChangeType_StableSeries: + return "StableSeries"; + default: throw OrthancException(ErrorCode_ParameterOutOfRange); }
--- a/OrthancServer/ServerEnumerations.h Fri Aug 16 14:23:54 2013 +0200 +++ b/OrthancServer/ServerEnumerations.h Fri Aug 16 15:39:53 2013 +0200 @@ -100,7 +100,10 @@ ChangeType_ModifiedStudy = 8, ChangeType_ModifiedSeries = 9, ChangeType_AnonymizedPatient = 10, - ChangeType_ModifiedPatient = 11 + ChangeType_ModifiedPatient = 11, + ChangeType_StablePatient = 12, + ChangeType_StableStudy = 13, + ChangeType_StableSeries = 14 }; void InitializeServerEnumerations();
--- a/OrthancServer/ServerIndex.cpp Fri Aug 16 14:23:54 2013 +0200 +++ b/OrthancServer/ServerIndex.cpp Fri Aug 16 15:39:53 2013 +0200 @@ -37,6 +37,7 @@ #endif #include "EmbeddedResources.h" +#include "OrthancInitialization.h" #include "../Core/Toolbox.h" #include "../Core/Uuid.h" #include "../Core/DicomFormat/DicomArray.h" @@ -187,6 +188,27 @@ }; + struct ServerIndex::UnstableResourcePayload + { + Orthanc::ResourceType type_; + boost::posix_time::ptime time_; + + UnstableResourcePayload() : type_(Orthanc::ResourceType_Instance) + { + } + + UnstableResourcePayload(Orthanc::ResourceType type) : type_(type) + { + time_ = boost::posix_time::second_clock::local_time(); + } + + unsigned int GetAge() const + { + return (boost::posix_time::second_clock::local_time() - time_).total_seconds(); + } + }; + + bool ServerIndex::DeleteResource(Json::Value& target, const std::string& uuid, ResourceType expectedType) @@ -227,18 +249,40 @@ } - static void FlushThread(DatabaseWrapper* db, - boost::mutex* mutex, - unsigned int sleep) + void ServerIndex::FlushThread(ServerIndex* that) { + unsigned int sleep; + + try + { + std::string sleepString = that->db_->GetGlobalProperty(GlobalProperty_FlushSleep); + sleep = boost::lexical_cast<unsigned int>(sleepString); + } + catch (boost::bad_lexical_cast&) + { + // By default, wait for 10 seconds before flushing + sleep = 10; + } + LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")"; - while (1) + unsigned int count = 0; + + while (!that->done_) { - boost::this_thread::sleep(boost::posix_time::seconds(sleep)); - boost::mutex::scoped_lock lock(*mutex); - db->FlushToDisk(); + boost::this_thread::sleep(boost::posix_time::seconds(1)); + count++; + if (count < sleep) + { + continue; + } + + boost::mutex::scoped_lock lock(that->mutex_); + that->db_->FlushToDisk(); + count = 0; } + + LOG(INFO) << "Stopping the database flushing thread"; } @@ -284,6 +328,7 @@ ServerIndex::ServerIndex(ServerContext& context, const std::string& dbPath) : + done_(false), maximumStorageSize_(0), maximumPatients_(0) { @@ -314,25 +359,24 @@ // execution of Orthanc StandaloneRecycling(); - unsigned int sleep; - try - { - std::string sleepString = db_->GetGlobalProperty(GlobalProperty_FlushSleep); - sleep = boost::lexical_cast<unsigned int>(sleepString); - } - catch (boost::bad_lexical_cast&) - { - // By default, wait for 10 seconds before flushing - sleep = 10; - } - - flushThread_ = boost::thread(FlushThread, db_.get(), &mutex_, sleep); + flushThread_ = boost::thread(FlushThread, this); + unstableResourcesMonitorThread_ = boost::thread(UnstableResourcesMonitorThread, this); } ServerIndex::~ServerIndex() { - LOG(INFO) << "Stopping the database flushing thread"; + done_ = true; + + if (flushThread_.joinable()) + { + flushThread_.join(); + } + + if (unstableResourcesMonitorThread_.joinable()) + { + unstableResourcesMonitorThread_.join(); + } } @@ -479,7 +523,6 @@ db_->SetMetadata(series, MetadataType_LastUpdate, now); db_->SetMetadata(study, MetadataType_LastUpdate, now); db_->SetMetadata(patient, MetadataType_LastUpdate, now); - db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet); const DicomValue* value; @@ -501,6 +544,11 @@ db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series); } + // Mark the parent resources of this instance as unstable + MarkAsUnstable(patient, ResourceType_Patient); + MarkAsUnstable(study, ResourceType_Study); + MarkAsUnstable(series, ResourceType_Series); + t.Commit(instanceSize); return StoreStatus_Success; @@ -756,6 +804,13 @@ if (tmp.size() != 0) result["ModifiedFrom"] = tmp; + if (type == ResourceType_Patient || + type == ResourceType_Study || + type == ResourceType_Series) + { + result["IsStable"] = !unstableResources_.Contains(id); + } + return true; } @@ -1343,4 +1398,69 @@ break; } } + + + void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that) + { + int stableAge = GetGlobalIntegerParameter("StableAge", 60); + if (stableAge <= 0) + { + stableAge = 60; + } + + LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")"; + + while (!that->done_) + { + // Check for stable resources each second + boost::this_thread::sleep(boost::posix_time::seconds(1)); + + boost::mutex::scoped_lock lock(that->mutex_); + while (!that->unstableResources_.IsEmpty() && + that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge)) + { + // This DICOM resource has not received any new instance for + // some time. It can be considered as stable. + + UnstableResourcePayload payload; + int64_t id = that->unstableResources_.RemoveOldest(payload); + + switch (payload.type_) + { + case Orthanc::ResourceType_Patient: + that->db_->LogChange(ChangeType_StablePatient, id, ResourceType_Patient); + break; + + case Orthanc::ResourceType_Study: + that->db_->LogChange(ChangeType_StableStudy, id, ResourceType_Study); + break; + + case Orthanc::ResourceType_Series: + that->db_->LogChange(ChangeType_StableSeries, id, ResourceType_Series); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + + //LOG(INFO) << "Stable resource: " << EnumerationToString(payload.type_) << " " << id; + } + } + + LOG(INFO) << "Closing the monitor thread for stable resources"; + } + + + void ServerIndex::MarkAsUnstable(int64_t id, + Orthanc::ResourceType type) + { + // WARNING: Before calling this method, "mutex_" must be locked. + + assert(type == Orthanc::ResourceType_Patient || + type == Orthanc::ResourceType_Study || + type == Orthanc::ResourceType_Series); + + unstableResources_.AddOrMakeMostRecent(id, type); + //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id; + } }
--- a/OrthancServer/ServerIndex.h Fri Aug 16 14:23:54 2013 +0200 +++ b/OrthancServer/ServerIndex.h Fri Aug 16 15:39:53 2013 +0200 @@ -34,6 +34,7 @@ #include <boost/thread.hpp> #include <boost/noncopyable.hpp> +#include "../Core/Cache/LeastRecentlyUsedIndex.h" #include "../Core/SQLite/Connection.h" #include "../Core/DicomFormat/DicomMap.h" #include "../Core/DicomFormat/DicomInstanceHasher.h" @@ -55,17 +56,25 @@ { private: class Transaction; + struct UnstableResourcePayload; + bool done_; boost::mutex mutex_; boost::thread flushThread_; + boost::thread unstableResourcesMonitorThread_; std::auto_ptr<Internals::ServerIndexListener> listener_; std::auto_ptr<DatabaseWrapper> db_; + LeastRecentlyUsedIndex<int64_t, UnstableResourcePayload> unstableResources_; uint64_t currentStorageSize_; uint64_t maximumStorageSize_; unsigned int maximumPatients_; + static void FlushThread(ServerIndex* that); + + static void UnstableResourcesMonitorThread(ServerIndex* that); + void MainDicomTagsToJson(Json::Value& result, int64_t resourceId); @@ -78,6 +87,9 @@ void StandaloneRecycling(); + void MarkAsUnstable(int64_t id, + Orthanc::ResourceType type); + public: typedef std::list<FileInfo> Attachments;
--- a/Resources/Configuration.json Fri Aug 16 14:23:54 2013 +0200 +++ b/Resources/Configuration.json Fri Aug 16 15:39:53 2013 +0200 @@ -33,13 +33,6 @@ "LuaScripts" : [ ], - // Dictionary of symbolic names for the user-defined metadata. Each - // entry must map a number between 1024 and 65535 to an unique - // string. - "UserMetadata" : { - // "Sample" : 1024 - }, - /** @@ -114,5 +107,22 @@ **/ // "peer" : [ "http://localhost:8043/", "alice", "alicePassword" ] // "peer2" : [ "http://localhost:8044/" ] - } + }, + + + + /** + * Advanced options + **/ + + // Dictionary of symbolic names for the user-defined metadata. Each + // entry must map a number between 1024 and 65535 to an unique + // string. + "UserMetadata" : { + // "Sample" : 1024 + }, + + // Number of seconds without receiving any instance before a + // patient, a study or a series is considered as stable. + "StableAge" : 60 }
--- a/UnitTests/MemoryCache.cpp Fri Aug 16 14:23:54 2013 +0200 +++ b/UnitTests/MemoryCache.cpp Fri Aug 16 15:39:53 2013 +0200 @@ -195,105 +195,3 @@ ASSERT_EQ("45 42 43 47 44 42 ", provider.log_); } - - -#include "../OrthancServer/ServerEnumerations.h" - -namespace -{ - struct Payload - { - Orthanc::ResourceType type_; - boost::posix_time::ptime time_; - - Payload() : type_(Orthanc::ResourceType_Instance) - { - } - - Payload(Orthanc::ResourceType type) : type_(type) - { - time_ = boost::posix_time::second_clock::local_time(); - } - - unsigned int GetAge() const - { - return (boost::posix_time::second_clock::local_time() - time_).total_seconds(); - } - }; - - - class StableResourcesMonitor - { - private: - bool done_; - boost::mutex mutex_; - unsigned int stableTimeout_; - Orthanc::LeastRecentlyUsedIndex<std::string, Payload> unstableResources_; - boost::thread thread_; - - static void Thread(StableResourcesMonitor* that) - { - static const unsigned int SLEEP = 1; // Check for stable resources each second - - while (!that->done_) - { - boost::this_thread::sleep(boost::posix_time::seconds(SLEEP)); - - boost::mutex::scoped_lock lock(that->mutex_); - while (!that->unstableResources_.IsEmpty() && - that->unstableResources_.GetOldestPayload().GetAge() > that->stableTimeout_) - { - // This DICOM resource has not received any new instance for - // some time. It can be considered as stable. - - Payload payload; - std::string id = that->unstableResources_.RemoveOldest(payload); - - LOG(INFO) << "Stable resource: " << id << " (type " << payload.type_ << ")"; - } - } - - LOG(INFO) << "Closing the monitor for stable resources"; - } - - public: - StableResourcesMonitor(unsigned int stableTimeout) - { - done_ = false; - stableTimeout_ = stableTimeout; - thread_ = boost::thread(Thread, this); - } - - ~StableResourcesMonitor() - { - done_ = true; - - if (thread_.joinable()) - { - thread_.join(); - } - } - - void ResourceUpdated(const std::string& id, - Orthanc::ResourceType type) - { - assert(type == Orthanc::ResourceType_Patient || - type == Orthanc::ResourceType_Study || - type == Orthanc::ResourceType_Series); - - boost::mutex::scoped_lock lock(mutex_); - unstableResources_.AddOrMakeMostRecent(id, type); - } - }; -} - -TEST(LRU, Hello) -{ - StableResourcesMonitor m(5); - boost::this_thread::sleep(boost::posix_time::seconds(1)); - m.ResourceUpdated("Hello", Orthanc::ResourceType_Study); - m.ResourceUpdated("World", Orthanc::ResourceType_Series); - boost::this_thread::sleep(boost::posix_time::seconds(2)); - m.ResourceUpdated("Hello", Orthanc::ResourceType_Study); - boost::this_thread::sleep(boost::posix_time::seconds(10)); -}