comparison OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 4819:70d2a97ca8cb openssl-3.x

integration mainline->openssl-3.x
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 25 Nov 2021 13:12:32 +0100
parents f0038043fb97 58637d39ce88
children 2e71a08eea15
comparison
equal deleted inserted replaced
4785:61da49321754 4819:70d2a97ca8cb
26 #include "../../../OrthancFramework/Sources/Compression/HierarchicalZipWriter.h" 26 #include "../../../OrthancFramework/Sources/Compression/HierarchicalZipWriter.h"
27 #include "../../../OrthancFramework/Sources/DicomParsing/DicomDirWriter.h" 27 #include "../../../OrthancFramework/Sources/DicomParsing/DicomDirWriter.h"
28 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h" 28 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h"
29 #include "../../../OrthancFramework/Sources/Logging.h" 29 #include "../../../OrthancFramework/Sources/Logging.h"
30 #include "../../../OrthancFramework/Sources/OrthancException.h" 30 #include "../../../OrthancFramework/Sources/OrthancException.h"
31 #include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h"
31 #include "../OrthancConfiguration.h" 32 #include "../OrthancConfiguration.h"
32 #include "../ServerContext.h" 33 #include "../ServerContext.h"
33 34
34 #include <stdio.h> 35 #include <stdio.h>
35 36
74 75
75 return isZip64; 76 return isZip64;
76 } 77 }
77 78
78 79
80 class ArchiveJob::InstanceLoader : public boost::noncopyable
81 {
82 protected:
83 ServerContext& context_;
84 public:
85 explicit InstanceLoader(ServerContext& context)
86 : context_(context)
87 {
88 }
89
90 virtual ~InstanceLoader()
91 {
92 }
93
94 virtual void PrepareDicom(const std::string& instanceId)
95 {
96
97 }
98
99 virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0;
100
101 virtual void Clear()
102 {
103 }
104 };
105
106 class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader
107 {
108 public:
109 explicit SynchronousInstanceLoader(ServerContext& context)
110 : InstanceLoader(context)
111 {
112 }
113
114 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE
115 {
116 context_.ReadDicom(dicom, instanceId);
117 }
118 };
119
120 class InstanceId : public Orthanc::IDynamicObject
121 {
122 private:
123 std::string id_;
124
125 public:
126 explicit InstanceId(const std::string& id) : id_(id)
127 {
128 }
129
130 virtual ~InstanceId() ORTHANC_OVERRIDE
131 {
132 }
133
134 std::string GetId() const {return id_;};
135 };
136
137 class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader
138 {
139 Semaphore availableInstancesSemaphore_;
140 std::map<std::string, boost::shared_ptr<std::string> > availableInstances_;
141 boost::mutex availableInstancesMutex_;
142 SharedMessageQueue instancesToPreload_;
143 std::vector<boost::thread*> threads_;
144
145
146 public:
147 ThreadedInstanceLoader(ServerContext& context, size_t threadCount)
148 : InstanceLoader(context),
149 availableInstancesSemaphore_(0)
150 {
151 for (size_t i = 0; i < threadCount; i++)
152 {
153 threads_.push_back(new boost::thread(PreloaderWorkerThread, this));
154 }
155 }
156
157 virtual ~ThreadedInstanceLoader()
158 {
159 Clear();
160 }
161
162 virtual void Clear() ORTHANC_OVERRIDE
163 {
164 for (size_t i = 0; i < threads_.size(); i++)
165 {
166 instancesToPreload_.Enqueue(NULL);
167 }
168
169 for (size_t i = 0; i < threads_.size(); i++)
170 {
171 if (threads_[i]->joinable())
172 {
173 threads_[i]->join();
174 }
175 delete threads_[i];
176 }
177
178 threads_.clear();
179 availableInstances_.clear();
180 }
181
182 static void PreloaderWorkerThread(ThreadedInstanceLoader* that)
183 {
184 while (true)
185 {
186 std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0)));
187 if (instanceId.get() == NULL) // that's the signal to exit the thread
188 {
189 return;
190 }
191
192 try
193 {
194 boost::shared_ptr<std::string> dicomContent(new std::string());
195 that->context_.ReadDicom(*dicomContent, instanceId->GetId());
196 {
197 boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
198 that->availableInstances_[instanceId->GetId()] = dicomContent;
199 }
200
201 that->availableInstancesSemaphore_.Release();
202 }
203 catch (OrthancException& e)
204 {
205 boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
206 // store a NULL result to notify that we could not read the instance
207 that->availableInstances_[instanceId->GetId()] = boost::shared_ptr<std::string>();
208 that->availableInstancesSemaphore_.Release();
209 }
210 }
211 }
212
213 virtual void PrepareDicom(const std::string& instanceId) ORTHANC_OVERRIDE
214 {
215 instancesToPreload_.Enqueue(new InstanceId(instanceId));
216 }
217
218 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE
219 {
220 while (true)
221 {
222 // wait for an instance to be available but this might not be the one we are waiting for !
223 availableInstancesSemaphore_.Acquire();
224
225 boost::shared_ptr<std::string> dicomContent;
226 {
227 if (availableInstances_.find(instanceId) != availableInstances_.end())
228 {
229 // this is the instance we were waiting for
230 dicomContent = availableInstances_[instanceId];
231 availableInstances_.erase(instanceId);
232
233 if (dicomContent.get() == NULL) // there has been an error while reading the file
234 {
235 throw OrthancException(ErrorCode_InexistentItem);
236 }
237 dicom.swap(*dicomContent);
238
239 if (availableInstances_.size() > 0)
240 {
241 // we have just read the instance we were waiting for but there are still other instances available ->
242 // make sure the next GetDicom call does not wait !
243 availableInstancesSemaphore_.Release();
244 }
245 return;
246 }
247 // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when
248 // a new instance is available
249 }
250 }
251 }
252 };
253
254
79 class ArchiveJob::ResourceIdentifiers : public boost::noncopyable 255 class ArchiveJob::ResourceIdentifiers : public boost::noncopyable
80 { 256 {
81 private: 257 private:
82 ResourceType level_; 258 ResourceType level_;
83 std::string patient_; 259 std::string patient_;
388 assert(type_ == Type_WriteInstance); 564 assert(type_ == Type_WriteInstance);
389 } 565 }
390 566
391 void Apply(HierarchicalZipWriter& writer, 567 void Apply(HierarchicalZipWriter& writer,
392 ServerContext& context, 568 ServerContext& context,
569 InstanceLoader& instanceLoader,
393 DicomDirWriter* dicomDir, 570 DicomDirWriter* dicomDir,
394 const std::string& dicomDirFolder, 571 const std::string& dicomDirFolder,
395 bool transcode, 572 bool transcode,
396 DicomTransferSyntax transferSyntax) const 573 DicomTransferSyntax transferSyntax) const
397 { 574 {
409 { 586 {
410 std::string content; 587 std::string content;
411 588
412 try 589 try
413 { 590 {
414 context.ReadDicom(content, instanceId_); 591 instanceLoader.GetDicom(content, instanceId_);
415 } 592 }
416 catch (OrthancException& e) 593 catch (OrthancException& e)
417 { 594 {
418 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_; 595 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_;
419 return; 596 return;
480 }; 657 };
481 658
482 std::deque<Command*> commands_; 659 std::deque<Command*> commands_;
483 uint64_t uncompressedSize_; 660 uint64_t uncompressedSize_;
484 unsigned int instancesCount_; 661 unsigned int instancesCount_;
662 InstanceLoader& instanceLoader_;
485 663
486 664
487 void ApplyInternal(HierarchicalZipWriter& writer, 665 void ApplyInternal(HierarchicalZipWriter& writer,
488 ServerContext& context, 666 ServerContext& context,
667 InstanceLoader& instanceLoader,
489 size_t index, 668 size_t index,
490 DicomDirWriter* dicomDir, 669 DicomDirWriter* dicomDir,
491 const std::string& dicomDirFolder, 670 const std::string& dicomDirFolder,
492 bool transcode, 671 bool transcode,
493 DicomTransferSyntax transferSyntax) const 672 DicomTransferSyntax transferSyntax) const
495 if (index >= commands_.size()) 674 if (index >= commands_.size())
496 { 675 {
497 throw OrthancException(ErrorCode_ParameterOutOfRange); 676 throw OrthancException(ErrorCode_ParameterOutOfRange);
498 } 677 }
499 678
500 commands_[index]->Apply(writer, context, dicomDir, dicomDirFolder, transcode, transferSyntax); 679 commands_[index]->Apply(writer, context, instanceLoader, dicomDir, dicomDirFolder, transcode, transferSyntax);
501 } 680 }
502 681
503 public: 682 public:
504 ZipCommands() : 683 explicit ZipCommands(InstanceLoader& instanceLoader) :
505 uncompressedSize_(0), 684 uncompressedSize_(0),
506 instancesCount_(0) 685 instancesCount_(0),
686 instanceLoader_(instanceLoader)
507 { 687 {
508 } 688 }
509 689
510 ~ZipCommands() 690 ~ZipCommands()
511 { 691 {
533 } 713 }
534 714
535 // "media" flavor (with DICOMDIR) 715 // "media" flavor (with DICOMDIR)
536 void Apply(HierarchicalZipWriter& writer, 716 void Apply(HierarchicalZipWriter& writer,
537 ServerContext& context, 717 ServerContext& context,
718 InstanceLoader& instanceLoader,
538 size_t index, 719 size_t index,
539 DicomDirWriter& dicomDir, 720 DicomDirWriter& dicomDir,
540 const std::string& dicomDirFolder, 721 const std::string& dicomDirFolder,
541 bool transcode, 722 bool transcode,
542 DicomTransferSyntax transferSyntax) const 723 DicomTransferSyntax transferSyntax) const
543 { 724 {
544 ApplyInternal(writer, context, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); 725 ApplyInternal(writer, context, instanceLoader, index, &dicomDir, dicomDirFolder, transcode, transferSyntax);
545 } 726 }
546 727
547 // "archive" flavor (without DICOMDIR) 728 // "archive" flavor (without DICOMDIR)
548 void Apply(HierarchicalZipWriter& writer, 729 void Apply(HierarchicalZipWriter& writer,
549 ServerContext& context, 730 ServerContext& context,
731 InstanceLoader& instanceLoader,
550 size_t index, 732 size_t index,
551 bool transcode, 733 bool transcode,
552 DicomTransferSyntax transferSyntax) const 734 DicomTransferSyntax transferSyntax) const
553 { 735 {
554 ApplyInternal(writer, context, index, NULL, "", transcode, transferSyntax); 736 ApplyInternal(writer, context, instanceLoader, index, NULL, "", transcode, transferSyntax);
555 } 737 }
556 738
557 void AddOpenDirectory(const std::string& filename) 739 void AddOpenDirectory(const std::string& filename)
558 { 740 {
559 commands_.push_back(new Command(Type_OpenDirectory, filename)); 741 commands_.push_back(new Command(Type_OpenDirectory, filename));
566 748
567 void AddWriteInstance(const std::string& filename, 749 void AddWriteInstance(const std::string& filename,
568 const std::string& instanceId, 750 const std::string& instanceId,
569 uint64_t uncompressedSize) 751 uint64_t uncompressedSize)
570 { 752 {
753 instanceLoader_.PrepareDicom(instanceId);
571 commands_.push_back(new Command(Type_WriteInstance, filename, instanceId)); 754 commands_.push_back(new Command(Type_WriteInstance, filename, instanceId));
572 instancesCount_ ++; 755 instancesCount_ ++;
573 uncompressedSize_ += uncompressedSize; 756 uncompressedSize_ += uncompressedSize;
574 } 757 }
575 758
733 916
734 class ArchiveJob::ZipWriterIterator : public boost::noncopyable 917 class ArchiveJob::ZipWriterIterator : public boost::noncopyable
735 { 918 {
736 private: 919 private:
737 ServerContext& context_; 920 ServerContext& context_;
921 InstanceLoader& instanceLoader_;
738 ZipCommands commands_; 922 ZipCommands commands_;
739 std::unique_ptr<HierarchicalZipWriter> zip_; 923 std::unique_ptr<HierarchicalZipWriter> zip_;
740 std::unique_ptr<DicomDirWriter> dicomDir_; 924 std::unique_ptr<DicomDirWriter> dicomDir_;
741 bool isMedia_; 925 bool isMedia_;
742 bool isStream_; 926 bool isStream_;
743 927
744 public: 928 public:
745 ZipWriterIterator(ServerContext& context, 929 ZipWriterIterator(ServerContext& context,
930 InstanceLoader& instanceLoader,
746 ArchiveIndex& archive, 931 ArchiveIndex& archive,
747 bool isMedia, 932 bool isMedia,
748 bool enableExtendedSopClass) : 933 bool enableExtendedSopClass) :
749 context_(context), 934 context_(context),
935 instanceLoader_(instanceLoader),
936 commands_(instanceLoader),
750 isMedia_(isMedia), 937 isMedia_(isMedia),
751 isStream_(false) 938 isStream_(false)
752 { 939 {
753 if (isMedia) 940 if (isMedia)
754 { 941 {
868 else 1055 else
869 { 1056 {
870 if (isMedia_) 1057 if (isMedia_)
871 { 1058 {
872 assert(dicomDir_.get() != NULL); 1059 assert(dicomDir_.get() != NULL);
873 commands_.Apply(*zip_, context_, index, *dicomDir_, 1060 commands_.Apply(*zip_, context_, instanceLoader_, index, *dicomDir_,
874 MEDIA_IMAGES_FOLDER, transcode, transferSyntax); 1061 MEDIA_IMAGES_FOLDER, transcode, transferSyntax);
875 } 1062 }
876 else 1063 else
877 { 1064 {
878 assert(dicomDir_.get() == NULL); 1065 assert(dicomDir_.get() == NULL);
879 commands_.Apply(*zip_, context_, index, transcode, transferSyntax); 1066 commands_.Apply(*zip_, context_, instanceLoader_, index, transcode, transferSyntax);
880 } 1067 }
881 } 1068 }
882 } 1069 }
883 1070
884 unsigned int GetInstancesCount() const 1071 unsigned int GetInstancesCount() const
903 currentStep_(0), 1090 currentStep_(0),
904 instancesCount_(0), 1091 instancesCount_(0),
905 uncompressedSize_(0), 1092 uncompressedSize_(0),
906 archiveSize_(0), 1093 archiveSize_(0),
907 transcode_(false), 1094 transcode_(false),
908 transferSyntax_(DicomTransferSyntax_LittleEndianImplicit) 1095 transferSyntax_(DicomTransferSyntax_LittleEndianImplicit),
1096 loaderThreads_(0)
909 { 1097 {
910 } 1098 }
911 1099
912 1100
913 ArchiveJob::~ArchiveJob() 1101 ArchiveJob::~ArchiveJob()
979 transferSyntax_ = transferSyntax; 1167 transferSyntax_ = transferSyntax;
980 } 1168 }
981 } 1169 }
982 1170
983 1171
1172 void ArchiveJob::SetLoaderThreads(unsigned int loaderThreads)
1173 {
1174 if (writer_.get() != NULL) // Already started
1175 {
1176 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1177 }
1178 else
1179 {
1180 loaderThreads_ = loaderThreads;
1181 }
1182 }
1183
1184
984 void ArchiveJob::Reset() 1185 void ArchiveJob::Reset()
985 { 1186 {
986 throw OrthancException(ErrorCode_BadSequenceOfCalls, 1187 throw OrthancException(ErrorCode_BadSequenceOfCalls,
987 "Cannot resubmit the creation of an archive"); 1188 "Cannot resubmit the creation of an archive");
988 } 1189 }
989 1190
990 1191
991 void ArchiveJob::Start() 1192 void ArchiveJob::Start()
992 { 1193 {
1194 if (loaderThreads_ == 0)
1195 {
1196 // default behaviour before loaderThreads was introducted in 1.9.8
1197 instanceLoader_.reset(new SynchronousInstanceLoader(context_));
1198 }
1199 else
1200 {
1201 instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_));
1202 }
1203
993 if (writer_.get() != NULL) 1204 if (writer_.get() != NULL)
994 { 1205 {
995 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1206 throw OrthancException(ErrorCode_BadSequenceOfCalls);
996 } 1207 }
997 else 1208 else
1009 asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); 1220 asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile());
1010 1221
1011 assert(asynchronousTarget_.get() != NULL); 1222 assert(asynchronousTarget_.get() != NULL);
1012 asynchronousTarget_->Touch(); // Make sure we can write to the temporary file 1223 asynchronousTarget_->Touch(); // Make sure we can write to the temporary file
1013 1224
1014 writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); 1225 writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_));
1015 writer_->SetOutputFile(asynchronousTarget_->GetPath()); 1226 writer_->SetOutputFile(asynchronousTarget_->GetPath());
1016 } 1227 }
1017 } 1228 }
1018 else 1229 else
1019 { 1230 {
1020 assert(synchronousTarget_.get() != NULL); 1231 assert(synchronousTarget_.get() != NULL);
1021 1232
1022 writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); 1233 writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_));
1023 writer_->AcquireOutputStream(synchronousTarget_.release()); 1234 writer_->AcquireOutputStream(synchronousTarget_.release());
1024 } 1235 }
1025 1236
1026 instancesCount_ = writer_->GetInstancesCount(); 1237 instancesCount_ = writer_->GetInstancesCount();
1027 uncompressedSize_ = writer_->GetUncompressedSize(); 1238 uncompressedSize_ = writer_->GetUncompressedSize();
1060 if (writer_.get() != NULL) 1271 if (writer_.get() != NULL)
1061 { 1272 {
1062 writer_->Close(); // Flush all the results 1273 writer_->Close(); // Flush all the results
1063 archiveSize_ = writer_->GetArchiveSize(); 1274 archiveSize_ = writer_->GetArchiveSize();
1064 writer_.reset(); 1275 writer_.reset();
1276 }
1277
1278 if (instanceLoader_.get() != NULL)
1279 {
1280 instanceLoader_->Clear();
1065 } 1281 }
1066 1282
1067 if (asynchronousTarget_.get() != NULL) 1283 if (asynchronousTarget_.get() != NULL)
1068 { 1284 {
1069 // Asynchronous behavior: Move the resulting file into the media archive 1285 // Asynchronous behavior: Move the resulting file into the media archive
1182 } 1398 }
1183 1399
1184 1400
1185 bool ArchiveJob::GetOutput(std::string& output, 1401 bool ArchiveJob::GetOutput(std::string& output,
1186 MimeType& mime, 1402 MimeType& mime,
1403 std::string& filename,
1187 const std::string& key) 1404 const std::string& key)
1188 { 1405 {
1189 if (key == "archive" && 1406 if (key == "archive" &&
1190 !mediaArchiveId_.empty()) 1407 !mediaArchiveId_.empty())
1191 { 1408 {
1194 if (accessor.IsValid()) 1411 if (accessor.IsValid())
1195 { 1412 {
1196 const DynamicTemporaryFile& f = dynamic_cast<DynamicTemporaryFile&>(accessor.GetItem()); 1413 const DynamicTemporaryFile& f = dynamic_cast<DynamicTemporaryFile&>(accessor.GetItem());
1197 f.GetFile().Read(output); 1414 f.GetFile().Read(output);
1198 mime = MimeType_Zip; 1415 mime = MimeType_Zip;
1416 filename = "archive.zip";
1199 return true; 1417 return true;
1200 } 1418 }
1201 else 1419 else
1202 { 1420 {
1203 return false; 1421 return false;