Mercurial > hg > orthanc
diff OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 4674:cdab941fe17d
ZIP archive/media generated in synchronous mode are now streamed by default
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 03 Jun 2021 17:40:15 +0200 |
parents | b02dc8303cf6 |
children | f0038043fb97 4e765c18ace7 0a38000b086d |
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Wed Jun 02 18:01:17 2021 +0200 +++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Thu Jun 03 17:40:15 2021 +0200 @@ -57,6 +57,8 @@ static const char* const KEY_INSTANCES_COUNT = "InstancesCount"; static const char* const KEY_UNCOMPRESSED_SIZE_MB = "UncompressedSizeMB"; static const char* const KEY_ARCHIVE_SIZE_MB = "ArchiveSizeMB"; +static const char* const KEY_UNCOMPRESSED_SIZE = "UncompressedSize"; +static const char* const KEY_ARCHIVE_SIZE = "ArchiveSize"; static const char* const KEY_TRANSCODE = "Transcode"; @@ -749,15 +751,16 @@ std::unique_ptr<HierarchicalZipWriter> zip_; std::unique_ptr<DicomDirWriter> dicomDir_; bool isMedia_; + bool isStream_; public: - ZipWriterIterator(const TemporaryFile& target, - ServerContext& context, + ZipWriterIterator(ServerContext& context, ArchiveIndex& archive, bool isMedia, bool enableExtendedSopClass) : context_(context), - isMedia_(isMedia) + isMedia_(isMedia), + isStream_(false) { if (isMedia) { @@ -777,11 +780,73 @@ archive.Expand(context.GetIndex()); archive.Apply(visitor); } + } - zip_.reset(new HierarchicalZipWriter(target.GetPath().c_str())); - zip_->SetZip64(commands_.IsZip64()); + void SetOutputFile(const std::string& path) + { + if (zip_.get() == NULL) + { + zip_.reset(new HierarchicalZipWriter(path.c_str())); + zip_->SetZip64(commands_.IsZip64()); + isStream_ = false; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + void AcquireOutputStream(ZipWriter::IOutputStream* output) + { + std::unique_ptr<ZipWriter::IOutputStream> protection(output); + + if (zip_.get() == NULL) + { + zip_.reset(new HierarchicalZipWriter(protection.release(), commands_.IsZip64())); + isStream_ = true; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } } - + + void CancelStream() + { + if (zip_.get() == NULL) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else if (isStream_) + { + zip_->CancelStream(); + } + } + + void Close() + { + if (zip_.get() == NULL) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + zip_->Close(); + } + } + + uint64_t GetArchiveSize() const + { + if (zip_.get() == NULL) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return zip_->GetArchiveSize(); + } + } + size_t GetStepsCount() const { return commands_.GetSize() + 1; @@ -795,6 +860,10 @@ { throw OrthancException(ErrorCode_ParameterOutOfRange); } + else if (zip_.get() == NULL) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } else if (index == commands_.GetSize()) { // Last step: Add the DICOMDIR @@ -862,9 +931,11 @@ } - void ArchiveJob::SetSynchronousTarget(boost::shared_ptr<TemporaryFile>& target) + void ArchiveJob::AcquireSynchronousTarget(ZipWriter::IOutputStream* target) { - if (target.get() == NULL) + std::unique_ptr<ZipWriter::IOutputStream> protection(target); + + if (target == NULL) { throw OrthancException(ErrorCode_NullPointer); } @@ -876,7 +947,7 @@ } else { - synchronousTarget_ = target; + synchronousTarget_.reset(protection.release()); } } @@ -931,35 +1002,42 @@ void ArchiveJob::Start() { - TemporaryFile* target = NULL; // (*) - - if (synchronousTarget_.get() == NULL) - { - { - OrthancConfiguration::ReaderLock lock; - asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); - } - - target = asynchronousTarget_.get(); - } - else - { - target = synchronousTarget_.get(); - } - - assert(target != NULL); - target->Touch(); // Make sure we can write to the temporary file - if (writer_.get() != NULL) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } + else + { + if (synchronousTarget_.get() == NULL) + { + if (asynchronousTarget_.get() != NULL) + { + // It is up to this method to create the asynchronous target + throw OrthancException(ErrorCode_InternalError); + } + else + { + OrthancConfiguration::ReaderLock lock; + asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); + + assert(asynchronousTarget_.get() != NULL); + asynchronousTarget_->Touch(); // Make sure we can write to the temporary file + + writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_->SetOutputFile(asynchronousTarget_->GetPath()); + } + } + else + { + assert(synchronousTarget_.get() != NULL); + + writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); + writer_->AcquireOutputStream(synchronousTarget_.release()); + } - writer_.reset(new ZipWriterIterator(*target, context_, *archive_, - isMedia_, enableExtendedSopClass_)); - - instancesCount_ = writer_->GetInstancesCount(); - uncompressedSize_ = writer_->GetUncompressedSize(); + instancesCount_ = writer_->GetInstancesCount(); + uncompressedSize_ = writer_->GetUncompressedSize(); + } } @@ -989,37 +1067,15 @@ } - void ArchiveJob::RefreshArchiveSize() - { - if (synchronousTarget_.get() != NULL) - { - archiveSize_ = synchronousTarget_->GetFileSize(); - } - - if (asynchronousTarget_.get() != NULL) - { - archiveSize_ = asynchronousTarget_->GetFileSize(); - } - } - - void ArchiveJob::FinalizeTarget() { - writer_.reset(); // Flush all the results - - RefreshArchiveSize(); - - if (synchronousTarget_.get() != NULL) + if (writer_.get() != NULL) { - /** - * Synchronous behavior: Release the reference to the temporary - * file. It is now up to the caller to deal with the shared - * pointer. This is a fix in Orthanc 1.9.3. - * https://groups.google.com/g/orthanc-users/c/tpP2fkRAd9o/m/5SGpEHbGCQAJ - **/ - synchronousTarget_.reset(); + writer_->Close(); // Flush all the results + archiveSize_ = writer_->GetArchiveSize(); + writer_.reset(); } - + if (asynchronousTarget_.get() != NULL) { // Asynchronous behavior: Move the resulting file into the media archive @@ -1033,14 +1089,6 @@ { assert(writer_.get() != NULL); - if (synchronousTarget_.get() != NULL && - synchronousTarget_.unique()) - { - LOG(WARNING) << "A client has disconnected while creating an archive"; - return JobStepResult::Failure(ErrorCode_NetworkProtocol, - "A client has disconnected while creating an archive"); - } - if (writer_->GetStepsCount() == 0) { FinalizeTarget(); @@ -1048,7 +1096,16 @@ } else { - writer_->RunStep(currentStep_, transcode_, transferSyntax_); + try + { + writer_->RunStep(currentStep_, transcode_, transferSyntax_); + } + catch (Orthanc::OrthancException& e) + { + LOG(ERROR) << "Error while creating an archive: " << e.What(); + writer_->CancelStream(); + throw; + } currentStep_ ++; @@ -1059,7 +1116,7 @@ } else { - RefreshArchiveSize(); + archiveSize_ = writer_->GetArchiveSize(); return JobStepResult::Continue(); } } @@ -1077,6 +1134,8 @@ reason == JobStopReason_Failure || reason == JobStopReason_Retry) { + writer_->CancelStream(); + // First delete the writer, as it holds a reference to "(a)synchronousTarget_", cf. (*) writer_.reset(); @@ -1124,6 +1183,10 @@ value[KEY_ARCHIVE_SIZE_MB] = static_cast<unsigned int>(archiveSize_ / MEGA_BYTES); + // New in Orthanc 1.9.4 + value[KEY_ARCHIVE_SIZE] = boost::lexical_cast<std::string>(archiveSize_); + value[KEY_UNCOMPRESSED_SIZE] = boost::lexical_cast<std::string>(uncompressedSize_); + if (transcode_) { value[KEY_TRANSCODE] = GetTransferSyntaxUid(transferSyntax_);