Mercurial > hg > orthanc
diff OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp @ 4674:cdab941fe17d
ZIP archive/media generated in synchronous mode are now streamed by default
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 03 Jun 2021 17:40:15 +0200 |
parents | d9473bd5ed43 |
children | 13efc0967cea |
line wrap: on
line diff
--- a/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp Wed Jun 02 18:01:17 2021 +0200 +++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp Thu Jun 03 17:40:15 2021 +0200 @@ -34,13 +34,17 @@ #include "../PrecompiledHeadersServer.h" #include "OrthancRestApi.h" +#include "../../../OrthancFramework/Sources/Compression/ZipWriter.h" #include "../../../OrthancFramework/Sources/HttpServer/FilesystemHttpSender.h" +#include "../../../OrthancFramework/Sources/Logging.h" #include "../../../OrthancFramework/Sources/OrthancException.h" #include "../../../OrthancFramework/Sources/SerializationToolbox.h" #include "../OrthancConfiguration.h" #include "../ServerContext.h" #include "../ServerJobs/ArchiveJob.h" +#include <boost/filesystem/fstream.hpp> + namespace Orthanc { @@ -150,6 +154,271 @@ } + namespace + { + class SynchronousZipChunk : public IDynamicObject + { + private: + std::string chunk_; + bool done_; + + public: + static SynchronousZipChunk* CreateDone() + { + std::unique_ptr<SynchronousZipChunk> item(new SynchronousZipChunk); + item->done_ = true; + return item.release(); + } + + static SynchronousZipChunk* CreateChunk(const std::string& chunk) + { + std::unique_ptr<SynchronousZipChunk> item(new SynchronousZipChunk); + item->done_ = false; + item->chunk_ = chunk; + return item.release(); + } + + bool IsDone() const + { + return done_; + } + + void SwapString(std::string& target) + { + if (done_) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + target.swap(chunk_); + } + } + }; + + + class SynchronousZipStream : public ZipWriter::IOutputStream + { + private: + boost::shared_ptr<SharedMessageQueue> queue_; + uint64_t archiveSize_; + + public: + SynchronousZipStream(const boost::shared_ptr<SharedMessageQueue>& queue) : + queue_(queue), + archiveSize_(0) + { + } + + uint64_t GetArchiveSize() const + { + return archiveSize_; + } + + virtual void Write(const std::string& chunk) ORTHANC_OVERRIDE + { + if (queue_.unique()) + { + throw OrthancException(ErrorCode_NetworkProtocol, + "HTTP client has disconnected while creating an archive in synchronous mode"); + } + else + { + queue_->Enqueue(SynchronousZipChunk::CreateChunk(chunk)); + archiveSize_ += chunk.size(); + } + } + + virtual void Close() ORTHANC_OVERRIDE + { + queue_->Enqueue(SynchronousZipChunk::CreateDone()); + } + }; + + + class SynchronousZipSender : public IHttpStreamAnswer + { + private: + ServerContext& context_; + std::string jobId_; + boost::shared_ptr<SharedMessageQueue> queue_; + std::string filename_; + bool done_; + std::string chunk_; + + public: + SynchronousZipSender(ServerContext& context, + const std::string& jobId, + const boost::shared_ptr<SharedMessageQueue>& queue, + const std::string& filename) : + context_(context), + jobId_(jobId), + queue_(queue), + filename_(filename), + done_(false) + { + } + + virtual HttpCompression SetupHttpCompression(bool gzipAllowed, + bool deflateAllowed) ORTHANC_OVERRIDE + { + // This function is not called by HttpOutput::AnswerWithoutBuffering() + throw OrthancException(ErrorCode_InternalError); + } + + virtual bool HasContentFilename(std::string& filename) ORTHANC_OVERRIDE + { + filename = filename_; + return true; + } + + virtual std::string GetContentType() ORTHANC_OVERRIDE + { + return EnumerationToString(MimeType_Zip); + } + + virtual uint64_t GetContentLength() ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); + } + + virtual bool ReadNextChunk() ORTHANC_OVERRIDE + { + for (;;) + { + std::unique_ptr<IDynamicObject> obj(queue_->Dequeue(100)); + + if (obj.get() == NULL) + { + // Check that the job is still active, which indicates + // that more data might still be returned + JobState state; + if (context_.GetJobsEngine().GetRegistry().GetState(state, jobId_) && + (state != JobState_Pending || + state != JobState_Running || + state != JobState_Success)) + { + continue; + } + else + { + return false; + } + } + else + { + SynchronousZipChunk& item = dynamic_cast<SynchronousZipChunk&>(*obj); + if (item.IsDone()) + { + done_ = true; + } + else + { + item.SwapString(chunk_); + done_ = false; + } + + return !done_; + } + } + } + + virtual const char* GetChunkContent() ORTHANC_OVERRIDE + { + if (done_) + { + throw OrthancException(ErrorCode_InternalError); + } + else + { + return (chunk_.empty() ? NULL : chunk_.c_str()); + } + } + + virtual size_t GetChunkSize() ORTHANC_OVERRIDE + { + if (done_) + { + throw OrthancException(ErrorCode_InternalError); + } + else + { + return chunk_.size(); + } + } + }; + + + class SynchronousTemporaryStream : public ZipWriter::IOutputStream + { + private: + boost::shared_ptr<TemporaryFile> temp_; + boost::filesystem::ofstream file_; + uint64_t archiveSize_; + + public: + SynchronousTemporaryStream(const boost::shared_ptr<TemporaryFile>& temp) : + temp_(temp), + archiveSize_(0) + { + file_.open(temp_->GetPath(), std::ofstream::out | std::ofstream::binary); + if (!file_.good()) + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + } + + uint64_t GetArchiveSize() const + { + return archiveSize_; + } + + virtual void Write(const std::string& chunk) ORTHANC_OVERRIDE + { + if (!chunk.empty()) + { + try + { + file_.write(chunk.c_str(), chunk.size()); + + if (!file_.good()) + { + file_.close(); + throw OrthancException(ErrorCode_CannotWriteFile); + } + } + catch (boost::filesystem::filesystem_error&) + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + catch (...) // To catch "std::system_error&" in C++11 + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + } + + archiveSize_ += chunk.size(); + } + + virtual void Close() ORTHANC_OVERRIDE + { + try + { + file_.close(); + } + catch (boost::filesystem::filesystem_error&) + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + catch (...) // To catch "std::system_error&" in C++11 + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + } + }; + } + + static void SubmitJob(RestApiOutput& output, ServerContext& context, std::unique_ptr<ArchiveJob>& job, @@ -166,26 +435,54 @@ if (synchronous) { - boost::shared_ptr<TemporaryFile> tmp; - + bool streaming; + { OrthancConfiguration::ReaderLock lock; - tmp.reset(lock.GetConfiguration().CreateTemporaryFile()); + streaming = lock.GetConfiguration().GetBooleanParameter("SynchronousZipStream", true); // New in Orthanc 1.9.4 } - job->SetSynchronousTarget(tmp); - - Json::Value publicContent; - context.GetJobsEngine().GetRegistry().SubmitAndWait - (publicContent, job.release(), priority); - + if (streaming) + { + LOG(INFO) << "Streaming a ZIP archive"; + boost::shared_ptr<SharedMessageQueue> queue(new SharedMessageQueue); + + job->AcquireSynchronousTarget(new SynchronousZipStream(queue)); + + std::string jobId; + context.GetJobsEngine().GetRegistry().Submit(jobId, job.release(), priority); + + SynchronousZipSender sender(context, jobId, queue, filename); + output.AnswerWithoutBuffering(sender); + + // If we reach this line, this means that + // "SynchronousZipSender::ReadNextChunk()" has returned "false" + } + else { - // The archive is now created: Prepare the sending of the ZIP file - FilesystemHttpSender sender(tmp->GetPath(), MimeType_Zip); - sender.SetContentFilename(filename); + // This was the only behavior in Orthanc <= 1.9.3 + LOG(INFO) << "Not streaming a ZIP archive (use of a temporary file)"; + boost::shared_ptr<TemporaryFile> tmp; + + { + OrthancConfiguration::ReaderLock lock; + tmp.reset(lock.GetConfiguration().CreateTemporaryFile()); + } + + job->AcquireSynchronousTarget(new SynchronousTemporaryStream(tmp)); - // Send the ZIP - output.AnswerStream(sender); + Json::Value publicContent; + context.GetJobsEngine().GetRegistry().SubmitAndWait + (publicContent, job.release(), priority); + + { + // The archive is now created: Prepare the sending of the ZIP file + FilesystemHttpSender sender(tmp->GetPath(), MimeType_Zip); + sender.SetContentFilename(filename); + + // Send the ZIP + output.AnswerStream(sender); + } } } else @@ -201,12 +498,13 @@ { call.GetDocumentation() .SetRequestField("Synchronous", RestApiCallDocumentation::Type_Boolean, - "If `true`, create the archive in synchronous mode, which means that the HTTP answer will directly " - "contain the ZIP file. This is the default, easy behavior, but it is *not* be desirable to archive " - "large amount of data, as it might lead to network timeouts.", false) + "If `true`, create the archive in synchronous mode, which means that the HTTP answer will directly " + "contain the ZIP file. This is the default, easy behavior. However, if global configuration option " + "\"SynchronousZipStream\" is set to \"false\", asynchronous transfers should be prefered for " + "large amount of data, as the creation of the temporary file might lead to network timeouts.", false) .SetRequestField("Asynchronous", RestApiCallDocumentation::Type_Boolean, "If `true`, create the archive in asynchronous mode, which means that a job is submitted to create " - "the archive in background. Prefer this flavor wherever possible.", false) + "the archive in background.", false) .SetRequestField(KEY_TRANSCODE, RestApiCallDocumentation::Type_String, "If present, the DICOM files in the archive will be transcoded to the provided " "transfer syntax: https://book.orthanc-server.com/faq/transcoding.html", false)