changeset 568:2c494f872f70 am-experimental

wip: timing + one large query/vs individual queries to retrieve instance tags
author Alain Mazy <am@osimis.io>
date Tue, 20 Jun 2023 09:42:00 +0200
parents de4a56ecd2bc
children
files Plugin/DicomWebFormatter.cpp Plugin/WadoRs.cpp TODO
diffstat 3 files changed, 367 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- a/Plugin/DicomWebFormatter.cpp	Wed Jun 14 14:47:13 2023 +0200
+++ b/Plugin/DicomWebFormatter.cpp	Tue Jun 20 09:42:00 2023 +0200
@@ -26,7 +26,7 @@
 #if !defined(NDEBUG)  // In debug mode, check that the value is actually a JSON string
 #  include <Toolbox.h>
 #endif
-
+#include "Logging.h"
 
 namespace OrthancPlugins
 {
@@ -166,8 +166,18 @@
 
     std::string item;
 
+    static size_t instanceCounter = 0;
+    static size_t totalTime1 = 0;
+    static size_t totalTime2 = 0;
+
+    instanceCounter++;
+    boost::posix_time::ptime start2 = boost::posix_time::microsec_clock::universal_time();
+
     DicomWebFormatter::Apply(item, context_, dicom, size, isXml_, mode, bulkRoot, injectEmptyPixelData);
-   
+
+    boost::posix_time::ptime stop2 = boost::posix_time::microsec_clock::universal_time();
+    totalTime1 += (stop2-start2).total_microseconds();
+
     if (isXml_)
     {
       OrthancPluginSendMultipartItem(context_, output_, item.c_str(), item.size());
@@ -176,6 +186,14 @@
     {
       jsonBuffer_.AddChunk(item);
     }
+
+    stop2 = boost::posix_time::microsec_clock::universal_time();
+    totalTime2 += (stop2-start2).total_microseconds();
+
+    if (instanceCounter % 100 == 0)
+    {
+      LOG(WARNING) << "AddInternal i: " << instanceCounter << " " << totalTime1/instanceCounter << " " << totalTime2/instanceCounter;
+    }
   }
 
 
@@ -291,13 +309,13 @@
       throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
     }
 
-#if !defined(NDEBUG)  // In debug mode, check that the value is actually a JSON string
-    Json::Value json;
-    if (!OrthancPlugins::ReadJson(json, data, size))
-    {
-      throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat);
-    }
-#endif
+// #if !defined(NDEBUG)  // In debug mode, check that the value is actually a JSON string
+//     Json::Value json;
+//     if (!OrthancPlugins::ReadJson(json, data, size))
+//     {
+//       throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat);
+//     }
+// #endif
     
     if (first_)
     {
--- a/Plugin/WadoRs.cpp	Wed Jun 14 14:47:13 2023 +0200
+++ b/Plugin/WadoRs.cpp	Tue Jun 20 09:42:00 2023 +0200
@@ -27,9 +27,11 @@
 #include <ChunkedBuffer.h>
 #include <Logging.h>
 #include <Toolbox.h>
+#include <MultiThreading/SharedMessageQueue.h>
 
 #include <memory>
 #include <boost/thread/mutex.hpp>
+#include <boost/thread.hpp>
 
 static const char* const MAIN_DICOM_TAGS = "MainDicomTags";
 static const char* const REQUESTED_TAGS = "RequestedTags";
@@ -913,13 +915,32 @@
 
       // On a SSD drive, this version is twice slower than if using
       // cache (see below)
-    
+
+      static size_t instanceCounter = 0;
+      static size_t totalTime1 = 0;
+      static size_t totalTime2 = 0;
+
+      instanceCounter++;
+      boost::posix_time::ptime start2 = boost::posix_time::microsec_clock::universal_time();
+
       OrthancPlugins::MemoryBuffer dicomFile;
       if (dicomFile.RestApiGet("/instances/" + orthancId + "/file-until-pixel-data", false))
       {
+        boost::posix_time::ptime stop2 = boost::posix_time::microsec_clock::universal_time();
+        totalTime1 += (stop2-start2).total_microseconds();
+
         writer.AddDicom(dicomFile.GetData(), dicomFile.GetSize(), bulkRoot, true);
+
+        stop2 = boost::posix_time::microsec_clock::universal_time();
+        totalTime2 += (stop2-start2).total_microseconds();
       }
 
+      if (instanceCounter % 100 == 0)
+      {
+        LOG(WARNING) << " i: " << instanceCounter << " " << totalTime1/instanceCounter << " " << totalTime2/instanceCounter;
+      }
+
+
       break;
     }
 
