diff OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 4674:cdab941fe17d

ZIP archive/media generated in synchronous mode are now streamed by default
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 03 Jun 2021 17:40:15 +0200
parents b02dc8303cf6
children f0038043fb97 4e765c18ace7 0a38000b086d
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp	Wed Jun 02 18:01:17 2021 +0200
+++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp	Thu Jun 03 17:40:15 2021 +0200
@@ -57,6 +57,8 @@
 static const char* const KEY_INSTANCES_COUNT = "InstancesCount";
 static const char* const KEY_UNCOMPRESSED_SIZE_MB = "UncompressedSizeMB";
 static const char* const KEY_ARCHIVE_SIZE_MB = "ArchiveSizeMB";
+static const char* const KEY_UNCOMPRESSED_SIZE = "UncompressedSize";
+static const char* const KEY_ARCHIVE_SIZE = "ArchiveSize";
 static const char* const KEY_TRANSCODE = "Transcode";
 
 
@@ -749,15 +751,16 @@
     std::unique_ptr<HierarchicalZipWriter>  zip_;
     std::unique_ptr<DicomDirWriter>         dicomDir_;
     bool                                    isMedia_;
+    bool                                    isStream_;
 
   public:
-    ZipWriterIterator(const TemporaryFile& target,
-                      ServerContext& context,
+    ZipWriterIterator(ServerContext& context,
                       ArchiveIndex& archive,
                       bool isMedia,
                       bool enableExtendedSopClass) :
       context_(context),
-      isMedia_(isMedia)
+      isMedia_(isMedia),
+      isStream_(false)
     {
       if (isMedia)
       {
@@ -777,11 +780,73 @@
         archive.Expand(context.GetIndex());
         archive.Apply(visitor);
       }
+    }
 
-      zip_.reset(new HierarchicalZipWriter(target.GetPath().c_str()));
-      zip_->SetZip64(commands_.IsZip64());
+    void SetOutputFile(const std::string& path)
+    {
+      if (zip_.get() == NULL)
+      {
+        zip_.reset(new HierarchicalZipWriter(path.c_str()));
+        zip_->SetZip64(commands_.IsZip64());
+        isStream_ = false;
+      }
+      else
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+
+    void AcquireOutputStream(ZipWriter::IOutputStream* output)
+    {
+      std::unique_ptr<ZipWriter::IOutputStream> protection(output);
+
+      if (zip_.get() == NULL)
+      {
+        zip_.reset(new HierarchicalZipWriter(protection.release(), commands_.IsZip64()));
+        isStream_ = true;
+      }
+      else
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
     }
-      
+
+    void CancelStream()
+    {
+      if (zip_.get() == NULL)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else if (isStream_)
+      {
+        zip_->CancelStream();
+      }
+    }
+
+    void Close()
+    {
+      if (zip_.get() == NULL)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        zip_->Close();
+      }
+    }
+
+    uint64_t GetArchiveSize() const
+    {
+      if (zip_.get() == NULL)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        return zip_->GetArchiveSize();
+      }
+    }
+
     size_t GetStepsCount() const
     {
       return commands_.GetSize() + 1;
@@ -795,6 +860,10 @@
       {
         throw OrthancException(ErrorCode_ParameterOutOfRange);
       }
+      else if (zip_.get() == NULL)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
       else if (index == commands_.GetSize())
       {
         // Last step: Add the DICOMDIR
@@ -862,9 +931,11 @@
   }
 
 
-  void ArchiveJob::SetSynchronousTarget(boost::shared_ptr<TemporaryFile>& target)
+  void ArchiveJob::AcquireSynchronousTarget(ZipWriter::IOutputStream* target)
   {
-    if (target.get() == NULL)
+    std::unique_ptr<ZipWriter::IOutputStream> protection(target);
+    
+    if (target == NULL)
     {
       throw OrthancException(ErrorCode_NullPointer);
     }
@@ -876,7 +947,7 @@
     }
     else
     {
-      synchronousTarget_ = target;
+      synchronousTarget_.reset(protection.release());
     }
   }
 
@@ -931,35 +1002,42 @@
   
   void ArchiveJob::Start()
   {
-    TemporaryFile* target = NULL;  // (*)
-    
-    if (synchronousTarget_.get() == NULL)
-    {
-      {
-        OrthancConfiguration::ReaderLock lock;
-        asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile());
-      }
-
-      target = asynchronousTarget_.get();
-    }
-    else
-    {
-      target = synchronousTarget_.get();
-    }
-
-    assert(target != NULL);
-    target->Touch();  // Make sure we can write to the temporary file
-    
     if (writer_.get() != NULL)
     {
       throw OrthancException(ErrorCode_BadSequenceOfCalls);
     }
