# HG changeset patch # User Alain Mazy # Date 1672935883 -3600 # Node ID f2dcdbe05884853314ac9012021343e2009c11cc # Parent ede035d48b8e29e9ab52554a5da20e88f558453e ResourceModification jobs can now use multiple threads diff -r ede035d48b8e -r f2dcdbe05884 NEWS --- a/NEWS Mon Dec 19 20:00:21 2022 +0100 +++ b/NEWS Thu Jan 05 17:24:43 2023 +0100 @@ -10,6 +10,8 @@ * Made the HTTP Client errors more verbose by including the url in the logs. * Optimization: now using multiple threads to transcode files for asynchronous download of studies archive. * New configuration "KeepAliveTimeout" with a default value of 1 second. +* ResourceModification jobs (/modify + /anonymize) can now use multiple threads to speed up processing + - New configuration "JobsEngineThreadsCount.ResourceModification" to configure the number of threads. REST API -------- diff -r ede035d48b8e -r f2dcdbe05884 OrthancFramework/Sources/DicomParsing/DicomModification.cpp --- a/OrthancFramework/Sources/DicomParsing/DicomModification.cpp Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancFramework/Sources/DicomParsing/DicomModification.cpp Thu Jan 05 17:24:43 2023 +0100 @@ -439,6 +439,8 @@ const std::string& mapped, ResourceType level) { + boost::recursive_mutex::scoped_lock lock(uidMapMutex_); + UidMap::const_iterator previous = uidMap_.find(std::make_pair(level, original)); if (previous == uidMap_.end()) @@ -450,6 +452,8 @@ std::string DicomModification::MapDicomIdentifier(const std::string& original, ResourceType level) { + boost::recursive_mutex::scoped_lock lock(uidMapMutex_); + const std::string stripped = Toolbox::StripSpaces(original); std::string mapped; @@ -684,6 +688,8 @@ void DicomModification::SetLevel(ResourceType level) { + boost::recursive_mutex::scoped_lock lock(uidMapMutex_); + uidMap_.clear(); level_ = level; @@ -839,6 +845,8 @@ void DicomModification::SetupAnonymization(DicomVersion version) { + boost::recursive_mutex::scoped_lock lock(uidMapMutex_); + isAnonymization_ = true; removals_.clear(); @@ -1423,6 +1431,8 @@ void DicomModification::Serialize(Json::Value& value) const { + boost::recursive_mutex::scoped_lock lock(uidMapMutex_); + if (identifierGenerator_ != NULL) { throw OrthancException(ErrorCode_InternalError, @@ -1548,6 +1558,8 @@ const Json::Value& serialized, const char* field) { + boost::recursive_mutex::scoped_lock lock(uidMapMutex_); + if (!serialized.isMember(field) || serialized[field].type() != Json::objectValue) { diff -r ede035d48b8e -r f2dcdbe05884 OrthancFramework/Sources/DicomParsing/DicomModification.h --- a/OrthancFramework/Sources/DicomParsing/DicomModification.h Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancFramework/Sources/DicomParsing/DicomModification.h Thu Jan 05 17:24:43 2023 +0100 @@ -24,6 +24,7 @@ #pragma once #include "ParsedDicomFile.h" +#include #include @@ -126,6 +127,7 @@ typedef std::map< std::pair, std::string> UidMap; + mutable boost::recursive_mutex uidMapMutex_; SetOfTags removals_; SetOfTags clearings_; Replacements replacements_; diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/CMakeLists.txt --- a/OrthancServer/CMakeLists.txt Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/CMakeLists.txt Thu Jan 05 17:24:43 2023 +0100 @@ -136,6 +136,7 @@ ${CMAKE_SOURCE_DIR}/Sources/ServerJobs/ResourceModificationJob.cpp ${CMAKE_SOURCE_DIR}/Sources/ServerJobs/SplitStudyJob.cpp ${CMAKE_SOURCE_DIR}/Sources/ServerJobs/StorageCommitmentScpJob.cpp + ${CMAKE_SOURCE_DIR}/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp ${CMAKE_SOURCE_DIR}/Sources/ServerToolbox.cpp ${CMAKE_SOURCE_DIR}/Sources/SliceOrdering.cpp ${CMAKE_SOURCE_DIR}/Sources/StorageCommitmentReports.cpp diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Resources/Configuration.json --- a/OrthancServer/Resources/Configuration.json Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/Resources/Configuration.json Thu Jan 05 17:24:43 2023 +0100 @@ -65,7 +65,7 @@ // The period (in seconds) between 2 calls of the "OnHeartBeat" // lua callback. O means the heart beat is disabled. // TODO: text below for Orthanc book: - // Note: that the period is not actually the delay between + // Note: that the period is actually not the delay between // the end of an execution and the triggering of the next one. // Since there is only one lua context, if other lua code is being // executed, the heart beat might be delayed even more. @@ -86,6 +86,13 @@ // this value to "1". "ConcurrentJobs" : 2, + // Defines the number of threads that are used to execute each type of + // jobs (for the jobs that can be parallelized). + // A value of "0" indicates to use all the available CPU logical cores. + // (new in Orthanc 1.11.3) + "JobsEngineThreadsCount" : { + "ResourceModification": 1 // for /anonymize, /modify + }, /** * Configuration of the HTTP server @@ -884,7 +891,8 @@ "ZipLoaderThreads": 0, // Extra Main Dicom tags that are stored in DB together with all default - // Main Dicom tags that are already stored (TODO: see book new page). + // Main Dicom tags that are already stored. + // see https://book.orthanc-server.com/faq/main-dicom-tags.html // (new in Orthanc 1.11.0) // Sequences tags are not supported. /** @@ -909,7 +917,7 @@ // Enables/disables warnings in the logs. // "true" enables a warning. All warnings are enabled by default - // TODO: see book new page + // see https://book.orthanc-server.com/faq/main-dicom-tags.html#warnings // (new in Orthanc 1.11.0) "Warnings" : { // A "RequestedTags" has been read from storage which is slower than @@ -922,7 +930,7 @@ // saved with another "ExtraMainDicomTags" configuration which means that // your response might be incomplete/inconsistent. // You should call patients|studies|series|instances/../reconstruct to rebuild - // the DB. TODO: also check for "rebuild DB" plugin + // the DB. You may also check for the "Housekeeper" plugin "W002_InconsistentDicomTagsInDb": true } diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/OrthancConfiguration.cpp --- a/OrthancServer/Sources/OrthancConfiguration.cpp Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/Sources/OrthancConfiguration.cpp Thu Jan 05 17:24:43 2023 +0100 @@ -44,6 +44,7 @@ static const char* const TEMPORARY_DIRECTORY = "TemporaryDirectory"; static const char* const DATABASE_SERVER_IDENTIFIER = "DatabaseServerIdentifier"; static const char* const WARNINGS = "Warnings"; +static const char* const JOBS_ENGINE_THREADS_COUNT = "JobsEngineThreadsCount"; namespace Orthanc { @@ -273,6 +274,55 @@ } } + void OrthancConfiguration::LoadJobsEngineThreadsCount() + { + // default values + jobsEngineThreadsCount_["ResourceModification"] = 1; + + if (json_.isMember(JOBS_ENGINE_THREADS_COUNT)) + { + const Json::Value& source = json_[JOBS_ENGINE_THREADS_COUNT]; + if (source.type() != Json::objectValue) + { + throw OrthancException(ErrorCode_BadFileFormat, + "Bad format of the \"" + std::string(JOBS_ENGINE_THREADS_COUNT) + + "\" configuration section"); + } + + Json::Value::Members members = source.getMemberNames(); + + for (size_t i = 0; i < members.size(); i++) + { + const std::string& name = members[i]; + if (!source[name].isUInt()) + { + throw OrthancException(ErrorCode_BadFileFormat, + "Bad format for \"" + std::string(JOBS_ENGINE_THREADS_COUNT) + "." + name + + "\". It should be an unsigned integer"); + } + jobsEngineThreadsCount_[name] = source[name].asUInt(); + } + } + } + + unsigned int OrthancConfiguration::GetJobsEngineWorkersThread(const std::string& jobType) const + { + unsigned int workersThread = 1; + + const JobsEngineThreadsCount::const_iterator it = jobsEngineThreadsCount_.find(jobType); + if (it != jobsEngineThreadsCount_.end()) + { + workersThread = it->second; + } + + if (workersThread == 0) + { + workersThread = SystemToolbox::GetHardwareConcurrency(); + } + + return workersThread; + } + void OrthancConfiguration::LoadPeers() { if (GetBooleanParameter(ORTHANC_PEERS_IN_DB, false)) diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/OrthancConfiguration.h --- a/OrthancServer/Sources/OrthancConfiguration.h Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/Sources/OrthancConfiguration.h Thu Jan 05 17:24:43 2023 +0100 @@ -51,6 +51,7 @@ private: typedef std::map Modalities; typedef std::map Peers; + typedef std::map JobsEngineThreadsCount; boost::shared_mutex mutex_; Json::Value json_; @@ -60,6 +61,7 @@ const char* configurationFileArg_; Modalities modalities_; Peers peers_; + JobsEngineThreadsCount jobsEngineThreadsCount_; ServerIndex* serverIndex_; std::set disabledWarnings_; @@ -160,6 +162,10 @@ void LoadWarnings(); + void LoadJobsEngineThreadsCount(); + + unsigned int GetJobsEngineWorkersThread(const std::string& jobType) const; + void RegisterFont(ServerResources::FileResourceId resource); bool LookupStringParameter(std::string& target, diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/OrthancInitialization.cpp --- a/OrthancServer/Sources/OrthancInitialization.cpp Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/Sources/OrthancInitialization.cpp Thu Jan 05 17:24:43 2023 +0100 @@ -354,6 +354,7 @@ LoadCustomDictionary(lock.GetJson()); lock.GetConfiguration().LoadWarnings(); + lock.GetConfiguration().LoadJobsEngineThreadsCount(); LoadMainDicomTags(lock.GetJson()); // New in Orthanc 1.11.0 diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/OrthancRestApi/OrthancRestAnonymizeModify.cpp --- a/OrthancServer/Sources/OrthancRestApi/OrthancRestAnonymizeModify.cpp Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestAnonymizeModify.cpp Thu Jan 05 17:24:43 2023 +0100 @@ -333,6 +333,15 @@ } } + static void SetKeepSource(ThreadedSetOfInstancesJob& job, + const Json::Value& body) + { + if (body.isMember(KEEP_SOURCE)) + { + job.SetKeepSource(SerializationToolbox::ReadBoolean(body, KEEP_SOURCE)); + } + } + static void SubmitModificationJob(std::unique_ptr& modification, bool isAnonymization, @@ -343,8 +352,14 @@ const std::set& resources) { ServerContext& context = OrthancRestApi::GetContext(call); + unsigned int workersCount = 0; - std::unique_ptr job(new ResourceModificationJob(context)); + { + OrthancConfiguration::ReaderLock lock; + workersCount = lock.GetConfiguration().GetJobsEngineWorkersThread("ResourceModification"); + } + + std::unique_ptr job(new ResourceModificationJob(context, workersCount)); if (isSingleResource) // This notably configures the output format { @@ -366,12 +381,14 @@ for (std::set::const_iterator it = resources.begin(); it != resources.end(); ++it) { - context.AddChildInstances(*job, *it); + std::list instances; + context.GetIndex().GetChildInstances(instances, *it); + job->AddInstances(instances); + + job->AddParentResource(*it); } - job->AddTrailingStep(); - - OrthancRestApi::GetApi(call).SubmitCommandsJob + OrthancRestApi::GetApi(call).SubmitThreadedInstancesJob (call, job.release(), true /* synchronous by default */, body); } diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/OrthancRestApi/OrthancRestApi.cpp --- a/OrthancServer/Sources/OrthancRestApi/OrthancRestApi.cpp Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestApi.cpp Thu Jan 05 17:24:43 2023 +0100 @@ -434,7 +434,32 @@ SubmitGenericJob(call, raii.release(), isDefaultSynchronous, body); } - + void OrthancRestApi::SubmitThreadedInstancesJob(RestApiPostCall& call, + ThreadedSetOfInstancesJob* job, + bool isDefaultSynchronous, + const Json::Value& body) const + { + std::unique_ptr raii(job); + + if (body.type() != Json::objectValue) + { + throw OrthancException(ErrorCode_BadFileFormat); + } + + job->SetDescription("REST API"); + + if (body.isMember(KEY_PERMISSIVE)) + { + job->SetPermissive(SerializationToolbox::ReadBoolean(body, KEY_PERMISSIVE)); + } + else + { + job->SetPermissive(false); + } + + SubmitGenericJob(call, raii.release(), isDefaultSynchronous, body); + } + void OrthancRestApi::DocumentSubmitGenericJob(RestApiPostCall& call) { call.GetDocumentation() diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/OrthancRestApi/OrthancRestApi.h --- a/OrthancServer/Sources/OrthancRestApi/OrthancRestApi.h Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestApi.h Thu Jan 05 17:24:43 2023 +0100 @@ -26,6 +26,7 @@ #include "../../../OrthancFramework/Sources/JobsEngine/SetOfCommandsJob.h" #include "../../../OrthancFramework/Sources/MetricsRegistry.h" #include "../../../OrthancFramework/Sources/RestApi/RestApi.h" +#include "../ServerJobs/ThreadedSetOfInstancesJob.h" #include "../ServerEnumerations.h" #include @@ -130,6 +131,12 @@ bool isDefaultSynchronous, const Json::Value& body) const; + void SubmitThreadedInstancesJob(RestApiPostCall& call, + ThreadedSetOfInstancesJob* job, + bool isDefaultSynchronous, + const Json::Value& body) const; + + static void DocumentSubmitGenericJob(RestApiPostCall& call); static void DocumentSubmitCommandsJob(RestApiPostCall& call); diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp --- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp Thu Jan 05 17:24:43 2023 +0100 @@ -293,14 +293,18 @@ **/ // assert(modifiedInstance == modifiedHasher.HashInstance()); - output_->Update(modifiedHasher); + { + boost::recursive_mutex::scoped_lock lock(outputMutex_); + + output_->Update(modifiedHasher); + } return true; } - ResourceModificationJob::ResourceModificationJob(ServerContext& context) : - CleaningInstancesJob(context, true /* by default, keep source */), + ResourceModificationJob::ResourceModificationJob(ServerContext& context, unsigned int workersCount) : + ThreadedSetOfInstancesJob(context, false /* no post processing step */, true /* by default, keep source */, workersCount), isAnonymization_(false), transcode_(false), transferSyntax_(DicomTransferSyntax_LittleEndianExplicit) // dummy initialization @@ -445,6 +449,8 @@ } else { + boost::recursive_mutex::scoped_lock lock(outputMutex_); + assert(output_.get() != NULL); return output_->IsSingleResource(); } @@ -453,6 +459,8 @@ ResourceType ResourceModificationJob::GetOutputLevel() const { + boost::recursive_mutex::scoped_lock lock(outputMutex_); + if (IsSingleResourceModification()) { assert(modification_.get() != NULL && @@ -469,7 +477,9 @@ void ResourceModificationJob::GetPublicContent(Json::Value& value) { - CleaningInstancesJob::GetPublicContent(value); + boost::recursive_mutex::scoped_lock lock(outputMutex_); + + ThreadedSetOfInstancesJob::GetPublicContent(value); value["IsAnonymization"] = isAnonymization_; @@ -495,7 +505,7 @@ ResourceModificationJob::ResourceModificationJob(ServerContext& context, const Json::Value& serialized) : - CleaningInstancesJob(context, serialized, true /* by default, keep source */), + ThreadedSetOfInstancesJob(context, serialized, false /* no post processing step */, true /* by default, keep source */), transferSyntax_(DicomTransferSyntax_LittleEndianExplicit) // dummy initialization { assert(serialized.type() == Json::objectValue); @@ -565,7 +575,7 @@ { throw OrthancException(ErrorCode_BadSequenceOfCalls); } - else if (!CleaningInstancesJob::Serialize(value)) + else if (!ThreadedSetOfInstancesJob::Serialize(value)) { return false; } diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/ServerJobs/ResourceModificationJob.h --- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.h Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.h Thu Jan 05 17:24:43 2023 +0100 @@ -23,14 +23,16 @@ #pragma once #include "../../../OrthancFramework/Sources/DicomParsing/DicomModification.h" +#include "../../../OrthancFramework/Sources/MultiThreading/RunnableWorkersPool.h" #include "../DicomInstanceOrigin.h" -#include "CleaningInstancesJob.h" +#include "ThreadedSetOfInstancesJob.h" +#include namespace Orthanc { class ServerContext; - class ResourceModificationJob : public CleaningInstancesJob + class ResourceModificationJob : public ThreadedSetOfInstancesJob { private: class IOutput : public boost::noncopyable @@ -49,6 +51,8 @@ class SingleOutput; class MultipleOutputs; + + mutable boost::recursive_mutex outputMutex_; std::unique_ptr modification_; boost::shared_ptr output_; @@ -58,10 +62,10 @@ DicomTransferSyntax transferSyntax_; protected: - virtual bool HandleInstance(const std::string& instance) ORTHANC_OVERRIDE; + virtual bool HandleInstance(const std::string& instance) ORTHANC_OVERRIDE; // from ThreadedSetOfInstancesJob public: - explicit ResourceModificationJob(ServerContext& context); + explicit ResourceModificationJob(ServerContext& context, unsigned int workersCount); ResourceModificationJob(ServerContext& context, const Json::Value& serialized); @@ -109,10 +113,6 @@ // Only possible if "IsSingleResourceModification()" ResourceType GetOutputLevel() const; - virtual void Stop(JobStopReason reason) ORTHANC_OVERRIDE - { - } - virtual void GetJobType(std::string& target) ORTHANC_OVERRIDE { target = "ResourceModification"; diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp Thu Jan 05 17:24:43 2023 +0100 @@ -0,0 +1,529 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2022 Osimis S.A., Belgium + * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * . + **/ + + +#include "ThreadedSetOfInstancesJob.h" + +#include "../../../OrthancFramework/Sources/Logging.h" +#include "../../../OrthancFramework/Sources/OrthancException.h" +#include "../../../OrthancFramework/Sources/SerializationToolbox.h" +#include "../ServerContext.h" + +#include +#include + +namespace Orthanc +{ + static const char* EXIT_WORKER_MESSAGE = "exit"; + + ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, + bool hasPostProcessing, + bool keepSource, + size_t workersCount) : + processedInstancesCount_(0), + hasPostProcessing_(hasPostProcessing), + started_(false), + stopRequested_(false), + permissive_(false), + currentStep_(ThreadedJobStep_BeforeStart), + workersCount_(workersCount), + context_(context), + keepSource_(keepSource) + { + } + + + ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob() + { + // no need to lock mutex here since we access variables used only by the "master" thread + + StopWorkers(); + } + + + void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount) + { + // no need to lock mutex here since we access variables used only by the "master" thread + + for (size_t i = 0; i < workersCount; i++) + { + instancesWorkers_.push_back(boost::shared_ptr(new boost::thread(InstanceWorkerThread, this))); + } + } + + + void ThreadedSetOfInstancesJob::WaitWorkersComplete() + { + // no need to lock mutex here since we access variables used only by the "master" thread + + // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue + for (size_t i = 0; i < instancesWorkers_.size(); i++) + { + instancesToProcess_.Enqueue(new SingleValueObject(EXIT_WORKER_MESSAGE)); + } + + for (size_t i = 0; i < instancesWorkers_.size(); i++) + { + if (instancesWorkers_[i]->joinable()) + { + instancesWorkers_[i]->join(); + } + } + + instancesWorkers_.clear(); + } + + + void ThreadedSetOfInstancesJob::StopWorkers() + { + // no need to lock mutex here since we access variables used or set only by the "master" thread + + stopRequested_ = true; + WaitWorkersComplete(); + } + + + void ThreadedSetOfInstancesJob::Stop(JobStopReason reason) + { + // no need to lock mutex here since we access variables used or set only by the "master" thread + + if (reason == JobStopReason_Canceled || + reason == JobStopReason_Failure || + reason == JobStopReason_Retry) + { + // deallocate resources + StopWorkers(); + } + else if (reason == JobStopReason_Paused) + { + // keep resources allocated. + // note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state + } + } + + + JobStepResult ThreadedSetOfInstancesJob::Step(const std::string& jobId) + { + // no need to lock mutex here since we access variables used or set only by the "master" thread + + if (!started_) + { + throw OrthancException(ErrorCode_InternalError); + } + + if (GetInstancesCount() == 0) + { + // No instances to handle: We're done + return JobStepResult::Success(); + } + + try + { + if (currentStep_ == ThreadedJobStep_BeforeStart) + { + currentStep_ = ThreadedJobStep_ProcessingInstances; + } + + if (currentStep_ == ThreadedJobStep_ProcessingInstances) + { + // wait until all instances are processed by the workers + WaitWorkersComplete(); + + currentStep_ = ThreadedJobStep_PostProcessingInstances; + return JobStepResult::Continue(); + } + else if (currentStep_ == ThreadedJobStep_PostProcessingInstances) + { + if (HasPostProcessingStep()) + { + PostProcessInstances(); + } + + currentStep_ = ThreadedJobStep_Cleanup; + return JobStepResult::Continue(); + } + else if (currentStep_ == ThreadedJobStep_Cleanup) + { + // clean after the post processing step + if (HasCleanupStep()) + { + for (std::set::const_iterator it = instances_.begin(); it != instances_.end(); ++it) + { + Json::Value tmp; + context_.DeleteResource(tmp, *it, ResourceType_Instance); + } + } + + currentStep_ = ThreadedJobStep_Done; + return JobStepResult::Success(); + } + } + catch (OrthancException& e) + { + if (permissive_) + { + LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What(); + } + else + { + return JobStepResult::Failure(e); + } + } + + return JobStepResult::Continue(); + } + + + bool ThreadedSetOfInstancesJob::HasPostProcessingStep() const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return hasPostProcessing_; + } + + + bool ThreadedSetOfInstancesJob::HasCleanupStep() const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return !keepSource_; + } + + + void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that) + { + while (true) + { + std::unique_ptr > instanceId(dynamic_cast*>(that->instancesToProcess_.Dequeue(0))); + if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean + || instanceId->GetValue() == EXIT_WORKER_MESSAGE) + { + return; + } + + bool processed = that->HandleInstance(instanceId->GetValue()); + + { + boost::recursive_mutex::scoped_lock lock(that->mutex_); + + that->processedInstancesCount_++; + if (!processed) + { + that->failedInstances_.insert(instanceId->GetValue()); + } + } + } + } + + + bool ThreadedSetOfInstancesJob::GetOutput(std::string &output, + MimeType &mime, + std::string& filename, + const std::string &key) + { + return false; + } + + + size_t ThreadedSetOfInstancesJob::GetInstancesCount() const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return instances_.size(); + } + + + void ThreadedSetOfInstancesJob::GetFailedInstances(std::set& target) const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + target = failedInstances_; + } + + + void ThreadedSetOfInstancesJob::GetInstances(std::set& target) const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + target = instances_; + } + + + bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return failedInstances_.find(instance) != failedInstances_.end(); + } + + + void ThreadedSetOfInstancesJob::Start() + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + started_ = true; + + // create the workers and enqueue all instances + for (std::set::const_iterator it = instances_.begin(); it != instances_.end(); ++it) + { + instancesToProcess_.Enqueue(new SingleValueObject(*it)); + } + + InitWorkers(workersCount_); + } + + + void ThreadedSetOfInstancesJob::Reset() + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + if (started_) + { + currentStep_ = ThreadedJobStep_BeforeStart; + stopRequested_ = false; + processedInstancesCount_ = 0; + failedInstances_.clear(); + instancesToProcess_.Clear(); + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + + static const char* KEY_FAILED_INSTANCES = "FailedInstances"; + static const char* KEY_PARENT_RESOURCES = "ParentResources"; + static const char* KEY_DESCRIPTION = "Description"; + static const char* KEY_PERMISSIVE = "Permissive"; + static const char* KEY_CURRENT_STEP = "CurrentStep"; + static const char* KEY_TYPE = "Type"; + static const char* KEY_INSTANCES = "Instances"; + static const char* KEY_INSTANCES_COUNT = "InstancesCount"; + static const char* KEY_FAILED_INSTANCES_COUNT = "FailedInstancesCount"; + static const char* KEY_KEEP_SOURCE = "KeepSource"; + static const char* KEY_WORKERS_COUNT = "WorkersCount"; + + + void ThreadedSetOfInstancesJob::GetPublicContent(Json::Value& target) + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + target[KEY_DESCRIPTION] = GetDescription(); + target[KEY_INSTANCES_COUNT] = static_cast(GetInstancesCount()); + target[KEY_FAILED_INSTANCES_COUNT] = static_cast(failedInstances_.size()); + + if (!parentResources_.empty()) + { + SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES); + } + } + + + bool ThreadedSetOfInstancesJob::Serialize(Json::Value& target) + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + target = Json::objectValue; + + std::string type; + GetJobType(type); + target[KEY_TYPE] = type; + + target[KEY_PERMISSIVE] = permissive_; + target[KEY_CURRENT_STEP] = static_cast(currentStep_); + target[KEY_DESCRIPTION] = description_; + target[KEY_KEEP_SOURCE] = keepSource_; + target[KEY_WORKERS_COUNT] = static_cast(workersCount_); + + SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES); + SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES); + SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES); + + return true; + } + + + ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, + const Json::Value& source, + bool hasPostProcessing, + bool defaultKeepSource) : + processedInstancesCount_(0), + hasPostProcessing_(hasPostProcessing), + started_(false), + stopRequested_(false), + permissive_(false), + currentStep_(ThreadedJobStep_BeforeStart), + workersCount_(1), + context_(context), + keepSource_(defaultKeepSource) + { + SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES); + + if (source.isMember(KEY_PARENT_RESOURCES)) + { + // Backward compatibility with Orthanc <= 1.5.6 + SerializationToolbox::ReadSetOfStrings(parentResources_, source, KEY_PARENT_RESOURCES); + } + + if (source.isMember(KEY_KEEP_SOURCE)) + { + keepSource_ = SerializationToolbox::ReadBoolean(source, KEY_KEEP_SOURCE); + } + + if (source.isMember(KEY_WORKERS_COUNT)) + { + workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT); + } + + if (source.isMember(KEY_INSTANCES)) + { + SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); + } + } + + + void ThreadedSetOfInstancesJob::SetKeepSource(bool keep) + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + keepSource_ = keep; + } + + + float ThreadedSetOfInstancesJob::GetProgress() + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + if (GetInstancesCount() == 0) + { + return 1; + } + else + { + size_t totalProgress = GetInstancesCount(); + size_t currentProgress = processedInstancesCount_; + + if (HasPostProcessingStep()) + { + ++totalProgress; + if (currentStep_ > ThreadedJobStep_PostProcessingInstances) + { + ++currentProgress; + } + } + + if (HasCleanupStep()) + { + ++totalProgress; + if (currentStep_ > ThreadedJobStep_Cleanup) + { + ++currentProgress; + } + } + + return (static_cast(currentProgress) / + static_cast(totalProgress)); + } + } + + + ThreadedSetOfInstancesJob::ThreadedJobStep ThreadedSetOfInstancesJob::GetCurrentStep() const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return currentStep_; + } + + + void ThreadedSetOfInstancesJob::SetDescription(const std::string &description) + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + description_ = description; + } + + + const std::string& ThreadedSetOfInstancesJob::GetDescription() const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return description_; + } + + + bool ThreadedSetOfInstancesJob::IsPermissive() const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return permissive_; + } + + + void ThreadedSetOfInstancesJob::SetPermissive(bool permissive) + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + if (started_) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + permissive_ = permissive; + } + } + + + bool ThreadedSetOfInstancesJob::IsStarted() const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return started_; + } + + + void ThreadedSetOfInstancesJob::AddInstances(const std::list& instances) + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + for (std::list::const_iterator + it = instances.begin(); it != instances.end(); ++it) + { + instances_.insert(*it); + } + } + + + void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource) + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + parentResources_.insert(resource); + } + +} \ No newline at end of file diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h Thu Jan 05 17:24:43 2023 +0100 @@ -0,0 +1,158 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2022 Osimis S.A., Belgium + * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * . + **/ + + +#pragma once + +#include "../../../OrthancFramework/Sources/Compatibility.h" // For ORTHANC_OVERRIDE +#include "../../../OrthancFramework/Sources/JobsEngine/IJob.h" +#include "../../../OrthancFramework/Sources/MultiThreading/SharedMessageQueue.h" + +#include +#include +#include + +namespace Orthanc +{ + class ServerContext; + + // This class is a threaded version of SetOfInstancesJob merged with CleaningInstancesJob + class ORTHANC_PUBLIC ThreadedSetOfInstancesJob : public IJob + { + public: + enum ThreadedJobStep // cannot use "Step" since there is a method with this name ! + { + ThreadedJobStep_BeforeStart, + ThreadedJobStep_ProcessingInstances, + ThreadedJobStep_PostProcessingInstances, + ThreadedJobStep_Cleanup, + ThreadedJobStep_Done + }; + + private: + std::set failedInstances_; + std::set parentResources_; + + size_t processedInstancesCount_; + SharedMessageQueue instancesToProcess_; + std::vector > instancesWorkers_; + std::set instances_; + + bool hasPostProcessing_; // final step before "KeepSource" cleanup + bool started_; + bool stopRequested_; + bool permissive_; + ThreadedJobStep currentStep_; + std::string description_; + size_t workersCount_; + + ServerContext& context_; + bool keepSource_; + protected: + mutable boost::recursive_mutex mutex_; + + public: + ThreadedSetOfInstancesJob(ServerContext& context, + bool hasTrailingStep, + bool keepSource, + size_t workersCount); + + explicit ThreadedSetOfInstancesJob(ServerContext& context, + const Json::Value& source, + bool hasTrailingStep, + bool defaultKeepSource); + + virtual ~ThreadedSetOfInstancesJob(); + + protected: + virtual bool HandleInstance(const std::string& instance) = 0; + + virtual void PostProcessInstances() {} + + void InitWorkers(size_t workersCount); + + void StopWorkers(); + + void WaitWorkersComplete(); + + static void InstanceWorkerThread(ThreadedSetOfInstancesJob* that); + + const std::string& GetInstance(size_t index) const; + + bool HasPostProcessingStep() const; + + bool HasCleanupStep() const; + + public: + + ThreadedJobStep GetCurrentStep() const; + + void SetDescription(const std::string& description); + + const std::string& GetDescription() const; + + void SetKeepSource(bool keep); + + void GetInstances(std::set& target) const; + + void GetFailedInstances(std::set& target) const; + + size_t GetInstancesCount() const; + + void AddInstances(const std::list& instances); + + void AddParentResource(const std::string &resource); + + bool IsPermissive() const; + + void SetPermissive(bool permissive); + + virtual void Reset() ORTHANC_OVERRIDE; + + virtual void Start() ORTHANC_OVERRIDE; + + virtual void Stop(JobStopReason reason) ORTHANC_OVERRIDE; + + virtual float GetProgress() ORTHANC_OVERRIDE; + + bool IsStarted() const; + + virtual JobStepResult Step(const std::string& jobId) ORTHANC_OVERRIDE; + + virtual void GetPublicContent(Json::Value& value) ORTHANC_OVERRIDE; + + virtual bool Serialize(Json::Value& target) ORTHANC_OVERRIDE; + + virtual bool GetOutput(std::string& output, + MimeType& mime, + std::string& filename, + const std::string& key) ORTHANC_OVERRIDE; + + bool IsFailedInstance(const std::string& instance) const; + + ServerContext& GetContext() const + { + return context_; + } + + }; +} diff -r ede035d48b8e -r f2dcdbe05884 OrthancServer/UnitTestsSources/ServerJobsTests.cpp --- a/OrthancServer/UnitTestsSources/ServerJobsTests.cpp Mon Dec 19 20:00:21 2022 +0100 +++ b/OrthancServer/UnitTestsSources/ServerJobsTests.cpp Thu Jan 05 17:24:43 2023 +0100 @@ -325,6 +325,34 @@ } +static bool CheckIdempotentSetOfInstances(IJobUnserializer& unserializer, + ThreadedSetOfInstancesJob& job) +{ + Json::Value a = 42; + + if (!job.Serialize(a)) + { + return false; + } + else + { + std::unique_ptr unserialized + (dynamic_cast(unserializer.UnserializeJob(a))); + + Json::Value b = 43; + if (unserialized->Serialize(b)) + { + return (CheckSameJson(a, b) && + job.GetCurrentStep() == unserialized->GetCurrentStep() && + job.GetInstancesCount() == unserialized->GetInstancesCount() ); + } + else + { + return false; + } + } +} + static bool CheckIdempotentSerialization(IJobUnserializer& unserializer, IJobOperation& operation) { @@ -813,7 +841,7 @@ modification->SetupAnonymization(DicomVersion_2008); modification->SetLevel(ResourceType_Series); - ResourceModificationJob job(GetContext()); + ResourceModificationJob job(GetContext(), 1); ASSERT_THROW(job.IsSingleResourceModification(), OrthancException); job.SetSingleResourceModification(modification.release(), ResourceType_Patient, true); job.SetOrigin(DicomInstanceOrigin::FromLua()); @@ -821,7 +849,6 @@ ASSERT_TRUE(job.IsSingleResourceModification()); ASSERT_EQ(ResourceType_Patient, job.GetOutputLevel()); - job.AddTrailingStep(); // Necessary since 1.7.0 ASSERT_TRUE(CheckIdempotentSetOfInstances(unserializer, job)); ASSERT_TRUE(job.Serialize(s)); } @@ -858,12 +885,11 @@ } { - ResourceModificationJob job(GetContext()); + ResourceModificationJob job(GetContext(), 2); ASSERT_THROW(job.SetTranscode("nope"), OrthancException); job.SetTranscode(DicomTransferSyntax_JPEGProcess1); job.SetSingleResourceModification(new DicomModification, ResourceType_Study, false); - job.AddTrailingStep(); // Necessary since 1.7.0 ASSERT_TRUE(CheckIdempotentSetOfInstances(unserializer, job)); ASSERT_TRUE(job.Serialize(s)); } @@ -883,11 +909,12 @@ } { - ResourceModificationJob job(GetContext()); + ResourceModificationJob job(GetContext(), 2); job.SetMultipleResourcesModification(new DicomModification, true); - job.AddInstance("toto"); - job.AddInstance("tutu"); - job.AddTrailingStep(); // Necessary since 1.7.0 + std::list instances; + instances.push_back("toto"); + instances.push_back("tutu"); + job.AddInstances(instances); ASSERT_TRUE(CheckIdempotentSetOfInstances(unserializer, job)); ASSERT_TRUE(job.Serialize(s)); } @@ -899,10 +926,7 @@ ResourceModificationJob& tmp = dynamic_cast(*job); std::set instances; - for (size_t i = 0; i < tmp.GetInstancesCount(); i++) - { - instances.insert(tmp.GetInstance(i)); - } + tmp.GetInstances(instances); ASSERT_EQ(2u, instances.size()); ASSERT_TRUE(instances.find("toto") != instances.end());