changeset 5130:f2dcdbe05884

ResourceModification jobs can now use multiple threads
author Alain Mazy <am@osimis.io>
date Thu, 05 Jan 2023 17:24:43 +0100
parents ede035d48b8e
children e107ff622e6d
files NEWS OrthancFramework/Sources/DicomParsing/DicomModification.cpp OrthancFramework/Sources/DicomParsing/DicomModification.h OrthancServer/CMakeLists.txt OrthancServer/Resources/Configuration.json OrthancServer/Sources/OrthancConfiguration.cpp OrthancServer/Sources/OrthancConfiguration.h OrthancServer/Sources/OrthancInitialization.cpp OrthancServer/Sources/OrthancRestApi/OrthancRestAnonymizeModify.cpp OrthancServer/Sources/OrthancRestApi/OrthancRestApi.cpp OrthancServer/Sources/OrthancRestApi/OrthancRestApi.h OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp OrthancServer/Sources/ServerJobs/ResourceModificationJob.h OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h OrthancServer/UnitTestsSources/ServerJobsTests.cpp
diffstat 16 files changed, 888 insertions(+), 36 deletions(-) [+]
line wrap: on
line diff
--- a/NEWS	Mon Dec 19 20:00:21 2022 +0100
+++ b/NEWS	Thu Jan 05 17:24:43 2023 +0100
@@ -10,6 +10,8 @@
 * Made the HTTP Client errors more verbose by including the url in the logs.
 * Optimization: now using multiple threads to transcode files for asynchronous download of studies archive.
 * New configuration "KeepAliveTimeout" with a default value of 1 second.
+* ResourceModification jobs (/modify + /anonymize) can now use multiple threads to speed up processing
+  - New configuration "JobsEngineThreadsCount.ResourceModification" to configure the number of threads.
 
 REST API
 --------