@@ -1148,13 +1169,62 @@
 }
 
 
+static void GetChildrenIdentifiers(std::list<std::string>& target,
+                                   std::string& resourceDicomUid,
+                                   Orthanc::ResourceType level,
+                                   const std::string& orthancId)
+{
+  target.clear();
+
+  const char* childrenTag = NULL;
+  const char* dicomUidTag = NULL;
+  std::string uri;
+
+  switch (level)
+  {
+    case Orthanc::ResourceType_Study:
+      uri = "/studies/" + orthancId;
+      childrenTag = "Series";
+      dicomUidTag = "StudyInstanceUID";
+      break;
+       
+    case Orthanc::ResourceType_Series:
+      uri = "/series/" + orthancId;
+      childrenTag = "Instances";
+      dicomUidTag = "SeriesInstanceUID";
+      break;
+
+    default:
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
+  }
+
+  assert(childrenTag != NULL && dicomUidTag != NULL);
+  
+  Json::Value resource;
+  if (OrthancPlugins::RestApiGet(resource, uri, false))
+  {
+    if (resource.type() != Json::objectValue || !resource.isMember(childrenTag) 
+      || !resource.isMember(MAIN_DICOM_TAGS) || !resource[MAIN_DICOM_TAGS].isMember(dicomUidTag))
+    {
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol);
+    }
+
+    const Json::Value& children = resource[childrenTag];
+    resourceDicomUid = resource[MAIN_DICOM_TAGS][dicomUidTag].asString();
+
+    for (Json::Value::ArrayIndex i = 0; i < children.size(); i++)
+    {
+      target.push_back(children[i].asString());
+    }
+  }  
+}
 
 typedef std::map<std::string, boost::shared_ptr<Orthanc::DicomMap> >  DicomMaps;
 
