changeset 715:d8114860cba4 default tip

merge
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 13 Jun 2025 15:30:50 +0200 (3 weeks ago)
parents e87b198aa6f1 (current diff) 9c62639ccb35 (diff)
children
files
diffstat 6 files changed, 353 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/NEWS	Fri Jun 13 15:30:36 2025 +0200
+++ b/NEWS	Fri Jun 13 15:30:50 2025 +0200
@@ -1,3 +1,12 @@
+Pending changes in the mainline
+===============================
+
+* New configuration "WadoRsLoaderThreadsCount" to configure how many threads are loading
+  files from the storage when answering to a WADO-RS query.  A value > 1 is meaningful 
+  only if the storage is a distributed network storage (e.g object storage plugin).
+  A value of 0 means reading and writing are performed in sequence (default behaviour).
+
+
 Version 1.20 (2025-05-12)
 =========================
 
--- a/Plugin/Configuration.cpp	Fri Jun 13 15:30:36 2025 +0200
+++ b/Plugin/Configuration.cpp	Fri Jun 13 15:30:50 2025 +0200
@@ -691,6 +691,11 @@
       return globalConfiguration_->GetBooleanValue("ReadOnly", false);
     }
     
+    unsigned int GetWadoRsLoaderThreadsCount()
+    {
+      return GetUnsignedIntegerValue("WadoRsLoaderThreadsCount", 0);
+    }
+
     MetadataMode GetMetadataMode(Orthanc::ResourceType level)
     {
       static const std::string FULL = "Full";
--- a/Plugin/Configuration.h	Fri Jun 13 15:30:36 2025 +0200
+++ b/Plugin/Configuration.h	Fri Jun 13 15:30:50 2025 +0200
@@ -141,6 +141,8 @@
 
     unsigned int GetMetadataWorkerThreadsCount();
 
+    unsigned int GetWadoRsLoaderThreadsCount();
+
     bool IsMetadataCacheEnabled();
 
     bool IsReadOnly();
--- a/Plugin/Plugin.cpp	Fri Jun 13 15:30:36 2025 +0200
+++ b/Plugin/Plugin.cpp	Fri Jun 13 15:30:50 2025 +0200
@@ -746,6 +746,8 @@
 
         std::string publicUrlRoot = OrthancPlugins::Configuration::GetPublicRoot();
         LOG(WARNING) << "DICOMweb public root: " << publicUrlRoot;
+
+        LOG(WARNING) << "The DICOMWeb plugin will use " << (OrthancPlugins::Configuration::GetWadoRsLoaderThreadsCount() == 0 ? 1 : OrthancPlugins::Configuration::GetWadoRsLoaderThreadsCount()) << " threads to load DICOM files for WADO-RS queries";
       }
       else
       {
--- a/Plugin/WadoRs.cpp	Fri Jun 13 15:30:36 2025 +0200
+++ b/Plugin/WadoRs.cpp	Fri Jun 13 15:30:50 2025 +0200
@@ -32,6 +32,7 @@
 #include <Toolbox.h>
 #include <SerializationToolbox.h>
 #include <MultiThreading/SharedMessageQueue.h>
+#include <MultiThreading/Semaphore.h>
 #include <Compression/GzipCompressor.h>
 
 #include <memory>
@@ -359,6 +360,234 @@
   }
 }
 