--- a/OrthancFramework/Sources/DicomParsing/DicomModification.cpp	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancFramework/Sources/DicomParsing/DicomModification.cpp	Thu Jan 05 17:24:43 2023 +0100
@@ -439,6 +439,8 @@
                                                         const std::string& mapped,
                                                         ResourceType level)
   {
+    boost::recursive_mutex::scoped_lock lock(uidMapMutex_);
+
     UidMap::const_iterator previous = uidMap_.find(std::make_pair(level, original));
 
     if (previous == uidMap_.end())
@@ -450,6 +452,8 @@
   std::string DicomModification::MapDicomIdentifier(const std::string& original,
                                                     ResourceType level)
   {
+    boost::recursive_mutex::scoped_lock lock(uidMapMutex_);
+
     const std::string stripped = Toolbox::StripSpaces(original);
     
     std::string mapped;
@@ -684,6 +688,8 @@
 
   void DicomModification::SetLevel(ResourceType level)
   {
+    boost::recursive_mutex::scoped_lock lock(uidMapMutex_);
+
     uidMap_.clear();
     level_ = level;
 
@@ -839,6 +845,8 @@
 
   void DicomModification::SetupAnonymization(DicomVersion version)
   {
+    boost::recursive_mutex::scoped_lock lock(uidMapMutex_);
+
     isAnonymization_ = true;
     
     removals_.clear();
@@ -1423,6 +1431,8 @@
   
   void DicomModification::Serialize(Json::Value& value) const
   {
+    boost::recursive_mutex::scoped_lock lock(uidMapMutex_);
+
     if (identifierGenerator_ != NULL)
     {
       throw OrthancException(ErrorCode_InternalError,
@@ -1548,6 +1558,8 @@
                                             const Json::Value& serialized,
                                             const char* field)
   {
+    boost::recursive_mutex::scoped_lock lock(uidMapMutex_);
+
     if (!serialized.isMember(field) ||
         serialized[field].type() != Json::objectValue)
     {
--- a/OrthancFramework/Sources/DicomParsing/DicomModification.h	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancFramework/Sources/DicomParsing/DicomModification.h	Thu Jan 05 17:24:43 2023 +0100
@@ -24,6 +24,7 @@
 #pragma once
 
 #include "ParsedDicomFile.h"
+#include <boost/thread/recursive_mutex.hpp>
 
 #include <list>
 
@@ -126,6 +127,7 @@
 
     typedef std::map< std::pair<ResourceType, std::string>, std::string>  UidMap;
     
+    mutable boost::recursive_mutex      uidMapMutex_;
     SetOfTags removals_;
     SetOfTags clearings_;
     Replacements replacements_;
--- a/OrthancServer/CMakeLists.txt	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/CMakeLists.txt	Thu Jan 05 17:24:43 2023 +0100
@@ -136,6 +136,7 @@
   ${CMAKE_SOURCE_DIR}/Sources/ServerJobs/ResourceModificationJob.cpp
   ${CMAKE_SOURCE_DIR}/Sources/ServerJobs/SplitStudyJob.cpp
   ${CMAKE_SOURCE_DIR}/Sources/ServerJobs/StorageCommitmentScpJob.cpp
+  ${CMAKE_SOURCE_DIR}/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp  
   ${CMAKE_SOURCE_DIR}/Sources/ServerToolbox.cpp
   ${CMAKE_SOURCE_DIR}/Sources/SliceOrdering.cpp
   ${CMAKE_SOURCE_DIR}/Sources/StorageCommitmentReports.cpp
--- a/OrthancServer/Resources/Configuration.json	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/Resources/Configuration.json	Thu Jan 05 17:24:43 2023 +0100
@@ -65,7 +65,7 @@
   // The period (in seconds) between 2 calls of the "OnHeartBeat"
   // lua callback.  O means the heart beat is disabled.
   // TODO: text below for Orthanc book:
-  // Note: that the period is not actually the delay between
+  // Note: that the period is actually not the delay between
   // the end of an execution and the triggering of the next one.
   // Since there is only one lua context, if other lua code is being
   // executed, the heart beat might be delayed even more.
@@ -86,6 +86,13 @@
   // this value to "1".
   "ConcurrentJobs" : 2,
 
+  // Defines the number of threads that are used to execute each type of
+  // jobs (for the jobs that can be parallelized).
+  // A value of "0" indicates to use all the available CPU logical cores.
+  // (new in Orthanc 1.11.3)
+  "JobsEngineThreadsCount" : {
+    "ResourceModification": 1     // for /anonymize, /modify
+  },
 
   /**
    * Configuration of the HTTP server
@@ -884,7 +891,8 @@
   "ZipLoaderThreads": 0,
 
   // Extra Main Dicom tags that are stored in DB together with all default
-  // Main Dicom tags that are already stored (TODO: see book new page). 
+  // Main Dicom tags that are already stored.
+  // see https://book.orthanc-server.com/faq/main-dicom-tags.html 
   // (new in Orthanc 1.11.0)
   // Sequences tags are not supported.
   /**
@@ -909,7 +917,7 @@
 
   // Enables/disables warnings in the logs.
   // "true" enables a warning.  All warnings are enabled by default
-  // TODO: see book new page
+  // see https://book.orthanc-server.com/faq/main-dicom-tags.html#warnings
   // (new in Orthanc 1.11.0)
   "Warnings" : {
     // A "RequestedTags" has been read from storage which is slower than
@@ -922,7 +930,7 @@
     // saved with another "ExtraMainDicomTags" configuration which means that
     // your response might be incomplete/inconsistent.
     // You should call patients|studies|series|instances/../reconstruct to rebuild
-    // the DB.  TODO: also check for "rebuild DB" plugin
+    // the DB.  You may also check for the "Housekeeper" plugin
     "W002_InconsistentDicomTagsInDb": true
   }
 
--- a/OrthancServer/Sources/OrthancConfiguration.cpp	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/Sources/OrthancConfiguration.cpp	Thu Jan 05 17:24:43 2023 +0100
@@ -44,6 +44,7 @@
 static const char* const TEMPORARY_DIRECTORY = "TemporaryDirectory";
 static const char* const DATABASE_SERVER_IDENTIFIER = "DatabaseServerIdentifier";
 static const char* const WARNINGS = "Warnings";
+static const char* const JOBS_ENGINE_THREADS_COUNT = "JobsEngineThreadsCount";
 
 namespace Orthanc
 {
@@ -273,6 +274,55 @@
     }
   }
 
+  void OrthancConfiguration::LoadJobsEngineThreadsCount()
+  {
+    // default values
+    jobsEngineThreadsCount_["ResourceModification"] = 1;
+
+    if (json_.isMember(JOBS_ENGINE_THREADS_COUNT))
+    {
+      const Json::Value& source = json_[JOBS_ENGINE_THREADS_COUNT];
+      if (source.type() != Json::objectValue)
+      {
+        throw OrthancException(ErrorCode_BadFileFormat,
+                               "Bad format of the \"" + std::string(JOBS_ENGINE_THREADS_COUNT) +
+                               "\" configuration section");
+      }
+
+      Json::Value::Members members = source.getMemberNames();
+
+      for (size_t i = 0; i < members.size(); i++)
+      {
+        const std::string& name = members[i];
+        if (!source[name].isUInt())
+        {
+          throw OrthancException(ErrorCode_BadFileFormat,
+                                 "Bad format for \"" + std::string(JOBS_ENGINE_THREADS_COUNT) + "." + name + 
+                                 "\".  It should be an unsigned integer");
+        }
+        jobsEngineThreadsCount_[name] = source[name].asUInt();
+      }      
+    }
+  }
+
+  unsigned int OrthancConfiguration::GetJobsEngineWorkersThread(const std::string& jobType) const
+  {
+    unsigned int workersThread = 1;
+    
+    const JobsEngineThreadsCount::const_iterator it = jobsEngineThreadsCount_.find(jobType);
+    if (it != jobsEngineThreadsCount_.end())
+    {
+      workersThread = it->second;
+    }
+
+    if (workersThread == 0)
+    {
+      workersThread = SystemToolbox::GetHardwareConcurrency();
+    }
+
+    return workersThread;
+  }
+
   void OrthancConfiguration::LoadPeers()
   {
     if (GetBooleanParameter(ORTHANC_PEERS_IN_DB, false))
--- a/OrthancServer/Sources/OrthancConfiguration.h	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/Sources/OrthancConfiguration.h	Thu Jan 05 17:24:43 2023 +0100
@@ -51,6 +51,7 @@
   private:
     typedef std::map<std::string, RemoteModalityParameters>   Modalities;
     typedef std::map<std::string, WebServiceParameters>       Peers;
+    typedef std::map<std::string, unsigned int>               JobsEngineThreadsCount;
 
     boost::shared_mutex      mutex_;
     Json::Value              json_;
@@ -60,6 +61,7 @@
     const char*              configurationFileArg_;
     Modalities               modalities_;
     Peers                    peers_;
+    JobsEngineThreadsCount   jobsEngineThreadsCount_;
     ServerIndex*             serverIndex_;
     std::set<Warnings>       disabledWarnings_;
 
@@ -160,6 +162,10 @@
 
     void LoadWarnings();
 
+    void LoadJobsEngineThreadsCount();
+
+    unsigned int GetJobsEngineWorkersThread(const std::string& jobType) const;
+
     void RegisterFont(ServerResources::FileResourceId resource);
 
     bool LookupStringParameter(std::string& target,
--- a/OrthancServer/Sources/OrthancInitialization.cpp	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/Sources/OrthancInitialization.cpp	Thu Jan 05 17:24:43 2023 +0100
@@ -354,6 +354,7 @@
     LoadCustomDictionary(lock.GetJson());
 
     lock.GetConfiguration().LoadWarnings();
+    lock.GetConfiguration().LoadJobsEngineThreadsCount();
 
     LoadMainDicomTags(lock.GetJson());  // New in Orthanc 1.11.0
 
--- a/OrthancServer/Sources/OrthancRestApi/OrthancRestAnonymizeModify.cpp	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestAnonymizeModify.cpp	Thu Jan 05 17:24:43 2023 +0100
@@ -333,6 +333,15 @@
     }
   }
 
+  static void SetKeepSource(ThreadedSetOfInstancesJob& job,
+                            const Json::Value& body)
+  {
+    if (body.isMember(KEEP_SOURCE))
+    {
+      job.SetKeepSource(SerializationToolbox::ReadBoolean(body, KEEP_SOURCE));
+    }
+  }
+
 
   static void SubmitModificationJob(std::unique_ptr<DicomModification>& modification,
                                     bool isAnonymization,
@@ -343,8 +352,14 @@
                                     const std::set<std::string>& resources)
   {
     ServerContext& context = OrthancRestApi::GetContext(call);
+    unsigned int workersCount = 0;
 
-    std::unique_ptr<ResourceModificationJob> job(new ResourceModificationJob(context));
+    {
+      OrthancConfiguration::ReaderLock lock;
+      workersCount = lock.GetConfiguration().GetJobsEngineWorkersThread("ResourceModification");
+    }
+
+    std::unique_ptr<ResourceModificationJob> job(new ResourceModificationJob(context, workersCount));
 
     if (isSingleResource)  // This notably configures the output format
     {
@@ -366,12 +381,14 @@
     for (std::set<std::string>::const_iterator
            it = resources.begin(); it != resources.end(); ++it)
     {
-      context.AddChildInstances(*job, *it);
+      std::list<std::string> instances;
+      context.GetIndex().GetChildInstances(instances, *it);
+      job->AddInstances(instances);
+
+      job->AddParentResource(*it);
     }
     
-    job->AddTrailingStep();
-
-    OrthancRestApi::GetApi(call).SubmitCommandsJob
+    OrthancRestApi::GetApi(call).SubmitThreadedInstancesJob
       (call, job.release(), true /* synchronous by default */, body);
   }
 
--- a/OrthancServer/Sources/OrthancRestApi/OrthancRestApi.cpp	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestApi.cpp	Thu Jan 05 17:24:43 2023 +0100
@@ -434,7 +434,32 @@
     SubmitGenericJob(call, raii.release(), isDefaultSynchronous, body);
   }
 
-  
+  void OrthancRestApi::SubmitThreadedInstancesJob(RestApiPostCall& call,
+                                                  ThreadedSetOfInstancesJob* job,
+                                                  bool isDefaultSynchronous,
+                                                  const Json::Value& body) const
+  {
+    std::unique_ptr<ThreadedSetOfInstancesJob> raii(job);
+    
+    if (body.type() != Json::objectValue)
+    {
+      throw OrthancException(ErrorCode_BadFileFormat);
+    }
+
+    job->SetDescription("REST API");
+    
+    if (body.isMember(KEY_PERMISSIVE))
+    {
+      job->SetPermissive(SerializationToolbox::ReadBoolean(body, KEY_PERMISSIVE));
+    }
+    else
+    {
+      job->SetPermissive(false);
+    }
+
+    SubmitGenericJob(call, raii.release(), isDefaultSynchronous, body);
+  }  
+
   void OrthancRestApi::DocumentSubmitGenericJob(RestApiPostCall& call)
   {
     call.GetDocumentation()
--- a/OrthancServer/Sources/OrthancRestApi/OrthancRestApi.h	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestApi.h	Thu Jan 05 17:24:43 2023 +0100
@@ -26,6 +26,7 @@
 #include "../../../OrthancFramework/Sources/JobsEngine/SetOfCommandsJob.h"
 #include "../../../OrthancFramework/Sources/MetricsRegistry.h"
 #include "../../../OrthancFramework/Sources/RestApi/RestApi.h"
+#include "../ServerJobs/ThreadedSetOfInstancesJob.h"
 #include "../ServerEnumerations.h"
 
 #include <set>
@@ -130,6 +131,12 @@
                            bool isDefaultSynchronous,
                            const Json::Value& body) const;
 
+    void SubmitThreadedInstancesJob(RestApiPostCall& call,
+                                    ThreadedSetOfInstancesJob* job,
+                                    bool isDefaultSynchronous,
+                                    const Json::Value& body) const;
+
+
     static void DocumentSubmitGenericJob(RestApiPostCall& call);
 
     static void DocumentSubmitCommandsJob(RestApiPostCall& call);
--- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp	Thu Jan 05 17:24:43 2023 +0100
@@ -293,14 +293,18 @@
      **/
     // assert(modifiedInstance == modifiedHasher.HashInstance());
 
-    output_->Update(modifiedHasher);
+    {
+      boost::recursive_mutex::scoped_lock lock(outputMutex_);
+
+      output_->Update(modifiedHasher);
+    }
 
     return true;
   }
 
 
-  ResourceModificationJob::ResourceModificationJob(ServerContext& context) :
-    CleaningInstancesJob(context, true /* by default, keep source */),
+  ResourceModificationJob::ResourceModificationJob(ServerContext& context, unsigned int workersCount) :
+    ThreadedSetOfInstancesJob(context, false /* no post processing step */, true /* by default, keep source */, workersCount),
     isAnonymization_(false),
     transcode_(false),
     transferSyntax_(DicomTransferSyntax_LittleEndianExplicit)  // dummy initialization
@@ -445,6 +449,8 @@
     }
     else
     {
+      boost::recursive_mutex::scoped_lock lock(outputMutex_);
+
       assert(output_.get() != NULL);
       return output_->IsSingleResource();
     }
@@ -453,6 +459,8 @@
 
   ResourceType ResourceModificationJob::GetOutputLevel() const
   {
+    boost::recursive_mutex::scoped_lock lock(outputMutex_);
+
     if (IsSingleResourceModification())
     {
       assert(modification_.get() != NULL &&
@@ -469,7 +477,9 @@
 
   void ResourceModificationJob::GetPublicContent(Json::Value& value)
   {
-    CleaningInstancesJob::GetPublicContent(value);
+    boost::recursive_mutex::scoped_lock lock(outputMutex_);
+
+    ThreadedSetOfInstancesJob::GetPublicContent(value);
 
     value["IsAnonymization"] = isAnonymization_;
 
@@ -495,7 +505,7 @@
 
   ResourceModificationJob::ResourceModificationJob(ServerContext& context,
                                                    const Json::Value& serialized) :
-    CleaningInstancesJob(context, serialized, true /* by default, keep source */),
+    ThreadedSetOfInstancesJob(context, serialized, false /* no post processing step */, true /* by default, keep source */),
     transferSyntax_(DicomTransferSyntax_LittleEndianExplicit)  // dummy initialization
   {
     assert(serialized.type() == Json::objectValue);
@@ -565,7 +575,7 @@
     {
       throw OrthancException(ErrorCode_BadSequenceOfCalls);
     }
-    else if (!CleaningInstancesJob::Serialize(value))
+    else if (!ThreadedSetOfInstancesJob::Serialize(value))
     {
       return false;
     }
--- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.h	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.h	Thu Jan 05 17:24:43 2023 +0100
@@ -23,14 +23,16 @@
 #pragma once
 
 #include "../../../OrthancFramework/Sources/DicomParsing/DicomModification.h"
+#include "../../../OrthancFramework/Sources/MultiThreading/RunnableWorkersPool.h"
 #include "../DicomInstanceOrigin.h"
-#include "CleaningInstancesJob.h"
+#include "ThreadedSetOfInstancesJob.h"
+#include <boost/thread/recursive_mutex.hpp>
 
 namespace Orthanc
 {
   class ServerContext;
   
-  class ResourceModificationJob : public CleaningInstancesJob
+  class ResourceModificationJob : public ThreadedSetOfInstancesJob
   {
   private:
     class IOutput : public boost::noncopyable
@@ -49,6 +51,8 @@
     
     class SingleOutput;
     class MultipleOutputs;
+
+    mutable boost::recursive_mutex      outputMutex_;
     
     std::unique_ptr<DicomModification>  modification_;
     boost::shared_ptr<IOutput>          output_;
@@ -58,10 +62,10 @@
     DicomTransferSyntax                 transferSyntax_;
 
   protected:
-    virtual bool HandleInstance(const std::string& instance) ORTHANC_OVERRIDE;
+    virtual bool HandleInstance(const std::string& instance) ORTHANC_OVERRIDE; // from ThreadedSetOfInstancesJob
     
   public:
-    explicit ResourceModificationJob(ServerContext& context);
+    explicit ResourceModificationJob(ServerContext& context, unsigned int workersCount);
 
     ResourceModificationJob(ServerContext& context,
                             const Json::Value& serialized);
@@ -109,10 +113,6 @@
     // Only possible if "IsSingleResourceModification()"
     ResourceType GetOutputLevel() const;
 
-    virtual void Stop(JobStopReason reason) ORTHANC_OVERRIDE
-    {
-    }
-
     virtual void GetJobType(std::string& target) ORTHANC_OVERRIDE
     {
       target = "ResourceModification";
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp	Thu Jan 05 17:24:43 2023 +0100
@@ -0,0 +1,529 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
+ * Department, University Hospital of Liege, Belgium
+ * Copyright (C) 2017-2022 Osimis S.A., Belgium
+ * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "ThreadedSetOfInstancesJob.h"
+
+#include "../../../OrthancFramework/Sources/Logging.h"
+#include "../../../OrthancFramework/Sources/OrthancException.h"
+#include "../../../OrthancFramework/Sources/SerializationToolbox.h"
+#include "../ServerContext.h"
+
+#include <boost/lexical_cast.hpp>
+#include <cassert>
+
+namespace Orthanc
+{
+  static const char* EXIT_WORKER_MESSAGE = "exit";
+
+   ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
+                                                        bool hasPostProcessing,
+                                                        bool keepSource,
+                                                        size_t workersCount) :
+    processedInstancesCount_(0),
+    hasPostProcessing_(hasPostProcessing),
+    started_(false),
+    stopRequested_(false),
+    permissive_(false),
+    currentStep_(ThreadedJobStep_BeforeStart),
+    workersCount_(workersCount),
+    context_(context),
+    keepSource_(keepSource)
+  {
+  }
+
+
+  ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob()
+  {
+    // no need to lock mutex here since we access variables used only by the "master" thread
+
+    StopWorkers();
+  }
+
+
+  void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount)
+  {
+    // no need to lock mutex here since we access variables used only by the "master" thread
+
+    for (size_t i = 0; i < workersCount; i++)
+    {
+      instancesWorkers_.push_back(boost::shared_ptr<boost::thread>(new boost::thread(InstanceWorkerThread, this)));
+    }
+  }
+
+
+  void ThreadedSetOfInstancesJob::WaitWorkersComplete()
+  {
+    // no need to lock mutex here since we access variables used only by the "master" thread
+
+    // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue
+    for (size_t i = 0; i < instancesWorkers_.size(); i++)
+    {
+      instancesToProcess_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE));
+    }
+
+    for (size_t i = 0; i < instancesWorkers_.size(); i++)
+    {
+      if (instancesWorkers_[i]->joinable())
+      {
+        instancesWorkers_[i]->join();
+      }
+    }
+
+    instancesWorkers_.clear();
+  }
+
+
+  void ThreadedSetOfInstancesJob::StopWorkers()
+  {
+    // no need to lock mutex here since we access variables used or set only by the "master" thread
+
+    stopRequested_ = true;
+    WaitWorkersComplete();
+  }
+
+
+  void ThreadedSetOfInstancesJob::Stop(JobStopReason reason)
+  {
+    // no need to lock mutex here since we access variables used or set only by the "master" thread
+
+    if (reason == JobStopReason_Canceled ||
+        reason == JobStopReason_Failure ||
+        reason == JobStopReason_Retry)
+    {
+      // deallocate resources
+      StopWorkers();
+    }
+    else if (reason == JobStopReason_Paused)
+    {
+      // keep resources allocated.
+      // note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state
+    }
+  }
+
+
+  JobStepResult ThreadedSetOfInstancesJob::Step(const std::string& jobId)
+  {
+    // no need to lock mutex here since we access variables used or set only by the "master" thread
+
+    if (!started_)
+    {
+      throw OrthancException(ErrorCode_InternalError);
+    }
+
+    if (GetInstancesCount() == 0)
+    {
+      // No instances to handle: We're done
+      return JobStepResult::Success();
+    }
+    
+    try
+    {
+      if (currentStep_ == ThreadedJobStep_BeforeStart)
+      {
+        currentStep_ = ThreadedJobStep_ProcessingInstances;
+      }
+
+      if (currentStep_ == ThreadedJobStep_ProcessingInstances)
+      {
+        // wait until all instances are processed by the workers
+        WaitWorkersComplete();
+        
+        currentStep_ = ThreadedJobStep_PostProcessingInstances;
+        return JobStepResult::Continue();
+      }
+      else if (currentStep_ == ThreadedJobStep_PostProcessingInstances)
+      {
+        if (HasPostProcessingStep())
+        {
+          PostProcessInstances();
+        }
+
+        currentStep_ = ThreadedJobStep_Cleanup;
+        return JobStepResult::Continue();
+      }
+      else if (currentStep_ == ThreadedJobStep_Cleanup)
+      {
+        // clean after the post processing step
+        if (HasCleanupStep())
+        {
+          for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
+          {
+            Json::Value tmp;
+            context_.DeleteResource(tmp, *it, ResourceType_Instance);
+          }
+        }
+
+        currentStep_ = ThreadedJobStep_Done;
+        return JobStepResult::Success();
+      }
+    }
+    catch (OrthancException& e)
+    {
+      if (permissive_)
+      {
+        LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What();
+      }
+      else
+      {
+        return JobStepResult::Failure(e);
+      }
+    }
+
+    return JobStepResult::Continue();
+  }
+
+
+  bool ThreadedSetOfInstancesJob::HasPostProcessingStep() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return hasPostProcessing_;
+  }
+
+
+  bool ThreadedSetOfInstancesJob::HasCleanupStep() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return !keepSource_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that)
+  {
+    while (true)
+    {
+      std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcess_.Dequeue(0)));
+      if (that->stopRequested_                // no lock(mutex) to access this variable, this is safe since it's just reading a boolean
+        || instanceId->GetValue() == EXIT_WORKER_MESSAGE)
+      {
+        return;
+      }
+
+      bool processed = that->HandleInstance(instanceId->GetValue());
+
+      {
+        boost::recursive_mutex::scoped_lock lock(that->mutex_);
+
+        that->processedInstancesCount_++;
+        if (!processed)
+        {
+          that->failedInstances_.insert(instanceId->GetValue()); 
+        }
+      }
+    }
+  }
+
+
+  bool ThreadedSetOfInstancesJob::GetOutput(std::string &output,
+                                            MimeType &mime,
+                                            std::string& filename,
+                                            const std::string &key)
+  {
+    return false;
+  }
+
+
+  size_t ThreadedSetOfInstancesJob::GetInstancesCount() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+    
+    return instances_.size();
+  }
+
+
+  void ThreadedSetOfInstancesJob::GetFailedInstances(std::set<std::string>& target) const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    target = failedInstances_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::GetInstances(std::set<std::string>& target) const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    target = instances_;
+  }
+
+
+  bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return failedInstances_.find(instance) != failedInstances_.end();
+  }
+
+
+  void ThreadedSetOfInstancesJob::Start()
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    started_ = true;
+
+    // create the workers and enqueue all instances
+    for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
+    {
+      instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it));
+    }
+
+    InitWorkers(workersCount_);
+  }
+
+
+  void ThreadedSetOfInstancesJob::Reset()
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    if (started_)
+    {
+      currentStep_ = ThreadedJobStep_BeforeStart;
+      stopRequested_ = false;
+      processedInstancesCount_ = 0;
+      failedInstances_.clear();
+      instancesToProcess_.Clear();
+    }
+    else
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+  }
+
+
+  static const char* KEY_FAILED_INSTANCES = "FailedInstances";
+  static const char* KEY_PARENT_RESOURCES = "ParentResources";
+  static const char* KEY_DESCRIPTION = "Description";
+  static const char* KEY_PERMISSIVE = "Permissive";
+  static const char* KEY_CURRENT_STEP = "CurrentStep";
+  static const char* KEY_TYPE = "Type";
+  static const char* KEY_INSTANCES = "Instances";
+  static const char* KEY_INSTANCES_COUNT = "InstancesCount";
+  static const char* KEY_FAILED_INSTANCES_COUNT = "FailedInstancesCount";
+  static const char* KEY_KEEP_SOURCE = "KeepSource";
+  static const char* KEY_WORKERS_COUNT = "WorkersCount";
+
+
+  void ThreadedSetOfInstancesJob::GetPublicContent(Json::Value& target)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    target[KEY_DESCRIPTION] = GetDescription();
+    target[KEY_INSTANCES_COUNT] = static_cast<uint32_t>(GetInstancesCount());
+    target[KEY_FAILED_INSTANCES_COUNT] = static_cast<uint32_t>(failedInstances_.size());
+
+    if (!parentResources_.empty())
+    {
+      SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
+    }
+  }
+
+
+  bool ThreadedSetOfInstancesJob::Serialize(Json::Value& target)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    target = Json::objectValue;
+
+    std::string type;
+    GetJobType(type);
+    target[KEY_TYPE] = type;
+    
+    target[KEY_PERMISSIVE] = permissive_;
+    target[KEY_CURRENT_STEP] = static_cast<unsigned int>(currentStep_);
+    target[KEY_DESCRIPTION] = description_;
+    target[KEY_KEEP_SOURCE] = keepSource_;
+    target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_);
+    
+    SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES);
+    SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES);
+    SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
+
+    return true;
+  }
+
+
+  ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
+                                                       const Json::Value& source,
+                                                       bool hasPostProcessing,
+                                                       bool defaultKeepSource) :
+    processedInstancesCount_(0),
+    hasPostProcessing_(hasPostProcessing),
+    started_(false),
+    stopRequested_(false),
+    permissive_(false),
+    currentStep_(ThreadedJobStep_BeforeStart),
+    workersCount_(1),
+    context_(context),
+    keepSource_(defaultKeepSource)
+  {
+    SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES);
+
+    if (source.isMember(KEY_PARENT_RESOURCES))
+    {
+      // Backward compatibility with Orthanc <= 1.5.6
+      SerializationToolbox::ReadSetOfStrings(parentResources_, source, KEY_PARENT_RESOURCES);
+    }
+    
+    if (source.isMember(KEY_KEEP_SOURCE))
+    {
+      keepSource_ = SerializationToolbox::ReadBoolean(source, KEY_KEEP_SOURCE);
+    }
+
+    if (source.isMember(KEY_WORKERS_COUNT))
+    {
+      workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT);
+    }
+
+    if (source.isMember(KEY_INSTANCES))
+    {
+      SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES);
+    }
+  }
+
+
+  void ThreadedSetOfInstancesJob::SetKeepSource(bool keep)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    if (IsStarted())
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+
+    keepSource_ = keep;
+  }
+
+
+  float ThreadedSetOfInstancesJob::GetProgress()
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    if (GetInstancesCount() == 0)
+    {
+      return 1;
+    }
+    else
+    {
+      size_t totalProgress = GetInstancesCount();
+      size_t currentProgress = processedInstancesCount_;
+      
+      if (HasPostProcessingStep())
+      {
+        ++totalProgress;
+        if (currentStep_ > ThreadedJobStep_PostProcessingInstances)
+        {
+          ++currentProgress;
+        }
+      }
+
+      if (HasCleanupStep())
+      {
+        ++totalProgress;
+        if (currentStep_ > ThreadedJobStep_Cleanup)
+        {
+          ++currentProgress;
+        }
+      }
+
+      return (static_cast<float>(currentProgress) /
+              static_cast<float>(totalProgress));
+    }
+  }
+
+
+  ThreadedSetOfInstancesJob::ThreadedJobStep ThreadedSetOfInstancesJob::GetCurrentStep() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return currentStep_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::SetDescription(const std::string &description)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    description_ = description;
+  }
+
+
+  const std::string& ThreadedSetOfInstancesJob::GetDescription() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return description_;
+  }
+
+
+  bool ThreadedSetOfInstancesJob::IsPermissive() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return permissive_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::SetPermissive(bool permissive)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    if (started_)
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+    else
+    {
+      permissive_ = permissive;
+    }
+  }
+
+
+  bool ThreadedSetOfInstancesJob::IsStarted() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return started_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::AddInstances(const std::list<std::string>& instances)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    for (std::list<std::string>::const_iterator
+           it = instances.begin(); it != instances.end(); ++it)
+    {
+      instances_.insert(*it);
+    }
+  }
+
+
+  void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    parentResources_.insert(resource);
+  }
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h	Thu Jan 05 17:24:43 2023 +0100
@@ -0,0 +1,158 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
+ * Department, University Hospital of Liege, Belgium
+ * Copyright (C) 2017-2022 Osimis S.A., Belgium
+ * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "../../../OrthancFramework/Sources/Compatibility.h"  // For ORTHANC_OVERRIDE
+#include "../../../OrthancFramework/Sources/JobsEngine/IJob.h"
+#include "../../../OrthancFramework/Sources/MultiThreading/SharedMessageQueue.h"
+
+#include <set>
+#include <boost/thread/recursive_mutex.hpp>
+#include <boost/thread.hpp>
+
+namespace Orthanc
+{
+  class ServerContext;
+
+  // This class is a threaded version of SetOfInstancesJob merged with CleaningInstancesJob
+  class ORTHANC_PUBLIC ThreadedSetOfInstancesJob : public IJob
+  {
+  public:
+    enum ThreadedJobStep  // cannot use "Step" since there is a method with this name !
+    {
+      ThreadedJobStep_BeforeStart,
+      ThreadedJobStep_ProcessingInstances,
+      ThreadedJobStep_PostProcessingInstances,
+      ThreadedJobStep_Cleanup,
+      ThreadedJobStep_Done
+    };
+
+  private:
+    std::set<std::string>  failedInstances_;
+    std::set<std::string>  parentResources_;
+    
+    size_t                              processedInstancesCount_;
+    SharedMessageQueue                  instancesToProcess_;
+    std::vector<boost::shared_ptr<boost::thread> >         instancesWorkers_;
+    std::set<std::string>               instances_;
+
+    bool                    hasPostProcessing_;  // final step before "KeepSource" cleanup
+    bool                    started_;
+    bool                    stopRequested_;
+    bool                    permissive_;
+    ThreadedJobStep         currentStep_;
+    std::string             description_;
+    size_t                  workersCount_;
+
+    ServerContext&          context_;
+    bool                    keepSource_;
+  protected:
+    mutable boost::recursive_mutex mutex_;
+
+  public:
+    ThreadedSetOfInstancesJob(ServerContext& context,
+                              bool hasTrailingStep,
+                              bool keepSource,
+                              size_t workersCount);
+
+    explicit ThreadedSetOfInstancesJob(ServerContext& context,
+                                       const Json::Value& source,
+                                       bool hasTrailingStep,
+                                       bool defaultKeepSource);
+
+    virtual ~ThreadedSetOfInstancesJob();
+
+  protected:
+    virtual bool HandleInstance(const std::string& instance) = 0;
+
+    virtual void PostProcessInstances() {}
+
+    void InitWorkers(size_t workersCount);
+
+    void StopWorkers();
+
+    void WaitWorkersComplete();
+
+    static void InstanceWorkerThread(ThreadedSetOfInstancesJob* that);
+
+    const std::string& GetInstance(size_t index) const;
+
+    bool HasPostProcessingStep() const;
+
+    bool HasCleanupStep() const;
+
+  public:
+
+    ThreadedJobStep GetCurrentStep() const;
+
+    void SetDescription(const std::string& description);
+
+    const std::string& GetDescription() const;
+
+    void SetKeepSource(bool keep);
+
+    void GetInstances(std::set<std::string>& target) const;
+
+    void GetFailedInstances(std::set<std::string>& target) const;
+
+    size_t GetInstancesCount() const;
+
+    void AddInstances(const std::list<std::string>& instances);
+
+    void AddParentResource(const std::string &resource);
+
+    bool IsPermissive() const;
+
+    void SetPermissive(bool permissive);
+
+    virtual void Reset() ORTHANC_OVERRIDE;
+    
+    virtual void Start() ORTHANC_OVERRIDE;
+    
+    virtual void Stop(JobStopReason reason) ORTHANC_OVERRIDE;
+
+    virtual float GetProgress() ORTHANC_OVERRIDE;
+
+    bool IsStarted() const;
+
+    virtual JobStepResult Step(const std::string& jobId) ORTHANC_OVERRIDE;
+    
+    virtual void GetPublicContent(Json::Value& value) ORTHANC_OVERRIDE;
+    
+    virtual bool Serialize(Json::Value& target) ORTHANC_OVERRIDE;
+
+    virtual bool GetOutput(std::string& output,
+                           MimeType& mime,
+                           std::string& filename,
+                           const std::string& key) ORTHANC_OVERRIDE;
+
+    bool IsFailedInstance(const std::string& instance) const;
+
+    ServerContext& GetContext() const
+    {
+      return context_;
+    }
+
+  };
+}
--- a/OrthancServer/UnitTestsSources/ServerJobsTests.cpp	Mon Dec 19 20:00:21 2022 +0100
+++ b/OrthancServer/UnitTestsSources/ServerJobsTests.cpp	Thu Jan 05 17:24:43 2023 +0100
@@ -325,6 +325,34 @@
 }
 
 
