changeset 4797:4e765c18ace7 storage-cache

enable using multiple threads to load instances when generating zip archive/media
author Alain Mazy <am@osimis.io>
date Thu, 07 Oct 2021 13:31:36 +0200
parents 434843934307
children 96ab170294fd
files NEWS OrthancFramework/Sources/MultiThreading/Semaphore.cpp OrthancFramework/Sources/MultiThreading/Semaphore.h OrthancServer/Resources/Configuration.json OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp OrthancServer/Sources/ServerJobs/ArchiveJob.cpp OrthancServer/Sources/ServerJobs/ArchiveJob.h
diffstat 7 files changed, 279 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/NEWS	Thu Sep 30 12:14:19 2021 +0200
+++ b/NEWS	Thu Oct 07 13:31:36 2021 +0200
@@ -9,6 +9,8 @@
   retrieval of individual frames of multiframe instances.
 * New configuration option "MaximumStorageCacheSize" to configure the size of
   the new storage cache.
+* New configuration option "ZipLoaderThreads" to configure the number of threads used
+  to read instances from storage when createing a Zip archive/media.
 
 
 Maintenance
--- a/OrthancFramework/Sources/MultiThreading/Semaphore.cpp	Thu Sep 30 12:14:19 2021 +0200
+++ b/OrthancFramework/Sources/MultiThreading/Semaphore.cpp	Thu Oct 07 13:31:36 2021 +0200
@@ -31,10 +31,6 @@
   Semaphore::Semaphore(unsigned int availableResources) :
     availableResources_(availableResources)
   {
-    if (availableResources_ == 0)
-    {
-      throw OrthancException(ErrorCode_ParameterOutOfRange);
-    }
   }
 
   unsigned int Semaphore::GetAvailableResourcesCount() const
--- a/OrthancFramework/Sources/MultiThreading/Semaphore.h	Thu Sep 30 12:14:19 2021 +0200
+++ b/OrthancFramework/Sources/MultiThreading/Semaphore.h	Thu Oct 07 13:31:36 2021 +0200
@@ -36,16 +36,16 @@
     boost::mutex mutex_;
     boost::condition_variable condition_;
 
+  public:
+    explicit Semaphore(unsigned int availableResources);
+
+    unsigned int GetAvailableResourcesCount() const;
+
     void Release(unsigned int resourceCount = 1);
 
     void Acquire(unsigned int resourceCount = 1);
 
     bool TryAcquire(unsigned int resourceCount = 1);
-  public:
-    explicit Semaphore(unsigned int availableResources);
-
-    unsigned int GetAvailableResourcesCount() const;
-
 
     class Locker : public boost::noncopyable
     {
--- a/OrthancServer/Resources/Configuration.json	Thu Sep 30 12:14:19 2021 +0200
+++ b/OrthancServer/Resources/Configuration.json	Thu Oct 07 13:31:36 2021 +0200
@@ -834,5 +834,12 @@
   // disk space and might lead to HTTP timeouts on large archives). If
   // set to "true", the chunks of the ZIP file are progressively sent
   // as soon as one DICOM file gets compressed (new in Orthanc 1.9.4)
-  "SynchronousZipStream" : true
+  "SynchronousZipStream" : true,
+
+  // Default number of loader threads when generating Zip archive/media.
+  // A value of 0 means reading and writing are performed in sequence
+  // (default behaviour).  A value > 1 is meaningfull only if the storage
+  // is a distributed network storage (e.g object storage plugin).
+  // (new in Orthanc 1.9.8)
+  "ZipLoaderThreads": 0
 }
--- a/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp	Thu Sep 30 12:14:19 2021 +0200
+++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp	Thu Oct 07 13:31:36 2021 +0200
@@ -51,7 +51,9 @@
   static const char* const KEY_RESOURCES = "Resources";
   static const char* const KEY_EXTENDED = "Extended";
   static const char* const KEY_TRANSCODE = "Transcode";
