Mercurial > hg > orthanc-transfers
changeset 95:b83139953899
CommitThreadsCount
| author | Alain Mazy <am@orthanc.team> |
|---|---|
| date | Tue, 09 Dec 2025 16:17:40 +0100 |
| parents | 5389d5df3dd1 |
| children | 3e73a429d8fa |
| files | Framework/DownloadArea.cpp Framework/DownloadArea.h NEWS Plugin/Plugin.cpp Plugin/PluginContext.cpp Plugin/PluginContext.h |
| diffstat | 6 files changed, 185 insertions(+), 31 deletions(-) [+] |
line wrap: on
line diff
--- a/Framework/DownloadArea.cpp Tue Dec 09 09:05:15 2025 +0100 +++ b/Framework/DownloadArea.cpp Tue Dec 09 16:17:40 2025 +0100 @@ -32,6 +32,42 @@ namespace OrthancPlugins { + static uint32_t commitWorkerThreadsCount = 1; + static boost::mutex commitThreadsCounterMutex; + static uint32_t commitThreadsCounter = 0; + + void DownloadArea::SetCommitWorkerThreadsCount(uint32_t workersCount) + { + commitWorkerThreadsCount = workersCount; + } + + class DownloadArea::InstanceToCommit : public Orthanc::IDynamicObject + { + DownloadArea::Instance* instance_; + bool simulate_; + + public: + InstanceToCommit(DownloadArea::Instance* instance /* transfer ownership */, bool simulate) : + instance_(instance), + simulate_(simulate) + {} + + virtual ~InstanceToCommit() + { + delete instance_; + } + + DownloadArea::Instance* GetInstance() + { + return instance_; + } + + bool IsSimulate() + { + return simulate_; + } + }; + class DownloadArea::Instance::Writer : public boost::noncopyable { private: @@ -141,6 +177,8 @@ void DownloadArea::Clear() { + boost::mutex::scoped_lock lock(instancesMutex_); + for (Instances::iterator it = instances_.begin(); it != instances_.end(); ++it) { @@ -157,6 +195,8 @@ DownloadArea::Instance& DownloadArea::LookupInstance(const std::string& id) { + boost::mutex::scoped_lock lock(instancesMutex_); + Instances::iterator it = instances_.find(id); if (it == instances_.end()) @@ -217,8 +257,10 @@ void DownloadArea::Setup(const std::vector<DicomInstanceInfo>& instances) { + boost::mutex::scoped_lock lock(instancesMutex_); + totalSize_ = 0; - + for (size_t i = 0; i < instances.size(); i++) { const std::string& id = instances[i].GetId(); @@ -229,30 +271,90 @@ totalSize_ += instances[i].GetSize(); } } - + + + void DownloadArea::CommitWorker(DownloadArea* that) + { + { + boost::mutex::scoped_lock lock(commitThreadsCounterMutex); + Orthanc::Logging::SetCurrentThreadName(std::string("TF-COMMIT-") + boost::lexical_cast<std::string>(commitThreadsCounter++)); + commitThreadsCounter %= 1000000; + } + + while (true) + { + std::unique_ptr<DownloadArea::InstanceToCommit> instanceToCommit(dynamic_cast<DownloadArea::InstanceToCommit*>(that->instancesToCommit_.Dequeue(0))); + if (instanceToCommit.get() == NULL || that->workersShouldStop_) // that's the signal to exit the thread + { + LOG(INFO) << "Commit thread has completed"; + return; + } + + instanceToCommit->GetInstance()->Commit(instanceToCommit->IsSimulate()); + } + + } void DownloadArea::CommitInternal(bool simulate) { - boost::mutex::scoped_lock lock(mutex_); + commitThreads_.reserve(commitWorkerThreadsCount); + + for (uint32_t i = 0; i < commitWorkerThreadsCount; ++i) + { + commitThreads_.push_back(boost::shared_ptr<boost::thread>(new boost::thread(CommitWorker, this))); + } + + { + boost::mutex::scoped_lock lock(instancesMutex_); - for (Instances::iterator it = instances_.begin(); - it != instances_.end(); ++it) - { - if (it->second != NULL) + for (Instances::iterator it = instances_.begin(); + it != instances_.end(); ++it) { - it->second->Commit(simulate); - delete it->second; - it->second = NULL; - } - else - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + if (it->second != NULL) + { + instancesToCommit_.Enqueue(new DownloadArea::InstanceToCommit(it->second, simulate)); // transfers the ownership of the Instance to the queue + it->second = NULL; + } + else + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } } } + + ClearThreads(); + } + + void DownloadArea::ClearThreads() + { + for (uint32_t i = 0; i < commitWorkerThreadsCount; ++i) + { + instancesToCommit_.Enqueue(NULL); // exit message + } + + instancesToCommit_.WaitEmpty(0); + + for (uint32_t i = 0; i < commitWorkerThreadsCount; ++i) + { + if (commitThreads_[i]->joinable()) + { + commitThreads_[i]->join(); + } + } + + } + + DownloadArea::DownloadArea(const std::vector<DicomInstanceInfo>& instances) + : instancesToCommit_(0), + workersShouldStop_(false) + { + Setup(instances); } DownloadArea::DownloadArea(const TransferScheduler& scheduler) + : instancesToCommit_(0), + workersShouldStop_(false) { std::vector<DicomInstanceInfo> instances; scheduler.ListInstances(instances); @@ -265,8 +367,6 @@ size_t size, BucketCompression compression) { - boost::mutex::scoped_lock lock(mutex_); - switch (compression) { case BucketCompression_None: @@ -296,7 +396,7 @@ Orthanc::Toolbox::ComputeMD5(md5, data, size); { - boost::mutex::scoped_lock lock(mutex_); + boost::mutex::scoped_lock lock(instancesMutex_); Instances::const_iterator it = instances_.find(instanceId); if (it == instances_.end() ||
--- a/Framework/DownloadArea.h Tue Dec 09 09:05:15 2025 +0100 +++ b/Framework/DownloadArea.h Tue Dec 09 16:17:40 2025 +0100 @@ -24,12 +24,16 @@ #include "TransferScheduler.h" #include <TemporaryFile.h> +#include <boost/thread/thread.hpp> +#include <MultiThreading/SharedMessageQueue.h> namespace OrthancPlugins { class DownloadArea : public boost::noncopyable { private: + class InstanceToCommit; + class Instance : public boost::noncopyable { private: @@ -40,6 +44,9 @@ public: explicit Instance(const DicomInstanceInfo& info); + + virtual ~Instance() + {} const DicomInstanceInfo& GetInfo() const { @@ -56,13 +63,17 @@ typedef std::map<std::string, Instance*> Instances; - boost::mutex mutex_; + boost::mutex instancesMutex_; Instances instances_; size_t totalSize_; - + std::vector<boost::shared_ptr<boost::thread> > commitThreads_; + Orthanc::SharedMessageQueue instancesToCommit_; + bool workersShouldStop_; void Clear(); + void ClearThreads(); + Instance& LookupInstance(const std::string& id); void WriteUncompressedBucket(const TransferBucket& bucket, @@ -73,13 +84,11 @@ void CommitInternal(bool simulate); + static void CommitWorker(DownloadArea* that); public: explicit DownloadArea(const TransferScheduler& scheduler); - explicit DownloadArea(const std::vector<DicomInstanceInfo>& instances) - { - Setup(instances); - } + explicit DownloadArea(const std::vector<DicomInstanceInfo>& instances); ~DownloadArea() { @@ -103,5 +112,7 @@ void CheckMD5(); void Commit(); + + static void SetCommitWorkerThreadsCount(uint32_t workersCount); }; }
--- a/NEWS Tue Dec 09 09:05:15 2025 +0100 +++ b/NEWS Tue Dec 09 16:17:40 2025 +0100 @@ -1,3 +1,12 @@ +Pending changes in the mainline +=============================== + +* new "CommitThreadsCount" configuration to configure the number of threads used to + perform the "commit" phase of a Push/Pull transfer. A value > 1 is meaningful + only if the storage is a distributed network storage (e.g object storage plugin). + A value of 1 means reading and writing are performed in sequence (default behaviour). + + Version 1.6 (2025-10-07) ========================
--- a/Plugin/Plugin.cpp Tue Dec 09 09:05:15 2025 +0100 +++ b/Plugin/Plugin.cpp Tue Dec 09 16:17:40 2025 +0100 @@ -661,6 +661,7 @@ unsigned int maxHttpRetries = 0; unsigned int peerConnectivityTimeout = 2; unsigned int peerCommitTimeout = 600; + unsigned int commitThreadsCount = 1; { OrthancPlugins::OrthancConfiguration config; @@ -677,11 +678,30 @@ maxHttpRetries = plugin.GetUnsignedIntegerValue("MaxHttpRetries", maxHttpRetries); peerConnectivityTimeout = plugin.GetUnsignedIntegerValue("PeerConnectivityTimeout", peerConnectivityTimeout); peerCommitTimeout = plugin.GetUnsignedIntegerValue("PeerCommitTimeout", peerCommitTimeout); + commitThreadsCount = plugin.GetUnsignedIntegerValue("CommitThreadsCount", commitThreadsCount); + + if (commitThreadsCount == 0) + { + LOG(ERROR) << "Invalid value for configuration \"Transfers.CommitThreadsCount\": " << commitThreadsCount; + return -1; + } + + if (targetBucketSize == 0) + { + LOG(ERROR) << "Invalid value for configuration \"Transfers.BucketSize\": " << targetBucketSize; + return -1; + } + + if (maxPushTransactions == 0) + { + LOG(ERROR) << "Invalid value for configuration \"Transfers.MaxPushTransactions\": " << maxPushTransactions; + return -1; + } } } OrthancPlugins::PluginContext::Initialize(threadsCount, targetBucketSize * KB, maxPushTransactions, - memoryCacheSize * MB, maxHttpRetries, peerConnectivityTimeout, peerCommitTimeout); + memoryCacheSize * MB, maxHttpRetries, peerConnectivityTimeout, peerCommitTimeout, commitThreadsCount); OrthancPlugins::RegisterRestCallback<ServeChunks> (std::string(URI_CHUNKS) + "/([.0-9a-f-]+)", true);
--- a/Plugin/PluginContext.cpp Tue Dec 09 09:05:15 2025 +0100 +++ b/Plugin/PluginContext.cpp Tue Dec 09 16:17:40 2025 +0100 @@ -23,7 +23,7 @@ #include <Compatibility.h> // For std::unique_ptr #include <Logging.h> - +#include "../Framework/DownloadArea.h" namespace OrthancPlugins { @@ -33,7 +33,8 @@ size_t memoryCacheSize, unsigned int maxHttpRetries, unsigned int peerConnectivityTimeout, - unsigned int peerCommitTimeout) : + unsigned int peerCommitTimeout, + unsigned int commitThreadsCount) : pushTransactions_(maxPushTransactions), semaphore_(threadsCount), pluginUuid_(Orthanc::Toolbox::GenerateUuid()), @@ -41,9 +42,11 @@ targetBucketSize_(targetBucketSize), maxHttpRetries_(maxHttpRetries), peerConnectivityTimeout_(peerConnectivityTimeout), - peerCommitTimeout_(peerCommitTimeout) + peerCommitTimeout_(peerCommitTimeout), + commitThreadsCount_(commitThreadsCount) { cache_.SetMaxMemorySize(memoryCacheSize); + DownloadArea::SetCommitWorkerThreadsCount(commitThreadsCount_); LOG(INFO) << "Transfers accelerator will use " << threadsCount_ << " thread(s) to run HTTP queries"; LOG(INFO) << "Transfers accelerator will keep local DICOM files in a memory cache of size: " @@ -58,6 +61,8 @@ << peerConnectivityTimeout_ << " seconds as a timeout when checking peers connectivity"; LOG(INFO) << "Transfers accelerator will use " << peerCommitTimeout_ << " seconds as a timeout when committing push transfer"; + LOG(INFO) << "Transfers accelerator will use " + << commitThreadsCount_ << " thread(s) to perform commit (on receiver's side)"; } @@ -74,10 +79,11 @@ size_t memoryCacheSize, unsigned int maxHttpRetries, unsigned int peerConnectivityTimeout, - unsigned int peerCommitTimeout) + unsigned int peerCommitTimeout, + unsigned int commitThreadsCount) { GetSingleton().reset(new PluginContext(threadsCount, targetBucketSize, - maxPushTransactions, memoryCacheSize, maxHttpRetries, peerConnectivityTimeout, peerCommitTimeout)); + maxPushTransactions, memoryCacheSize, maxHttpRetries, peerConnectivityTimeout, peerCommitTimeout, commitThreadsCount)); }
--- a/Plugin/PluginContext.h Tue Dec 09 09:05:15 2025 +0100 +++ b/Plugin/PluginContext.h Tue Dec 09 16:17:40 2025 +0100 @@ -46,6 +46,7 @@ unsigned int maxHttpRetries_; unsigned int peerConnectivityTimeout_; unsigned int peerCommitTimeout_; + unsigned int commitThreadsCount_; PluginContext(size_t threadsCount, size_t targetBucketSize, @@ -53,7 +54,8 @@ size_t memoryCacheSize, unsigned int maxHttpRetries, unsigned int peerConnectivityTimeout, - unsigned int peerCommitTimeout); + unsigned int peerCommitTimeout, + unsigned int commitThreadsCount); static std::unique_ptr<PluginContext>& GetSingleton(); @@ -83,6 +85,11 @@ return threadsCount_; } + size_t GetCommitThreadsCount() const + { + return commitThreadsCount_; + } + size_t GetTargetBucketSize() const { return targetBucketSize_; @@ -109,7 +116,8 @@ size_t memoryCacheSize, unsigned int maxHttpRetries, unsigned int peerConnectivityTimeout, - unsigned int peerCommitTimeout); + unsigned int peerCommitTimeout, + unsigned int commitThreadsCount); static PluginContext& GetInstance();