-static void GetChildrenIdentifiers(DicomMaps& childrenDicomMaps,
-                                   std::string& resourceDicomUid,
-                                   Orthanc::ResourceType level,
-                                   const std::string& orthancId)
+static void GetChildrenDicomMaps(DicomMaps& childrenDicomMaps,
+                                 std::string& resourceDicomUid,
+                                 Orthanc::ResourceType level,
+                                 const std::string& orthancId)
 {
   childrenDicomMaps.clear();
 
@@ -1236,13 +1306,13 @@
 
       DicomMaps seriesDicomMaps;
       std::string studyDicomUid;
-      GetChildrenIdentifiers(seriesDicomMaps, studyDicomUid, Orthanc::ResourceType_Study, studyOrthancId);
+      GetChildrenDicomMaps(seriesDicomMaps, studyDicomUid, Orthanc::ResourceType_Study, studyOrthancId);
 
       for (DicomMaps::const_iterator s = seriesDicomMaps.begin(); s != seriesDicomMaps.end(); ++s)
       {
         DicomMaps instancesDicomMaps;
         std::string seriesDicomUid;
-        GetChildrenIdentifiers(instancesDicomMaps, seriesDicomUid, Orthanc::ResourceType_Series, s->first);
+        GetChildrenDicomMaps(instancesDicomMaps, seriesDicomUid, Orthanc::ResourceType_Series, s->first);
 
         // TODO: parallelize (share code with RetrieveSeriesMetadata)
         for (DicomMaps::const_iterator i = instancesDicomMaps.begin(); i != instancesDicomMaps.end(); ++i)
@@ -1258,12 +1328,135 @@
 }
 
 
+static const char* EXIT_WORKER_MESSAGE = "exit";
+
+class InstanceToLoad : public Orthanc::IDynamicObject
+{
+public:
+  std::string                  orthancId;
+  std::string                  studyInstanceUid;
+  std::string                  seriesInstanceUid;
+  std::string                  bulkRoot;
+  boost::mutex&                writerMutex;
+  OrthancPlugins::DicomWebFormatter::HttpWriter& writer;
+  bool                         isXml;
+  OrthancPluginDicomWebBinaryMode mode;
+
+  explicit InstanceToLoad(const std::string& orthancId, const std::string bulkRoot, boost::mutex& writerMutex, OrthancPlugins::DicomWebFormatter::HttpWriter& writer, bool isXml, OrthancPluginDicomWebBinaryMode mode, const std::string& studyInstanceUid, const std::string& seriesInstanceUid)
+  : orthancId(orthancId),
+    studyInstanceUid(studyInstanceUid),
+    seriesInstanceUid(seriesInstanceUid),
+    bulkRoot(bulkRoot),
+    writerMutex(writerMutex),
+    writer(writer),
+    isXml(isXml),
+    mode(mode)
+  {}
+};
+
+class InstanceWorkerData
+{
+public:
+  std::string id;
+  Orthanc::SharedMessageQueue* instancesQueue;
+  std::string wadoBase;
+
+  InstanceWorkerData(const std::string& id, Orthanc::SharedMessageQueue* instancesQueue, const std::string& wadoBase)
+  : id(id),
+    instancesQueue(instancesQueue),
+    wadoBase(wadoBase)
+  {}
+};
+
+void InstanceWorkerThread(InstanceWorkerData* data)
+{
+  size_t instanceCounter = 0;
+  size_t totalTime1 = 0;
+  size_t totalTime2 = 0;
+  size_t totalTime3 = 0;
+  std::string threadId = data->id;
+
+  while (true)
+  {
+    try
+    {
+      std::unique_ptr<InstanceToLoad> instanceToLoad(dynamic_cast<InstanceToLoad*>(data->instancesQueue->Dequeue(0)));
+
+      if (instanceToLoad->orthancId == EXIT_WORKER_MESSAGE)
+      {
+        LOG(WARNING) << threadId << " i: " << instanceCounter << " " << totalTime1/instanceCounter << " " << totalTime2/instanceCounter << " " << totalTime3/instanceCounter;
+        return;
+      }
+      instanceCounter++;
+      boost::posix_time::ptime start2 = boost::posix_time::microsec_clock::universal_time();
+      boost::posix_time::ptime stop2 = boost::posix_time::microsec_clock::universal_time();
+
+      if (instanceToLoad->bulkRoot == "") // we are not in oneLargeQuery mode -> we must load the instance tags to get the SOPInstanceUID
+      {
+        Json::Value instanceResource;
+
+        if (OrthancPlugins::RestApiGet(instanceResource, "/instances/" + instanceToLoad->orthancId, false))
+        {
+          instanceToLoad->bulkRoot = (data->wadoBase +
+                              "studies/" + instanceToLoad->studyInstanceUid +
+                              "/series/" + instanceToLoad->seriesInstanceUid + 
+                              "/instances/" + instanceResource["MainDicomTags"]["SOPInstanceUID"].asString() + "/bulk");
+        }
+      }
+
+      OrthancPlugins::MemoryBuffer dicomFile;
+      if (dicomFile.RestApiGet("/instances/" + instanceToLoad->orthancId + "/file-until-pixel-data", false))
+      {
+
+        stop2 = boost::posix_time::microsec_clock::universal_time(); // LOG(WARNING) << data->id << " read file one instance " << (stop2-start2).total_microseconds() << "us";
+        totalTime1 += (stop2-start2).total_microseconds();
+
+        if (false)
+        {        
+          boost::mutex::scoped_lock lock(instanceToLoad->writerMutex);
+          instanceToLoad->writer.AddDicom(dicomFile.GetData(), dicomFile.GetSize(), instanceToLoad->bulkRoot, true);
+        }
+        else
+        {
+          std::string content;
+          OrthancPlugins::DicomWebFormatter::Apply(content, 
+                                   OrthancPlugins::GetGlobalContext(), 
+                                   dicomFile.GetData(), dicomFile.GetSize(),
+                                   instanceToLoad->isXml, 
+                                   instanceToLoad->mode, 
+                                   instanceToLoad->bulkRoot, 
+                                   true /*injectEmptyPixelData*/);
+
+          stop2 = boost::posix_time::microsec_clock::universal_time(); // LOG(WARNING) << data->id << " formatted one instance " << (stop2-start2).total_microseconds() << "us";
+          totalTime2 += (stop2-start2).total_microseconds();
+
+          boost::mutex::scoped_lock lock(instanceToLoad->writerMutex);
+          instanceToLoad->writer.AddDicomWebSerializedJson(content.data(), content.size());
+
+          stop2 = boost::posix_time::microsec_clock::universal_time(); //LOG(WARNING) << data->id << " written one instance " << (stop2-start2).total_microseconds() << "us";
+          totalTime3 += (stop2-start2).total_microseconds();
+        }
+      }
+    }
+    catch(...)
+    {
+      // ignore errors but don't exit the thread to make sure all workers end correctly
+    }
+  }
+}
+
+#include <boost/date_time/posix_time/posix_time.hpp>
+
 void RetrieveSeriesMetadata(OrthancPluginRestOutput* output,
                             const char* url,
                             const OrthancPluginHttpRequest* request)
 {
   OrthancPluginContext* context = OrthancPlugins::GetGlobalContext();
-  
+
+  boost::posix_time::ptime start = boost::posix_time::microsec_clock::universal_time();
+  boost::posix_time::ptime stop = boost::posix_time::microsec_clock::universal_time();
+  // stop = boost::posix_time::microsec_clock::universal_time(); LOG(WARNING) << "start " << (stop-start).total_milliseconds();
+
   bool isXml;
   if (!AcceptMetadata(request, isXml))
   {
@@ -1294,18 +1487,107 @@
       else
       {
         DicomMaps instancesDicomMaps;
+        std::list<std::string> instancesIds;
         std::string seriesDicomUid;
-        GetChildrenIdentifiers(instancesDicomMaps, seriesDicomUid, Orthanc::ResourceType_Series, seriesOrthancId);
+
+        size_t threadCount = 4;
+        bool oneLargeQuery = false;
+
+        if (oneLargeQuery)
+        {
+          GetChildrenDicomMaps(instancesDicomMaps, seriesDicomUid, Orthanc::ResourceType_Series, seriesOrthancId);
+        }
+        else
+        {
+          GetChildrenIdentifiers(instancesIds, seriesDicomUid, Orthanc::ResourceType_Series, seriesOrthancId);
+        }
+
+        stop = boost::posix_time::microsec_clock::universal_time(); LOG(WARNING) << "Get instances tags " << (stop-start).total_milliseconds();
+
+        
+        if (true || threadCount > 1)
+        {
+          // span a few workers to get the tags from the core and serialize them
+          Orthanc::SharedMessageQueue instancesQueue;
+          std::vector<boost::shared_ptr<boost::thread> > instancesWorkers;
+          boost::mutex writerMutex;
+          std::vector<boost::shared_ptr<InstanceWorkerData> > instancesWorkersData;
+          std::string wadoBase = OrthancPlugins::Configuration::GetBasePublicUrl(request);
+
+          for (size_t t; t < threadCount; t++)
+          {
+            InstanceWorkerData* threadData = new InstanceWorkerData(boost::lexical_cast<std::string>(t), &instancesQueue, wadoBase);
+            instancesWorkersData.push_back(boost::shared_ptr<InstanceWorkerData>(threadData));
+            instancesWorkers.push_back(boost::shared_ptr<boost::thread>(new boost::thread(InstanceWorkerThread, threadData)));
+          }
+
+          if (oneLargeQuery)
+          {
+            for (DicomMaps::const_iterator i = instancesDicomMaps.begin(); i != instancesDicomMaps.end(); ++i)
+            {
+              std::string bulkRoot = (wadoBase +
+                                  "studies/" + studyInstanceUid +
+                                  "/series/" + seriesInstanceUid + 
+                                  "/instances/" + i->second->GetStringValue(Orthanc::DICOM_TAG_SOP_INSTANCE_UID, "", false) + "/bulk");
 
-        // TODO: parallelize
-        for (DicomMaps::const_iterator i = instancesDicomMaps.begin(); i != instancesDicomMaps.end(); ++i)
+              instancesQueue.Enqueue(new InstanceToLoad(i->first, bulkRoot, writerMutex, writer, isXml, OrthancPluginDicomWebBinaryMode_BulkDataUri, studyInstanceUid, seriesInstanceUid));
+            }
+          }
+          else
+          {
+            for (std::list<std::string>::const_iterator i = instancesIds.begin(); i != instancesIds.end(); ++i)
+            {
+              instancesQueue.Enqueue(new InstanceToLoad(*i, "", writerMutex, writer, isXml, OrthancPluginDicomWebBinaryMode_BulkDataUri, studyInstanceUid, seriesInstanceUid));
+            }
+          }
+
+          stop = boost::posix_time::microsec_clock::universal_time(); LOG(WARNING) << "enqueued " << (stop-start).total_milliseconds();
+
+          // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue
+          for (size_t i = 0; i < instancesWorkers.size(); i++)
+          {
+            instancesQueue.Enqueue(new InstanceToLoad(EXIT_WORKER_MESSAGE, "", writerMutex, writer, isXml, OrthancPluginDicomWebBinaryMode_BulkDataUri, studyInstanceUid, seriesInstanceUid));
+          }
+
+          stop = boost::posix_time::microsec_clock::universal_time(); LOG(WARNING) << "waiting " << (stop-start).total_milliseconds();
+
+          for (size_t i = 0; i < instancesWorkers.size(); i++)
+          {
+            if (instancesWorkers[i]->joinable())
+            {
+              instancesWorkers[i]->join();
+            }
+          }
+
+          instancesWorkers.clear();
+        }
+        else
         {
-          WriteInstanceMetadata(writer, mode, cache, i->first, i->second.get(), studyInstanceUid, seriesDicomUid,
-                                OrthancPlugins::Configuration::GetBasePublicUrl(request));
+          size_t instanceCounter = 0;
+          size_t totalTime1 = 0;
+
+          for (DicomMaps::const_iterator i = instancesDicomMaps.begin(); i != instancesDicomMaps.end(); ++i)
+          {
+            boost::posix_time::ptime start2 = boost::posix_time::microsec_clock::universal_time();
+
+            WriteInstanceMetadata(writer, mode, cache, i->first, i->second.get(), studyInstanceUid, seriesDicomUid,
+                                  OrthancPlugins::Configuration::GetBasePublicUrl(request));
+
+            boost::posix_time::ptime stop2 = boost::posix_time::microsec_clock::universal_time();
+            totalTime1 += (stop2-start2).total_microseconds();
+            instanceCounter++;
+          }
+          LOG(WARNING) << " i: " << instanceCounter << " " << totalTime1/instanceCounter;
+
         }
       }
 
+      stop = boost::posix_time::microsec_clock::universal_time(); LOG(WARNING) << "before send " << (stop-start).total_milliseconds();
+
       writer.Send();
+
+      stop = boost::posix_time::microsec_clock::universal_time(); LOG(WARNING) << "after send " << (stop-start).total_milliseconds();
+
     }
   }
 }
