diff OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 4819:70d2a97ca8cb openssl-3.x

integration mainline->openssl-3.x
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 25 Nov 2021 13:12:32 +0100
parents f0038043fb97 58637d39ce88
children 2e71a08eea15
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp	Mon Aug 30 22:21:24 2021 +0200
+++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp	Thu Nov 25 13:12:32 2021 +0100
@@ -28,6 +28,7 @@
 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h"
 #include "../../../OrthancFramework/Sources/Logging.h"
 #include "../../../OrthancFramework/Sources/OrthancException.h"
+#include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h"
 #include "../OrthancConfiguration.h"
 #include "../ServerContext.h"
 
@@ -76,6 +77,181 @@
   }
 
 
+  class ArchiveJob::InstanceLoader : public boost::noncopyable
+  {
+  protected:
+    ServerContext&                        context_;
+  public:
+    explicit InstanceLoader(ServerContext& context)
+    : context_(context)
+    {
+    }
+
+    virtual ~InstanceLoader()
+    {
+    }
+
+    virtual void PrepareDicom(const std::string& instanceId)
+    {
+
+    }
+
+    virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0;
+
+    virtual void Clear()
+    {
+    }
+  };
+
+  class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader
+  {
+  public:
+    explicit SynchronousInstanceLoader(ServerContext& context)
+    : InstanceLoader(context)
+    {
+    }
+
+    virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE
+    {
+      context_.ReadDicom(dicom, instanceId);
+    }
+  };
+
+  class InstanceId : public Orthanc::IDynamicObject
+  {
+  private:
+    std::string id_;
+
+  public:
+    explicit InstanceId(const std::string& id) : id_(id)
+    {
+    }
+
+    virtual ~InstanceId() ORTHANC_OVERRIDE
+    {
+    }
+
+    std::string GetId() const {return id_;};
+  };
+
+  class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader
+  {
+    Semaphore                           availableInstancesSemaphore_;
+    std::map<std::string, boost::shared_ptr<std::string> >  availableInstances_;
+    boost::mutex                        availableInstancesMutex_;
+    SharedMessageQueue                  instancesToPreload_;
+    std::vector<boost::thread*>         threads_;
+
+
+  public:
+    ThreadedInstanceLoader(ServerContext& context, size_t threadCount)
+    : InstanceLoader(context),
+      availableInstancesSemaphore_(0)
+    {
+      for (size_t i = 0; i < threadCount; i++)
+      {
+        threads_.push_back(new boost::thread(PreloaderWorkerThread, this));
+      }
+    }
+
+    virtual ~ThreadedInstanceLoader()
+    {
+      Clear();
+    }
+
+    virtual void Clear() ORTHANC_OVERRIDE
+    {
+      for (size_t i = 0; i < threads_.size(); i++)
+      {
+        instancesToPreload_.Enqueue(NULL);
+      }
+
+      for (size_t i = 0; i < threads_.size(); i++)
+      {
+        if (threads_[i]->joinable())
+        {
+          threads_[i]->join();
+        }
+        delete threads_[i];
+      }
+
+      threads_.clear();
+      availableInstances_.clear();
+    }
+
+    static void PreloaderWorkerThread(ThreadedInstanceLoader* that)
+    {
+      while (true)
+      {
+        std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0)));
+        if (instanceId.get() == NULL)  // that's the signal to exit the thread
+        {
+          return;
+        }
+
+        try
+        {
+          boost::shared_ptr<std::string> dicomContent(new std::string());
+          that->context_.ReadDicom(*dicomContent, instanceId->GetId());
+          {
+            boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
+            that->availableInstances_[instanceId->GetId()] = dicomContent;
+          }
+
+          that->availableInstancesSemaphore_.Release();
+        }
+        catch (OrthancException& e)
+        {
+          boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
+          // store a NULL result to notify that we could not read the instance
+          that->availableInstances_[instanceId->GetId()] = boost::shared_ptr<std::string>(); 
+          that->availableInstancesSemaphore_.Release();
+        }
+      }
+    }
+
+    virtual void PrepareDicom(const std::string& instanceId) ORTHANC_OVERRIDE
+    {
+      instancesToPreload_.Enqueue(new InstanceId(instanceId));
+    }
+
+    virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE
+    {
+      while (true)
+      {
+        // wait for an instance to be available but this might not be the one we are waiting for !
+        availableInstancesSemaphore_.Acquire();
+
+        boost::shared_ptr<std::string> dicomContent;
+        {
+          if (availableInstances_.find(instanceId) != availableInstances_.end())
+          {
+            // this is the instance we were waiting for
+            dicomContent = availableInstances_[instanceId];
+            availableInstances_.erase(instanceId);
+
+            if (dicomContent.get() == NULL)  // there has been an error while reading the file
+            {
+              throw OrthancException(ErrorCode_InexistentItem);
+            }
+            dicom.swap(*dicomContent);
+
+            if (availableInstances_.size() > 0)
+            {
+              // we have just read the instance we were waiting for but there are still other instances available ->
+              // make sure the next GetDicom call does not wait !
+              availableInstancesSemaphore_.Release();
+            }
+            return;
+          }
+          // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when
+          // a new instance is available
+        }
+      }
+    }
+  };
+
+
   class ArchiveJob::ResourceIdentifiers : public boost::noncopyable
   {
   private:
@@ -390,6 +566,7 @@
         
       void Apply(HierarchicalZipWriter& writer,
                  ServerContext& context,
+                 InstanceLoader& instanceLoader,
                  DicomDirWriter* dicomDir,
                  const std::string& dicomDirFolder,
                  bool transcode,
@@ -411,7 +588,7 @@
 
             try
             {
-              context.ReadDicom(content, instanceId_);
+              instanceLoader.GetDicom(content, instanceId_);
             }
             catch (OrthancException& e)
             {
@@ -482,10 +659,12 @@
     std::deque<Command*>  commands_;
     uint64_t              uncompressedSize_;
     unsigned int          instancesCount_;
+    InstanceLoader&       instanceLoader_;
 
       
     void ApplyInternal(HierarchicalZipWriter& writer,
                        ServerContext& context,
+                       InstanceLoader& instanceLoader,
                        size_t index,
                        DicomDirWriter* dicomDir,
                        const std::string& dicomDirFolder,
@@ -497,13 +676,14 @@
         throw OrthancException(ErrorCode_ParameterOutOfRange);
       }
 
-      commands_[index]->Apply(writer, context, dicomDir, dicomDirFolder, transcode, transferSyntax);
+      commands_[index]->Apply(writer, context, instanceLoader, dicomDir, dicomDirFolder, transcode, transferSyntax);
     }
       
   public:
-    ZipCommands() :
+    explicit ZipCommands(InstanceLoader& instanceLoader) :
       uncompressedSize_(0),
-      instancesCount_(0)
+      instancesCount_(0),
+      instanceLoader_(instanceLoader)
     {
     }
       
@@ -535,23 +715,25 @@
     // "media" flavor (with DICOMDIR)
     void Apply(HierarchicalZipWriter& writer,
                ServerContext& context,
+               InstanceLoader& instanceLoader,
                size_t index,
                DicomDirWriter& dicomDir,
                const std::string& dicomDirFolder,
                bool transcode,
                DicomTransferSyntax transferSyntax) const
     {
-      ApplyInternal(writer, context, index, &dicomDir, dicomDirFolder, transcode, transferSyntax);
+      ApplyInternal(writer, context, instanceLoader, index, &dicomDir, dicomDirFolder, transcode, transferSyntax);
     }
 
     // "archive" flavor (without DICOMDIR)
     void Apply(HierarchicalZipWriter& writer,
                ServerContext& context,
+               InstanceLoader& instanceLoader,
                size_t index,
                bool transcode,
                DicomTransferSyntax transferSyntax) const
     {
-      ApplyInternal(writer, context, index, NULL, "", transcode, transferSyntax);
+      ApplyInternal(writer, context, instanceLoader, index, NULL, "", transcode, transferSyntax);
     }
       
     void AddOpenDirectory(const std::string& filename)
@@ -568,6 +750,7 @@
                           const std::string& instanceId,
                           uint64_t uncompressedSize)
     {
+      instanceLoader_.PrepareDicom(instanceId);
       commands_.push_back(new Command(Type_WriteInstance, filename, instanceId));
       instancesCount_ ++;
       uncompressedSize_ += uncompressedSize;
@@ -735,6 +918,7 @@
   {
   private:
     ServerContext&                          context_;
+    InstanceLoader&                         instanceLoader_;
     ZipCommands                             commands_;
     std::unique_ptr<HierarchicalZipWriter>  zip_;
     std::unique_ptr<DicomDirWriter>         dicomDir_;
@@ -743,10 +927,13 @@
 
   public:
     ZipWriterIterator(ServerContext& context,
+                      InstanceLoader& instanceLoader,
                       ArchiveIndex& archive,
                       bool isMedia,
                       bool enableExtendedSopClass) :
       context_(context),
+      instanceLoader_(instanceLoader),
+      commands_(instanceLoader),
       isMedia_(isMedia),
       isStream_(false)
     {
@@ -870,13 +1057,13 @@
         if (isMedia_)
         {
           assert(dicomDir_.get() != NULL);
-          commands_.Apply(*zip_, context_, index, *dicomDir_,
+          commands_.Apply(*zip_, context_, instanceLoader_, index, *dicomDir_,
                           MEDIA_IMAGES_FOLDER, transcode, transferSyntax);
         }
         else
         {
           assert(dicomDir_.get() == NULL);
-          commands_.Apply(*zip_, context_, index, transcode, transferSyntax);
+          commands_.Apply(*zip_, context_, instanceLoader_, index, transcode, transferSyntax);
         }
       }
     }
@@ -905,7 +1092,8 @@
     uncompressedSize_(0),
     archiveSize_(0),
     transcode_(false),
-    transferSyntax_(DicomTransferSyntax_LittleEndianImplicit)
+    transferSyntax_(DicomTransferSyntax_LittleEndianImplicit),
+    loaderThreads_(0)
   {
   }
 
@@ -981,6 +1169,19 @@
   }
 
   
+  void ArchiveJob::SetLoaderThreads(unsigned int loaderThreads)
+  {
+    if (writer_.get() != NULL)   // Already started
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+    else
+    {
+      loaderThreads_ = loaderThreads;
+    }
+  }
+
+
   void ArchiveJob::Reset()
   {
     throw OrthancException(ErrorCode_BadSequenceOfCalls,
@@ -990,6 +1191,16 @@
   
   void ArchiveJob::Start()
   {
+    if (loaderThreads_ == 0)
+    {
+      // default behaviour before loaderThreads was introducted in 1.9.8
+      instanceLoader_.reset(new SynchronousInstanceLoader(context_));
+    }
+    else
+    {
+      instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_));
+    }
+
     if (writer_.get() != NULL)
     {
       throw OrthancException(ErrorCode_BadSequenceOfCalls);
@@ -1011,7 +1222,7 @@
           assert(asynchronousTarget_.get() != NULL);
           asynchronousTarget_->Touch();  // Make sure we can write to the temporary file
           
-          writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_));
+          writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_));
           writer_->SetOutputFile(asynchronousTarget_->GetPath());
         }
       }
@@ -1019,7 +1230,7 @@
       {
         assert(synchronousTarget_.get() != NULL);
     
-        writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_));
+        writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_));
         writer_->AcquireOutputStream(synchronousTarget_.release());
       }
 
@@ -1064,6 +1275,11 @@
       writer_.reset();
     }
 
+    if (instanceLoader_.get() != NULL)
+    {
+      instanceLoader_->Clear();
+    }
+
     if (asynchronousTarget_.get() != NULL)
     {
       // Asynchronous behavior: Move the resulting file into the media archive
@@ -1184,6 +1400,7 @@
 
   bool ArchiveJob::GetOutput(std::string& output,
                              MimeType& mime,
+                             std::string& filename,
                              const std::string& key)
   {   
     if (key == "archive" &&
@@ -1196,6 +1413,7 @@
         const DynamicTemporaryFile& f = dynamic_cast<DynamicTemporaryFile&>(accessor.GetItem());
         f.GetFile().Read(output);
         mime = MimeType_Zip;
+        filename = "archive.zip";
         return true;
       }
       else