+static bool CheckIdempotentSetOfInstances(IJobUnserializer& unserializer,
+                                          ThreadedSetOfInstancesJob& job)
+{
+  Json::Value a = 42;
+  
+  if (!job.Serialize(a))
+  {
+    return false;
+  }
+  else
+  {
+    std::unique_ptr<ThreadedSetOfInstancesJob> unserialized
+      (dynamic_cast<ThreadedSetOfInstancesJob*>(unserializer.UnserializeJob(a)));
+  
+    Json::Value b = 43;
+    if (unserialized->Serialize(b))
+    {    
+      return (CheckSameJson(a, b) &&
+              job.GetCurrentStep() == unserialized->GetCurrentStep() &&
+              job.GetInstancesCount() == unserialized->GetInstancesCount() );
+    }
+    else
+    {
+      return false;
+    }
+  }
+}
+
 static bool CheckIdempotentSerialization(IJobUnserializer& unserializer,
                                          IJobOperation& operation)
 {
@@ -813,7 +841,7 @@
     modification->SetupAnonymization(DicomVersion_2008);
     modification->SetLevel(ResourceType_Series);
 
-    ResourceModificationJob job(GetContext());
+    ResourceModificationJob job(GetContext(), 1);
     ASSERT_THROW(job.IsSingleResourceModification(), OrthancException);
     job.SetSingleResourceModification(modification.release(), ResourceType_Patient, true);
     job.SetOrigin(DicomInstanceOrigin::FromLua());
@@ -821,7 +849,6 @@
     ASSERT_TRUE(job.IsSingleResourceModification());
     ASSERT_EQ(ResourceType_Patient, job.GetOutputLevel());
 
-    job.AddTrailingStep();  // Necessary since 1.7.0
     ASSERT_TRUE(CheckIdempotentSetOfInstances(unserializer, job));
     ASSERT_TRUE(job.Serialize(s));
   }
