Mercurial > hg > orthanc
diff OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 4797:4e765c18ace7 storage-cache
enable using multiple threads to load instances when generating zip archive/media
author | Alain Mazy <am@osimis.io> |
---|---|
date | Thu, 07 Oct 2021 13:31:36 +0200 |
parents | cdab941fe17d |
children | 7afbb54bd028 |
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Thu Sep 30 12:14:19 2021 +0200 +++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Thu Oct 07 13:31:36 2021 +0200 @@ -40,6 +40,7 @@ #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h" #include "../../../OrthancFramework/Sources/Logging.h" #include "../../../OrthancFramework/Sources/OrthancException.h" +#include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h" #include "../OrthancConfiguration.h" #include "../ServerContext.h" @@ -88,6 +89,181 @@ } + class ArchiveJob::InstanceLoader : public boost::noncopyable + { + protected: + ServerContext& context_; + public: + InstanceLoader(ServerContext& context) + : context_(context) + { + } + + virtual ~InstanceLoader() + { + } + + virtual void PrepareDicom(const std::string& instanceId) + { + + } + + virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0; + + virtual void Clear() + { + } + }; + + class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader + { + public: + SynchronousInstanceLoader(ServerContext& context) + : InstanceLoader(context) + { + } + + virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE + { + context_.ReadDicom(dicom, instanceId); + } + }; + + class InstanceId : public Orthanc::IDynamicObject + { + private: + std::string id_; + + public: + InstanceId(const std::string& id) : id_(id) + { + } + + virtual ~InstanceId() ORTHANC_OVERRIDE + { + } + + std::string GetId() const {return id_;}; + }; + + class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader + { + Semaphore availableInstancesSemaphore_; + std::map<std::string, boost::shared_ptr<std::string>> availableInstances_; + boost::mutex availableInstancesMutex_; + SharedMessageQueue instancesToPreload_; + std::vector<boost::thread*> threads_; + + + public: + ThreadedInstanceLoader(ServerContext& context, size_t threadCount) + : InstanceLoader(context), + availableInstancesSemaphore_(0) + { + for (size_t i = 0; i < threadCount; i++) + { + threads_.push_back(new boost::thread(PreloaderWorkerThread, this)); + } + } + + virtual ~ThreadedInstanceLoader() + { + Clear(); + } + + virtual void Clear() ORTHANC_OVERRIDE + { + for (size_t i = 0; i < threads_.size(); i++) + { + instancesToPreload_.Enqueue(NULL); + } + + for (size_t i = 0; i < threads_.size(); i++) + { + if (threads_[i]->joinable()) + { + threads_[i]->join(); + } + delete threads_[i]; + } + + threads_.clear(); + availableInstances_.clear(); + } + + static void PreloaderWorkerThread(ThreadedInstanceLoader* that) + { + while (true) + { + std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0))); + if (instanceId.get() == NULL) // that's the signal to exit the thread + { + return; + } + + try + { + boost::shared_ptr<std::string> dicomContent(new std::string()); + that->context_.ReadDicom(*dicomContent, instanceId->GetId()); + { + boost::mutex::scoped_lock lock(that->availableInstancesMutex_); + that->availableInstances_[instanceId->GetId()] = dicomContent; + } + + that->availableInstancesSemaphore_.Release(); + } + catch (OrthancException& e) + { + boost::mutex::scoped_lock lock(that->availableInstancesMutex_); + // store a NULL result to notify that we could not read the instance + that->availableInstances_[instanceId->GetId()] = boost::shared_ptr<std::string>(); + that->availableInstancesSemaphore_.Release(); + } + } + } + + virtual void PrepareDicom(const std::string& instanceId) ORTHANC_OVERRIDE + { + instancesToPreload_.Enqueue(new InstanceId(instanceId)); + } + + virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE + { + while (true) + { + // wait for an instance to be available but this might not be the one we are waiting for ! + availableInstancesSemaphore_.Acquire(); + + boost::shared_ptr<std::string> dicomContent; + { + if (availableInstances_.find(instanceId) != availableInstances_.end()) + { + // this is the instance we were waiting for + dicomContent = availableInstances_[instanceId]; + availableInstances_.erase(instanceId); + + if (dicomContent.get() == NULL) // there has been an error while reading the file + { + throw OrthancException(ErrorCode_InexistentItem); + } + dicom.swap(*dicomContent); + + if (availableInstances_.size() > 0) + { + // we have just read the instance we were waiting for but there are still other instances available -> + // make sure the next GetDicom call does not wait ! + availableInstancesSemaphore_.Release(); + } + return; + } + // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when + // a new instance is available + } + } + } + }; + + class ArchiveJob::ResourceIdentifiers : public boost::noncopyable { private: @@ -402,6 +578,7 @@ void Apply(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, DicomDirWriter* dicomDir, const std::string& dicomDirFolder, bool transcode, @@ -423,7 +600,7 @@ try { - context.ReadDicom(content, instanceId_); + instanceLoader.GetDicom(content, instanceId_); } catch (OrthancException& e) { @@ -494,10 +671,12 @@ std::deque<Command*> commands_; uint64_t uncompressedSize_; unsigned int instancesCount_; + InstanceLoader& instanceLoader_; void ApplyInternal(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, size_t index, DicomDirWriter* dicomDir, const std::string& dicomDirFolder, @@ -509,13 +688,14 @@ throw OrthancException(ErrorCode_ParameterOutOfRange); } - commands_[index]->Apply(writer, context, dicomDir, dicomDirFolder, transcode, transferSyntax); + commands_[index]->Apply(writer, context, instanceLoader, dicomDir, dicomDirFolder, transcode, transferSyntax); } public: - ZipCommands() : + ZipCommands(InstanceLoader& instanceLoader) : uncompressedSize_(0), - instancesCount_(0) + instancesCount_(0), + instanceLoader_(instanceLoader) { } @@ -547,23 +727,25 @@ // "media" flavor (with DICOMDIR) void Apply(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, size_t index, DicomDirWriter& dicomDir, const std::string& dicomDirFolder, bool transcode, DicomTransferSyntax transferSyntax) const { - ApplyInternal(writer, context, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); + ApplyInternal(writer, context, instanceLoader, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); } // "archive" flavor (without DICOMDIR) void Apply(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, size_t index, bool transcode, DicomTransferSyntax transferSyntax) const { - ApplyInternal(writer, context, index, NULL, "", transcode, transferSyntax); + ApplyInternal(writer, context, instanceLoader, index, NULL, "", transcode, transferSyntax); } void AddOpenDirectory(const std::string& filename) @@ -580,6 +762,7 @@ const std::string& instanceId, uint64_t uncompressedSize) { + instanceLoader_.PrepareDicom(instanceId); commands_.push_back(new Command(Type_WriteInstance, filename, instanceId)); instancesCount_ ++; uncompressedSize_ += uncompressedSize; @@ -747,6 +930,7 @@ { private: ServerContext& context_; + InstanceLoader& instanceLoader_; ZipCommands commands_; std::unique_ptr<HierarchicalZipWriter> zip_; std::unique_ptr<DicomDirWriter> dicomDir_; @@ -755,10 +939,13 @@ public: ZipWriterIterator(ServerContext& context, + InstanceLoader& instanceLoader, ArchiveIndex& archive, bool isMedia, bool enableExtendedSopClass) : context_(context), + instanceLoader_(instanceLoader), + commands_(instanceLoader), isMedia_(isMedia), isStream_(false) { @@ -882,13 +1069,13 @@ if (isMedia_) { assert(dicomDir_.get() != NULL); - commands_.Apply(*zip_, context_, index, *dicomDir_, + commands_.Apply(*zip_, context_, instanceLoader_, index, *dicomDir_, MEDIA_IMAGES_FOLDER, transcode, transferSyntax); } else { assert(dicomDir_.get() == NULL); - commands_.Apply(*zip_, context_, index, transcode, transferSyntax); + commands_.Apply(*zip_, context_, instanceLoader_, index, transcode, transferSyntax); } } } @@ -917,7 +1104,8 @@ uncompressedSize_(0), archiveSize_(0), transcode_(false), - transferSyntax_(DicomTransferSyntax_LittleEndianImplicit) + transferSyntax_(DicomTransferSyntax_LittleEndianImplicit), + loaderThreads_(0) { } @@ -993,6 +1181,19 @@ } + void ArchiveJob::SetLoaderThreads(unsigned int loaderThreads) + { + if (writer_.get() != NULL) // Already started + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + loaderThreads_ = loaderThreads; + } + } + + void ArchiveJob::Reset() { throw OrthancException(ErrorCode_BadSequenceOfCalls, @@ -1002,6 +1203,16 @@ void ArchiveJob::Start() { + if (loaderThreads_ == 0) + { + // default behaviour before loaderThreads was introducted in 1.9.8 + instanceLoader_.reset(new SynchronousInstanceLoader(context_)); + } + else + { + instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_)); + } + if (writer_.get() != NULL) { throw OrthancException(ErrorCode_BadSequenceOfCalls); @@ -1023,7 +1234,7 @@ assert(asynchronousTarget_.get() != NULL); asynchronousTarget_->Touch(); // Make sure we can write to the temporary file - writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); writer_->SetOutputFile(asynchronousTarget_->GetPath()); } } @@ -1031,7 +1242,7 @@ { assert(synchronousTarget_.get() != NULL); - writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); writer_->AcquireOutputStream(synchronousTarget_.release()); } @@ -1076,6 +1287,11 @@ writer_.reset(); } + if (instanceLoader_.get() != NULL) + { + instanceLoader_->Clear(); + } + if (asynchronousTarget_.get() != NULL) { // Asynchronous behavior: Move the resulting file into the media archive