-  
+
+  static const char* const CONFIG_LOADER_THREADS = "ZipLoaderThreads";
+
   static void AddResourcesOfInterestFromArray(ArchiveJob& job,
                                               const Json::Value& resources)
   {
@@ -123,6 +125,7 @@
                                bool& transcode,              /* out */
                                DicomTransferSyntax& syntax,  /* out */
                                int& priority,                /* out */
+                               unsigned int& loaderThreads,  /* out */
                                const Json::Value& body,      /* in */
                                const bool defaultExtended    /* in */)
   {
@@ -151,6 +154,12 @@
     {
       transcode = false;
     }
+
+    {
+      OrthancConfiguration::ReaderLock lock;
+      loaderThreads = lock.GetConfiguration().GetUnsignedIntegerParameter(CONFIG_LOADER_THREADS, 0);  // New in Orthanc 1.9.8
+    }
+   
   }
 
 
@@ -554,8 +563,9 @@
       bool synchronous, extended, transcode;
       DicomTransferSyntax transferSyntax;
       int priority;
+      unsigned int loaderThreads;
       GetJobParameters(synchronous, extended, transcode, transferSyntax,
-                       priority, body, DEFAULT_IS_EXTENDED);
+                       priority, loaderThreads, body, DEFAULT_IS_EXTENDED);
       
       std::unique_ptr<ArchiveJob> job(new ArchiveJob(context, IS_MEDIA, extended));
       AddResourcesOfInterest(*job, body);
@@ -565,6 +575,8 @@
         job->SetTranscode(transferSyntax);
       }
       
+      job->SetLoaderThreads(loaderThreads);
+
       SubmitJob(call.GetOutput(), context, job, priority, synchronous, "Archive.zip");
     }
     else
@@ -578,6 +590,8 @@
   template <bool IS_MEDIA>
   static void CreateSingleGet(RestApiGetCall& call)
   {
+    static const char* const TRANSCODE = "transcode";
+
     if (call.IsDocumentation())
     {
       ResourceType t = StringToResourceType(call.GetFullUri()[0].c_str());
@@ -591,7 +605,7 @@
                         "which might *not* be desirable to archive large amount of data, as it might "
                         "lead to network timeouts. Prefer the asynchronous version using `POST` method.")
         .SetUriArgument("id", "Orthanc identifier of the " + r + " of interest")
-        .SetHttpGetArgument("transcode", RestApiCallDocumentation::Type_String,
+        .SetHttpGetArgument(TRANSCODE, RestApiCallDocumentation::Type_String,
                             "If present, the DICOM files in the archive will be transcoded to the provided "
                             "transfer syntax: https://book.orthanc-server.com/faq/transcoding.html", false)
         .AddAnswerType(MimeType_Zip, "ZIP file containing the archive");
@@ -621,12 +635,17 @@
     std::unique_ptr<ArchiveJob> job(new ArchiveJob(context, IS_MEDIA, extended));
     job->AddResource(id);
 
-    static const char* const TRANSCODE = "transcode";
     if (call.HasArgument(TRANSCODE))
     {
       job->SetTranscode(GetTransferSyntax(call.GetArgument(TRANSCODE, "")));
     }
 
+    {
+      OrthancConfiguration::ReaderLock lock;
+      unsigned int loaderThreads = lock.GetConfiguration().GetUnsignedIntegerParameter(CONFIG_LOADER_THREADS, 0);  // New in Orthanc 1.9.8
+      job->SetLoaderThreads(loaderThreads);
+    }
+
     SubmitJob(call.GetOutput(), context, job, 0 /* priority */,
               true /* synchronous */, id + ".zip");
   }
@@ -660,8 +679,9 @@
       bool synchronous, extended, transcode;
       DicomTransferSyntax transferSyntax;
       int priority;
+      unsigned int loaderThreads;
       GetJobParameters(synchronous, extended, transcode, transferSyntax,
-                       priority, body, false /* by default, not extented */);
+                       priority, loaderThreads, body, false /* by default, not extented */);
       
       std::unique_ptr<ArchiveJob> job(new ArchiveJob(context, IS_MEDIA, extended));
       job->AddResource(id);
@@ -671,6 +691,8 @@
         job->SetTranscode(transferSyntax);
       }
 
+      job->SetLoaderThreads(loaderThreads);
+
       SubmitJob(call.GetOutput(), context, job, priority, synchronous, id + ".zip");
     }
     else
--- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp	Thu Sep 30 12:14:19 2021 +0200
+++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp	Thu Oct 07 13:31:36 2021 +0200
@@ -40,6 +40,7 @@
 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h"
 #include "../../../OrthancFramework/Sources/Logging.h"
 #include "../../../OrthancFramework/Sources/OrthancException.h"