+    else
+    {
+      if (synchronousTarget_.get() == NULL)
+      {
+        if (asynchronousTarget_.get() != NULL)
+        {
+          // It is up to this method to create the asynchronous target
+          throw OrthancException(ErrorCode_InternalError);
+        }
+        else
+        {
+          OrthancConfiguration::ReaderLock lock;
+          asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile());
+          
+          assert(asynchronousTarget_.get() != NULL);
+          asynchronousTarget_->Touch();  // Make sure we can write to the temporary file
+          
+          writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_));
+          writer_->SetOutputFile(asynchronousTarget_->GetPath());
+        }
+      }
+      else
+      {
+        assert(synchronousTarget_.get() != NULL);
+    
+        writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_));
+        writer_->AcquireOutputStream(synchronousTarget_.release());
+      }
 
-    writer_.reset(new ZipWriterIterator(*target, context_, *archive_,
-                                        isMedia_, enableExtendedSopClass_));
-
-    instancesCount_ = writer_->GetInstancesCount();
-    uncompressedSize_ = writer_->GetUncompressedSize();
+      instancesCount_ = writer_->GetInstancesCount();
+      uncompressedSize_ = writer_->GetUncompressedSize();
+    }
   }
 
 
@@ -989,37 +1067,15 @@
   }
   
 
-  void ArchiveJob::RefreshArchiveSize()
-  {
-    if (synchronousTarget_.get() != NULL)
-    {
-      archiveSize_ = synchronousTarget_->GetFileSize();
-    }
-        
-    if (asynchronousTarget_.get() != NULL)
-    {
-      archiveSize_ = asynchronousTarget_->GetFileSize();
-    }
-  }
-    
-
   void ArchiveJob::FinalizeTarget()
   {
-    writer_.reset();  // Flush all the results
-
-    RefreshArchiveSize();
-
-    if (synchronousTarget_.get() != NULL)
+    if (writer_.get() != NULL)
     {
-      /**
-       * Synchronous behavior: Release the reference to the temporary
-       * file. It is now up to the caller to deal with the shared
-       * pointer. This is a fix in Orthanc 1.9.3.
-       * https://groups.google.com/g/orthanc-users/c/tpP2fkRAd9o/m/5SGpEHbGCQAJ
-       **/
-      synchronousTarget_.reset();
+      writer_->Close();  // Flush all the results
+      archiveSize_ = writer_->GetArchiveSize();
+      writer_.reset();
     }
-    
+
     if (asynchronousTarget_.get() != NULL)
     {
       // Asynchronous behavior: Move the resulting file into the media archive
@@ -1033,14 +1089,6 @@
   {
     assert(writer_.get() != NULL);
 
-    if (synchronousTarget_.get() != NULL &&
-        synchronousTarget_.unique())
-    {
-      LOG(WARNING) << "A client has disconnected while creating an archive";
-      return JobStepResult::Failure(ErrorCode_NetworkProtocol,
-                                    "A client has disconnected while creating an archive");
-    }
-
     if (writer_->GetStepsCount() == 0)
     {
       FinalizeTarget();
@@ -1048,7 +1096,16 @@
     }
     else
     {
-      writer_->RunStep(currentStep_, transcode_, transferSyntax_);
+      try
+      {
+        writer_->RunStep(currentStep_, transcode_, transferSyntax_);
+      }
+      catch (Orthanc::OrthancException& e)
+      {
+        LOG(ERROR) << "Error while creating an archive: " << e.What();
+        writer_->CancelStream();
+        throw;
+      }
 
       currentStep_ ++;
 
@@ -1059,7 +1116,7 @@
       }
       else
       {
-        RefreshArchiveSize();
+        archiveSize_ = writer_->GetArchiveSize();
         return JobStepResult::Continue();
       }
     }
@@ -1077,6 +1134,8 @@
         reason == JobStopReason_Failure ||
         reason == JobStopReason_Retry)
     {
+      writer_->CancelStream();
+      
       // First delete the writer, as it holds a reference to "(a)synchronousTarget_", cf. (*)
       writer_.reset();
       
@@ -1124,6 +1183,10 @@
     value[KEY_ARCHIVE_SIZE_MB] =
       static_cast<unsigned int>(archiveSize_ / MEGA_BYTES);
 
+    // New in Orthanc 1.9.4
+    value[KEY_ARCHIVE_SIZE] = boost::lexical_cast<std::string>(archiveSize_);
+    value[KEY_UNCOMPRESSED_SIZE] = boost::lexical_cast<std::string>(uncompressedSize_);
+
     if (transcode_)
     {
       value[KEY_TRANSCODE] = GetTransferSyntaxUid(transferSyntax_);