Mercurial > hg > orthanc
comparison OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 4819:70d2a97ca8cb openssl-3.x
integration mainline->openssl-3.x
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 25 Nov 2021 13:12:32 +0100 |
parents | f0038043fb97 58637d39ce88 |
children | 2e71a08eea15 |
comparison
equal
deleted
inserted
replaced
4785:61da49321754 | 4819:70d2a97ca8cb |
---|---|
26 #include "../../../OrthancFramework/Sources/Compression/HierarchicalZipWriter.h" | 26 #include "../../../OrthancFramework/Sources/Compression/HierarchicalZipWriter.h" |
27 #include "../../../OrthancFramework/Sources/DicomParsing/DicomDirWriter.h" | 27 #include "../../../OrthancFramework/Sources/DicomParsing/DicomDirWriter.h" |
28 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h" | 28 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h" |
29 #include "../../../OrthancFramework/Sources/Logging.h" | 29 #include "../../../OrthancFramework/Sources/Logging.h" |
30 #include "../../../OrthancFramework/Sources/OrthancException.h" | 30 #include "../../../OrthancFramework/Sources/OrthancException.h" |
31 #include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h" | |
31 #include "../OrthancConfiguration.h" | 32 #include "../OrthancConfiguration.h" |
32 #include "../ServerContext.h" | 33 #include "../ServerContext.h" |
33 | 34 |
34 #include <stdio.h> | 35 #include <stdio.h> |
35 | 36 |
74 | 75 |
75 return isZip64; | 76 return isZip64; |
76 } | 77 } |
77 | 78 |
78 | 79 |
80 class ArchiveJob::InstanceLoader : public boost::noncopyable | |
81 { | |
82 protected: | |
83 ServerContext& context_; | |
84 public: | |
85 explicit InstanceLoader(ServerContext& context) | |
86 : context_(context) | |
87 { | |
88 } | |
89 | |
90 virtual ~InstanceLoader() | |
91 { | |
92 } | |
93 | |
94 virtual void PrepareDicom(const std::string& instanceId) | |
95 { | |
96 | |
97 } | |
98 | |
99 virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0; | |
100 | |
101 virtual void Clear() | |
102 { | |
103 } | |
104 }; | |
105 | |
106 class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader | |
107 { | |
108 public: | |
109 explicit SynchronousInstanceLoader(ServerContext& context) | |
110 : InstanceLoader(context) | |
111 { | |
112 } | |
113 | |
114 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE | |
115 { | |
116 context_.ReadDicom(dicom, instanceId); | |
117 } | |
118 }; | |
119 | |
120 class InstanceId : public Orthanc::IDynamicObject | |
121 { | |
122 private: | |
123 std::string id_; | |
124 | |
125 public: | |
126 explicit InstanceId(const std::string& id) : id_(id) | |
127 { | |
128 } | |
129 | |
130 virtual ~InstanceId() ORTHANC_OVERRIDE | |
131 { | |
132 } | |
133 | |
134 std::string GetId() const {return id_;}; | |
135 }; | |
136 | |
137 class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader | |
138 { | |
139 Semaphore availableInstancesSemaphore_; | |
140 std::map<std::string, boost::shared_ptr<std::string> > availableInstances_; | |
141 boost::mutex availableInstancesMutex_; | |
142 SharedMessageQueue instancesToPreload_; | |
143 std::vector<boost::thread*> threads_; | |
144 | |
145 | |
146 public: | |
147 ThreadedInstanceLoader(ServerContext& context, size_t threadCount) | |
148 : InstanceLoader(context), | |
149 availableInstancesSemaphore_(0) | |
150 { | |
151 for (size_t i = 0; i < threadCount; i++) | |
152 { | |
153 threads_.push_back(new boost::thread(PreloaderWorkerThread, this)); | |
154 } | |
155 } | |
156 | |
157 virtual ~ThreadedInstanceLoader() | |
158 { | |
159 Clear(); | |
160 } | |
161 | |
162 virtual void Clear() ORTHANC_OVERRIDE | |
163 { | |
164 for (size_t i = 0; i < threads_.size(); i++) | |
165 { | |
166 instancesToPreload_.Enqueue(NULL); | |
167 } | |
168 | |
169 for (size_t i = 0; i < threads_.size(); i++) | |
170 { | |
171 if (threads_[i]->joinable()) | |
172 { | |
173 threads_[i]->join(); | |
174 } | |
175 delete threads_[i]; | |
176 } | |
177 | |
178 threads_.clear(); | |
179 availableInstances_.clear(); | |
180 } | |
181 | |
182 static void PreloaderWorkerThread(ThreadedInstanceLoader* that) | |
183 { | |
184 while (true) | |
185 { | |
186 std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0))); | |
187 if (instanceId.get() == NULL) // that's the signal to exit the thread | |
188 { | |
189 return; | |
190 } | |
191 | |
192 try | |
193 { | |
194 boost::shared_ptr<std::string> dicomContent(new std::string()); | |
195 that->context_.ReadDicom(*dicomContent, instanceId->GetId()); | |
196 { | |
197 boost::mutex::scoped_lock lock(that->availableInstancesMutex_); | |
198 that->availableInstances_[instanceId->GetId()] = dicomContent; | |
199 } | |
200 | |
201 that->availableInstancesSemaphore_.Release(); | |
202 } | |
203 catch (OrthancException& e) | |
204 { | |
205 boost::mutex::scoped_lock lock(that->availableInstancesMutex_); | |
206 // store a NULL result to notify that we could not read the instance | |
207 that->availableInstances_[instanceId->GetId()] = boost::shared_ptr<std::string>(); | |
208 that->availableInstancesSemaphore_.Release(); | |
209 } | |
210 } | |
211 } | |
212 | |
213 virtual void PrepareDicom(const std::string& instanceId) ORTHANC_OVERRIDE | |
214 { | |
215 instancesToPreload_.Enqueue(new InstanceId(instanceId)); | |
216 } | |
217 | |
218 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE | |
219 { | |
220 while (true) | |
221 { | |
222 // wait for an instance to be available but this might not be the one we are waiting for ! | |
223 availableInstancesSemaphore_.Acquire(); | |
224 | |
225 boost::shared_ptr<std::string> dicomContent; | |
226 { | |
227 if (availableInstances_.find(instanceId) != availableInstances_.end()) | |
228 { | |
229 // this is the instance we were waiting for | |
230 dicomContent = availableInstances_[instanceId]; | |
231 availableInstances_.erase(instanceId); | |
232 | |
233 if (dicomContent.get() == NULL) // there has been an error while reading the file | |
234 { | |
235 throw OrthancException(ErrorCode_InexistentItem); | |
236 } | |
237 dicom.swap(*dicomContent); | |
238 | |
239 if (availableInstances_.size() > 0) | |
240 { | |
241 // we have just read the instance we were waiting for but there are still other instances available -> | |
242 // make sure the next GetDicom call does not wait ! | |
243 availableInstancesSemaphore_.Release(); | |
244 } | |
245 return; | |
246 } | |
247 // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when | |
248 // a new instance is available | |
249 } | |
250 } | |
251 } | |
252 }; | |
253 | |
254 | |
79 class ArchiveJob::ResourceIdentifiers : public boost::noncopyable | 255 class ArchiveJob::ResourceIdentifiers : public boost::noncopyable |
80 { | 256 { |
81 private: | 257 private: |
82 ResourceType level_; | 258 ResourceType level_; |
83 std::string patient_; | 259 std::string patient_; |
388 assert(type_ == Type_WriteInstance); | 564 assert(type_ == Type_WriteInstance); |
389 } | 565 } |
390 | 566 |
391 void Apply(HierarchicalZipWriter& writer, | 567 void Apply(HierarchicalZipWriter& writer, |
392 ServerContext& context, | 568 ServerContext& context, |
569 InstanceLoader& instanceLoader, | |
393 DicomDirWriter* dicomDir, | 570 DicomDirWriter* dicomDir, |
394 const std::string& dicomDirFolder, | 571 const std::string& dicomDirFolder, |
395 bool transcode, | 572 bool transcode, |
396 DicomTransferSyntax transferSyntax) const | 573 DicomTransferSyntax transferSyntax) const |
397 { | 574 { |
409 { | 586 { |
410 std::string content; | 587 std::string content; |
411 | 588 |
412 try | 589 try |
413 { | 590 { |
414 context.ReadDicom(content, instanceId_); | 591 instanceLoader.GetDicom(content, instanceId_); |
415 } | 592 } |
416 catch (OrthancException& e) | 593 catch (OrthancException& e) |
417 { | 594 { |
418 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_; | 595 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_; |
419 return; | 596 return; |
480 }; | 657 }; |
481 | 658 |
482 std::deque<Command*> commands_; | 659 std::deque<Command*> commands_; |
483 uint64_t uncompressedSize_; | 660 uint64_t uncompressedSize_; |
484 unsigned int instancesCount_; | 661 unsigned int instancesCount_; |
662 InstanceLoader& instanceLoader_; | |
485 | 663 |
486 | 664 |
487 void ApplyInternal(HierarchicalZipWriter& writer, | 665 void ApplyInternal(HierarchicalZipWriter& writer, |
488 ServerContext& context, | 666 ServerContext& context, |
667 InstanceLoader& instanceLoader, | |
489 size_t index, | 668 size_t index, |
490 DicomDirWriter* dicomDir, | 669 DicomDirWriter* dicomDir, |
491 const std::string& dicomDirFolder, | 670 const std::string& dicomDirFolder, |
492 bool transcode, | 671 bool transcode, |
493 DicomTransferSyntax transferSyntax) const | 672 DicomTransferSyntax transferSyntax) const |
495 if (index >= commands_.size()) | 674 if (index >= commands_.size()) |
496 { | 675 { |
497 throw OrthancException(ErrorCode_ParameterOutOfRange); | 676 throw OrthancException(ErrorCode_ParameterOutOfRange); |
498 } | 677 } |
499 | 678 |
500 commands_[index]->Apply(writer, context, dicomDir, dicomDirFolder, transcode, transferSyntax); | 679 commands_[index]->Apply(writer, context, instanceLoader, dicomDir, dicomDirFolder, transcode, transferSyntax); |
501 } | 680 } |
502 | 681 |
503 public: | 682 public: |
504 ZipCommands() : | 683 explicit ZipCommands(InstanceLoader& instanceLoader) : |
505 uncompressedSize_(0), | 684 uncompressedSize_(0), |
506 instancesCount_(0) | 685 instancesCount_(0), |
686 instanceLoader_(instanceLoader) | |
507 { | 687 { |
508 } | 688 } |
509 | 689 |
510 ~ZipCommands() | 690 ~ZipCommands() |
511 { | 691 { |
533 } | 713 } |
534 | 714 |
535 // "media" flavor (with DICOMDIR) | 715 // "media" flavor (with DICOMDIR) |
536 void Apply(HierarchicalZipWriter& writer, | 716 void Apply(HierarchicalZipWriter& writer, |
537 ServerContext& context, | 717 ServerContext& context, |
718 InstanceLoader& instanceLoader, | |
538 size_t index, | 719 size_t index, |
539 DicomDirWriter& dicomDir, | 720 DicomDirWriter& dicomDir, |
540 const std::string& dicomDirFolder, | 721 const std::string& dicomDirFolder, |
541 bool transcode, | 722 bool transcode, |
542 DicomTransferSyntax transferSyntax) const | 723 DicomTransferSyntax transferSyntax) const |
543 { | 724 { |
544 ApplyInternal(writer, context, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); | 725 ApplyInternal(writer, context, instanceLoader, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); |
545 } | 726 } |
546 | 727 |
547 // "archive" flavor (without DICOMDIR) | 728 // "archive" flavor (without DICOMDIR) |
548 void Apply(HierarchicalZipWriter& writer, | 729 void Apply(HierarchicalZipWriter& writer, |
549 ServerContext& context, | 730 ServerContext& context, |
731 InstanceLoader& instanceLoader, | |
550 size_t index, | 732 size_t index, |
551 bool transcode, | 733 bool transcode, |
552 DicomTransferSyntax transferSyntax) const | 734 DicomTransferSyntax transferSyntax) const |
553 { | 735 { |
554 ApplyInternal(writer, context, index, NULL, "", transcode, transferSyntax); | 736 ApplyInternal(writer, context, instanceLoader, index, NULL, "", transcode, transferSyntax); |
555 } | 737 } |
556 | 738 |
557 void AddOpenDirectory(const std::string& filename) | 739 void AddOpenDirectory(const std::string& filename) |
558 { | 740 { |
559 commands_.push_back(new Command(Type_OpenDirectory, filename)); | 741 commands_.push_back(new Command(Type_OpenDirectory, filename)); |
566 | 748 |
567 void AddWriteInstance(const std::string& filename, | 749 void AddWriteInstance(const std::string& filename, |
568 const std::string& instanceId, | 750 const std::string& instanceId, |
569 uint64_t uncompressedSize) | 751 uint64_t uncompressedSize) |
570 { | 752 { |
753 instanceLoader_.PrepareDicom(instanceId); | |
571 commands_.push_back(new Command(Type_WriteInstance, filename, instanceId)); | 754 commands_.push_back(new Command(Type_WriteInstance, filename, instanceId)); |
572 instancesCount_ ++; | 755 instancesCount_ ++; |
573 uncompressedSize_ += uncompressedSize; | 756 uncompressedSize_ += uncompressedSize; |
574 } | 757 } |
575 | 758 |
733 | 916 |
734 class ArchiveJob::ZipWriterIterator : public boost::noncopyable | 917 class ArchiveJob::ZipWriterIterator : public boost::noncopyable |
735 { | 918 { |
736 private: | 919 private: |
737 ServerContext& context_; | 920 ServerContext& context_; |
921 InstanceLoader& instanceLoader_; | |
738 ZipCommands commands_; | 922 ZipCommands commands_; |
739 std::unique_ptr<HierarchicalZipWriter> zip_; | 923 std::unique_ptr<HierarchicalZipWriter> zip_; |
740 std::unique_ptr<DicomDirWriter> dicomDir_; | 924 std::unique_ptr<DicomDirWriter> dicomDir_; |
741 bool isMedia_; | 925 bool isMedia_; |
742 bool isStream_; | 926 bool isStream_; |
743 | 927 |
744 public: | 928 public: |
745 ZipWriterIterator(ServerContext& context, | 929 ZipWriterIterator(ServerContext& context, |
930 InstanceLoader& instanceLoader, | |
746 ArchiveIndex& archive, | 931 ArchiveIndex& archive, |
747 bool isMedia, | 932 bool isMedia, |
748 bool enableExtendedSopClass) : | 933 bool enableExtendedSopClass) : |
749 context_(context), | 934 context_(context), |
935 instanceLoader_(instanceLoader), | |
936 commands_(instanceLoader), | |
750 isMedia_(isMedia), | 937 isMedia_(isMedia), |
751 isStream_(false) | 938 isStream_(false) |
752 { | 939 { |
753 if (isMedia) | 940 if (isMedia) |
754 { | 941 { |
868 else | 1055 else |
869 { | 1056 { |
870 if (isMedia_) | 1057 if (isMedia_) |
871 { | 1058 { |
872 assert(dicomDir_.get() != NULL); | 1059 assert(dicomDir_.get() != NULL); |
873 commands_.Apply(*zip_, context_, index, *dicomDir_, | 1060 commands_.Apply(*zip_, context_, instanceLoader_, index, *dicomDir_, |
874 MEDIA_IMAGES_FOLDER, transcode, transferSyntax); | 1061 MEDIA_IMAGES_FOLDER, transcode, transferSyntax); |
875 } | 1062 } |
876 else | 1063 else |
877 { | 1064 { |
878 assert(dicomDir_.get() == NULL); | 1065 assert(dicomDir_.get() == NULL); |
879 commands_.Apply(*zip_, context_, index, transcode, transferSyntax); | 1066 commands_.Apply(*zip_, context_, instanceLoader_, index, transcode, transferSyntax); |
880 } | 1067 } |
881 } | 1068 } |
882 } | 1069 } |
883 | 1070 |
884 unsigned int GetInstancesCount() const | 1071 unsigned int GetInstancesCount() const |
903 currentStep_(0), | 1090 currentStep_(0), |
904 instancesCount_(0), | 1091 instancesCount_(0), |
905 uncompressedSize_(0), | 1092 uncompressedSize_(0), |
906 archiveSize_(0), | 1093 archiveSize_(0), |
907 transcode_(false), | 1094 transcode_(false), |
908 transferSyntax_(DicomTransferSyntax_LittleEndianImplicit) | 1095 transferSyntax_(DicomTransferSyntax_LittleEndianImplicit), |
1096 loaderThreads_(0) | |
909 { | 1097 { |
910 } | 1098 } |
911 | 1099 |
912 | 1100 |
913 ArchiveJob::~ArchiveJob() | 1101 ArchiveJob::~ArchiveJob() |
979 transferSyntax_ = transferSyntax; | 1167 transferSyntax_ = transferSyntax; |
980 } | 1168 } |
981 } | 1169 } |
982 | 1170 |
983 | 1171 |
1172 void ArchiveJob::SetLoaderThreads(unsigned int loaderThreads) | |
1173 { | |
1174 if (writer_.get() != NULL) // Already started | |
1175 { | |
1176 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1177 } | |
1178 else | |
1179 { | |
1180 loaderThreads_ = loaderThreads; | |
1181 } | |
1182 } | |
1183 | |
1184 | |
984 void ArchiveJob::Reset() | 1185 void ArchiveJob::Reset() |
985 { | 1186 { |
986 throw OrthancException(ErrorCode_BadSequenceOfCalls, | 1187 throw OrthancException(ErrorCode_BadSequenceOfCalls, |
987 "Cannot resubmit the creation of an archive"); | 1188 "Cannot resubmit the creation of an archive"); |
988 } | 1189 } |
989 | 1190 |
990 | 1191 |
991 void ArchiveJob::Start() | 1192 void ArchiveJob::Start() |
992 { | 1193 { |
1194 if (loaderThreads_ == 0) | |
1195 { | |
1196 // default behaviour before loaderThreads was introducted in 1.9.8 | |
1197 instanceLoader_.reset(new SynchronousInstanceLoader(context_)); | |
1198 } | |
1199 else | |
1200 { | |
1201 instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_)); | |
1202 } | |
1203 | |
993 if (writer_.get() != NULL) | 1204 if (writer_.get() != NULL) |
994 { | 1205 { |
995 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 1206 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
996 } | 1207 } |
997 else | 1208 else |
1009 asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); | 1220 asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); |
1010 | 1221 |
1011 assert(asynchronousTarget_.get() != NULL); | 1222 assert(asynchronousTarget_.get() != NULL); |
1012 asynchronousTarget_->Touch(); // Make sure we can write to the temporary file | 1223 asynchronousTarget_->Touch(); // Make sure we can write to the temporary file |
1013 | 1224 |
1014 writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); | 1225 writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); |
1015 writer_->SetOutputFile(asynchronousTarget_->GetPath()); | 1226 writer_->SetOutputFile(asynchronousTarget_->GetPath()); |
1016 } | 1227 } |
1017 } | 1228 } |
1018 else | 1229 else |
1019 { | 1230 { |
1020 assert(synchronousTarget_.get() != NULL); | 1231 assert(synchronousTarget_.get() != NULL); |
1021 | 1232 |
1022 writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); | 1233 writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); |
1023 writer_->AcquireOutputStream(synchronousTarget_.release()); | 1234 writer_->AcquireOutputStream(synchronousTarget_.release()); |
1024 } | 1235 } |
1025 | 1236 |
1026 instancesCount_ = writer_->GetInstancesCount(); | 1237 instancesCount_ = writer_->GetInstancesCount(); |
1027 uncompressedSize_ = writer_->GetUncompressedSize(); | 1238 uncompressedSize_ = writer_->GetUncompressedSize(); |
1060 if (writer_.get() != NULL) | 1271 if (writer_.get() != NULL) |
1061 { | 1272 { |
1062 writer_->Close(); // Flush all the results | 1273 writer_->Close(); // Flush all the results |
1063 archiveSize_ = writer_->GetArchiveSize(); | 1274 archiveSize_ = writer_->GetArchiveSize(); |
1064 writer_.reset(); | 1275 writer_.reset(); |
1276 } | |
1277 | |
1278 if (instanceLoader_.get() != NULL) | |
1279 { | |
1280 instanceLoader_->Clear(); | |
1065 } | 1281 } |
1066 | 1282 |
1067 if (asynchronousTarget_.get() != NULL) | 1283 if (asynchronousTarget_.get() != NULL) |
1068 { | 1284 { |
1069 // Asynchronous behavior: Move the resulting file into the media archive | 1285 // Asynchronous behavior: Move the resulting file into the media archive |
1182 } | 1398 } |
1183 | 1399 |
1184 | 1400 |
1185 bool ArchiveJob::GetOutput(std::string& output, | 1401 bool ArchiveJob::GetOutput(std::string& output, |
1186 MimeType& mime, | 1402 MimeType& mime, |
1403 std::string& filename, | |
1187 const std::string& key) | 1404 const std::string& key) |
1188 { | 1405 { |
1189 if (key == "archive" && | 1406 if (key == "archive" && |
1190 !mediaArchiveId_.empty()) | 1407 !mediaArchiveId_.empty()) |
1191 { | 1408 { |
1194 if (accessor.IsValid()) | 1411 if (accessor.IsValid()) |
1195 { | 1412 { |
1196 const DynamicTemporaryFile& f = dynamic_cast<DynamicTemporaryFile&>(accessor.GetItem()); | 1413 const DynamicTemporaryFile& f = dynamic_cast<DynamicTemporaryFile&>(accessor.GetItem()); |
1197 f.GetFile().Read(output); | 1414 f.GetFile().Read(output); |
1198 mime = MimeType_Zip; | 1415 mime = MimeType_Zip; |
1416 filename = "archive.zip"; | |
1199 return true; | 1417 return true; |
1200 } | 1418 } |
1201 else | 1419 else |
1202 { | 1420 { |
1203 return false; | 1421 return false; |