diff OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp @ 4674:cdab941fe17d

ZIP archive/media generated in synchronous mode are now streamed by default
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 03 Jun 2021 17:40:15 +0200
parents d9473bd5ed43
children 13efc0967cea
line wrap: on
line diff
--- a/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp	Wed Jun 02 18:01:17 2021 +0200
+++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp	Thu Jun 03 17:40:15 2021 +0200
@@ -34,13 +34,17 @@
 #include "../PrecompiledHeadersServer.h"
 #include "OrthancRestApi.h"
 
+#include "../../../OrthancFramework/Sources/Compression/ZipWriter.h"
 #include "../../../OrthancFramework/Sources/HttpServer/FilesystemHttpSender.h"
+#include "../../../OrthancFramework/Sources/Logging.h"
 #include "../../../OrthancFramework/Sources/OrthancException.h"
 #include "../../../OrthancFramework/Sources/SerializationToolbox.h"
 #include "../OrthancConfiguration.h"
 #include "../ServerContext.h"
 #include "../ServerJobs/ArchiveJob.h"
 
+#include <boost/filesystem/fstream.hpp>
+
 
 namespace Orthanc
 {
@@ -150,6 +154,271 @@
   }
 
 
+  namespace
+  {
+    class SynchronousZipChunk : public IDynamicObject
+    {
+    private:
+      std::string  chunk_;
+      bool         done_;
+
+    public:
+      static SynchronousZipChunk* CreateDone()
+      {
+        std::unique_ptr<SynchronousZipChunk> item(new SynchronousZipChunk);
+        item->done_ = true;
+        return item.release();
+      }
+
+      static SynchronousZipChunk* CreateChunk(const std::string& chunk)
+      {
+        std::unique_ptr<SynchronousZipChunk> item(new SynchronousZipChunk);
+        item->done_ = false;
+        item->chunk_ = chunk;
+        return item.release();
+      }
+
+      bool IsDone() const
+      {
+        return done_;
+      }
+
+      void SwapString(std::string& target)
+      {
+        if (done_)
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else
+        {
+          target.swap(chunk_);
+        }
+      }
+    };
+
+    
+    class SynchronousZipStream : public ZipWriter::IOutputStream
+    {
+    private:
+      boost::shared_ptr<SharedMessageQueue>  queue_;
+      uint64_t                               archiveSize_;
+
+    public:
+      SynchronousZipStream(const boost::shared_ptr<SharedMessageQueue>& queue) :
+        queue_(queue),
+        archiveSize_(0)
+      {
+      }
+
+      uint64_t GetArchiveSize() const
+      {
+        return archiveSize_;
+      }
+
+      virtual void Write(const std::string& chunk) ORTHANC_OVERRIDE
+      {
+        if (queue_.unique())
+        {
+          throw OrthancException(ErrorCode_NetworkProtocol,
+                                 "HTTP client has disconnected while creating an archive in synchronous mode");
+        }
+        else
+        {
+          queue_->Enqueue(SynchronousZipChunk::CreateChunk(chunk));
+          archiveSize_ += chunk.size();
+        }
+      }
+
+      virtual void Close() ORTHANC_OVERRIDE
+      {
+        queue_->Enqueue(SynchronousZipChunk::CreateDone());
+      }
+    };
+
+
+    class SynchronousZipSender : public IHttpStreamAnswer
+    {
+    private:
+      ServerContext&                         context_;
+      std::string                            jobId_;
+      boost::shared_ptr<SharedMessageQueue>  queue_;
+      std::string                            filename_;
+      bool                                   done_;
+      std::string                            chunk_;
+
+    public:
+      SynchronousZipSender(ServerContext& context,
+                           const std::string& jobId,
+                           const boost::shared_ptr<SharedMessageQueue>& queue,
+                           const std::string& filename) :
+        context_(context),
+        jobId_(jobId),
+        queue_(queue),
+        filename_(filename),
+        done_(false)
+      {
+      }
+
+      virtual HttpCompression SetupHttpCompression(bool gzipAllowed,
+                                                   bool deflateAllowed) ORTHANC_OVERRIDE
+      {
+        // This function is not called by HttpOutput::AnswerWithoutBuffering()
+        throw OrthancException(ErrorCode_InternalError);
+      }
+
+      virtual bool HasContentFilename(std::string& filename) ORTHANC_OVERRIDE
+      {
+        filename = filename_;
+        return true;
+      }
+
+      virtual std::string GetContentType() ORTHANC_OVERRIDE
+      {
+        return EnumerationToString(MimeType_Zip);
+      }
+
+      virtual uint64_t GetContentLength() ORTHANC_OVERRIDE
+      {
+        throw OrthancException(ErrorCode_InternalError);
+      }
+
+      virtual bool ReadNextChunk() ORTHANC_OVERRIDE
+      {
+        for (;;)
+        {
+          std::unique_ptr<IDynamicObject> obj(queue_->Dequeue(100));
+        
+          if (obj.get() == NULL)
+          {
+            // Check that the job is still active, which indicates
+            // that more data might still be returned
+            JobState state;
+            if (context_.GetJobsEngine().GetRegistry().GetState(state, jobId_) &&
+                (state != JobState_Pending ||
+                 state != JobState_Running ||
+                 state != JobState_Success))
+            {
+              continue;
+            }
+            else
+            {
+              return false;
+            }
+          }
+          else
+          {
+            SynchronousZipChunk& item = dynamic_cast<SynchronousZipChunk&>(*obj);
+            if (item.IsDone())
+            {
+              done_ = true;
+            }
+            else
+            {
+              item.SwapString(chunk_);
+              done_ = false;
+            }
+
+            return !done_;
+          }
+        }
+      }
+
+      virtual const char* GetChunkContent() ORTHANC_OVERRIDE
+      {
+        if (done_)
+        {
+          throw OrthancException(ErrorCode_InternalError);
+        }
+        else
+        {
+          return (chunk_.empty() ? NULL : chunk_.c_str());
+        }
+      }
+      
+      virtual size_t GetChunkSize() ORTHANC_OVERRIDE
+      {
+        if (done_)
+        {
+          throw OrthancException(ErrorCode_InternalError);
+        }
+        else
+        {
+          return chunk_.size();
+        }
+      }
+    };
+
+    
+    class SynchronousTemporaryStream : public ZipWriter::IOutputStream
+    {
+    private:
+      boost::shared_ptr<TemporaryFile>  temp_;
+      boost::filesystem::ofstream       file_;
+      uint64_t                          archiveSize_;
+
+    public:
+      SynchronousTemporaryStream(const boost::shared_ptr<TemporaryFile>& temp) :
+        temp_(temp),
+        archiveSize_(0)
+      {
+        file_.open(temp_->GetPath(), std::ofstream::out | std::ofstream::binary);
+        if (!file_.good())
+        {
+          throw OrthancException(ErrorCode_CannotWriteFile);
+        }
+      }
+      
+      uint64_t GetArchiveSize() const
+      {
+        return archiveSize_;
+      }
+
+      virtual void Write(const std::string& chunk) ORTHANC_OVERRIDE
+      {
+        if (!chunk.empty())
+        {
+          try
+          {
+            file_.write(chunk.c_str(), chunk.size());
+            
+            if (!file_.good())
+            {
+              file_.close();
+              throw OrthancException(ErrorCode_CannotWriteFile);
+            }
+          }
+          catch (boost::filesystem::filesystem_error&)
+          {
+            throw OrthancException(ErrorCode_CannotWriteFile);
+          }
+          catch (...)  // To catch "std::system_error&" in C++11
+          {
+            throw OrthancException(ErrorCode_CannotWriteFile);
+          }
+        }
+        
+        archiveSize_ += chunk.size();
+      }
+
+      virtual void Close() ORTHANC_OVERRIDE
+      {
+        try
+        {
+          file_.close();
+        }
+        catch (boost::filesystem::filesystem_error&)
+        {
+          throw OrthancException(ErrorCode_CannotWriteFile);
+        }
+        catch (...)  // To catch "std::system_error&" in C++11
+        {
+          throw OrthancException(ErrorCode_CannotWriteFile);
+        }
+      }
+    };
+  }
+
+  
   static void SubmitJob(RestApiOutput& output,
                         ServerContext& context,
                         std::unique_ptr<ArchiveJob>& job,
@@ -166,26 +435,54 @@
 
     if (synchronous)
     {
-      boost::shared_ptr<TemporaryFile> tmp;
-
+      bool streaming;
+      
       {
         OrthancConfiguration::ReaderLock lock;
-        tmp.reset(lock.GetConfiguration().CreateTemporaryFile());
+        streaming = lock.GetConfiguration().GetBooleanParameter("SynchronousZipStream", true);  // New in Orthanc 1.9.4
       }
 
-      job->SetSynchronousTarget(tmp);
-    
-      Json::Value publicContent;
-      context.GetJobsEngine().GetRegistry().SubmitAndWait
-        (publicContent, job.release(), priority);
-      
+      if (streaming)
+      {
+        LOG(INFO) << "Streaming a ZIP archive";
+        boost::shared_ptr<SharedMessageQueue> queue(new SharedMessageQueue);
+
+        job->AcquireSynchronousTarget(new SynchronousZipStream(queue));
+
+        std::string jobId;
+        context.GetJobsEngine().GetRegistry().Submit(jobId, job.release(), priority);
+
+        SynchronousZipSender sender(context, jobId, queue, filename);
+        output.AnswerWithoutBuffering(sender);
+
+        // If we reach this line, this means that
+        // "SynchronousZipSender::ReadNextChunk()" has returned "false"
+      }
+      else
       {
-        // The archive is now created: Prepare the sending of the ZIP file
-        FilesystemHttpSender sender(tmp->GetPath(), MimeType_Zip);
-        sender.SetContentFilename(filename);
+        // This was the only behavior in Orthanc <= 1.9.3
+        LOG(INFO) << "Not streaming a ZIP archive (use of a temporary file)";
+        boost::shared_ptr<TemporaryFile> tmp;
+
+        {
+          OrthancConfiguration::ReaderLock lock;
+          tmp.reset(lock.GetConfiguration().CreateTemporaryFile());
+        }
+
+        job->AcquireSynchronousTarget(new SynchronousTemporaryStream(tmp));
 
-        // Send the ZIP
-        output.AnswerStream(sender);
+        Json::Value publicContent;
+        context.GetJobsEngine().GetRegistry().SubmitAndWait
+          (publicContent, job.release(), priority);
+      
+        {
+          // The archive is now created: Prepare the sending of the ZIP file
+          FilesystemHttpSender sender(tmp->GetPath(), MimeType_Zip);
+          sender.SetContentFilename(filename);
+
+          // Send the ZIP
+          output.AnswerStream(sender);
+        }
       }
     }
     else
@@ -201,12 +498,13 @@
   {
     call.GetDocumentation()
       .SetRequestField("Synchronous", RestApiCallDocumentation::Type_Boolean,
-                     "If `true`, create the archive in synchronous mode, which means that the HTTP answer will directly "
-                     "contain the ZIP file. This is the default, easy behavior, but it is *not* be desirable to archive "
-                     "large amount of data, as it might lead to network timeouts.", false)
+                       "If `true`, create the archive in synchronous mode, which means that the HTTP answer will directly "
+                       "contain the ZIP file. This is the default, easy behavior. However, if global configuration option "
+                       "\"SynchronousZipStream\" is set to \"false\", asynchronous transfers should be prefered for "
+                       "large amount of data, as the creation of the temporary file might lead to network timeouts.", false)
       .SetRequestField("Asynchronous", RestApiCallDocumentation::Type_Boolean,
                        "If `true`, create the archive in asynchronous mode, which means that a job is submitted to create "
-                       "the archive in background. Prefer this flavor wherever possible.", false)
+                       "the archive in background.", false)
       .SetRequestField(KEY_TRANSCODE, RestApiCallDocumentation::Type_String,
                        "If present, the DICOM files in the archive will be transcoded to the provided "
                        "transfer syntax: https://book.orthanc-server.com/faq/transcoding.html", false)