diff OrthancServer/Plugins/Samples/Housekeeper/Plugin.cpp @ 4988:8fba26292a9f

Housekeeper plugin: finalizing + integration tests ok
author Alain Mazy <am@osimis.io>
date Sat, 30 Apr 2022 19:39:40 +0200
parents 40fd2a485a84
children 3cdda1cec537
line wrap: on
line diff
--- a/OrthancServer/Plugins/Samples/Housekeeper/Plugin.cpp	Tue Apr 26 16:14:49 2022 +0200
+++ b/OrthancServer/Plugins/Samples/Housekeeper/Plugin.cpp	Sat Apr 30 19:39:40 2022 +0200
@@ -25,6 +25,7 @@
 
 #include <boost/thread.hpp>
 #include <boost/algorithm/string.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
 #include <json/value.h>
 #include <json/writer.h>
 #include <string.h>
@@ -42,6 +43,7 @@
 static bool triggerOnStorageCompressionChange_ = true;
 static bool triggerOnMainDicomTagsChange_ = true;
 static bool triggerOnUnnecessaryDicomAsJsonFiles_ = true;
+static bool triggerOnIngestTranscodingChange_ = true;
 
 
 struct RunningPeriod
@@ -112,6 +114,7 @@
   }
 };
 
+
 struct RunningPeriods
 {
   std::list<RunningPeriod> runningPeriods_;
@@ -153,6 +156,7 @@
 
 RunningPeriods runningPeriods_;
 
+
 struct DbConfiguration
 {
   std::string orthancVersion;
@@ -160,6 +164,7 @@
   std::string studiesMainDicomTagsSignature;
   std::string seriesMainDicomTagsSignature;
   std::string instancesMainDicomTagsSignature;
+  std::string ingestTranscoding;
   bool storageCompressionEnabled;
 
   DbConfiguration()
@@ -179,6 +184,7 @@
     studiesMainDicomTagsSignature.clear();
     seriesMainDicomTagsSignature.clear();
     instancesMainDicomTagsSignature.clear();
+    ingestTranscoding.clear();
   }
 
   void ToJson(Json::Value& target)
@@ -202,6 +208,7 @@
       target["MainDicomTagsSignature"] = signatures;
       target["OrthancVersion"] = orthancVersion;
       target["StorageCompressionEnabled"] = storageCompressionEnabled;
+      target["IngestTranscoding"] = ingestTranscoding;
     }
   }
 
@@ -218,15 +225,18 @@
       instancesMainDicomTagsSignature = signatures["Instance"].asString();
 
       storageCompressionEnabled = source["StorageCompressionEnabled"].asBool();
+      ingestTranscoding = source["IngestTranscoding"].asString();
     }
   }
 };
 
+
 struct PluginStatus
 {
   int statusVersion;
   int64_t lastProcessedChange;
   int64_t lastChangeToProcess;
+  boost::posix_time::ptime lastTimeStarted;
 
   DbConfiguration currentlyProcessingConfiguration; // last configuration being processed (has not reached last change yet)
   DbConfiguration lastProcessedConfiguration;       // last configuration that has been fully processed (till last change)
@@ -234,7 +244,8 @@
   PluginStatus()
   : statusVersion(1),
     lastProcessedChange(-1),
-    lastChangeToProcess(-1)
+    lastChangeToProcess(-1),
+    lastTimeStarted(boost::date_time::special_values::not_a_date_time)
   {
   }
 
@@ -245,6 +256,15 @@
     target["Version"] = statusVersion;
     target["LastProcessedChange"] = Json::Value::Int64(lastProcessedChange);
     target["LastChangeToProcess"] = Json::Value::Int64(lastChangeToProcess);
+    
+    if (lastTimeStarted == boost::posix_time::special_values::not_a_date_time)
+    {
+      target["LastTimeStarted"] = Json::Value::null;  
+    }
+    else
+    {
+      target["LastTimeStarted"] = boost::posix_time::to_iso_string(lastTimeStarted);
+    }
 
     currentlyProcessingConfiguration.ToJson(target["CurrentlyProcessingConfiguration"]);
     lastProcessedConfiguration.ToJson(target["LastProcessedConfiguration"]);
@@ -255,6 +275,14 @@
     statusVersion = source["Version"].asInt();
     lastProcessedChange = source["LastProcessedChange"].asInt64();
     lastChangeToProcess = source["LastChangeToProcess"].asInt64();
+    if (source["LastTimeStarted"].isNull())
+    {
+      lastTimeStarted = boost::posix_time::special_values::not_a_date_time;
+    }
+    else
+    {
+      lastTimeStarted = boost::posix_time::from_iso_string(source["LastTimeStarted"].asString());
+    }
 
     Json::Value& current = source["CurrentlyProcessingConfiguration"];
     Json::Value& last = source["LastProcessedConfiguration"];
@@ -264,9 +292,13 @@
   }
 };
 