@@ -858,12 +885,11 @@
   }
 
   {
-    ResourceModificationJob job(GetContext());
+    ResourceModificationJob job(GetContext(), 2);
     ASSERT_THROW(job.SetTranscode("nope"), OrthancException);
     job.SetTranscode(DicomTransferSyntax_JPEGProcess1);
     job.SetSingleResourceModification(new DicomModification, ResourceType_Study, false);
 
-    job.AddTrailingStep();  // Necessary since 1.7.0
     ASSERT_TRUE(CheckIdempotentSetOfInstances(unserializer, job));
     ASSERT_TRUE(job.Serialize(s));
   }
@@ -883,11 +909,12 @@
   }
 
   {
-    ResourceModificationJob job(GetContext());
+    ResourceModificationJob job(GetContext(), 2);
     job.SetMultipleResourcesModification(new DicomModification, true);
-    job.AddInstance("toto");
-    job.AddInstance("tutu");
-    job.AddTrailingStep();  // Necessary since 1.7.0
+    std::list<std::string> instances;
+    instances.push_back("toto");
+    instances.push_back("tutu");
+    job.AddInstances(instances);
     ASSERT_TRUE(CheckIdempotentSetOfInstances(unserializer, job));
     ASSERT_TRUE(job.Serialize(s));
   }
@@ -899,10 +926,7 @@
     ResourceModificationJob& tmp = dynamic_cast<ResourceModificationJob&>(*job);
 
     std::set<std::string> instances;
-    for (size_t i = 0; i < tmp.GetInstancesCount(); i++)
-    {
-      instances.insert(tmp.GetInstance(i));
-    }
+    tmp.GetInstances(instances);
     
     ASSERT_EQ(2u, instances.size());
     ASSERT_TRUE(instances.find("toto") != instances.end());