# HG changeset patch # User Sebastien Jodogne # Date 1622734815 -7200 # Node ID cdab941fe17d87fe14bed7a3c9f8055f3a67945c # Parent ac66afbdda584c3706bd6ac1a2e68820b5363452 ZIP archive/media generated in synchronous mode are now streamed by default diff -r ac66afbdda58 -r cdab941fe17d NEWS --- a/NEWS Wed Jun 02 18:01:17 2021 +0200 +++ b/NEWS Thu Jun 03 17:40:15 2021 +0200 @@ -1,9 +1,15 @@ Pending changes in the mainline =============================== +General +------- + +* New configuration option "SynchronousZipStream" to disable streaming of ZIP + REST API -------- +* ZIP archive/media generated in synchronous mode are now streamed by default * "Replace" tags in "/modify" and "/anonymize" now supports value representation AT * "/jobs/..." has new field "ErrorDetails" to help identify the cause of an error diff -r ac66afbdda58 -r cdab941fe17d OrthancServer/Resources/Configuration.json --- a/OrthancServer/Resources/Configuration.json Wed Jun 02 18:01:17 2021 +0200 +++ b/OrthancServer/Resources/Configuration.json Thu Jun 03 17:40:15 2021 +0200 @@ -800,5 +800,14 @@ // modifications in the case of multiple writers. The database // back-end must support this option, which is notably *not* yet the // case of the built-in SQLite index. (new in Orthanc 1.9.2) - "CheckRevisions" : false + "CheckRevisions" : false, + + // Whether Orthanc streams ZIP archive/media to the HTTP + // client. Setting this option to "false" corresponds to the + // behavior of Orthanc <= 1.9.3: The ZIP is first entirely written + // to a temporary file, then sent to the client (which necessitates + // disk space and might lead to HTTP timeouts on large archives). If + // set to "true", the chunks of the ZIP file are progressively sent + // as soon as one DICOM file gets compressed (new in Orthanc 1.9.4) + "SynchronousZipStream" : true } diff -r ac66afbdda58 -r cdab941fe17d OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp --- a/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp Wed Jun 02 18:01:17 2021 +0200 +++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestArchive.cpp Thu Jun 03 17:40:15 2021 +0200 @@ -34,13 +34,17 @@ #include "../PrecompiledHeadersServer.h" #include "OrthancRestApi.h" +#include "../../../OrthancFramework/Sources/Compression/ZipWriter.h" #include "../../../OrthancFramework/Sources/HttpServer/FilesystemHttpSender.h" +#include "../../../OrthancFramework/Sources/Logging.h" #include "../../../OrthancFramework/Sources/OrthancException.h" #include "../../../OrthancFramework/Sources/SerializationToolbox.h" #include "../OrthancConfiguration.h" #include "../ServerContext.h" #include "../ServerJobs/ArchiveJob.h" +#include + namespace Orthanc { @@ -150,6 +154,271 @@ } + namespace + { + class SynchronousZipChunk : public IDynamicObject + { + private: + std::string chunk_; + bool done_; + + public: + static SynchronousZipChunk* CreateDone() + { + std::unique_ptr item(new SynchronousZipChunk); + item->done_ = true; + return item.release(); + } + + static SynchronousZipChunk* CreateChunk(const std::string& chunk) + { + std::unique_ptr item(new SynchronousZipChunk); + item->done_ = false; + item->chunk_ = chunk; + return item.release(); + } + + bool IsDone() const + { + return done_; + } + + void SwapString(std::string& target) + { + if (done_) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + target.swap(chunk_); + } + } + }; + + + class SynchronousZipStream : public ZipWriter::IOutputStream + { + private: + boost::shared_ptr queue_; + uint64_t archiveSize_; + + public: + SynchronousZipStream(const boost::shared_ptr& queue) : + queue_(queue), + archiveSize_(0) + { + } + + uint64_t GetArchiveSize() const + { + return archiveSize_; + } + + virtual void Write(const std::string& chunk) ORTHANC_OVERRIDE + { + if (queue_.unique()) + { + throw OrthancException(ErrorCode_NetworkProtocol, + "HTTP client has disconnected while creating an archive in synchronous mode"); + } + else + { + queue_->Enqueue(SynchronousZipChunk::CreateChunk(chunk)); + archiveSize_ += chunk.size(); + } + } + + virtual void Close() ORTHANC_OVERRIDE + { + queue_->Enqueue(SynchronousZipChunk::CreateDone()); + } + }; + + + class SynchronousZipSender : public IHttpStreamAnswer + { + private: + ServerContext& context_; + std::string jobId_; + boost::shared_ptr queue_; + std::string filename_; + bool done_; + std::string chunk_; + + public: + SynchronousZipSender(ServerContext& context, + const std::string& jobId, + const boost::shared_ptr& queue, + const std::string& filename) : + context_(context), + jobId_(jobId), + queue_(queue), + filename_(filename), + done_(false) + { + } + + virtual HttpCompression SetupHttpCompression(bool gzipAllowed, + bool deflateAllowed) ORTHANC_OVERRIDE + { + // This function is not called by HttpOutput::AnswerWithoutBuffering() + throw OrthancException(ErrorCode_InternalError); + } + + virtual bool HasContentFilename(std::string& filename) ORTHANC_OVERRIDE + { + filename = filename_; + return true; + } + + virtual std::string GetContentType() ORTHANC_OVERRIDE + { + return EnumerationToString(MimeType_Zip); + } + + virtual uint64_t GetContentLength() ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); + } + + virtual bool ReadNextChunk() ORTHANC_OVERRIDE + { + for (;;) + { + std::unique_ptr obj(queue_->Dequeue(100)); + + if (obj.get() == NULL) + { + // Check that the job is still active, which indicates + // that more data might still be returned + JobState state; + if (context_.GetJobsEngine().GetRegistry().GetState(state, jobId_) && + (state != JobState_Pending || + state != JobState_Running || + state != JobState_Success)) + { + continue; + } + else + { + return false; + } + } + else + { + SynchronousZipChunk& item = dynamic_cast(*obj); + if (item.IsDone()) + { + done_ = true; + } + else + { + item.SwapString(chunk_); + done_ = false; + } + + return !done_; + } + } + } + + virtual const char* GetChunkContent() ORTHANC_OVERRIDE + { + if (done_) + { + throw OrthancException(ErrorCode_InternalError); + } + else + { + return (chunk_.empty() ? NULL : chunk_.c_str()); + } + } + + virtual size_t GetChunkSize() ORTHANC_OVERRIDE + { + if (done_) + { + throw OrthancException(ErrorCode_InternalError); + } + else + { + return chunk_.size(); + } + } + }; + + + class SynchronousTemporaryStream : public ZipWriter::IOutputStream + { + private: + boost::shared_ptr temp_; + boost::filesystem::ofstream file_; + uint64_t archiveSize_; + + public: + SynchronousTemporaryStream(const boost::shared_ptr& temp) : + temp_(temp), + archiveSize_(0) + { + file_.open(temp_->GetPath(), std::ofstream::out | std::ofstream::binary); + if (!file_.good()) + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + } + + uint64_t GetArchiveSize() const + { + return archiveSize_; + } + + virtual void Write(const std::string& chunk) ORTHANC_OVERRIDE + { + if (!chunk.empty()) + { + try + { + file_.write(chunk.c_str(), chunk.size()); + + if (!file_.good()) + { + file_.close(); + throw OrthancException(ErrorCode_CannotWriteFile); + } + } + catch (boost::filesystem::filesystem_error&) + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + catch (...) // To catch "std::system_error&" in C++11 + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + } + + archiveSize_ += chunk.size(); + } + + virtual void Close() ORTHANC_OVERRIDE + { + try + { + file_.close(); + } + catch (boost::filesystem::filesystem_error&) + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + catch (...) // To catch "std::system_error&" in C++11 + { + throw OrthancException(ErrorCode_CannotWriteFile); + } + } + }; + } + + static void SubmitJob(RestApiOutput& output, ServerContext& context, std::unique_ptr& job, @@ -166,26 +435,54 @@ if (synchronous) { - boost::shared_ptr tmp; - + bool streaming; + { OrthancConfiguration::ReaderLock lock; - tmp.reset(lock.GetConfiguration().CreateTemporaryFile()); + streaming = lock.GetConfiguration().GetBooleanParameter("SynchronousZipStream", true); // New in Orthanc 1.9.4 } - job->SetSynchronousTarget(tmp); - - Json::Value publicContent; - context.GetJobsEngine().GetRegistry().SubmitAndWait - (publicContent, job.release(), priority); - + if (streaming) + { + LOG(INFO) << "Streaming a ZIP archive"; + boost::shared_ptr queue(new SharedMessageQueue); + + job->AcquireSynchronousTarget(new SynchronousZipStream(queue)); + + std::string jobId; + context.GetJobsEngine().GetRegistry().Submit(jobId, job.release(), priority); + + SynchronousZipSender sender(context, jobId, queue, filename); + output.AnswerWithoutBuffering(sender); + + // If we reach this line, this means that + // "SynchronousZipSender::ReadNextChunk()" has returned "false" + } + else { - // The archive is now created: Prepare the sending of the ZIP file - FilesystemHttpSender sender(tmp->GetPath(), MimeType_Zip); - sender.SetContentFilename(filename); + // This was the only behavior in Orthanc <= 1.9.3 + LOG(INFO) << "Not streaming a ZIP archive (use of a temporary file)"; + boost::shared_ptr tmp; + + { + OrthancConfiguration::ReaderLock lock; + tmp.reset(lock.GetConfiguration().CreateTemporaryFile()); + } + + job->AcquireSynchronousTarget(new SynchronousTemporaryStream(tmp)); - // Send the ZIP - output.AnswerStream(sender); + Json::Value publicContent; + context.GetJobsEngine().GetRegistry().SubmitAndWait + (publicContent, job.release(), priority); + + { + // The archive is now created: Prepare the sending of the ZIP file + FilesystemHttpSender sender(tmp->GetPath(), MimeType_Zip); + sender.SetContentFilename(filename); + + // Send the ZIP + output.AnswerStream(sender); + } } } else @@ -201,12 +498,13 @@ { call.GetDocumentation() .SetRequestField("Synchronous", RestApiCallDocumentation::Type_Boolean, - "If `true`, create the archive in synchronous mode, which means that the HTTP answer will directly " - "contain the ZIP file. This is the default, easy behavior, but it is *not* be desirable to archive " - "large amount of data, as it might lead to network timeouts.", false) + "If `true`, create the archive in synchronous mode, which means that the HTTP answer will directly " + "contain the ZIP file. This is the default, easy behavior. However, if global configuration option " + "\"SynchronousZipStream\" is set to \"false\", asynchronous transfers should be prefered for " + "large amount of data, as the creation of the temporary file might lead to network timeouts.", false) .SetRequestField("Asynchronous", RestApiCallDocumentation::Type_Boolean, "If `true`, create the archive in asynchronous mode, which means that a job is submitted to create " - "the archive in background. Prefer this flavor wherever possible.", false) + "the archive in background.", false) .SetRequestField(KEY_TRANSCODE, RestApiCallDocumentation::Type_String, "If present, the DICOM files in the archive will be transcoded to the provided " "transfer syntax: https://book.orthanc-server.com/faq/transcoding.html", false) diff -r ac66afbdda58 -r cdab941fe17d OrthancServer/Sources/ServerJobs/ArchiveJob.cpp --- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Wed Jun 02 18:01:17 2021 +0200 +++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Thu Jun 03 17:40:15 2021 +0200 @@ -57,6 +57,8 @@ static const char* const KEY_INSTANCES_COUNT = "InstancesCount"; static const char* const KEY_UNCOMPRESSED_SIZE_MB = "UncompressedSizeMB"; static const char* const KEY_ARCHIVE_SIZE_MB = "ArchiveSizeMB"; +static const char* const KEY_UNCOMPRESSED_SIZE = "UncompressedSize"; +static const char* const KEY_ARCHIVE_SIZE = "ArchiveSize"; static const char* const KEY_TRANSCODE = "Transcode"; @@ -749,15 +751,16 @@ std::unique_ptr zip_; std::unique_ptr dicomDir_; bool isMedia_; + bool isStream_; public: - ZipWriterIterator(const TemporaryFile& target, - ServerContext& context, + ZipWriterIterator(ServerContext& context, ArchiveIndex& archive, bool isMedia, bool enableExtendedSopClass) : context_(context), - isMedia_(isMedia) + isMedia_(isMedia), + isStream_(false) { if (isMedia) { @@ -777,11 +780,73 @@ archive.Expand(context.GetIndex()); archive.Apply(visitor); } + } - zip_.reset(new HierarchicalZipWriter(target.GetPath().c_str())); - zip_->SetZip64(commands_.IsZip64()); + void SetOutputFile(const std::string& path) + { + if (zip_.get() == NULL) + { + zip_.reset(new HierarchicalZipWriter(path.c_str())); + zip_->SetZip64(commands_.IsZip64()); + isStream_ = false; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + void AcquireOutputStream(ZipWriter::IOutputStream* output) + { + std::unique_ptr protection(output); + + if (zip_.get() == NULL) + { + zip_.reset(new HierarchicalZipWriter(protection.release(), commands_.IsZip64())); + isStream_ = true; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } } - + + void CancelStream() + { + if (zip_.get() == NULL) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else if (isStream_) + { + zip_->CancelStream(); + } + } + + void Close() + { + if (zip_.get() == NULL) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + zip_->Close(); + } + } + + uint64_t GetArchiveSize() const + { + if (zip_.get() == NULL) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return zip_->GetArchiveSize(); + } + } + size_t GetStepsCount() const { return commands_.GetSize() + 1; @@ -795,6 +860,10 @@ { throw OrthancException(ErrorCode_ParameterOutOfRange); } + else if (zip_.get() == NULL) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } else if (index == commands_.GetSize()) { // Last step: Add the DICOMDIR @@ -862,9 +931,11 @@ } - void ArchiveJob::SetSynchronousTarget(boost::shared_ptr& target) + void ArchiveJob::AcquireSynchronousTarget(ZipWriter::IOutputStream* target) { - if (target.get() == NULL) + std::unique_ptr protection(target); + + if (target == NULL) { throw OrthancException(ErrorCode_NullPointer); } @@ -876,7 +947,7 @@ } else { - synchronousTarget_ = target; + synchronousTarget_.reset(protection.release()); } } @@ -931,35 +1002,42 @@ void ArchiveJob::Start() { - TemporaryFile* target = NULL; // (*) - - if (synchronousTarget_.get() == NULL) - { - { - OrthancConfiguration::ReaderLock lock; - asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); - } - - target = asynchronousTarget_.get(); - } - else - { - target = synchronousTarget_.get(); - } - - assert(target != NULL); - target->Touch(); // Make sure we can write to the temporary file - if (writer_.get() != NULL) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } + else + { + if (synchronousTarget_.get() == NULL) + { + if (asynchronousTarget_.get() != NULL) + { + // It is up to this method to create the asynchronous target + throw OrthancException(ErrorCode_InternalError); + } + else + { + OrthancConfiguration::ReaderLock lock; + asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); + + assert(asynchronousTarget_.get() != NULL); + asynchronousTarget_->Touch(); // Make sure we can write to the temporary file + + writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_->SetOutputFile(asynchronousTarget_->GetPath()); + } + } + else + { + assert(synchronousTarget_.get() != NULL); + + writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_->AcquireOutputStream(synchronousTarget_.release()); + } - writer_.reset(new ZipWriterIterator(*target, context_, *archive_, - isMedia_, enableExtendedSopClass_)); - - instancesCount_ = writer_->GetInstancesCount(); - uncompressedSize_ = writer_->GetUncompressedSize(); + instancesCount_ = writer_->GetInstancesCount(); + uncompressedSize_ = writer_->GetUncompressedSize(); + } } @@ -989,37 +1067,15 @@ } - void ArchiveJob::RefreshArchiveSize() - { - if (synchronousTarget_.get() != NULL) - { - archiveSize_ = synchronousTarget_->GetFileSize(); - } - - if (asynchronousTarget_.get() != NULL) - { - archiveSize_ = asynchronousTarget_->GetFileSize(); - } - } - - void ArchiveJob::FinalizeTarget() { - writer_.reset(); // Flush all the results - - RefreshArchiveSize(); - - if (synchronousTarget_.get() != NULL) + if (writer_.get() != NULL) { - /** - * Synchronous behavior: Release the reference to the temporary - * file. It is now up to the caller to deal with the shared - * pointer. This is a fix in Orthanc 1.9.3. - * https://groups.google.com/g/orthanc-users/c/tpP2fkRAd9o/m/5SGpEHbGCQAJ - **/ - synchronousTarget_.reset(); + writer_->Close(); // Flush all the results + archiveSize_ = writer_->GetArchiveSize(); + writer_.reset(); } - + if (asynchronousTarget_.get() != NULL) { // Asynchronous behavior: Move the resulting file into the media archive @@ -1033,14 +1089,6 @@ { assert(writer_.get() != NULL); - if (synchronousTarget_.get() != NULL && - synchronousTarget_.unique()) - { - LOG(WARNING) << "A client has disconnected while creating an archive"; - return JobStepResult::Failure(ErrorCode_NetworkProtocol, - "A client has disconnected while creating an archive"); - } - if (writer_->GetStepsCount() == 0) { FinalizeTarget(); @@ -1048,7 +1096,16 @@ } else { - writer_->RunStep(currentStep_, transcode_, transferSyntax_); + try + { + writer_->RunStep(currentStep_, transcode_, transferSyntax_); + } + catch (Orthanc::OrthancException& e) + { + LOG(ERROR) << "Error while creating an archive: " << e.What(); + writer_->CancelStream(); + throw; + } currentStep_ ++; @@ -1059,7 +1116,7 @@ } else { - RefreshArchiveSize(); + archiveSize_ = writer_->GetArchiveSize(); return JobStepResult::Continue(); } } @@ -1077,6 +1134,8 @@ reason == JobStopReason_Failure || reason == JobStopReason_Retry) { + writer_->CancelStream(); + // First delete the writer, as it holds a reference to "(a)synchronousTarget_", cf. (*) writer_.reset(); @@ -1124,6 +1183,10 @@ value[KEY_ARCHIVE_SIZE_MB] = static_cast(archiveSize_ / MEGA_BYTES); + // New in Orthanc 1.9.4 + value[KEY_ARCHIVE_SIZE] = boost::lexical_cast(archiveSize_); + value[KEY_UNCOMPRESSED_SIZE] = boost::lexical_cast(uncompressedSize_); + if (transcode_) { value[KEY_TRANSCODE] = GetTransferSyntaxUid(transferSyntax_); diff -r ac66afbdda58 -r cdab941fe17d OrthancServer/Sources/ServerJobs/ArchiveJob.h --- a/OrthancServer/Sources/ServerJobs/ArchiveJob.h Wed Jun 02 18:01:17 2021 +0200 +++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.h Thu Jun 03 17:40:15 2021 +0200 @@ -34,6 +34,7 @@ #pragma once #include "../../../OrthancFramework/Sources/Compatibility.h" +#include "../../../OrthancFramework/Sources/Compression/ZipWriter.h" #include "../../../OrthancFramework/Sources/JobsEngine/IJob.h" #include "../../../OrthancFramework/Sources/TemporaryFile.h" @@ -55,7 +56,7 @@ class ZipCommands; class ZipWriterIterator; - boost::shared_ptr synchronousTarget_; + std::unique_ptr synchronousTarget_; // Only valid before "Start()" std::unique_ptr asynchronousTarget_; ServerContext& context_; boost::shared_ptr archive_; @@ -74,8 +75,6 @@ bool transcode_; DicomTransferSyntax transferSyntax_; - void RefreshArchiveSize(); - void FinalizeTarget(); public: @@ -84,8 +83,8 @@ bool enableExtendedSopClass); virtual ~ArchiveJob(); - - void SetSynchronousTarget(boost::shared_ptr& synchronousTarget); + + void AcquireSynchronousTarget(ZipWriter::IOutputStream* synchronousTarget); void SetDescription(const std::string& description);