+static PluginStatus pluginStatus_;
+static boost::recursive_mutex pluginStatusMutex_;
 
-static void ReadStatusFromDb(PluginStatus& pluginStatus)
+static void ReadStatusFromDb()
 {
+  boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
+
   OrthancPlugins::OrthancString globalPropertyContent;
 
   globalPropertyContent.Assign(OrthancPluginGetGlobalProperty(OrthancPlugins::GetGlobalContext(),
@@ -277,29 +309,32 @@
   {
     Json::Value jsonStatus;
     globalPropertyContent.ToJson(jsonStatus);
-    pluginStatus.FromJson(jsonStatus);
+    pluginStatus_.FromJson(jsonStatus);
   }
   else
   {
     // default config
-    pluginStatus.statusVersion = 1;
-    pluginStatus.lastProcessedChange = -1;
-    pluginStatus.lastChangeToProcess = -1;
+    pluginStatus_.statusVersion = 1;
+    pluginStatus_.lastProcessedChange = -1;
+    pluginStatus_.lastChangeToProcess = -1;
+    pluginStatus_.lastTimeStarted = boost::date_time::special_values::not_a_date_time;
     
-    pluginStatus.currentlyProcessingConfiguration.orthancVersion = "1.9.0"; // when we don't know, we assume some files were stored with Orthanc 1.9.0 (last version saving the dicom-as-json files)
+    pluginStatus_.lastProcessedConfiguration.orthancVersion = "1.9.0"; // when we don't know, we assume some files were stored with Orthanc 1.9.0 (last version saving the dicom-as-json files)
 
     // default main dicom tags signature are the one from Orthanc 1.4.2 (last time the list was changed):
-    pluginStatus.currentlyProcessingConfiguration.patientsMainDicomTagsSignature = "0010,0010;0010,0020;0010,0030;0010,0040;0010,1000";
-    pluginStatus.currentlyProcessingConfiguration.studiesMainDicomTagsSignature = "0008,0020;0008,0030;0008,0050;0008,0080;0008,0090;0008,1030;0020,000d;0020,0010;0032,1032;0032,1060";
-    pluginStatus.currentlyProcessingConfiguration.seriesMainDicomTagsSignature = "0008,0021;0008,0031;0008,0060;0008,0070;0008,1010;0008,103e;0008,1070;0018,0010;0018,0015;0018,0024;0018,1030;0018,1090;0018,1400;0020,000e;0020,0011;0020,0037;0020,0105;0020,1002;0040,0254;0054,0081;0054,0101;0054,1000";
-    pluginStatus.currentlyProcessingConfiguration.instancesMainDicomTagsSignature = "0008,0012;0008,0013;0008,0018;0020,0012;0020,0013;0020,0032;0020,0037;0020,0100;0020,4000;0028,0008;0054,1330"; 
+    pluginStatus_.lastProcessedConfiguration.patientsMainDicomTagsSignature = "0010,0010;0010,0020;0010,0030;0010,0040;0010,1000";
+    pluginStatus_.lastProcessedConfiguration.studiesMainDicomTagsSignature = "0008,0020;0008,0030;0008,0050;0008,0080;0008,0090;0008,1030;0020,000d;0020,0010;0032,1032;0032,1060";
+    pluginStatus_.lastProcessedConfiguration.seriesMainDicomTagsSignature = "0008,0021;0008,0031;0008,0060;0008,0070;0008,1010;0008,103e;0008,1070;0018,0010;0018,0015;0018,0024;0018,1030;0018,1090;0018,1400;0020,000e;0020,0011;0020,0037;0020,0105;0020,1002;0040,0254;0054,0081;0054,0101;0054,1000";
+    pluginStatus_.lastProcessedConfiguration.instancesMainDicomTagsSignature = "0008,0012;0008,0013;0008,0018;0020,0012;0020,0013;0020,0032;0020,0037;0020,0100;0020,4000;0028,0008;0054,1330"; 
   }
 }
 
-static void SaveStatusInDb(PluginStatus& pluginStatus)
+static void SaveStatusInDb()
 {
+  boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
+
   Json::Value jsonStatus;
-  pluginStatus.ToJson(jsonStatus);
+  pluginStatus_.ToJson(jsonStatus);
 
   Json::StreamWriterBuilder builder;
   builder.settings_["indentation"] = "   ";
@@ -321,26 +356,29 @@
   configuration.seriesMainDicomTagsSignature = systemInfo["MainDicomTags"]["Series"].asString();
   configuration.instancesMainDicomTagsSignature = systemInfo["MainDicomTags"]["Instance"].asString();
   configuration.storageCompressionEnabled = systemInfo["StorageCompression"].asBool();
+  configuration.ingestTranscoding = systemInfo["IngestTranscoding"].asString();
 
   configuration.orthancVersion = OrthancPlugins::GetGlobalContext()->orthancVersion;
 }
 
-static bool NeedsProcessing(const DbConfiguration& current, const DbConfiguration& last)
+static void CheckNeedsProcessing(bool& needsReconstruct, bool& needsReingest, const DbConfiguration& current, const DbConfiguration& last)
 {
+  needsReconstruct = false;
+  needsReingest = false;
+
   if (!last.IsDefined())
   {
-    return true;
+    return;
   }
 
   const char* lastVersion = last.orthancVersion.c_str();
-  bool needsProcessing = false;
 
   if (!OrthancPlugins::CheckMinimalVersion(lastVersion, 1, 9, 1))
   {
     if (triggerOnUnnecessaryDicomAsJsonFiles_)
     {
       OrthancPlugins::LogWarning("Housekeeper: your storage might still contain some dicom-as-json files -> will perform housekeeping");
-      needsProcessing = true;
+      needsReconstruct = true;  // the default reconstruct removes the dicom-as-json
     }
     else
     {
@@ -353,7 +391,7 @@
     if (triggerOnMainDicomTagsChange_)
     {
       OrthancPlugins::LogWarning("Housekeeper: Patient main dicom tags have changed, -> will perform housekeeping");
-      needsProcessing = true;
+      needsReconstruct = true;
     }
     else
     {
@@ -366,7 +404,7 @@
     if (triggerOnMainDicomTagsChange_)
     {
       OrthancPlugins::LogWarning("Housekeeper: Study main dicom tags have changed, -> will perform housekeeping");
-      needsProcessing = true;
+      needsReconstruct = true;
     }
     else
     {
@@ -379,7 +417,7 @@
     if (triggerOnMainDicomTagsChange_)
     {
       OrthancPlugins::LogWarning("Housekeeper: Series main dicom tags have changed, -> will perform housekeeping");
-      needsProcessing = true;
+      needsReconstruct = true;
     }
     else
     {
@@ -392,7 +430,7 @@
     if (triggerOnMainDicomTagsChange_)
     {
       OrthancPlugins::LogWarning("Housekeeper: Instance main dicom tags have changed, -> will perform housekeeping");
-      needsProcessing = true;
+      needsReconstruct = true;
     }
     else
     {
@@ -413,7 +451,7 @@
         OrthancPlugins::LogWarning("Housekeeper: storage compression is now disabled -> will perform housekeeping");
       }
       
-      needsProcessing = true;
+      needsReingest = true;
     }
     else
     {
@@ -421,16 +459,33 @@
     }
   }
 
-  return needsProcessing;
+  if (current.ingestTranscoding != last.ingestTranscoding)
+  {
+    if (triggerOnIngestTranscodingChange_)
+    {
+      OrthancPlugins::LogWarning("Housekeeper: ingest transcoding has changed -> will perform housekeeping");
+      
+      needsReingest = true;
+    }
+    else
+    {
+      OrthancPlugins::LogWarning("Housekeeper: ingest transcoding has changed but the trigger is disabled");
+    }
+  }
+
 }
 
-static bool ProcessChanges(PluginStatus& pluginStatus, const DbConfiguration& currentDbConfiguration)
+static bool ProcessChanges(bool needsReconstruct, bool needsReingest, const DbConfiguration& currentDbConfiguration)
 {
   Json::Value changes;
 
-  pluginStatus.currentlyProcessingConfiguration = currentDbConfiguration;
+  {
+    boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
 
-  OrthancPlugins::RestApiGet(changes, "/changes?since=" + boost::lexical_cast<std::string>(pluginStatus.lastProcessedChange) + "&limit=100", false);
+    pluginStatus_.currentlyProcessingConfiguration = currentDbConfiguration;
+
+    OrthancPlugins::RestApiGet(changes, "/changes?since=" + boost::lexical_cast<std::string>(pluginStatus_.lastProcessedChange) + "&limit=100", false);
+  }
 
   for (Json::ArrayIndex i = 0; i < changes["Changes"].size(); i++)
   {
@@ -440,16 +495,29 @@
     if (change["ChangeType"] == "NewStudy") // some StableStudy might be missing if orthanc was shutdown during a StableAge -> consider only the NewStudy events that can not be missed
     {
       Json::Value result;
-      OrthancPlugins::RestApiPost(result, "/studies/" + change["ID"].asString() + "/reconstruct", std::string(""), false);
-      boost::this_thread::sleep(boost::posix_time::milliseconds(throttleDelay_ * 1000));
+      Json::Value request;
+      if (needsReingest)
+      {
+        request["ReconstructFiles"] = true;
+      }
+      OrthancPlugins::RestApiPost(result, "/studies/" + change["ID"].asString() + "/reconstruct", request, false);
     }
 
-    if (seq >= pluginStatus.lastChangeToProcess)  // we are done !
     {
-      return true;
+      boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
+
+      pluginStatus_.lastProcessedChange = seq;
+
+      if (seq >= pluginStatus_.lastChangeToProcess)  // we are done !
+      {
+        return true;
+      }
     }
 
-    pluginStatus.lastProcessedChange = seq;
+    if (change["ChangeType"] == "NewStudy")
+    {
+      boost::this_thread::sleep(boost::posix_time::milliseconds(throttleDelay_ * 1000));
+    }
   }
 
   return false;
@@ -458,21 +526,31 @@
 
 static void WorkerThread()
 {
-  PluginStatus pluginStatus;
   DbConfiguration currentDbConfiguration;
 
   OrthancPluginLogWarning(OrthancPlugins::GetGlobalContext(), "Starting Housekeeper worker thread");
 
-  ReadStatusFromDb(pluginStatus);
+  ReadStatusFromDb();
+
   GetCurrentDbConfiguration(currentDbConfiguration);
 
-  if (!NeedsProcessing(currentDbConfiguration, pluginStatus.lastProcessedConfiguration))
+  bool needsReconstruct = false;
+  bool needsReingest = false;
+
+  {
+    boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
+    CheckNeedsProcessing(needsReconstruct, needsReingest, currentDbConfiguration, pluginStatus_.lastProcessedConfiguration);
+  }
+
+  bool needsProcessing = needsReconstruct || needsReingest;
+
+  if (!needsProcessing)
   {
     OrthancPlugins::LogWarning("Housekeeper: everything has been processed already !");
     return;
   }
 
-  if (force_ || NeedsProcessing(currentDbConfiguration, pluginStatus.currentlyProcessingConfiguration))
+  if (force_ || needsProcessing)
   {
     if (force_)
     {
@@ -486,29 +564,41 @@
     Json::Value changes;
     OrthancPlugins::RestApiGet(changes, "/changes?last", false);
 
-    pluginStatus.lastProcessedChange = 0;
-    pluginStatus.lastChangeToProcess = changes["Last"].asInt64();  // the last change is the last change at the time we start.  We assume that every new ingested file will be constructed correctly
+    {
+      boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
+
+      pluginStatus_.lastProcessedChange = 0;
+      pluginStatus_.lastChangeToProcess = changes["Last"].asInt64();  // the last change is the last change at the time we start.  We assume that every new ingested file will be constructed correctly
+      pluginStatus_.lastTimeStarted = boost::posix_time::microsec_clock::universal_time();
+    }
   }
   else
   {
     OrthancPlugins::LogWarning("Housekeeper: the DB configuration has not changed since last run, will continue processing changes");
   }
 
-  bool completed = pluginStatus.lastChangeToProcess == 0;  // if the DB is empty at start, no need to process anyting
+  bool completed = false;
+  {
+    boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
+    completed = pluginStatus_.lastChangeToProcess == 0;  // if the DB is empty at start, no need to process anyting
+  }
+
   bool loggedNotRightPeriodChangeMessage = false;
 
   while (!workerThreadShouldStop_ && !completed)
   {
     if (runningPeriods_.isInPeriod())
     {
-      completed = ProcessChanges(pluginStatus, currentDbConfiguration);
-      SaveStatusInDb(pluginStatus);
+      completed = ProcessChanges(needsReconstruct, needsReingest, currentDbConfiguration);
+      SaveStatusInDb();
       
       if (!completed)
       {
+        boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
+    
         OrthancPlugins::LogInfo("Housekeeper: processed changes " + 
-                                boost::lexical_cast<std::string>(pluginStatus.lastProcessedChange) + 
-                                " / " + boost::lexical_cast<std::string>(pluginStatus.lastChangeToProcess));
+                                boost::lexical_cast<std::string>(pluginStatus_.lastProcessedChange) + 
+                                " / " + boost::lexical_cast<std::string>(pluginStatus_.lastChangeToProcess));
         
         boost::this_thread::sleep(boost::posix_time::milliseconds(throttleDelay_ * 100));  // wait 1/10 of the delay between changes
       }
@@ -527,13 +617,15 @@
 
   if (completed)
   {
-    pluginStatus.lastProcessedConfiguration = currentDbConfiguration;
-    pluginStatus.currentlyProcessingConfiguration.Clear();
+    boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
+
+    pluginStatus_.lastProcessedConfiguration = currentDbConfiguration;
+    pluginStatus_.currentlyProcessingConfiguration.Clear();
 
-    pluginStatus.lastProcessedChange = -1;
-    pluginStatus.lastChangeToProcess = -1;
+    pluginStatus_.lastProcessedChange = -1;
+    pluginStatus_.lastChangeToProcess = -1;
     
-    SaveStatusInDb(pluginStatus);
+    SaveStatusInDb();
 
     OrthancPluginLogWarning(OrthancPlugins::GetGlobalContext(), "Housekeeper: finished processing all changes");
   }
@@ -541,6 +633,28 @@
 
 extern "C"
 {
+  OrthancPluginErrorCode GetPluginStatus(OrthancPluginRestOutput* output,
+                                         const char* url,
+                                         const OrthancPluginHttpRequest* request)
+  {
+    if (request->method != OrthancPluginHttpMethod_Get)
+    {
+      OrthancPlugins::AnswerMethodNotAllowed(output, "GET");
+    }
+    else
+    {
+      boost::recursive_mutex::scoped_lock lock(pluginStatusMutex_);
+
+      Json::Value status;
+      pluginStatus_.ToJson(status);
+
+      OrthancPlugins::AnswerJson(status, output);
+    }
+
+    return OrthancPluginErrorCode_Success;
+  }
+
+
   OrthancPluginErrorCode OnChangeCallback(OrthancPluginChangeType changeType,
                                           OrthancPluginResourceType resourceType,
                                           const char* resourceId)
@@ -581,10 +695,10 @@
     OrthancPlugins::LogWarning("Housekeeper plugin is initializing");
     OrthancPluginSetDescription(c, "Optimizes your DB and storage.");
 
-    OrthancPlugins::OrthancConfiguration configuration;
+    OrthancPlugins::OrthancConfiguration orthancConfiguration;
 
     OrthancPlugins::OrthancConfiguration housekeeper;
-    configuration.GetSection(housekeeper, "Housekeeper");
+    orthancConfiguration.GetSection(housekeeper, "Housekeeper");
 
     bool enabled = housekeeper.GetBooleanValue("Enable", false);
     if (enabled)
@@ -644,8 +758,10 @@
       if (housekeeper.GetJson().isMember("Triggers"))
       {
         triggerOnStorageCompressionChange_ = housekeeper.GetBooleanValue("StorageCompressionChange", true);
+
         triggerOnMainDicomTagsChange_ = housekeeper.GetBooleanValue("MainDicomTagsChange", true);
         triggerOnUnnecessaryDicomAsJsonFiles_ = housekeeper.GetBooleanValue("UnnecessaryDicomAsJsonFiles", true);
+        triggerOnIngestTranscodingChange_ = housekeeper.GetBooleanValue("IngestTranscodingChange", true);
       }
 
       if (housekeeper.GetJson().isMember("Schedule"))
@@ -654,6 +770,7 @@
       }
 
       OrthancPluginRegisterOnChangeCallback(c, OnChangeCallback);
+      OrthancPluginRegisterRestCallback(c, "/housekeeper/status", GetPluginStatus);
     }
     else
     {