+#include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h"
 #include "../OrthancConfiguration.h"
 #include "../ServerContext.h"
 
@@ -88,6 +89,181 @@
   }
 
 
+  class ArchiveJob::InstanceLoader : public boost::noncopyable
+  {
+  protected:
+    ServerContext&                        context_;
+  public:
+    InstanceLoader(ServerContext& context)
+    : context_(context)
+    {
+    }
+
+    virtual ~InstanceLoader()
+    {
+    }
+
+    virtual void PrepareDicom(const std::string& instanceId)
+    {
+
+    }
+
+    virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0;
+
+    virtual void Clear()
+    {
+    }
+  };
+
+  class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader
+  {
+  public:
+    SynchronousInstanceLoader(ServerContext& context)
+    : InstanceLoader(context)
+    {
+    }
+
+    virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE
+    {
+      context_.ReadDicom(dicom, instanceId);
+    }
+  };
+
+  class InstanceId : public Orthanc::IDynamicObject
+  {
+  private:
+    std::string id_;
+
+  public:
+    InstanceId(const std::string& id) : id_(id)
+    {
+    }
+
+    virtual ~InstanceId() ORTHANC_OVERRIDE
+    {
+    }
+
+    std::string GetId() const {return id_;};
+  };
+
+  class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader
+  {
+    Semaphore                           availableInstancesSemaphore_;
+    std::map<std::string, boost::shared_ptr<std::string>>  availableInstances_;
+    boost::mutex                        availableInstancesMutex_;
+    SharedMessageQueue                  instancesToPreload_;
+    std::vector<boost::thread*>         threads_;
+
+
+  public:
+    ThreadedInstanceLoader(ServerContext& context, size_t threadCount)
+    : InstanceLoader(context),
+      availableInstancesSemaphore_(0)
+    {
+      for (size_t i = 0; i < threadCount; i++)
+      {
+        threads_.push_back(new boost::thread(PreloaderWorkerThread, this));
+      }
+    }
+
+    virtual ~ThreadedInstanceLoader()
+    {
+      Clear();
+    }
+
+    virtual void Clear() ORTHANC_OVERRIDE
+    {
+      for (size_t i = 0; i < threads_.size(); i++)
+      {
+        instancesToPreload_.Enqueue(NULL);
+      }
+
+      for (size_t i = 0; i < threads_.size(); i++)
+      {
+        if (threads_[i]->joinable())
+        {
+          threads_[i]->join();
+        }
+        delete threads_[i];
+      }
+
+      threads_.clear();
+      availableInstances_.clear();
+    }
+
+    static void PreloaderWorkerThread(ThreadedInstanceLoader* that)
+    {
+      while (true)
+      {
+        std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0)));
+        if (instanceId.get() == NULL)  // that's the signal to exit the thread
+        {
+          return;
+        }
+
+        try
+        {
+          boost::shared_ptr<std::string> dicomContent(new std::string());
+          that->context_.ReadDicom(*dicomContent, instanceId->GetId());
+          {
+            boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
+            that->availableInstances_[instanceId->GetId()] = dicomContent;
+          }
+
+          that->availableInstancesSemaphore_.Release();
+        }
+        catch (OrthancException& e)
+        {
+          boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
+          // store a NULL result to notify that we could not read the instance
+          that->availableInstances_[instanceId->GetId()] = boost::shared_ptr<std::string>(); 
+          that->availableInstancesSemaphore_.Release();
+        }
+      }
+    }
+
+    virtual void PrepareDicom(const std::string& instanceId) ORTHANC_OVERRIDE
+    {
+      instancesToPreload_.Enqueue(new InstanceId(instanceId));
+    }
+
+    virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE
+    {
+      while (true)
+      {
+        // wait for an instance to be available but this might not be the one we are waiting for !
+        availableInstancesSemaphore_.Acquire();
+
+        boost::shared_ptr<std::string> dicomContent;
+        {
+          if (availableInstances_.find(instanceId) != availableInstances_.end())
+          {
+            // this is the instance we were waiting for
+            dicomContent = availableInstances_[instanceId];
+            availableInstances_.erase(instanceId);
+
+            if (dicomContent.get() == NULL)  // there has been an error while reading the file
+            {
+              throw OrthancException(ErrorCode_InexistentItem);
+            }
+            dicom.swap(*dicomContent);
+
+            if (availableInstances_.size() > 0)
+            {
+              // we have just read the instance we were waiting for but there are still other instances available ->
+              // make sure the next GetDicom call does not wait !
+              availableInstancesSemaphore_.Release();
+            }
+            return;
+          }
+          // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when
+          // a new instance is available
+        }
+      }
+    }
+  };
+
+
   class ArchiveJob::ResourceIdentifiers : public boost::noncopyable
   {
   private:
@@ -402,6 +578,7 @@
         
       void Apply(HierarchicalZipWriter& writer,
                  ServerContext& context,
+                 InstanceLoader& instanceLoader,
                  DicomDirWriter* dicomDir,
                  const std::string& dicomDirFolder,
                  bool transcode,
@@ -423,7 +600,7 @@
 
             try
             {
-              context.ReadDicom(content, instanceId_);
+              instanceLoader.GetDicom(content, instanceId_);
             }
             catch (OrthancException& e)
             {
@@ -494,10 +671,12 @@
     std::deque<Command*>  commands_;
     uint64_t              uncompressedSize_;
     unsigned int          instancesCount_;
+    InstanceLoader&       instanceLoader_;
 
       
     void ApplyInternal(HierarchicalZipWriter& writer,
                        ServerContext& context,
+                       InstanceLoader& instanceLoader,
                        size_t index,
                        DicomDirWriter* dicomDir,
                        const std::string& dicomDirFolder,
@@ -509,13 +688,14 @@
         throw OrthancException(ErrorCode_ParameterOutOfRange);
       }
 
-      commands_[index]->Apply(writer, context, dicomDir, dicomDirFolder, transcode, transferSyntax);
+      commands_[index]->Apply(writer, context, instanceLoader, dicomDir, dicomDirFolder, transcode, transferSyntax);
     }
       
   public:
-    ZipCommands() :
+    ZipCommands(InstanceLoader& instanceLoader) :
       uncompressedSize_(0),
-      instancesCount_(0)
+      instancesCount_(0),
+      instanceLoader_(instanceLoader)
     {
     }
       
@@ -547,23 +727,25 @@
     // "media" flavor (with DICOMDIR)
     void Apply(HierarchicalZipWriter& writer,
                ServerContext& context,
+               InstanceLoader& instanceLoader,
                size_t index,
                DicomDirWriter& dicomDir,
                const std::string& dicomDirFolder,
                bool transcode,
                DicomTransferSyntax transferSyntax) const
     {
-      ApplyInternal(writer, context, index, &dicomDir, dicomDirFolder, transcode, transferSyntax);
+      ApplyInternal(writer, context, instanceLoader, index, &dicomDir, dicomDirFolder, transcode, transferSyntax);
     }
 
     // "archive" flavor (without DICOMDIR)
     void Apply(HierarchicalZipWriter& writer,
                ServerContext& context,
+               InstanceLoader& instanceLoader,
                size_t index,
                bool transcode,
                DicomTransferSyntax transferSyntax) const
     {
-      ApplyInternal(writer, context, index, NULL, "", transcode, transferSyntax);
+      ApplyInternal(writer, context, instanceLoader, index, NULL, "", transcode, transferSyntax);
     }
       
     void AddOpenDirectory(const std::string& filename)
@@ -580,6 +762,7 @@
                           const std::string& instanceId,
                           uint64_t uncompressedSize)
     {
+      instanceLoader_.PrepareDicom(instanceId);
       commands_.push_back(new Command(Type_WriteInstance, filename, instanceId));
       instancesCount_ ++;
       uncompressedSize_ += uncompressedSize;
@@ -747,6 +930,7 @@
   {
   private:
     ServerContext&                          context_;
+    InstanceLoader&                         instanceLoader_;
     ZipCommands                             commands_;
     std::unique_ptr<HierarchicalZipWriter>  zip_;
     std::unique_ptr<DicomDirWriter>         dicomDir_;
@@ -755,10 +939,13 @@
 
   public:
     ZipWriterIterator(ServerContext& context,
+                      InstanceLoader& instanceLoader,
                       ArchiveIndex& archive,
                       bool isMedia,
                       bool enableExtendedSopClass) :
       context_(context),
+      instanceLoader_(instanceLoader),
+      commands_(instanceLoader),
       isMedia_(isMedia),
       isStream_(false)
     {
@@ -882,13 +1069,13 @@
         if (isMedia_)
         {
           assert(dicomDir_.get() != NULL);
-          commands_.Apply(*zip_, context_, index, *dicomDir_,
+          commands_.Apply(*zip_, context_, instanceLoader_, index, *dicomDir_,
                           MEDIA_IMAGES_FOLDER, transcode, transferSyntax);
         }
         else
         {
           assert(dicomDir_.get() == NULL);
-          commands_.Apply(*zip_, context_, index, transcode, transferSyntax);
+          commands_.Apply(*zip_, context_, instanceLoader_, index, transcode, transferSyntax);
         }
       }
     }
@@ -917,7 +1104,8 @@
     uncompressedSize_(0),
     archiveSize_(0),
     transcode_(false),
-    transferSyntax_(DicomTransferSyntax_LittleEndianImplicit)
+    transferSyntax_(DicomTransferSyntax_LittleEndianImplicit),
+    loaderThreads_(0)
   {
   }
 
@@ -993,6 +1181,19 @@
   }
 
   
+  void ArchiveJob::SetLoaderThreads(unsigned int loaderThreads)
+  {
+    if (writer_.get() != NULL)   // Already started
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+    else
+    {
+      loaderThreads_ = loaderThreads;
+    }
+  }
+
+
   void ArchiveJob::Reset()
   {
     throw OrthancException(ErrorCode_BadSequenceOfCalls,
@@ -1002,6 +1203,16 @@
   
   void ArchiveJob::Start()
   {
+    if (loaderThreads_ == 0)
+    {
+      // default behaviour before loaderThreads was introducted in 1.9.8
+      instanceLoader_.reset(new SynchronousInstanceLoader(context_));
+    }
+    else
+    {
+      instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_));
+    }
+
     if (writer_.get() != NULL)
     {
       throw OrthancException(ErrorCode_BadSequenceOfCalls);
@@ -1023,7 +1234,7 @@
           assert(asynchronousTarget_.get() != NULL);
           asynchronousTarget_->Touch();  // Make sure we can write to the temporary file
           
-          writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_));
+          writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_));
           writer_->SetOutputFile(asynchronousTarget_->GetPath());
         }
       }