+class InstanceToPreload : public Orthanc::IDynamicObject
+{
+private:
+  std::string instanceId_;
+  bool transcode_;
+
+public:
+  explicit InstanceToPreload(const std::string& instanceId, bool transcode) : 
+    instanceId_(instanceId),
+    transcode_(transcode)
+  {
+  }
+
+  virtual ~InstanceToPreload() ORTHANC_OVERRIDE
+  {
+  }
+
+  const std::string& GetInstanceId() const {return instanceId_;}
+
+  bool NeedsTranscoding() const {return transcode_;}
+};
+
+
+class LoadedInstance : public Orthanc::IDynamicObject
+{
+private:
+  std::unique_ptr<OrthancPlugins::DicomInstance> dicom_;
+
+public:
+  explicit LoadedInstance(OrthancPlugins::DicomInstance* dicom) : 
+    dicom_(dicom)
+  {
+  }
+
+  virtual ~LoadedInstance() ORTHANC_OVERRIDE
+  {
+  }
+
+  OrthancPlugins::DicomInstance* ReleaseInstance() 
+  {
+    return dicom_.release();
+  }
+};
+
+
+class InstanceLoader : public boost::noncopyable
+{
+protected:
+  bool                                  transcode_;
+  Orthanc::DicomTransferSyntax          targetTransferSyntax_;
+
+  OrthancPlugins::DicomInstance* GetAndTranscodeDicom(InstanceToPreload* instanceToLoad)
+  {
+    std::unique_ptr<OrthancPlugins::DicomInstance> dicom(OrthancPlugins::DicomInstance::Load(instanceToLoad->GetInstanceId(),
+                                                                                             OrthancPluginLoadDicomInstanceMode_WholeDicom));
+    if (transcode_ && instanceToLoad->NeedsTranscoding())
+    {
+      dicom.reset(dicom->Transcode(dicom->GetBuffer(),
+                                    dicom->GetSize(),
+                                    Orthanc::GetTransferSyntaxUid(targetTransferSyntax_)));
+    }
+
+    return dicom.release();
+  }
+
+public:
+  explicit InstanceLoader(bool transcode, Orthanc::DicomTransferSyntax targetTransferSyntax)
+  : transcode_(transcode),
+    targetTransferSyntax_(targetTransferSyntax)
+  {
+  }
+
+  virtual ~InstanceLoader()
+  {
+  }
+
+  virtual void PrepareDicom(const std::string& instanceId, bool transcode) = 0;
+
+  virtual OrthancPlugins::DicomInstance* GetNextDicom() = 0;
+
+};
+
+
+class ThreadedInstanceLoader : public InstanceLoader
+{
+  std::vector<boost::thread*>         threads_;
+
+  Orthanc::SharedMessageQueue         instancesToPreload_;
+
+  Orthanc::SharedMessageQueue         loadedInstances_;
+
+  Orthanc::Semaphore                  bufferSemaphore_;
+
+public:
+  ThreadedInstanceLoader(size_t threadCount, bool transcode, Orthanc::DicomTransferSyntax transferSyntax)
+  : InstanceLoader(transcode, transferSyntax),
+    instancesToPreload_(0),
+    loadedInstances_(0),
+    bufferSemaphore_(3*threadCount) // to limit the number of loaded instances in memory
+  {
+    for (size_t i = 0; i < threadCount; i++)
+    {
+      threads_.push_back(new boost::thread(PreloaderWorkerThread, this));
+    }
+  }
+
+  virtual ~ThreadedInstanceLoader()
+  {
+    Clear();
+  }
+
+  void Clear()
+  {
+    for (size_t i = 0; i < threads_.size(); i++)
+    {
+      instancesToPreload_.Enqueue(NULL);
+    }
+
+    for (size_t i = 0; i < threads_.size(); i++)
+    {
+      if (threads_[i]->joinable())
+      {
+        threads_[i]->join();
+      }
+      delete threads_[i];
+    }
+
+    threads_.clear();
+  }
+
+  static void PreloaderWorkerThread(ThreadedInstanceLoader* that)
+  {
+    static uint16_t threadCounter = 0;
+    Orthanc::Logging::SetCurrentThreadName(std::string("WADO-LOAD-") + boost::lexical_cast<std::string>(threadCounter++));
+
+    while (true)
+    {
+      std::unique_ptr<InstanceToPreload> instanceToPreload(dynamic_cast<InstanceToPreload*>(that->instancesToPreload_.Dequeue(0)));
+      if (instanceToPreload.get() == NULL)  // that's the signal to exit the thread
+      {
+        return;
+      }
+      
+      // wait for the consumers, no need to accumulate instances in memory if loaders are faster than writers
+      that->bufferSemaphore_.Acquire();
+
+      try
+      {
+        std::unique_ptr<OrthancPlugins::DicomInstance> dicom(that->GetAndTranscodeDicom(instanceToPreload.get()));
+        that->loadedInstances_.Enqueue(new LoadedInstance(dicom.release()));
+      }
+      catch (Orthanc::OrthancException& e)
+      {
+        LOG(ERROR) << "Error while loading instances " << e.GetDetails();
+        that->loadedInstances_.Enqueue(NULL);
+      }
+      catch (...)
+      {
+        LOG(ERROR) << "Unknown error while loading instances ";
+        that->loadedInstances_.Enqueue(NULL);
+      }
+
+    }
+  }
+
+  virtual void PrepareDicom(const std::string& instanceId, bool transcode) ORTHANC_OVERRIDE
+  {
+    instancesToPreload_.Enqueue(new InstanceToPreload(instanceId, transcode));
+  }
+
+  virtual OrthancPlugins::DicomInstance* GetNextDicom() ORTHANC_OVERRIDE
+  {
+    std::unique_ptr<LoadedInstance> loadedInstance(dynamic_cast<LoadedInstance*>(loadedInstances_.Dequeue(0)));
+    
+    // unlock preloader threads to buffer the following instances
+    bufferSemaphore_.Release();
+
+    if (loadedInstance.get() != NULL)
+    {
+      return loadedInstance->ReleaseInstance();
+    }
+    else
+    {
+      return NULL;
+    }
+  }
+};
+
+
+class SynchronousInstanceLoader : public InstanceLoader
+{
+  std::list<boost::shared_ptr<InstanceToPreload> >         instancesToLoad_;
+
+public:
+  SynchronousInstanceLoader(bool transcode, Orthanc::DicomTransferSyntax transferSyntax)
+  : InstanceLoader(transcode, transferSyntax)
+  {
+  }
+
+  virtual ~SynchronousInstanceLoader()
+  {
+    Clear();
+  }
+
+  void Clear()
+  {
+    instancesToLoad_.clear();
+  }
+
+  virtual void PrepareDicom(const std::string& instanceId, bool transcode) ORTHANC_OVERRIDE
+  {
+    instancesToLoad_.push_back(boost::shared_ptr<InstanceToPreload>(new InstanceToPreload(instanceId, transcode)));
+  }
+
+  virtual OrthancPlugins::DicomInstance* GetNextDicom() ORTHANC_OVERRIDE
+  {
+    boost::shared_ptr<InstanceToPreload> instanceToLoad(instancesToLoad_.front());
+    instancesToLoad_.pop_front();
+
+    if (instanceToLoad.get() == NULL)
+    {
+      return NULL;
+    }
+
+    return GetAndTranscodeDicom(instanceToLoad.get());
+  }
+};
+
 
 static void AnswerListOfDicomInstances(OrthancPluginRestOutput* output,
                                        Orthanc::ResourceType level,
@@ -381,17 +610,76 @@
   {
     Json::Value tmp = Json::objectValue;
     tmp["ID"] = publicId;
-    
+
+    if (transcode)
+    {
+      std::string sourceTransferSyntax;
+      if (OrthancPlugins::RestApiGetString(sourceTransferSyntax, "/instances/" + publicId + "/metadata/TransferSyntax", false))
+      {
+        tmp["Metadata"] = Json::objectValue;
+        tmp["Metadata"]["TransferSyntax"] = sourceTransferSyntax;
+      }
+    }
+
     instances = Json::arrayValue;
     instances.append(tmp);
   }
   else
   {
-    if (!OrthancPlugins::RestApiGet(instances, GetResourceUri(level, publicId) + "/instances", false))
+    if (CanUseExtendedFind())
     {
-      // Internal error
-      OrthancPluginSendHttpStatusCode(context, output, 400);
-      return;
+      Json::Value toolsFindPayload;
+
+      toolsFindPayload["Query"] = Json::objectValue;
+      toolsFindPayload["ResponseContent"] = Json::arrayValue;
+      toolsFindPayload["Level"] = "Instance";
+      
+      if (transcode)
+      {
+        toolsFindPayload["ResponseContent"].append("Metadata");
+      }
+      
+      if (level == Orthanc::ResourceType_Patient)
+      {
+        toolsFindPayload["ParentPatient"] = publicId;
+      } 
+      else if (level == Orthanc::ResourceType_Study)
+      {
+        toolsFindPayload["ParentStudy"] = publicId;
+      } 
+      else if (level == Orthanc::ResourceType_Series)
+      {
+        toolsFindPayload["ParentSeries"] = publicId;
+      }
+
+      if (!OrthancPlugins::RestApiPost(instances, "/tools/find", toolsFindPayload, false))
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource, "Unable to list instances");
+      }
+    }
+    else
+    {
+      if (!OrthancPlugins::RestApiGet(instances, GetResourceUri(level, publicId) + "/instances", false))
+      {
+        // Internal error
+        OrthancPluginSendHttpStatusCode(context, output, 400);
+        return;
+      }
+
+      if (transcode)
+      {
+        for (Json::Value::ArrayIndex i = 0; i < instances.size(); i++)
+        {
+          const std::string uri = "/instances/" + instances[i]["ID"].asString();
+
+          std::string sourceTransferSyntax;
+          if (OrthancPlugins::RestApiGetString(sourceTransferSyntax, uri + "/metadata/TransferSyntax", false))
+          {
+            instances[i]["Metadata"] = Json::objectValue;
+            instances[i]["Metadata"]["TransferSyntax"] = sourceTransferSyntax;
+          }
+        }
+      }
     }
   }
 