--- a/TODO	Wed Jun 14 14:47:13 2023 +0200
+++ b/TODO	Tue Jun 20 09:42:00 2023 +0200
@@ -34,7 +34,51 @@
     There is still a high cost for serialization (which is now simply copying and merging json nodes)
 
     // The "Full" mode optimized with reading only "file-until-pixel-data" and with a single call to get all instances tags is now
-    -> 0.750 s
+    -> 0.450 s
+    // after reading 4 files together (still only one DicomWebToJson serializer)
+    -> 0.450 s
+    // after reading 4 files together, with 4 DicomWebToJson serializers
+    -> 0.230 s
+
+4 threads (after removing the ReadJson in debug !):
+W0614 15:49:38.925856 PluginsManager.cpp:157] Get instances tags 90
+W0614 15:49:39.251882 PluginsManager.cpp:157] before send 416
+W0614 15:49:39.255049 PluginsManager.cpp:157] after send 420
+
+single thread:
+W0614 15:52:04.881710 PluginsManager.cpp:157] Get instances tags 92
+W0614 15:52:05.228800 PluginsManager.cpp:157] before send 439
+W0614 15:52:05.231285 PluginsManager.cpp:157] after send 441
+
+old-single threaded code (no log during measures):
+- total time: ~553us
+  - get file-until-pixel-data from plugin SDK: ~100us
+  - writer.AddDicom: ~450us
+    - format DicomWeb JSON: ~450us
+    - AddChunk: 2us
+
+after removing the ReadJson in debug !
+multi-threaded code with a single thread (measured with no logs !):
+- get file-until-pixel-data from plugin SDK: ~91us
+- format DicomWeb JSON: ~420us
+- add to writer: ~4us
+- TOTAL TIME: 413ms (85ms to get the instances tags)
+multi-threaded code with 2 threads (measured with no logs !):
+- get file-until-pixel-data from plugin SDK: ~110us
+- format DicomWeb JSON: ~450us
+- add to writer: ~4us
+- TOTAL TIME: 275ms (85ms to get the instances tags)
+multi-threaded code with 4 threads (measured with no logs !):
+- get file-until-pixel-data from plugin SDK: ~120us
+- format DicomWeb JSON: ~490us
+- add to writer: ~4us  
+- TOTAL TIME: 194ms (85ms to get the instances tags)
+multi-threaded code with 4 threads (measured with no logs !) + !oneLargeQuery mode -> get the instance SOPInstanceUID in the thread:
+- get file-until-pixel-data + SOP InstanceUID from plugin SDK: ~420us
+- format DicomWeb JSON: ~490us
+- add to writer: ~4us  
+- TOTAL TIME: 158ms
+
 
 
   - note that all measurements have been performed on a DB with a single series !  We should repeat