Mercurial > hg > orthanc
diff OrthancServer/ServerIndex.cpp @ 511:3b735fdf320b
monitoring of stable patients/studies/series
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 16 Aug 2013 15:39:53 +0200 |
parents | 23e5b35e3c5c |
children | 935e8c7e0b18 |
line wrap: on
line diff
--- a/OrthancServer/ServerIndex.cpp Fri Aug 16 14:23:54 2013 +0200 +++ b/OrthancServer/ServerIndex.cpp Fri Aug 16 15:39:53 2013 +0200 @@ -37,6 +37,7 @@ #endif #include "EmbeddedResources.h" +#include "OrthancInitialization.h" #include "../Core/Toolbox.h" #include "../Core/Uuid.h" #include "../Core/DicomFormat/DicomArray.h" @@ -187,6 +188,27 @@ }; + struct ServerIndex::UnstableResourcePayload + { + Orthanc::ResourceType type_; + boost::posix_time::ptime time_; + + UnstableResourcePayload() : type_(Orthanc::ResourceType_Instance) + { + } + + UnstableResourcePayload(Orthanc::ResourceType type) : type_(type) + { + time_ = boost::posix_time::second_clock::local_time(); + } + + unsigned int GetAge() const + { + return (boost::posix_time::second_clock::local_time() - time_).total_seconds(); + } + }; + + bool ServerIndex::DeleteResource(Json::Value& target, const std::string& uuid, ResourceType expectedType) @@ -227,18 +249,40 @@ } - static void FlushThread(DatabaseWrapper* db, - boost::mutex* mutex, - unsigned int sleep) + void ServerIndex::FlushThread(ServerIndex* that) { + unsigned int sleep; + + try + { + std::string sleepString = that->db_->GetGlobalProperty(GlobalProperty_FlushSleep); + sleep = boost::lexical_cast<unsigned int>(sleepString); + } + catch (boost::bad_lexical_cast&) + { + // By default, wait for 10 seconds before flushing + sleep = 10; + } + LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")"; - while (1) + unsigned int count = 0; + + while (!that->done_) { - boost::this_thread::sleep(boost::posix_time::seconds(sleep)); - boost::mutex::scoped_lock lock(*mutex); - db->FlushToDisk(); + boost::this_thread::sleep(boost::posix_time::seconds(1)); + count++; + if (count < sleep) + { + continue; + } + + boost::mutex::scoped_lock lock(that->mutex_); + that->db_->FlushToDisk(); + count = 0; } + + LOG(INFO) << "Stopping the database flushing thread"; } @@ -284,6 +328,7 @@ ServerIndex::ServerIndex(ServerContext& context, const std::string& dbPath) : + done_(false), maximumStorageSize_(0), maximumPatients_(0) { @@ -314,25 +359,24 @@ // execution of Orthanc StandaloneRecycling(); - unsigned int sleep; - try - { - std::string sleepString = db_->GetGlobalProperty(GlobalProperty_FlushSleep); - sleep = boost::lexical_cast<unsigned int>(sleepString); - } - catch (boost::bad_lexical_cast&) - { - // By default, wait for 10 seconds before flushing - sleep = 10; - } - - flushThread_ = boost::thread(FlushThread, db_.get(), &mutex_, sleep); + flushThread_ = boost::thread(FlushThread, this); + unstableResourcesMonitorThread_ = boost::thread(UnstableResourcesMonitorThread, this); } ServerIndex::~ServerIndex() { - LOG(INFO) << "Stopping the database flushing thread"; + done_ = true; + + if (flushThread_.joinable()) + { + flushThread_.join(); + } + + if (unstableResourcesMonitorThread_.joinable()) + { + unstableResourcesMonitorThread_.join(); + } } @@ -479,7 +523,6 @@ db_->SetMetadata(series, MetadataType_LastUpdate, now); db_->SetMetadata(study, MetadataType_LastUpdate, now); db_->SetMetadata(patient, MetadataType_LastUpdate, now); - db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet); const DicomValue* value; @@ -501,6 +544,11 @@ db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series); } + // Mark the parent resources of this instance as unstable + MarkAsUnstable(patient, ResourceType_Patient); + MarkAsUnstable(study, ResourceType_Study); + MarkAsUnstable(series, ResourceType_Series); + t.Commit(instanceSize); return StoreStatus_Success; @@ -756,6 +804,13 @@ if (tmp.size() != 0) result["ModifiedFrom"] = tmp; + if (type == ResourceType_Patient || + type == ResourceType_Study || + type == ResourceType_Series) + { + result["IsStable"] = !unstableResources_.Contains(id); + } + return true; } @@ -1343,4 +1398,69 @@ break; } } + + + void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that) + { + int stableAge = GetGlobalIntegerParameter("StableAge", 60); + if (stableAge <= 0) + { + stableAge = 60; + } + + LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")"; + + while (!that->done_) + { + // Check for stable resources each second + boost::this_thread::sleep(boost::posix_time::seconds(1)); + + boost::mutex::scoped_lock lock(that->mutex_); + while (!that->unstableResources_.IsEmpty() && + that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge)) + { + // This DICOM resource has not received any new instance for + // some time. It can be considered as stable. + + UnstableResourcePayload payload; + int64_t id = that->unstableResources_.RemoveOldest(payload); + + switch (payload.type_) + { + case Orthanc::ResourceType_Patient: + that->db_->LogChange(ChangeType_StablePatient, id, ResourceType_Patient); + break; + + case Orthanc::ResourceType_Study: + that->db_->LogChange(ChangeType_StableStudy, id, ResourceType_Study); + break; + + case Orthanc::ResourceType_Series: + that->db_->LogChange(ChangeType_StableSeries, id, ResourceType_Series); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + + //LOG(INFO) << "Stable resource: " << EnumerationToString(payload.type_) << " " << id; + } + } + + LOG(INFO) << "Closing the monitor thread for stable resources"; + } + + + void ServerIndex::MarkAsUnstable(int64_t id, + Orthanc::ResourceType type) + { + // WARNING: Before calling this method, "mutex_" must be locked. + + assert(type == Orthanc::ResourceType_Patient || + type == Orthanc::ResourceType_Study || + type == Orthanc::ResourceType_Series); + + unstableResources_.AddOrMakeMostRecent(id, type); + //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id; + } }