@@ -400,59 +688,57 @@
     throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol);
   }
 
+  // single threaded or multi threaded loading ?
+  std::unique_ptr<InstanceLoader> loader;
+  
+  const unsigned int workersCount = OrthancPlugins::Configuration::GetWadoRsLoaderThreadsCount();
+
+  if (workersCount > 0 && level != Orthanc::ResourceType_Instance)
+  {
+    loader.reset(new ThreadedInstanceLoader(workersCount, transcode, targetSyntax));
+  }
+  else
+  {
+    loader.reset(new SynchronousInstanceLoader(transcode, targetSyntax));
+  }
+  
   for (Json::Value::ArrayIndex i = 0; i < instances.size(); i++)
   {
-    const std::string uri = "/instances/" + instances[i]["ID"].asString();
+    bool transcodeThisInstance;
+    Orthanc::DicomTransferSyntax currentSyntax;
 
-    bool transcodeThisInstance;
-    
-    std::string sourceTransferSyntax;
-    if (!transcode)
+    if (!transcode || !instances[i].isMember("Metadata") || !instances[i]["Metadata"].isMember("TransferSyntax"))
     {
       transcodeThisInstance = false;      
     }
-    else if (OrthancPlugins::RestApiGetString(sourceTransferSyntax, uri + "/metadata/TransferSyntax", false))
+    else if (Orthanc::LookupTransferSyntax(currentSyntax, instances[i]["Metadata"]["TransferSyntax"].asString()))
+    {
+      transcodeThisInstance = (currentSyntax != targetSyntax);
+    }
+    else
     {
-      // Avoid transcoding if the source file already uses the expected transfer syntax
-      Orthanc::DicomTransferSyntax syntax;
-      if (Orthanc::LookupTransferSyntax(syntax, sourceTransferSyntax))
+      transcodeThisInstance = true;
+    }
+
+    loader->PrepareDicom(instances[i]["ID"].asString(), transcodeThisInstance);
+  }
+
+  for (Json::Value::ArrayIndex i = 0; i < instances.size(); i++)
+  {
+    std::unique_ptr<OrthancPlugins::DicomInstance> dicom(loader->GetNextDicom());
+
+    if (dicom.get() != NULL)
+    {
+      if (OrthancPluginSendMultipartItem(
+          context, output, reinterpret_cast<const char*>(dicom->GetBuffer()),
+          dicom->GetSize()) != 0)
       {
-        transcodeThisInstance = (syntax != targetSyntax);
-      }
-      else
-      {
-        transcodeThisInstance = true;
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
       }
     }
     else
     {
-      // The transfer syntax of the source file is unknown, transcode it to be sure
-      transcodeThisInstance = true;
-    }
-    
-    OrthancPlugins::MemoryBuffer dicom;
-    if (dicom.RestApiGet(uri + "/file", false))
-    {
-      if (transcodeThisInstance)
-      {
-        std::unique_ptr<OrthancPlugins::DicomInstance> transcoded(
-          OrthancPlugins::DicomInstance::Transcode(
-            dicom.GetData(), dicom.GetSize(), Orthanc::GetTransferSyntaxUid(targetSyntax)));
-
-        if (OrthancPluginSendMultipartItem(
-              context, output, reinterpret_cast<const char*>(transcoded->GetBuffer()),
-              transcoded->GetSize()) != 0)
-        {
-          throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
-        }
-      }
-      else
-      {
-        if (OrthancPluginSendMultipartItem(context, output, dicom.GetData(), dicom.GetSize()) != 0)
-        {
-          throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
-        }
-      }
+      LOG(WARNING) << "Failed to load an instance";
     }
   }
 }
--- a/TODO	Fri Jun 13 15:30:36 2025 +0200
+++ b/TODO	Fri Jun 13 15:30:50 2025 +0200
@@ -1,3 +1,6 @@
+* Force usage of StudyInstanceUID & SeriesInstanceUID in WADO-URI for single instances:
+  https://discourse.orthanc-server.org/t/dicomweb-wado-uri-does-not-work-if-duplicated-instances/5863
+
 
 
 * https://orthanc.uclouvain.be/book/plugins/dicomweb.html#retrieving-dicom-resources-from-a-wado-rs-server