changeset 95:b83139953899

CommitThreadsCount
author Alain Mazy <am@orthanc.team>
date Tue, 09 Dec 2025 16:17:40 +0100
parents 5389d5df3dd1
children 3e73a429d8fa
files Framework/DownloadArea.cpp Framework/DownloadArea.h NEWS Plugin/Plugin.cpp Plugin/PluginContext.cpp Plugin/PluginContext.h
diffstat 6 files changed, 185 insertions(+), 31 deletions(-) [+]
line wrap: on
line diff
--- a/Framework/DownloadArea.cpp	Tue Dec 09 09:05:15 2025 +0100
+++ b/Framework/DownloadArea.cpp	Tue Dec 09 16:17:40 2025 +0100
@@ -32,6 +32,42 @@
 
 namespace OrthancPlugins
 {
+  static uint32_t commitWorkerThreadsCount = 1;
+  static boost::mutex commitThreadsCounterMutex;
+  static uint32_t commitThreadsCounter = 0;
+
+  void DownloadArea::SetCommitWorkerThreadsCount(uint32_t workersCount)
+  {
+    commitWorkerThreadsCount = workersCount;
+  }
+
+  class DownloadArea::InstanceToCommit : public Orthanc::IDynamicObject
+  {
+    DownloadArea::Instance* instance_;
+    bool simulate_;
+  
+  public:
+    InstanceToCommit(DownloadArea::Instance* instance /* transfer ownership */, bool simulate) :
+      instance_(instance),
+      simulate_(simulate)
+    {}
+    
+    virtual ~InstanceToCommit()
+    {
+      delete instance_;
+    }
+
+    DownloadArea::Instance* GetInstance()
+    {
+      return instance_;
+    }
+
+    bool IsSimulate()
+    {
+      return simulate_;
+    }
+  };
+
   class DownloadArea::Instance::Writer : public boost::noncopyable
   {
   private:
@@ -141,6 +177,8 @@
 
   void DownloadArea::Clear()
   {
+    boost::mutex::scoped_lock lock(instancesMutex_);
+
     for (Instances::iterator it = instances_.begin(); 
          it != instances_.end(); ++it)
     {
@@ -157,6 +195,8 @@
 
   DownloadArea::Instance& DownloadArea::LookupInstance(const std::string& id)
   {
+    boost::mutex::scoped_lock lock(instancesMutex_);
+
     Instances::iterator it = instances_.find(id);
 
     if (it == instances_.end())
@@ -217,8 +257,10 @@
 
   void DownloadArea::Setup(const std::vector<DicomInstanceInfo>& instances)
   {
+    boost::mutex::scoped_lock lock(instancesMutex_);
+
     totalSize_ = 0;
-      
+    
     for (size_t i = 0; i < instances.size(); i++)
     {
       const std::string& id = instances[i].GetId();
@@ -229,30 +271,90 @@
       totalSize_ += instances[i].GetSize();
     }
   }
-    
+
+  
+  void DownloadArea::CommitWorker(DownloadArea* that)
+  {
+    {
+      boost::mutex::scoped_lock lock(commitThreadsCounterMutex);
+      Orthanc::Logging::SetCurrentThreadName(std::string("TF-COMMIT-") + boost::lexical_cast<std::string>(commitThreadsCounter++));
+      commitThreadsCounter %= 1000000;
+    }
+
+    while (true)
+    {
+      std::unique_ptr<DownloadArea::InstanceToCommit> instanceToCommit(dynamic_cast<DownloadArea::InstanceToCommit*>(that->instancesToCommit_.Dequeue(0)));
+      if (instanceToCommit.get() == NULL || that->workersShouldStop_)  // that's the signal to exit the thread
+      {
+        LOG(INFO) << "Commit thread has completed";
+        return;
+      }
+
+      instanceToCommit->GetInstance()->Commit(instanceToCommit->IsSimulate());
+    }
+
+  }
 
   void DownloadArea::CommitInternal(bool simulate)
   {
-    boost::mutex::scoped_lock lock(mutex_);
+    commitThreads_.reserve(commitWorkerThreadsCount);
+
+    for (uint32_t i = 0; i < commitWorkerThreadsCount; ++i)
+    {
+      commitThreads_.push_back(boost::shared_ptr<boost::thread>(new boost::thread(CommitWorker, this)));
+    }
+
+    {
+      boost::mutex::scoped_lock lock(instancesMutex_);
       
-    for (Instances::iterator it = instances_.begin(); 
-         it != instances_.end(); ++it)
-    {
-      if (it->second != NULL)
+      for (Instances::iterator it = instances_.begin(); 
+          it != instances_.end(); ++it)
       {
-        it->second->Commit(simulate);
-        delete it->second;
-        it->second = NULL;
-      }
-      else
-      {
-        throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
+        if (it->second != NULL)
+        {
+          instancesToCommit_.Enqueue(new DownloadArea::InstanceToCommit(it->second, simulate)); // transfers the ownership of the Instance to the queue
+          it->second = NULL;
+        }
+        else
+        {
+          throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
+        }
       }
     }
+
+    ClearThreads();
+  }
+
+  void DownloadArea::ClearThreads()
+  {
+    for (uint32_t i = 0; i < commitWorkerThreadsCount; ++i)
+    {
+      instancesToCommit_.Enqueue(NULL); // exit message
+    }
+
+    instancesToCommit_.WaitEmpty(0);
+
+    for (uint32_t i = 0; i < commitWorkerThreadsCount; ++i)
+    {
+      if (commitThreads_[i]->joinable())
+      {
+        commitThreads_[i]->join();
+      }
+    }
+
+  }
+
+  DownloadArea::DownloadArea(const std::vector<DicomInstanceInfo>& instances)
+  : instancesToCommit_(0),
+    workersShouldStop_(false)
+  {
+    Setup(instances);
   }
 
 
   DownloadArea::DownloadArea(const TransferScheduler& scheduler)
+  : instancesToCommit_(0),
+    workersShouldStop_(false)
   {
     std::vector<DicomInstanceInfo> instances;
     scheduler.ListInstances(instances);
@@ -265,8 +367,6 @@
                                  size_t size,
                                  BucketCompression compression)
   {
-    boost::mutex::scoped_lock lock(mutex_);
-      
     switch (compression)
     {
       case BucketCompression_None:
@@ -296,7 +396,7 @@
     Orthanc::Toolbox::ComputeMD5(md5, data, size);
       
     {
-      boost::mutex::scoped_lock lock(mutex_);
+      boost::mutex::scoped_lock lock(instancesMutex_);
 
       Instances::const_iterator it = instances_.find(instanceId);
       if (it == instances_.end() ||
--- a/Framework/DownloadArea.h	Tue Dec 09 09:05:15 2025 +0100
+++ b/Framework/DownloadArea.h	Tue Dec 09 16:17:40 2025 +0100
@@ -24,12 +24,16 @@
 #include "TransferScheduler.h"
 
 #include <TemporaryFile.h>
+#include <boost/thread/thread.hpp>
+#include <MultiThreading/SharedMessageQueue.h>
 
 namespace OrthancPlugins
 {
   class DownloadArea : public boost::noncopyable
   {
   private:
+    class InstanceToCommit;
+
     class Instance : public boost::noncopyable
     {
     private:
@@ -40,6 +44,9 @@
 
     public:
       explicit Instance(const DicomInstanceInfo& info);
+      
+      virtual ~Instance()
+      {}
 
       const DicomInstanceInfo& GetInfo() const
       {
@@ -56,13 +63,17 @@
 
     typedef std::map<std::string, Instance*>   Instances;
 
-    boost::mutex  mutex_;
+    boost::mutex  instancesMutex_;
     Instances     instances_;
     size_t        totalSize_;
-
+    std::vector<boost::shared_ptr<boost::thread> > commitThreads_;
+    Orthanc::SharedMessageQueue instancesToCommit_;
+    bool          workersShouldStop_;
 
     void Clear();
 
+    void ClearThreads();
+
     Instance& LookupInstance(const std::string& id);
 
     void WriteUncompressedBucket(const TransferBucket& bucket,
@@ -73,13 +84,11 @@
     
     void CommitInternal(bool simulate);
 
+    static void CommitWorker(DownloadArea* that);
   public:
     explicit DownloadArea(const TransferScheduler& scheduler);
 
-    explicit DownloadArea(const std::vector<DicomInstanceInfo>& instances)
-    {
-      Setup(instances);
-    }
+    explicit DownloadArea(const std::vector<DicomInstanceInfo>& instances);
 
     ~DownloadArea()
     {
@@ -103,5 +112,7 @@
     void CheckMD5();
 
     void Commit();
+
+    static void SetCommitWorkerThreadsCount(uint32_t workersCount);
   };
 }
--- a/NEWS	Tue Dec 09 09:05:15 2025 +0100
+++ b/NEWS	Tue Dec 09 16:17:40 2025 +0100
@@ -1,3 +1,12 @@
+Pending changes in the mainline
+===============================
+
+* new "CommitThreadsCount" configuration to configure the number of threads used to
+  perform the "commit" phase of a Push/Pull transfer.  A value > 1 is meaningful 
+  only if the storage is a distributed network storage (e.g object storage plugin).
+  A value of 1 means reading and writing are performed in sequence (default behaviour).
+
+
 Version 1.6 (2025-10-07)
 ========================
 
--- a/Plugin/Plugin.cpp	Tue Dec 09 09:05:15 2025 +0100
+++ b/Plugin/Plugin.cpp	Tue Dec 09 16:17:40 2025 +0100
@@ -661,6 +661,7 @@
       unsigned int maxHttpRetries = 0;
       unsigned int peerConnectivityTimeout = 2;
       unsigned int peerCommitTimeout = 600;
+      unsigned int commitThreadsCount = 1;
     
       {
         OrthancPlugins::OrthancConfiguration config;
@@ -677,11 +678,30 @@
           maxHttpRetries = plugin.GetUnsignedIntegerValue("MaxHttpRetries", maxHttpRetries);
           peerConnectivityTimeout = plugin.GetUnsignedIntegerValue("PeerConnectivityTimeout", peerConnectivityTimeout);
           peerCommitTimeout = plugin.GetUnsignedIntegerValue("PeerCommitTimeout", peerCommitTimeout);
+          commitThreadsCount = plugin.GetUnsignedIntegerValue("CommitThreadsCount", commitThreadsCount);
+
+          if (commitThreadsCount == 0)
+          {
+            LOG(ERROR) << "Invalid value for configuration \"Transfers.CommitThreadsCount\": " << commitThreadsCount;
+            return -1;
+          }
+
+          if (targetBucketSize == 0)
+          {
+            LOG(ERROR) << "Invalid value for configuration \"Transfers.BucketSize\": " << targetBucketSize;
+            return -1;
+          }
+
+          if (maxPushTransactions == 0)
+          {
+            LOG(ERROR) << "Invalid value for configuration \"Transfers.MaxPushTransactions\": " << maxPushTransactions;
+            return -1;
+          }
         }
       }
 
       OrthancPlugins::PluginContext::Initialize(threadsCount, targetBucketSize * KB, maxPushTransactions,
-                                                memoryCacheSize * MB, maxHttpRetries, peerConnectivityTimeout, peerCommitTimeout);
+                                                memoryCacheSize * MB, maxHttpRetries, peerConnectivityTimeout, peerCommitTimeout, commitThreadsCount);
     
       OrthancPlugins::RegisterRestCallback<ServeChunks>
         (std::string(URI_CHUNKS) + "/([.0-9a-f-]+)", true);
--- a/Plugin/PluginContext.cpp	Tue Dec 09 09:05:15 2025 +0100
+++ b/Plugin/PluginContext.cpp	Tue Dec 09 16:17:40 2025 +0100
@@ -23,7 +23,7 @@
 
 #include <Compatibility.h>  // For std::unique_ptr
 #include <Logging.h>
-
+#include "../Framework/DownloadArea.h"
 
 namespace OrthancPlugins
 {
@@ -33,7 +33,8 @@
                                size_t memoryCacheSize,
                                unsigned int maxHttpRetries,
                                unsigned int peerConnectivityTimeout,
-                               unsigned int peerCommitTimeout) :
+                               unsigned int peerCommitTimeout,
+                               unsigned int commitThreadsCount) :
     pushTransactions_(maxPushTransactions),
     semaphore_(threadsCount),
     pluginUuid_(Orthanc::Toolbox::GenerateUuid()),
@@ -41,9 +42,11 @@
     targetBucketSize_(targetBucketSize),
     maxHttpRetries_(maxHttpRetries),
     peerConnectivityTimeout_(peerConnectivityTimeout),
-    peerCommitTimeout_(peerCommitTimeout)
+    peerCommitTimeout_(peerCommitTimeout),
+    commitThreadsCount_(commitThreadsCount)
   {
     cache_.SetMaxMemorySize(memoryCacheSize);
+    DownloadArea::SetCommitWorkerThreadsCount(commitThreadsCount_);
 
     LOG(INFO) << "Transfers accelerator will use " << threadsCount_ << " thread(s) to run HTTP queries";
     LOG(INFO) << "Transfers accelerator will keep local DICOM files in a memory cache of size: "
@@ -58,6 +61,8 @@
               << peerConnectivityTimeout_ << " seconds as a timeout when checking peers connectivity";
     LOG(INFO) << "Transfers accelerator will use "
               << peerCommitTimeout_ << " seconds as a timeout when committing push transfer";
+    LOG(INFO) << "Transfers accelerator will use "
+              << commitThreadsCount_ << " thread(s) to perform commit (on receiver's side)";
   }
 
 
@@ -74,10 +79,11 @@
                                  size_t memoryCacheSize,
                                  unsigned int maxHttpRetries,
                                  unsigned int peerConnectivityTimeout,
-                                 unsigned int peerCommitTimeout)
+                                 unsigned int peerCommitTimeout,
+                                 unsigned int commitThreadsCount)
   {
     GetSingleton().reset(new PluginContext(threadsCount, targetBucketSize,
-                                           maxPushTransactions, memoryCacheSize, maxHttpRetries, peerConnectivityTimeout, peerCommitTimeout));
+                                           maxPushTransactions, memoryCacheSize, maxHttpRetries, peerConnectivityTimeout, peerCommitTimeout, commitThreadsCount));
   }
 
   
--- a/Plugin/PluginContext.h	Tue Dec 09 09:05:15 2025 +0100
+++ b/Plugin/PluginContext.h	Tue Dec 09 16:17:40 2025 +0100
@@ -46,6 +46,7 @@
     unsigned int             maxHttpRetries_;
     unsigned int             peerConnectivityTimeout_;
     unsigned int             peerCommitTimeout_;
+    unsigned int             commitThreadsCount_;
   
     PluginContext(size_t threadsCount,
                   size_t targetBucketSize,
@@ -53,7 +54,8 @@
                   size_t memoryCacheSize,
                   unsigned int maxHttpRetries,
                   unsigned int peerConnectivityTimeout,
-                  unsigned int peerCommitTimeout);
+                  unsigned int peerCommitTimeout,
+                  unsigned int commitThreadsCount);
 
     static std::unique_ptr<PluginContext>& GetSingleton();
   
@@ -83,6 +85,11 @@
       return threadsCount_;
     }
 
+    size_t GetCommitThreadsCount() const
+    {
+      return commitThreadsCount_;
+    }
+
     size_t GetTargetBucketSize() const
     {
       return targetBucketSize_;
@@ -109,7 +116,8 @@
                            size_t memoryCacheSize,
                            unsigned int maxHttpRetries,
                            unsigned int peerConnectivityTimeout,
-                           unsigned int peerCommitTimeout);
+                           unsigned int peerCommitTimeout,
+                           unsigned int commitThreadsCount);
   
     static PluginContext& GetInstance();