Mercurial > hg > orthanc
diff OrthancServer/Plugins/Samples/Housekeeper/Plugin.cpp @ 4988:8fba26292a9f
Housekeeper plugin: finalizing + integration tests ok
author | Alain Mazy <am@osimis.io> |
---|---|
date | Sat, 30 Apr 2022 19:39:40 +0200 |
parents | 40fd2a485a84 |
children | 3cdda1cec537 |
line wrap: on
line diff
--- a/OrthancServer/Plugins/Samples/Housekeeper/Plugin.cpp Tue Apr 26 16:14:49 2022 +0200 +++ b/OrthancServer/Plugins/Samples/Housekeeper/Plugin.cpp Sat Apr 30 19:39:40 2022 +0200 @@ -25,6 +25,7 @@ #include <boost/thread.hpp> #include <boost/algorithm/string.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> #include <json/value.h> #include <json/writer.h> #include <string.h> @@ -42,6 +43,7 @@ static bool triggerOnStorageCompressionChange_ = true; static bool triggerOnMainDicomTagsChange_ = true; static bool triggerOnUnnecessaryDicomAsJsonFiles_ = true; +static bool triggerOnIngestTranscodingChange_ = true; struct RunningPeriod @@ -112,6 +114,7 @@ } }; + struct RunningPeriods { std::list<RunningPeriod> runningPeriods_; @@ -153,6 +156,7 @@ RunningPeriods runningPeriods_; + struct DbConfiguration { std::string orthancVersion; @@ -160,6 +164,7 @@ std::string studiesMainDicomTagsSignature; std::string seriesMainDicomTagsSignature; std::string instancesMainDicomTagsSignature; + std::string ingestTranscoding; bool storageCompressionEnabled; DbConfiguration() @@ -179,6 +184,7 @@ studiesMainDicomTagsSignature.clear(); seriesMainDicomTagsSignature.clear(); instancesMainDicomTagsSignature.clear(); + ingestTranscoding.clear(); } void ToJson(Json::Value& target) @@ -202,6 +208,7 @@ target["MainDicomTagsSignature"] = signatures; target["OrthancVersion"] = orthancVersion; target["StorageCompressionEnabled"] = storageCompressionEnabled; + target["IngestTranscoding"] = ingestTranscoding; } } @@ -218,15 +225,18 @@ instancesMainDicomTagsSignature = signatures["Instance"].asString(); storageCompressionEnabled = source["StorageCompressionEnabled"].asBool(); + ingestTranscoding = source["IngestTranscoding"].asString(); } } }; + struct PluginStatus { int statusVersion; int64_t lastProcessedChange; int64_t lastChangeToProcess; + boost::posix_time::ptime lastTimeStarted; DbConfiguration currentlyProcessingConfiguration; // last configuration being processed (has not reached last change yet) DbConfiguration lastProcessedConfiguration; // last configuration that has been fully processed (till last change) @@ -234,7 +244,8 @@ PluginStatus() : statusVersion(1), lastProcessedChange(-1), - lastChangeToProcess(-1) + lastChangeToProcess(-1), + lastTimeStarted(boost::date_time::special_values::not_a_date_time) { } @@ -245,6 +256,15 @@ target["Version"] = statusVersion; target["LastProcessedChange"] = Json::Value::Int64(lastProcessedChange); target["LastChangeToProcess"] = Json::Value::Int64(lastChangeToProcess); + + if (lastTimeStarted == boost::posix_time::special_values::not_a_date_time) + { + target["LastTimeStarted"] = Json::Value::null; + } + else + { + target["LastTimeStarted"] = boost::posix_time::to_iso_string(lastTimeStarted); + } currentlyProcessingConfiguration.ToJson(target["CurrentlyProcessingConfiguration"]); lastProcessedConfiguration.ToJson(target["LastProcessedConfiguration"]); @@ -255,6 +275,14 @@ statusVersion = source["Version"].asInt(); lastProcessedChange = source["LastProcessedChange"].asInt64(); lastChangeToProcess = source["LastChangeToProcess"].asInt64(); + if (source["LastTimeStarted"].isNull()) + { + lastTimeStarted = boost::posix_time::special_values::not_a_date_time; + } + else + { + lastTimeStarted = boost::posix_time::from_iso_string(source["LastTimeStarted"].asString()); + } Json::Value& current = source["CurrentlyProcessingConfiguration"]; Json::Value& last = source["LastProcessedConfiguration"]; @@ -264,9 +292,13 @@ } }; +static PluginStatus pluginStatus_; +static boost::recursive_mutex pluginStatusMutex_; -static void ReadStatusFromDb(PluginStatus& pluginStatus) +static void ReadStatusFromDb() { + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); + OrthancPlugins::OrthancString globalPropertyContent; globalPropertyContent.Assign(OrthancPluginGetGlobalProperty(OrthancPlugins::GetGlobalContext(), @@ -277,29 +309,32 @@ { Json::Value jsonStatus; globalPropertyContent.ToJson(jsonStatus); - pluginStatus.FromJson(jsonStatus); + pluginStatus_.FromJson(jsonStatus); } else { // default config - pluginStatus.statusVersion = 1; - pluginStatus.lastProcessedChange = -1; - pluginStatus.lastChangeToProcess = -1; + pluginStatus_.statusVersion = 1; + pluginStatus_.lastProcessedChange = -1; + pluginStatus_.lastChangeToProcess = -1; + pluginStatus_.lastTimeStarted = boost::date_time::special_values::not_a_date_time; - pluginStatus.currentlyProcessingConfiguration.orthancVersion = "1.9.0"; // when we don't know, we assume some files were stored with Orthanc 1.9.0 (last version saving the dicom-as-json files) + pluginStatus_.lastProcessedConfiguration.orthancVersion = "1.9.0"; // when we don't know, we assume some files were stored with Orthanc 1.9.0 (last version saving the dicom-as-json files) // default main dicom tags signature are the one from Orthanc 1.4.2 (last time the list was changed): - pluginStatus.currentlyProcessingConfiguration.patientsMainDicomTagsSignature = "0010,0010;0010,0020;0010,0030;0010,0040;0010,1000"; - pluginStatus.currentlyProcessingConfiguration.studiesMainDicomTagsSignature = "0008,0020;0008,0030;0008,0050;0008,0080;0008,0090;0008,1030;0020,000d;0020,0010;0032,1032;0032,1060"; - pluginStatus.currentlyProcessingConfiguration.seriesMainDicomTagsSignature = "0008,0021;0008,0031;0008,0060;0008,0070;0008,1010;0008,103e;0008,1070;0018,0010;0018,0015;0018,0024;0018,1030;0018,1090;0018,1400;0020,000e;0020,0011;0020,0037;0020,0105;0020,1002;0040,0254;0054,0081;0054,0101;0054,1000"; - pluginStatus.currentlyProcessingConfiguration.instancesMainDicomTagsSignature = "0008,0012;0008,0013;0008,0018;0020,0012;0020,0013;0020,0032;0020,0037;0020,0100;0020,4000;0028,0008;0054,1330"; + pluginStatus_.lastProcessedConfiguration.patientsMainDicomTagsSignature = "0010,0010;0010,0020;0010,0030;0010,0040;0010,1000"; + pluginStatus_.lastProcessedConfiguration.studiesMainDicomTagsSignature = "0008,0020;0008,0030;0008,0050;0008,0080;0008,0090;0008,1030;0020,000d;0020,0010;0032,1032;0032,1060"; + pluginStatus_.lastProcessedConfiguration.seriesMainDicomTagsSignature = "0008,0021;0008,0031;0008,0060;0008,0070;0008,1010;0008,103e;0008,1070;0018,0010;0018,0015;0018,0024;0018,1030;0018,1090;0018,1400;0020,000e;0020,0011;0020,0037;0020,0105;0020,1002;0040,0254;0054,0081;0054,0101;0054,1000"; + pluginStatus_.lastProcessedConfiguration.instancesMainDicomTagsSignature = "0008,0012;0008,0013;0008,0018;0020,0012;0020,0013;0020,0032;0020,0037;0020,0100;0020,4000;0028,0008;0054,1330"; } } -static void SaveStatusInDb(PluginStatus& pluginStatus) +static void SaveStatusInDb() { + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); + Json::Value jsonStatus; - pluginStatus.ToJson(jsonStatus); + pluginStatus_.ToJson(jsonStatus); Json::StreamWriterBuilder builder; builder.settings_["indentation"] = " "; @@ -321,26 +356,29 @@ configuration.seriesMainDicomTagsSignature = systemInfo["MainDicomTags"]["Series"].asString(); configuration.instancesMainDicomTagsSignature = systemInfo["MainDicomTags"]["Instance"].asString(); configuration.storageCompressionEnabled = systemInfo["StorageCompression"].asBool(); + configuration.ingestTranscoding = systemInfo["IngestTranscoding"].asString(); configuration.orthancVersion = OrthancPlugins::GetGlobalContext()->orthancVersion; } -static bool NeedsProcessing(const DbConfiguration& current, const DbConfiguration& last) +static void CheckNeedsProcessing(bool& needsReconstruct, bool& needsReingest, const DbConfiguration& current, const DbConfiguration& last) { + needsReconstruct = false; + needsReingest = false; + if (!last.IsDefined()) { - return true; + return; } const char* lastVersion = last.orthancVersion.c_str(); - bool needsProcessing = false; if (!OrthancPlugins::CheckMinimalVersion(lastVersion, 1, 9, 1)) { if (triggerOnUnnecessaryDicomAsJsonFiles_) { OrthancPlugins::LogWarning("Housekeeper: your storage might still contain some dicom-as-json files -> will perform housekeeping"); - needsProcessing = true; + needsReconstruct = true; // the default reconstruct removes the dicom-as-json } else { @@ -353,7 +391,7 @@ if (triggerOnMainDicomTagsChange_) { OrthancPlugins::LogWarning("Housekeeper: Patient main dicom tags have changed, -> will perform housekeeping"); - needsProcessing = true; + needsReconstruct = true; } else { @@ -366,7 +404,7 @@ if (triggerOnMainDicomTagsChange_) { OrthancPlugins::LogWarning("Housekeeper: Study main dicom tags have changed, -> will perform housekeeping"); - needsProcessing = true; + needsReconstruct = true; } else { @@ -379,7 +417,7 @@ if (triggerOnMainDicomTagsChange_) { OrthancPlugins::LogWarning("Housekeeper: Series main dicom tags have changed, -> will perform housekeeping"); - needsProcessing = true; + needsReconstruct = true; } else { @@ -392,7 +430,7 @@ if (triggerOnMainDicomTagsChange_) { OrthancPlugins::LogWarning("Housekeeper: Instance main dicom tags have changed, -> will perform housekeeping"); - needsProcessing = true; + needsReconstruct = true; } else { @@ -413,7 +451,7 @@ OrthancPlugins::LogWarning("Housekeeper: storage compression is now disabled -> will perform housekeeping"); } - needsProcessing = true; + needsReingest = true; } else { @@ -421,16 +459,33 @@ } } - return needsProcessing; + if (current.ingestTranscoding != last.ingestTranscoding) + { + if (triggerOnIngestTranscodingChange_) + { + OrthancPlugins::LogWarning("Housekeeper: ingest transcoding has changed -> will perform housekeeping"); + + needsReingest = true; + } + else + { + OrthancPlugins::LogWarning("Housekeeper: ingest transcoding has changed but the trigger is disabled"); + } + } + } -static bool ProcessChanges(PluginStatus& pluginStatus, const DbConfiguration& currentDbConfiguration) +static bool ProcessChanges(bool needsReconstruct, bool needsReingest, const DbConfiguration& currentDbConfiguration) { Json::Value changes; - pluginStatus.currentlyProcessingConfiguration = currentDbConfiguration; + { + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); - OrthancPlugins::RestApiGet(changes, "/changes?since=" + boost::lexical_cast<std::string>(pluginStatus.lastProcessedChange) + "&limit=100", false); + pluginStatus_.currentlyProcessingConfiguration = currentDbConfiguration; + + OrthancPlugins::RestApiGet(changes, "/changes?since=" + boost::lexical_cast<std::string>(pluginStatus_.lastProcessedChange) + "&limit=100", false); + } for (Json::ArrayIndex i = 0; i < changes["Changes"].size(); i++) { @@ -440,16 +495,29 @@ if (change["ChangeType"] == "NewStudy") // some StableStudy might be missing if orthanc was shutdown during a StableAge -> consider only the NewStudy events that can not be missed { Json::Value result; - OrthancPlugins::RestApiPost(result, "/studies/" + change["ID"].asString() + "/reconstruct", std::string(""), false); - boost::this_thread::sleep(boost::posix_time::milliseconds(throttleDelay_ * 1000)); + Json::Value request; + if (needsReingest) + { + request["ReconstructFiles"] = true; + } + OrthancPlugins::RestApiPost(result, "/studies/" + change["ID"].asString() + "/reconstruct", request, false); } - if (seq >= pluginStatus.lastChangeToProcess) // we are done ! { - return true; + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); + + pluginStatus_.lastProcessedChange = seq; + + if (seq >= pluginStatus_.lastChangeToProcess) // we are done ! + { + return true; + } } - pluginStatus.lastProcessedChange = seq; + if (change["ChangeType"] == "NewStudy") + { + boost::this_thread::sleep(boost::posix_time::milliseconds(throttleDelay_ * 1000)); + } } return false; @@ -458,21 +526,31 @@ static void WorkerThread() { - PluginStatus pluginStatus; DbConfiguration currentDbConfiguration; OrthancPluginLogWarning(OrthancPlugins::GetGlobalContext(), "Starting Housekeeper worker thread"); - ReadStatusFromDb(pluginStatus); + ReadStatusFromDb(); + GetCurrentDbConfiguration(currentDbConfiguration); - if (!NeedsProcessing(currentDbConfiguration, pluginStatus.lastProcessedConfiguration)) + bool needsReconstruct = false; + bool needsReingest = false; + + { + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); + CheckNeedsProcessing(needsReconstruct, needsReingest, currentDbConfiguration, pluginStatus_.lastProcessedConfiguration); + } + + bool needsProcessing = needsReconstruct || needsReingest; + + if (!needsProcessing) { OrthancPlugins::LogWarning("Housekeeper: everything has been processed already !"); return; } - if (force_ || NeedsProcessing(currentDbConfiguration, pluginStatus.currentlyProcessingConfiguration)) + if (force_ || needsProcessing) { if (force_) { @@ -486,29 +564,41 @@ Json::Value changes; OrthancPlugins::RestApiGet(changes, "/changes?last", false); - pluginStatus.lastProcessedChange = 0; - pluginStatus.lastChangeToProcess = changes["Last"].asInt64(); // the last change is the last change at the time we start. We assume that every new ingested file will be constructed correctly + { + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); + + pluginStatus_.lastProcessedChange = 0; + pluginStatus_.lastChangeToProcess = changes["Last"].asInt64(); // the last change is the last change at the time we start. We assume that every new ingested file will be constructed correctly + pluginStatus_.lastTimeStarted = boost::posix_time::microsec_clock::universal_time(); + } } else { OrthancPlugins::LogWarning("Housekeeper: the DB configuration has not changed since last run, will continue processing changes"); } - bool completed = pluginStatus.lastChangeToProcess == 0; // if the DB is empty at start, no need to process anyting + bool completed = false; + { + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); + completed = pluginStatus_.lastChangeToProcess == 0; // if the DB is empty at start, no need to process anyting + } + bool loggedNotRightPeriodChangeMessage = false; while (!workerThreadShouldStop_ && !completed) { if (runningPeriods_.isInPeriod()) { - completed = ProcessChanges(pluginStatus, currentDbConfiguration); - SaveStatusInDb(pluginStatus); + completed = ProcessChanges(needsReconstruct, needsReingest, currentDbConfiguration); + SaveStatusInDb(); if (!completed) { + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); + OrthancPlugins::LogInfo("Housekeeper: processed changes " + - boost::lexical_cast<std::string>(pluginStatus.lastProcessedChange) + - " / " + boost::lexical_cast<std::string>(pluginStatus.lastChangeToProcess)); + boost::lexical_cast<std::string>(pluginStatus_.lastProcessedChange) + + " / " + boost::lexical_cast<std::string>(pluginStatus_.lastChangeToProcess)); boost::this_thread::sleep(boost::posix_time::milliseconds(throttleDelay_ * 100)); // wait 1/10 of the delay between changes } @@ -527,13 +617,15 @@ if (completed) { - pluginStatus.lastProcessedConfiguration = currentDbConfiguration; - pluginStatus.currentlyProcessingConfiguration.Clear(); + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); + + pluginStatus_.lastProcessedConfiguration = currentDbConfiguration; + pluginStatus_.currentlyProcessingConfiguration.Clear(); - pluginStatus.lastProcessedChange = -1; - pluginStatus.lastChangeToProcess = -1; + pluginStatus_.lastProcessedChange = -1; + pluginStatus_.lastChangeToProcess = -1; - SaveStatusInDb(pluginStatus); + SaveStatusInDb(); OrthancPluginLogWarning(OrthancPlugins::GetGlobalContext(), "Housekeeper: finished processing all changes"); } @@ -541,6 +633,28 @@ extern "C" { + OrthancPluginErrorCode GetPluginStatus(OrthancPluginRestOutput* output, + const char* url, + const OrthancPluginHttpRequest* request) + { + if (request->method != OrthancPluginHttpMethod_Get) + { + OrthancPlugins::AnswerMethodNotAllowed(output, "GET"); + } + else + { + boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_); + + Json::Value status; + pluginStatus_.ToJson(status); + + OrthancPlugins::AnswerJson(status, output); + } + + return OrthancPluginErrorCode_Success; + } + + OrthancPluginErrorCode OnChangeCallback(OrthancPluginChangeType changeType, OrthancPluginResourceType resourceType, const char* resourceId) @@ -581,10 +695,10 @@ OrthancPlugins::LogWarning("Housekeeper plugin is initializing"); OrthancPluginSetDescription(c, "Optimizes your DB and storage."); - OrthancPlugins::OrthancConfiguration configuration; + OrthancPlugins::OrthancConfiguration orthancConfiguration; OrthancPlugins::OrthancConfiguration housekeeper; - configuration.GetSection(housekeeper, "Housekeeper"); + orthancConfiguration.GetSection(housekeeper, "Housekeeper"); bool enabled = housekeeper.GetBooleanValue("Enable", false); if (enabled) @@ -644,8 +758,10 @@ if (housekeeper.GetJson().isMember("Triggers")) { triggerOnStorageCompressionChange_ = housekeeper.GetBooleanValue("StorageCompressionChange", true); + triggerOnMainDicomTagsChange_ = housekeeper.GetBooleanValue("MainDicomTagsChange", true); triggerOnUnnecessaryDicomAsJsonFiles_ = housekeeper.GetBooleanValue("UnnecessaryDicomAsJsonFiles", true); + triggerOnIngestTranscodingChange_ = housekeeper.GetBooleanValue("IngestTranscodingChange", true); } if (housekeeper.GetJson().isMember("Schedule")) @@ -654,6 +770,7 @@ } OrthancPluginRegisterOnChangeCallback(c, OnChangeCallback); + OrthancPluginRegisterRestCallback(c, "/housekeeper/status", GetPluginStatus); } else {