Mercurial > hg > orthanc
diff OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 4819:70d2a97ca8cb openssl-3.x
integration mainline->openssl-3.x
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 25 Nov 2021 13:12:32 +0100 |
parents | f0038043fb97 58637d39ce88 |
children | 2e71a08eea15 |
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Mon Aug 30 22:21:24 2021 +0200 +++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Thu Nov 25 13:12:32 2021 +0100 @@ -28,6 +28,7 @@ #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h" #include "../../../OrthancFramework/Sources/Logging.h" #include "../../../OrthancFramework/Sources/OrthancException.h" +#include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h" #include "../OrthancConfiguration.h" #include "../ServerContext.h" @@ -76,6 +77,181 @@ } + class ArchiveJob::InstanceLoader : public boost::noncopyable + { + protected: + ServerContext& context_; + public: + explicit InstanceLoader(ServerContext& context) + : context_(context) + { + } + + virtual ~InstanceLoader() + { + } + + virtual void PrepareDicom(const std::string& instanceId) + { + + } + + virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0; + + virtual void Clear() + { + } + }; + + class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader + { + public: + explicit SynchronousInstanceLoader(ServerContext& context) + : InstanceLoader(context) + { + } + + virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE + { + context_.ReadDicom(dicom, instanceId); + } + }; + + class InstanceId : public Orthanc::IDynamicObject + { + private: + std::string id_; + + public: + explicit InstanceId(const std::string& id) : id_(id) + { + } + + virtual ~InstanceId() ORTHANC_OVERRIDE + { + } + + std::string GetId() const {return id_;}; + }; + + class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader + { + Semaphore availableInstancesSemaphore_; + std::map<std::string, boost::shared_ptr<std::string> > availableInstances_; + boost::mutex availableInstancesMutex_; + SharedMessageQueue instancesToPreload_; + std::vector<boost::thread*> threads_; + + + public: + ThreadedInstanceLoader(ServerContext& context, size_t threadCount) + : InstanceLoader(context), + availableInstancesSemaphore_(0) + { + for (size_t i = 0; i < threadCount; i++) + { + threads_.push_back(new boost::thread(PreloaderWorkerThread, this)); + } + } + + virtual ~ThreadedInstanceLoader() + { + Clear(); + } + + virtual void Clear() ORTHANC_OVERRIDE + { + for (size_t i = 0; i < threads_.size(); i++) + { + instancesToPreload_.Enqueue(NULL); + } + + for (size_t i = 0; i < threads_.size(); i++) + { + if (threads_[i]->joinable()) + { + threads_[i]->join(); + } + delete threads_[i]; + } + + threads_.clear(); + availableInstances_.clear(); + } + + static void PreloaderWorkerThread(ThreadedInstanceLoader* that) + { + while (true) + { + std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0))); + if (instanceId.get() == NULL) // that's the signal to exit the thread + { + return; + } + + try + { + boost::shared_ptr<std::string> dicomContent(new std::string()); + that->context_.ReadDicom(*dicomContent, instanceId->GetId()); + { + boost::mutex::scoped_lock lock(that->availableInstancesMutex_); + that->availableInstances_[instanceId->GetId()] = dicomContent; + } + + that->availableInstancesSemaphore_.Release(); + } + catch (OrthancException& e) + { + boost::mutex::scoped_lock lock(that->availableInstancesMutex_); + // store a NULL result to notify that we could not read the instance + that->availableInstances_[instanceId->GetId()] = boost::shared_ptr<std::string>(); + that->availableInstancesSemaphore_.Release(); + } + } + } + + virtual void PrepareDicom(const std::string& instanceId) ORTHANC_OVERRIDE + { + instancesToPreload_.Enqueue(new InstanceId(instanceId)); + } + + virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE + { + while (true) + { + // wait for an instance to be available but this might not be the one we are waiting for ! + availableInstancesSemaphore_.Acquire(); + + boost::shared_ptr<std::string> dicomContent; + { + if (availableInstances_.find(instanceId) != availableInstances_.end()) + { + // this is the instance we were waiting for + dicomContent = availableInstances_[instanceId]; + availableInstances_.erase(instanceId); + + if (dicomContent.get() == NULL) // there has been an error while reading the file + { + throw OrthancException(ErrorCode_InexistentItem); + } + dicom.swap(*dicomContent); + + if (availableInstances_.size() > 0) + { + // we have just read the instance we were waiting for but there are still other instances available -> + // make sure the next GetDicom call does not wait ! + availableInstancesSemaphore_.Release(); + } + return; + } + // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when + // a new instance is available + } + } + } + }; + + class ArchiveJob::ResourceIdentifiers : public boost::noncopyable { private: @@ -390,6 +566,7 @@ void Apply(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, DicomDirWriter* dicomDir, const std::string& dicomDirFolder, bool transcode, @@ -411,7 +588,7 @@ try { - context.ReadDicom(content, instanceId_); + instanceLoader.GetDicom(content, instanceId_); } catch (OrthancException& e) { @@ -482,10 +659,12 @@ std::deque<Command*> commands_; uint64_t uncompressedSize_; unsigned int instancesCount_; + InstanceLoader& instanceLoader_; void ApplyInternal(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, size_t index, DicomDirWriter* dicomDir, const std::string& dicomDirFolder, @@ -497,13 +676,14 @@ throw OrthancException(ErrorCode_ParameterOutOfRange); } - commands_[index]->Apply(writer, context, dicomDir, dicomDirFolder, transcode, transferSyntax); + commands_[index]->Apply(writer, context, instanceLoader, dicomDir, dicomDirFolder, transcode, transferSyntax); } public: - ZipCommands() : + explicit ZipCommands(InstanceLoader& instanceLoader) : uncompressedSize_(0), - instancesCount_(0) + instancesCount_(0), + instanceLoader_(instanceLoader) { } @@ -535,23 +715,25 @@ // "media" flavor (with DICOMDIR) void Apply(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, size_t index, DicomDirWriter& dicomDir, const std::string& dicomDirFolder, bool transcode, DicomTransferSyntax transferSyntax) const { - ApplyInternal(writer, context, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); + ApplyInternal(writer, context, instanceLoader, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); } // "archive" flavor (without DICOMDIR) void Apply(HierarchicalZipWriter& writer, ServerContext& context, + InstanceLoader& instanceLoader, size_t index, bool transcode, DicomTransferSyntax transferSyntax) const { - ApplyInternal(writer, context, index, NULL, "", transcode, transferSyntax); + ApplyInternal(writer, context, instanceLoader, index, NULL, "", transcode, transferSyntax); } void AddOpenDirectory(const std::string& filename) @@ -568,6 +750,7 @@ const std::string& instanceId, uint64_t uncompressedSize) { + instanceLoader_.PrepareDicom(instanceId); commands_.push_back(new Command(Type_WriteInstance, filename, instanceId)); instancesCount_ ++; uncompressedSize_ += uncompressedSize; @@ -735,6 +918,7 @@ { private: ServerContext& context_; + InstanceLoader& instanceLoader_; ZipCommands commands_; std::unique_ptr<HierarchicalZipWriter> zip_; std::unique_ptr<DicomDirWriter> dicomDir_; @@ -743,10 +927,13 @@ public: ZipWriterIterator(ServerContext& context, + InstanceLoader& instanceLoader, ArchiveIndex& archive, bool isMedia, bool enableExtendedSopClass) : context_(context), + instanceLoader_(instanceLoader), + commands_(instanceLoader), isMedia_(isMedia), isStream_(false) { @@ -870,13 +1057,13 @@ if (isMedia_) { assert(dicomDir_.get() != NULL); - commands_.Apply(*zip_, context_, index, *dicomDir_, + commands_.Apply(*zip_, context_, instanceLoader_, index, *dicomDir_, MEDIA_IMAGES_FOLDER, transcode, transferSyntax); } else { assert(dicomDir_.get() == NULL); - commands_.Apply(*zip_, context_, index, transcode, transferSyntax); + commands_.Apply(*zip_, context_, instanceLoader_, index, transcode, transferSyntax); } } } @@ -905,7 +1092,8 @@ uncompressedSize_(0), archiveSize_(0), transcode_(false), - transferSyntax_(DicomTransferSyntax_LittleEndianImplicit) + transferSyntax_(DicomTransferSyntax_LittleEndianImplicit), + loaderThreads_(0) { } @@ -981,6 +1169,19 @@ } + void ArchiveJob::SetLoaderThreads(unsigned int loaderThreads) + { + if (writer_.get() != NULL) // Already started + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + loaderThreads_ = loaderThreads; + } + } + + void ArchiveJob::Reset() { throw OrthancException(ErrorCode_BadSequenceOfCalls, @@ -990,6 +1191,16 @@ void ArchiveJob::Start() { + if (loaderThreads_ == 0) + { + // default behaviour before loaderThreads was introducted in 1.9.8 + instanceLoader_.reset(new SynchronousInstanceLoader(context_)); + } + else + { + instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_)); + } + if (writer_.get() != NULL) { throw OrthancException(ErrorCode_BadSequenceOfCalls); @@ -1011,7 +1222,7 @@ assert(asynchronousTarget_.get() != NULL); asynchronousTarget_->Touch(); // Make sure we can write to the temporary file - writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); writer_->SetOutputFile(asynchronousTarget_->GetPath()); } } @@ -1019,7 +1230,7 @@ { assert(synchronousTarget_.get() != NULL); - writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); writer_->AcquireOutputStream(synchronousTarget_.release()); } @@ -1064,6 +1275,11 @@ writer_.reset(); } + if (instanceLoader_.get() != NULL) + { + instanceLoader_->Clear(); + } + if (asynchronousTarget_.get() != NULL) { // Asynchronous behavior: Move the resulting file into the media archive @@ -1184,6 +1400,7 @@ bool ArchiveJob::GetOutput(std::string& output, MimeType& mime, + std::string& filename, const std::string& key) { if (key == "archive" && @@ -1196,6 +1413,7 @@ const DynamicTemporaryFile& f = dynamic_cast<DynamicTemporaryFile&>(accessor.GetItem()); f.GetFile().Read(output); mime = MimeType_Zip; + filename = "archive.zip"; return true; } else