# HG changeset patch # User Alain Mazy # Date 1633606296 -7200 # Node ID 4e765c18ace729b5cff569c8dbd29b1548b7c619 # Parent 434843934307f45934ec825a245f9e9bff362ad2 enable using multiple threads to load instances when generating zip archive/media diff -r 434843934307 -r 4e765c18ace7 NEWS --- a/NEWS Thu Sep 30 12:14:19 2021 +0200 +++ b/NEWS Thu Oct 07 13:31:36 2021 +0200 @@ -9,6 +9,8 @@ retrieval of individual frames of multiframe instances. * New configuration option "MaximumStorageCacheSize" to configure the size of the new storage cache. +* New configuration option "ZipLoaderThreads" to configure the number of threads used + to read instances from storage when createing a Zip archive/media. Maintenance diff -r 434843934307 -r 4e765c18ace7 OrthancFramework/Sources/MultiThreading/Semaphore.cpp --- a/OrthancFramework/Sources/MultiThreading/Semaphore.cpp Thu Sep 30 12:14:19 2021 +0200 +++ b/OrthancFramework/Sources/MultiThreading/Semaphore.cpp Thu Oct 07 13:31:36 2021 +0200 @@ -31,10 +31,6 @@ Semaphore::Semaphore(unsigned int availableResources) : availableResources_(availableResources) { - if (availableResources_ == 0) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } } unsigned int Semaphore::GetAvailableResourcesCount() const diff -r 434843934307 -r 4e765c18ace7 OrthancFramework/Sources/MultiThreading/Semaphore.h --- a/OrthancFramework/Sources/MultiThreading/Semaphore.h Thu Sep 30 12:14:19 2021 +0200 +++ b/OrthancFramework/Sources/MultiThreading/Semaphore.h Thu Oct 07 13:31:36 2021 +0200 @@ -36,16 +36,16 @@ boost::mutex mutex_; boost::condition_variable condition_; + public: + explicit Semaphore(unsigned int availableResources); + + unsigned int GetAvailableResourcesCount() const; + void Release(unsigned int resourceCount = 1); void Acquire(unsigned int resourceCount = 1); bool TryAcquire(unsigned int resourceCount = 1); - public: - explicit Semaphore(unsigned int availableResources); - - unsigned int GetAvailableResourcesCount() const; - class Locker : public boost::noncopyable { diff -r 434843934307 -r 4e765c18ace7 OrthancServer/Resources/Configuration.json --- a/OrthancServer/Resources/Configuration.json Thu Sep 30 12:14:19 2021 +0200 +++ b/OrthancServer/Resources/Configuration.json Thu Oct 07 13:31:36 2021 +0200 @@ -834,5 +834,12 @@ // disk space and might lead to HTTP timeouts on large archives). If // set to "true", the chunks of the ZIP file are progressively sent // as soon as one DICOM file gets compressed (new in Orthanc 1.9.4) - "SynchronousZipStream" : true + "SynchronousZipStream" : true, + + // Default number of loader threads when generating Zip archive/media. + // A value of 0 means reading and writing are performed in sequence + // (default behaviour). A value > 1 is meaningfull only if the storage + // is a distributed network storage (e.g object storage plugin). + // (new in Orthanc 1.9.8) + "ZipLoaderThreads": 0 } diff -r 434843934307 -r 4e765c18ace7 OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp --- a/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp Thu Sep 30 12:14:19 2021 +0200 +++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp Thu Oct 07 13:31:36 2021 +0200 @@ -51,7 +51,9 @@ static const char* const KEY_RESOURCES = "Resources"; static const char* const KEY_EXTENDED = "Extended"; static const char* const KEY_TRANSCODE = "Transcode"; - + + static const char* const CONFIG_LOADER_THREADS = "ZipLoaderThreads"; + static void AddResourcesOfInterestFromArray(ArchiveJob& job, const Json::Value& resources) { @@ -123,6 +125,7 @@ bool& transcode, /* out */ DicomTransferSyntax& syntax, /* out */ int& priority, /* out */ + unsigned int& loaderThreads, /* out */ const Json::Value& body, /* in */ const bool defaultExtended /* in */) { @@ -151,6 +154,12 @@ { transcode = false; } + + { + OrthancConfiguration::ReaderLock lock; + loaderThreads = lock.GetConfiguration().GetUnsignedIntegerParameter(CONFIG_LOADER_THREADS, 0); // New in Orthanc 1.9.8 + } + } @@ -554,8 +563,9 @@ bool synchronous, extended, transcode; DicomTransferSyntax transferSyntax; int priority; + unsigned int loaderThreads; GetJobParameters(synchronous, extended, transcode, transferSyntax, - priority, body, DEFAULT_IS_EXTENDED); + priority, loaderThreads, body, DEFAULT_IS_EXTENDED); std::unique_ptr job(new ArchiveJob(context, IS_MEDIA, extended)); AddResourcesOfInterest(*job, body); @@ -565,6 +575,8 @@ job->SetTranscode(transferSyntax); } + job->SetLoaderThreads(loaderThreads); + SubmitJob(call.GetOutput(), context, job, priority, synchronous, "Archive.zip"); } else @@ -578,6 +590,8 @@ template static void CreateSingleGet(RestApiGetCall& call) { + static const char* const TRANSCODE = "transcode"; + if (call.IsDocumentation()) { ResourceType t = StringToResourceType(call.GetFullUri()[0].c_str()); @@ -591,7 +605,7 @@ "which might *not* be desirable to archive large amount of data, as it might " "lead to network timeouts. Prefer the asynchronous version using `POST` method.") .SetUriArgument("id", "Orthanc identifier of the " + r + " of interest") - .SetHttpGetArgument("transcode", RestApiCallDocumentation::Type_String, + .SetHttpGetArgument(TRANSCODE, RestApiCallDocumentation::Type_String, "If present, the DICOM files in the archive will be transcoded to the provided " "transfer syntax: https://book.orthanc-server.com/faq/transcoding.html", false) .AddAnswerType(MimeType_Zip, "ZIP file containing the archive"); @@ -621,12 +635,17 @@ std::unique_ptr job(new ArchiveJob(context, IS_MEDIA, extended)); job->AddResource(id); - static const char* const TRANSCODE = "transcode"; if (call.HasArgument(TRANSCODE)) { job->SetTranscode(GetTransferSyntax(call.GetArgument(TRANSCODE, ""))); } + { + OrthancConfiguration::ReaderLock lock; + unsigned int loaderThreads = lock.GetConfiguration().GetUnsignedIntegerParameter(CONFIG_LOADER_THREADS, 0); // New in Orthanc 1.9.8 + job->SetLoaderThreads(loaderThreads); + } + SubmitJob(call.GetOutput(), context, job, 0 /* priority */, true /* synchronous */, id + ".zip"); } @@ -660,8 +679,9 @@ bool synchronous, extended, transcode; DicomTransferSyntax transferSyntax; int priority; + unsigned int loaderThreads; GetJobParameters(synchronous, extended, transcode, transferSyntax, - priority, body, false /* by default, not extented */); + priority, loaderThreads, body, false /* by default, not extented */); std::unique_ptr job(new ArchiveJob(context, IS_MEDIA, extended)); job->AddResource(id); @@ -671,6 +691,8 @@ job->SetTranscode(transferSyntax); } + job->SetLoaderThreads(loaderThreads); + SubmitJob(call.GetOutput(), context, job, priority, synchronous, id + ".zip"); } else diff -r 434843934307 -r 4e765c18ace7 OrthancServer/Sources/ServerJobs/ArchiveJob.cpp --- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Thu Sep 30 12:14:19 2021 +0200 +++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Thu Oct 07 13:31:36 2021 +0200 @@ -40,6 +40,7 @@ #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h" #include "../../../OrthancFramework/Sources/Logging.h" #include "../../../OrthancFramework/Sources/OrthancException.h" +#include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h" #include "../OrthancConfiguration.h" #include "../ServerContext.h" @@ -88,6 +89,181 @@ } + class ArchiveJob::InstanceLoader : public boost::noncopyable + { + protected: + ServerContext& context_; + public: + InstanceLoader(ServerContext& context) + : context_(context) + { + } + + virtual ~InstanceLoader() + { + } + + virtual void PrepareDicom(const std::string& instanceId) + { + + } + + virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0; + + virtual void Clear() + { + } + }; + + class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader + { + public: + SynchronousInstanceLoader(ServerContext& context) + : InstanceLoader(context) + { + } + + virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE + { + context_.ReadDicom(dicom, instanceId); + } + }; + + class InstanceId : public Orthanc::IDynamicObject + { + private: + std::string id_; + + public: + InstanceId(const std::string& id) : id_(id) + { + } + + virtual ~InstanceId() ORTHANC_OVERRIDE + { + } + + std::string GetId() const {return id_;}; + }; + + class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader + { + Semaphore availableInstancesSemaphore_; + std::map> availableInstances_; + boost::mutex availableInstancesMutex_; + SharedMessageQueue instancesToPreload_; + std::vector threads_; + + + public: + ThreadedInstanceLoader(ServerContext& context, size_t threadCount) + : InstanceLoader(context), + availableInstancesSemaphore_(0) + { + for (size_t i = 0; i < threadCount; i++) + { + threads_.push_back(new boost::thread(PreloaderWorkerThread, this)); + } + } + + virtual ~ThreadedInstanceLoader() + { + Clear(); + } + + virtual void Clear() ORTHANC_OVERRIDE + { + for (size_t i = 0; i < threads_.size(); i++) + { + instancesToPreload_.Enqueue(NULL); + } + + for (size_t i = 0; i < threads_.size(); i++) + { + if (threads_[i]->joinable()) + { + threads_[i]->join(); + } + delete threads_[i]; + } + + threads_.clear(); + availableInstances_.clear(); + } + + static void PreloaderWorkerThread(ThreadedInstanceLoader* that) + { + while (true) + { + std::unique_ptr instanceId(dynamic_cast(that->instancesToPreload_.Dequeue(0))); + if (instanceId.get() == NULL) // that's the signal to exit the thread + { + return; + } + + try + { + boost::shared_ptr dicomContent(new std::string()); + that->context_.ReadDicom(*dicomContent, instanceId->GetId()); + { + boost::mutex::scoped_lock lock(that->availableInstancesMutex_); + that->availableInstances_[instanceId->GetId()] = dicomContent; + } + + that->availableInstancesSemaphore_.Release(); + } + catch (OrthancException& e) + { + boost::mutex::scoped_lock lock(that->availableInstancesMutex_); + // store a NULL result to notify that we could not read the instance + that->availableInstances_[instanceId->GetId()] = boost::shared_ptr(); + that->availableInstancesSemaphore_.Release(); + } + } + } + + virtual void PrepareDicom(const std::string& instanceId) ORTHANC_OVERRIDE + { + instancesToPreload_.Enqueue(new InstanceId(instanceId)); + } + + virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE + { + while (true) + { + // wait for an instance to be available but this might not be the one we are waiting for ! + availableInstancesSemaphore_.Acquire(); + + boost::shared_ptr dicomContent; + { + if (availableInstances_.find(instanceId) != availableInstances_.end()) + { + // this is the instance we were waiting for + dicomContent = availableInstances_[instanceId]; + availableInstances_.erase(instanceId); + + if (dicomContent.get() == NULL) // there has been an error while reading the file + { + throw OrthancException(ErrorCode_InexistentItem); + } + dicom.swap(*dicomContent); + + if (availableInstances_.size() > 0) + { + // we have just read the instance we were waiting for but there are still other instances available -> + // make sure the next GetDicom call does not wait ! + availableInstancesSemaphore_.Release(); + } + return; + } + // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when + // a new instance is available + } + } + } + }; + + class ArchiveJob::ResourceIdentifiers : public boost::noncopyable { private: @@ -402,6 +578,7 @@ void Apply(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, DicomDirWriter* dicomDir, const std::string& dicomDirFolder, bool transcode, @@ -423,7 +600,7 @@ try { - context.ReadDicom(content, instanceId_); + instanceLoader.GetDicom(content, instanceId_); } catch (OrthancException& e) { @@ -494,10 +671,12 @@ std::deque commands_; uint64_t uncompressedSize_; unsigned int instancesCount_; + InstanceLoader& instanceLoader_; void ApplyInternal(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, size_t index, DicomDirWriter* dicomDir, const std::string& dicomDirFolder, @@ -509,13 +688,14 @@ throw OrthancException(ErrorCode_ParameterOutOfRange); } - commands_[index]->Apply(writer, context, dicomDir, dicomDirFolder, transcode, transferSyntax); + commands_[index]->Apply(writer, context, instanceLoader, dicomDir, dicomDirFolder, transcode, transferSyntax); } public: - ZipCommands() : + ZipCommands(InstanceLoader& instanceLoader) : uncompressedSize_(0), - instancesCount_(0) + instancesCount_(0), + instanceLoader_(instanceLoader) { } @@ -547,23 +727,25 @@ // "media" flavor (with DICOMDIR) void Apply(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, size_t index, DicomDirWriter& dicomDir, const std::string& dicomDirFolder, bool transcode, DicomTransferSyntax transferSyntax) const { - ApplyInternal(writer, context, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); + ApplyInternal(writer, context, instanceLoader, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); } // "archive" flavor (without DICOMDIR) void Apply(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, size_t index, bool transcode, DicomTransferSyntax transferSyntax) const { - ApplyInternal(writer, context, index, NULL, "", transcode, transferSyntax); + ApplyInternal(writer, context, instanceLoader, index, NULL, "", transcode, transferSyntax); } void AddOpenDirectory(const std::string& filename) @@ -580,6 +762,7 @@ const std::string& instanceId, uint64_t uncompressedSize) { + instanceLoader_.PrepareDicom(instanceId); commands_.push_back(new Command(Type_WriteInstance, filename, instanceId)); instancesCount_ ++; uncompressedSize_ += uncompressedSize; @@ -747,6 +930,7 @@ { private: ServerContext& context_; + InstanceLoader& instanceLoader_; ZipCommands commands_; std::unique_ptr zip_; std::unique_ptr dicomDir_; @@ -755,10 +939,13 @@ public: ZipWriterIterator(ServerContext& context, + InstanceLoader& instanceLoader, ArchiveIndex& archive, bool isMedia, bool enableExtendedSopClass) : context_(context), + instanceLoader_(instanceLoader), + commands_(instanceLoader), isMedia_(isMedia), isStream_(false) { @@ -882,13 +1069,13 @@ if (isMedia_) { assert(dicomDir_.get() != NULL); - commands_.Apply(*zip_, context_, index, *dicomDir_, + commands_.Apply(*zip_, context_, instanceLoader_, index, *dicomDir_, MEDIA_IMAGES_FOLDER, transcode, transferSyntax); } else { assert(dicomDir_.get() == NULL); - commands_.Apply(*zip_, context_, index, transcode, transferSyntax); + commands_.Apply(*zip_, context_, instanceLoader_, index, transcode, transferSyntax); } } } @@ -917,7 +1104,8 @@ uncompressedSize_(0), archiveSize_(0), transcode_(false), - transferSyntax_(DicomTransferSyntax_LittleEndianImplicit) + transferSyntax_(DicomTransferSyntax_LittleEndianImplicit), + loaderThreads_(0) { } @@ -993,6 +1181,19 @@ } + void ArchiveJob::SetLoaderThreads(unsigned int loaderThreads) + { + if (writer_.get() != NULL) // Already started + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + loaderThreads_ = loaderThreads; + } + } + + void ArchiveJob::Reset() { throw OrthancException(ErrorCode_BadSequenceOfCalls, @@ -1002,6 +1203,16 @@ void ArchiveJob::Start() { + if (loaderThreads_ == 0) + { + // default behaviour before loaderThreads was introducted in 1.9.8 + instanceLoader_.reset(new SynchronousInstanceLoader(context_)); + } + else + { + instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_)); + } + if (writer_.get() != NULL) { throw OrthancException(ErrorCode_BadSequenceOfCalls); @@ -1023,7 +1234,7 @@ assert(asynchronousTarget_.get() != NULL); asynchronousTarget_->Touch(); // Make sure we can write to the temporary file - writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); writer_->SetOutputFile(asynchronousTarget_->GetPath()); } } @@ -1031,7 +1242,7 @@ { assert(synchronousTarget_.get() != NULL); - writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); writer_->AcquireOutputStream(synchronousTarget_.release()); } @@ -1076,6 +1287,11 @@ writer_.reset(); } + if (instanceLoader_.get() != NULL) + { + instanceLoader_->Clear(); + } + if (asynchronousTarget_.get() != NULL) { // Asynchronous behavior: Move the resulting file into the media archive diff -r 434843934307 -r 4e765c18ace7 OrthancServer/Sources/ServerJobs/ArchiveJob.h --- a/OrthancServer/Sources/ServerJobs/ArchiveJob.h Thu Sep 30 12:14:19 2021 +0200 +++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.h Thu Oct 07 13:31:36 2021 +0200 @@ -55,10 +55,14 @@ class ResourceIdentifiers; class ZipCommands; class ZipWriterIterator; - + class InstanceLoader; + class SynchronousInstanceLoader; + class ThreadedInstanceLoader; + std::unique_ptr synchronousTarget_; // Only valid before "Start()" std::unique_ptr asynchronousTarget_; ServerContext& context_; + std::unique_ptr instanceLoader_; boost::shared_ptr archive_; bool isMedia_; bool enableExtendedSopClass_; @@ -75,6 +79,9 @@ bool transcode_; DicomTransferSyntax transferSyntax_; + // New in Orthanc 1.9.8 + unsigned int loaderThreads_; + void FinalizeTarget(); public: @@ -97,6 +104,8 @@ void SetTranscode(DicomTransferSyntax transferSyntax); + void SetLoaderThreads(unsigned int loaderThreads); + virtual void Reset() ORTHANC_OVERRIDE; virtual void Start() ORTHANC_OVERRIDE;