@@ -1031,7 +1242,7 @@
       {
         assert(synchronousTarget_.get() != NULL);
     
-        writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_));
+        writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_));
         writer_->AcquireOutputStream(synchronousTarget_.release());
       }
 
@@ -1076,6 +1287,11 @@
       writer_.reset();
     }
 
+    if (instanceLoader_.get() != NULL)
+    {
+      instanceLoader_->Clear();
+    }
+
     if (asynchronousTarget_.get() != NULL)
     {
       // Asynchronous behavior: Move the resulting file into the media archive
--- a/OrthancServer/Sources/ServerJobs/ArchiveJob.h	Thu Sep 30 12:14:19 2021 +0200
+++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.h	Thu Oct 07 13:31:36 2021 +0200
@@ -55,10 +55,14 @@
     class ResourceIdentifiers;
     class ZipCommands;
     class ZipWriterIterator;
-    
+    class InstanceLoader;
+    class SynchronousInstanceLoader;
+    class ThreadedInstanceLoader;
+
     std::unique_ptr<ZipWriter::IOutputStream>  synchronousTarget_;  // Only valid before "Start()"
     std::unique_ptr<TemporaryFile>        asynchronousTarget_;
     ServerContext&                        context_;
+    std::unique_ptr<InstanceLoader>       instanceLoader_;
     boost::shared_ptr<ArchiveIndex>       archive_;
     bool                                  isMedia_;
     bool                                  enableExtendedSopClass_;
@@ -75,6 +79,9 @@
     bool                 transcode_;
     DicomTransferSyntax  transferSyntax_;
 
+    // New in Orthanc 1.9.8
+    unsigned int         loaderThreads_;
+
     void FinalizeTarget();
     
   public:
@@ -97,6 +104,8 @@
 
     void SetTranscode(DicomTransferSyntax transferSyntax);
 
+    void SetLoaderThreads(unsigned int loaderThreads);
+
     virtual void Reset() ORTHANC_OVERRIDE;
 
     virtual void Start() ORTHANC_OVERRIDE;