changeset 732:06e580b0da0e

refactored wado-rs loader threads to use the BlockingSharedMessageQueue
author Alain Mazy <am@orthanc.team>
date Tue, 02 Dec 2025 16:28:34 +0100
parents ff400f3cfc99
children 6baed50da311
files NEWS Plugin/WadoRs.cpp
diffstat 2 files changed, 9 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/NEWS	Mon Nov 24 20:03:42 2025 +0100
+++ b/NEWS	Tue Dec 02 16:28:34 2025 +0100
@@ -1,6 +1,8 @@
 Pending changes in the mainline
 ===============================
 
+TODO before release: update to framework 1.12.10+ (with BlockingSharedMessageQueue)
+
 * Fixed a possible deadlock when using "WadoRsLoaderThreadsCount" > 1 when the HTTP
   client disconnects while downloading the response.
 * Fixed "Success: Success" errors when trying to send resources synchronously to a remote DICOMweb
--- a/Plugin/WadoRs.cpp	Mon Nov 24 20:03:42 2025 +0100
+++ b/Plugin/WadoRs.cpp	Tue Dec 02 16:28:34 2025 +0100
@@ -32,7 +32,7 @@
 #include <Toolbox.h>
 #include <SerializationToolbox.h>
 #include <MultiThreading/SharedMessageQueue.h>
-#include <MultiThreading/Semaphore.h>
+#include <MultiThreading/BlockingSharedMessageQueue.h>
 #include <Compression/GzipCompressor.h>
 
 #include <memory>
@@ -449,9 +449,7 @@
 
   Orthanc::SharedMessageQueue         instancesToPreload_;
 
-  Orthanc::SharedMessageQueue         loadedInstances_;
-
-  Orthanc::Semaphore                  bufferSemaphore_;
+  Orthanc::BlockingSharedMessageQueue loadedInstances_;
 
   bool                                loadersShouldStop_;
 
@@ -459,8 +457,7 @@
   ThreadedInstanceLoader(size_t threadCount, bool transcode, Orthanc::DicomTransferSyntax transferSyntax)
   : InstanceLoader(transcode, transferSyntax),
     instancesToPreload_(0),
-    loadedInstances_(0),
-    bufferSemaphore_(3*threadCount), // to limit the number of loaded instances in memory
+    loadedInstances_(3*threadCount), // to limit the number of loaded instances in memory
     loadersShouldStop_(false)
   {
     for (size_t i = 0; i < threadCount; i++)
@@ -488,12 +485,6 @@
         instancesToPreload_.Enqueue(NULL);
       }
 
-      // If the consumer stops e.g. because the HttpClient disconnected, we must make sure the loader threads are not blocked waiting for room in the bufferSemaphore_.
-      // If the loader threads have completed their jobs, this is harmless to release the bufferSemaphore_ since they won't be used anymore.
-      for (size_t i = 0; i < threads_.size(); i++)
-      {
-        bufferSemaphore_.Release();
-      }
 
       for (size_t i = 0; i < threads_.size(); i++)
       {
@@ -526,9 +517,6 @@
         return;
       }
       
-      // wait for the consumers, no need to accumulate instances in memory if loaders are faster than writers
-      that->bufferSemaphore_.Acquire();
-
       try
       {
         std::unique_ptr<OrthancPlugins::DicomInstance> dicom(that->GetAndTranscodeDicom(instancesToPreload.get()));
@@ -556,16 +544,13 @@
   {
     std::unique_ptr<LoadedInstance> loadedInstance(dynamic_cast<LoadedInstance*>(loadedInstances_.Dequeue(0)));
     
-    // unlock preloader threads to buffer the following instances
-    bufferSemaphore_.Release();
-
     if (loadedInstance.get() != NULL)
     {
       return loadedInstance->ReleaseInstance();
     }
     else
     {
-      return NULL;
+      return NULL; // happens when the loader threads are exiting
     }
   }
 };
@@ -1707,6 +1692,9 @@
 
 void InstanceWorkerThread(InstanceWorkerData* data)
 {
+  static uint16_t threadCounter = 0;
+  Orthanc::Logging::SetCurrentThreadName(std::string("DW-META-") + boost::lexical_cast<std::string>(threadCounter++));
+
   while (true)
   {
     try