Mercurial > hg > orthanc
comparison OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 4810:7afbb54bd028
merge storage-cache
author | Alain Mazy <am@osimis.io> |
---|---|
date | Tue, 23 Nov 2021 09:22:11 +0100 |
parents | 0a38000b086d 4e765c18ace7 |
children | 46bfa3a4fd63 cd6dc99e0470 |
comparison
equal
deleted
inserted
replaced
4809:2ca4213fb50a | 4810:7afbb54bd028 |
---|---|
38 #include "../../../OrthancFramework/Sources/Compression/HierarchicalZipWriter.h" | 38 #include "../../../OrthancFramework/Sources/Compression/HierarchicalZipWriter.h" |
39 #include "../../../OrthancFramework/Sources/DicomParsing/DicomDirWriter.h" | 39 #include "../../../OrthancFramework/Sources/DicomParsing/DicomDirWriter.h" |
40 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h" | 40 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h" |
41 #include "../../../OrthancFramework/Sources/Logging.h" | 41 #include "../../../OrthancFramework/Sources/Logging.h" |
42 #include "../../../OrthancFramework/Sources/OrthancException.h" | 42 #include "../../../OrthancFramework/Sources/OrthancException.h" |
43 #include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h" | |
43 #include "../OrthancConfiguration.h" | 44 #include "../OrthancConfiguration.h" |
44 #include "../ServerContext.h" | 45 #include "../ServerContext.h" |
45 | 46 |
46 #include <stdio.h> | 47 #include <stdio.h> |
47 | 48 |
86 | 87 |
87 return isZip64; | 88 return isZip64; |
88 } | 89 } |
89 | 90 |
90 | 91 |
92 class ArchiveJob::InstanceLoader : public boost::noncopyable | |
93 { | |
94 protected: | |
95 ServerContext& context_; | |
96 public: | |
97 InstanceLoader(ServerContext& context) | |
98 : context_(context) | |
99 { | |
100 } | |
101 | |
102 virtual ~InstanceLoader() | |
103 { | |
104 } | |
105 | |
106 virtual void PrepareDicom(const std::string& instanceId) | |
107 { | |
108 | |
109 } | |
110 | |
111 virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0; | |
112 | |
113 virtual void Clear() | |
114 { | |
115 } | |
116 }; | |
117 | |
118 class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader | |
119 { | |
120 public: | |
121 SynchronousInstanceLoader(ServerContext& context) | |
122 : InstanceLoader(context) | |
123 { | |
124 } | |
125 | |
126 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE | |
127 { | |
128 context_.ReadDicom(dicom, instanceId); | |
129 } | |
130 }; | |
131 | |
132 class InstanceId : public Orthanc::IDynamicObject | |
133 { | |
134 private: | |
135 std::string id_; | |
136 | |
137 public: | |
138 InstanceId(const std::string& id) : id_(id) | |
139 { | |
140 } | |
141 | |
142 virtual ~InstanceId() ORTHANC_OVERRIDE | |
143 { | |
144 } | |
145 | |
146 std::string GetId() const {return id_;}; | |
147 }; | |
148 | |
149 class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader | |
150 { | |
151 Semaphore availableInstancesSemaphore_; | |
152 std::map<std::string, boost::shared_ptr<std::string>> availableInstances_; | |
153 boost::mutex availableInstancesMutex_; | |
154 SharedMessageQueue instancesToPreload_; | |
155 std::vector<boost::thread*> threads_; | |
156 | |
157 | |
158 public: | |
159 ThreadedInstanceLoader(ServerContext& context, size_t threadCount) | |
160 : InstanceLoader(context), | |
161 availableInstancesSemaphore_(0) | |
162 { | |
163 for (size_t i = 0; i < threadCount; i++) | |
164 { | |
165 threads_.push_back(new boost::thread(PreloaderWorkerThread, this)); | |
166 } | |
167 } | |
168 | |
169 virtual ~ThreadedInstanceLoader() | |
170 { | |
171 Clear(); | |
172 } | |
173 | |
174 virtual void Clear() ORTHANC_OVERRIDE | |
175 { | |
176 for (size_t i = 0; i < threads_.size(); i++) | |
177 { | |
178 instancesToPreload_.Enqueue(NULL); | |
179 } | |
180 | |
181 for (size_t i = 0; i < threads_.size(); i++) | |
182 { | |
183 if (threads_[i]->joinable()) | |
184 { | |
185 threads_[i]->join(); | |
186 } | |
187 delete threads_[i]; | |
188 } | |
189 | |
190 threads_.clear(); | |
191 availableInstances_.clear(); | |
192 } | |
193 | |
194 static void PreloaderWorkerThread(ThreadedInstanceLoader* that) | |
195 { | |
196 while (true) | |
197 { | |
198 std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0))); | |
199 if (instanceId.get() == NULL) // that's the signal to exit the thread | |
200 { | |
201 return; | |
202 } | |
203 | |
204 try | |
205 { | |
206 boost::shared_ptr<std::string> dicomContent(new std::string()); | |
207 that->context_.ReadDicom(*dicomContent, instanceId->GetId()); | |
208 { | |
209 boost::mutex::scoped_lock lock(that->availableInstancesMutex_); | |
210 that->availableInstances_[instanceId->GetId()] = dicomContent; | |
211 } | |
212 | |
213 that->availableInstancesSemaphore_.Release(); | |
214 } | |
215 catch (OrthancException& e) | |
216 { | |
217 boost::mutex::scoped_lock lock(that->availableInstancesMutex_); | |
218 // store a NULL result to notify that we could not read the instance | |
219 that->availableInstances_[instanceId->GetId()] = boost::shared_ptr<std::string>(); | |
220 that->availableInstancesSemaphore_.Release(); | |
221 } | |
222 } | |
223 } | |
224 | |
225 virtual void PrepareDicom(const std::string& instanceId) ORTHANC_OVERRIDE | |
226 { | |
227 instancesToPreload_.Enqueue(new InstanceId(instanceId)); | |
228 } | |
229 | |
230 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE | |
231 { | |
232 while (true) | |
233 { | |
234 // wait for an instance to be available but this might not be the one we are waiting for ! | |
235 availableInstancesSemaphore_.Acquire(); | |
236 | |
237 boost::shared_ptr<std::string> dicomContent; | |
238 { | |
239 if (availableInstances_.find(instanceId) != availableInstances_.end()) | |
240 { | |
241 // this is the instance we were waiting for | |
242 dicomContent = availableInstances_[instanceId]; | |
243 availableInstances_.erase(instanceId); | |
244 | |
245 if (dicomContent.get() == NULL) // there has been an error while reading the file | |
246 { | |
247 throw OrthancException(ErrorCode_InexistentItem); | |
248 } | |
249 dicom.swap(*dicomContent); | |
250 | |
251 if (availableInstances_.size() > 0) | |
252 { | |
253 // we have just read the instance we were waiting for but there are still other instances available -> | |
254 // make sure the next GetDicom call does not wait ! | |
255 availableInstancesSemaphore_.Release(); | |
256 } | |
257 return; | |
258 } | |
259 // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when | |
260 // a new instance is available | |
261 } | |
262 } | |
263 } | |
264 }; | |
265 | |
266 | |
91 class ArchiveJob::ResourceIdentifiers : public boost::noncopyable | 267 class ArchiveJob::ResourceIdentifiers : public boost::noncopyable |
92 { | 268 { |
93 private: | 269 private: |
94 ResourceType level_; | 270 ResourceType level_; |
95 std::string patient_; | 271 std::string patient_; |
400 assert(type_ == Type_WriteInstance); | 576 assert(type_ == Type_WriteInstance); |
401 } | 577 } |
402 | 578 |
403 void Apply(HierarchicalZipWriter& writer, | 579 void Apply(HierarchicalZipWriter& writer, |
404 ServerContext& context, | 580 ServerContext& context, |
581 InstanceLoader& instanceLoader, | |
405 DicomDirWriter* dicomDir, | 582 DicomDirWriter* dicomDir, |
406 const std::string& dicomDirFolder, | 583 const std::string& dicomDirFolder, |
407 bool transcode, | 584 bool transcode, |
408 DicomTransferSyntax transferSyntax) const | 585 DicomTransferSyntax transferSyntax) const |
409 { | 586 { |
421 { | 598 { |
422 std::string content; | 599 std::string content; |
423 | 600 |
424 try | 601 try |
425 { | 602 { |
426 context.ReadDicom(content, instanceId_); | 603 instanceLoader.GetDicom(content, instanceId_); |
427 } | 604 } |
428 catch (OrthancException& e) | 605 catch (OrthancException& e) |
429 { | 606 { |
430 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_; | 607 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_; |
431 return; | 608 return; |
492 }; | 669 }; |
493 | 670 |
494 std::deque<Command*> commands_; | 671 std::deque<Command*> commands_; |
495 uint64_t uncompressedSize_; | 672 uint64_t uncompressedSize_; |
496 unsigned int instancesCount_; | 673 unsigned int instancesCount_; |
674 InstanceLoader& instanceLoader_; | |
497 | 675 |
498 | 676 |
499 void ApplyInternal(HierarchicalZipWriter& writer, | 677 void ApplyInternal(HierarchicalZipWriter& writer, |
500 ServerContext& context, | 678 ServerContext& context, |
679 InstanceLoader& instanceLoader, | |
501 size_t index, | 680 size_t index, |
502 DicomDirWriter* dicomDir, | 681 DicomDirWriter* dicomDir, |
503 const std::string& dicomDirFolder, | 682 const std::string& dicomDirFolder, |
504 bool transcode, | 683 bool transcode, |
505 DicomTransferSyntax transferSyntax) const | 684 DicomTransferSyntax transferSyntax) const |
507 if (index >= commands_.size()) | 686 if (index >= commands_.size()) |
508 { | 687 { |
509 throw OrthancException(ErrorCode_ParameterOutOfRange); | 688 throw OrthancException(ErrorCode_ParameterOutOfRange); |
510 } | 689 } |
511 | 690 |
512 commands_[index]->Apply(writer, context, dicomDir, dicomDirFolder, transcode, transferSyntax); | 691 commands_[index]->Apply(writer, context, instanceLoader, dicomDir, dicomDirFolder, transcode, transferSyntax); |
513 } | 692 } |
514 | 693 |
515 public: | 694 public: |
516 ZipCommands() : | 695 ZipCommands(InstanceLoader& instanceLoader) : |
517 uncompressedSize_(0), | 696 uncompressedSize_(0), |
518 instancesCount_(0) | 697 instancesCount_(0), |
698 instanceLoader_(instanceLoader) | |
519 { | 699 { |
520 } | 700 } |
521 | 701 |
522 ~ZipCommands() | 702 ~ZipCommands() |
523 { | 703 { |
545 } | 725 } |
546 | 726 |
547 // "media" flavor (with DICOMDIR) | 727 // "media" flavor (with DICOMDIR) |
548 void Apply(HierarchicalZipWriter& writer, | 728 void Apply(HierarchicalZipWriter& writer, |
549 ServerContext& context, | 729 ServerContext& context, |
730 InstanceLoader& instanceLoader, | |
550 size_t index, | 731 size_t index, |
551 DicomDirWriter& dicomDir, | 732 DicomDirWriter& dicomDir, |
552 const std::string& dicomDirFolder, | 733 const std::string& dicomDirFolder, |
553 bool transcode, | 734 bool transcode, |
554 DicomTransferSyntax transferSyntax) const | 735 DicomTransferSyntax transferSyntax) const |
555 { | 736 { |
556 ApplyInternal(writer, context, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); | 737 ApplyInternal(writer, context, instanceLoader, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); |
557 } | 738 } |
558 | 739 |
559 // "archive" flavor (without DICOMDIR) | 740 // "archive" flavor (without DICOMDIR) |
560 void Apply(HierarchicalZipWriter& writer, | 741 void Apply(HierarchicalZipWriter& writer, |
561 ServerContext& context, | 742 ServerContext& context, |
743 InstanceLoader& instanceLoader, | |
562 size_t index, | 744 size_t index, |
563 bool transcode, | 745 bool transcode, |
564 DicomTransferSyntax transferSyntax) const | 746 DicomTransferSyntax transferSyntax) const |
565 { | 747 { |
566 ApplyInternal(writer, context, index, NULL, "", transcode, transferSyntax); | 748 ApplyInternal(writer, context, instanceLoader, index, NULL, "", transcode, transferSyntax); |
567 } | 749 } |
568 | 750 |
569 void AddOpenDirectory(const std::string& filename) | 751 void AddOpenDirectory(const std::string& filename) |
570 { | 752 { |
571 commands_.push_back(new Command(Type_OpenDirectory, filename)); | 753 commands_.push_back(new Command(Type_OpenDirectory, filename)); |
578 | 760 |
579 void AddWriteInstance(const std::string& filename, | 761 void AddWriteInstance(const std::string& filename, |
580 const std::string& instanceId, | 762 const std::string& instanceId, |
581 uint64_t uncompressedSize) | 763 uint64_t uncompressedSize) |
582 { | 764 { |
765 instanceLoader_.PrepareDicom(instanceId); | |
583 commands_.push_back(new Command(Type_WriteInstance, filename, instanceId)); | 766 commands_.push_back(new Command(Type_WriteInstance, filename, instanceId)); |
584 instancesCount_ ++; | 767 instancesCount_ ++; |
585 uncompressedSize_ += uncompressedSize; | 768 uncompressedSize_ += uncompressedSize; |
586 } | 769 } |
587 | 770 |
745 | 928 |
746 class ArchiveJob::ZipWriterIterator : public boost::noncopyable | 929 class ArchiveJob::ZipWriterIterator : public boost::noncopyable |
747 { | 930 { |
748 private: | 931 private: |
749 ServerContext& context_; | 932 ServerContext& context_; |
933 InstanceLoader& instanceLoader_; | |
750 ZipCommands commands_; | 934 ZipCommands commands_; |
751 std::unique_ptr<HierarchicalZipWriter> zip_; | 935 std::unique_ptr<HierarchicalZipWriter> zip_; |
752 std::unique_ptr<DicomDirWriter> dicomDir_; | 936 std::unique_ptr<DicomDirWriter> dicomDir_; |
753 bool isMedia_; | 937 bool isMedia_; |
754 bool isStream_; | 938 bool isStream_; |
755 | 939 |
756 public: | 940 public: |
757 ZipWriterIterator(ServerContext& context, | 941 ZipWriterIterator(ServerContext& context, |
942 InstanceLoader& instanceLoader, | |
758 ArchiveIndex& archive, | 943 ArchiveIndex& archive, |
759 bool isMedia, | 944 bool isMedia, |
760 bool enableExtendedSopClass) : | 945 bool enableExtendedSopClass) : |
761 context_(context), | 946 context_(context), |
947 instanceLoader_(instanceLoader), | |
948 commands_(instanceLoader), | |
762 isMedia_(isMedia), | 949 isMedia_(isMedia), |
763 isStream_(false) | 950 isStream_(false) |
764 { | 951 { |
765 if (isMedia) | 952 if (isMedia) |
766 { | 953 { |
880 else | 1067 else |
881 { | 1068 { |
882 if (isMedia_) | 1069 if (isMedia_) |
883 { | 1070 { |
884 assert(dicomDir_.get() != NULL); | 1071 assert(dicomDir_.get() != NULL); |
885 commands_.Apply(*zip_, context_, index, *dicomDir_, | 1072 commands_.Apply(*zip_, context_, instanceLoader_, index, *dicomDir_, |
886 MEDIA_IMAGES_FOLDER, transcode, transferSyntax); | 1073 MEDIA_IMAGES_FOLDER, transcode, transferSyntax); |
887 } | 1074 } |
888 else | 1075 else |
889 { | 1076 { |
890 assert(dicomDir_.get() == NULL); | 1077 assert(dicomDir_.get() == NULL); |
891 commands_.Apply(*zip_, context_, index, transcode, transferSyntax); | 1078 commands_.Apply(*zip_, context_, instanceLoader_, index, transcode, transferSyntax); |
892 } | 1079 } |
893 } | 1080 } |
894 } | 1081 } |
895 | 1082 |
896 unsigned int GetInstancesCount() const | 1083 unsigned int GetInstancesCount() const |
915 currentStep_(0), | 1102 currentStep_(0), |
916 instancesCount_(0), | 1103 instancesCount_(0), |
917 uncompressedSize_(0), | 1104 uncompressedSize_(0), |
918 archiveSize_(0), | 1105 archiveSize_(0), |
919 transcode_(false), | 1106 transcode_(false), |
920 transferSyntax_(DicomTransferSyntax_LittleEndianImplicit) | 1107 transferSyntax_(DicomTransferSyntax_LittleEndianImplicit), |
1108 loaderThreads_(0) | |
921 { | 1109 { |
922 } | 1110 } |
923 | 1111 |
924 | 1112 |
925 ArchiveJob::~ArchiveJob() | 1113 ArchiveJob::~ArchiveJob() |
991 transferSyntax_ = transferSyntax; | 1179 transferSyntax_ = transferSyntax; |
992 } | 1180 } |
993 } | 1181 } |
994 | 1182 |
995 | 1183 |
1184 void ArchiveJob::SetLoaderThreads(unsigned int loaderThreads) | |
1185 { | |
1186 if (writer_.get() != NULL) // Already started | |
1187 { | |
1188 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1189 } | |
1190 else | |
1191 { | |
1192 loaderThreads_ = loaderThreads; | |
1193 } | |
1194 } | |
1195 | |
1196 | |
996 void ArchiveJob::Reset() | 1197 void ArchiveJob::Reset() |
997 { | 1198 { |
998 throw OrthancException(ErrorCode_BadSequenceOfCalls, | 1199 throw OrthancException(ErrorCode_BadSequenceOfCalls, |
999 "Cannot resubmit the creation of an archive"); | 1200 "Cannot resubmit the creation of an archive"); |
1000 } | 1201 } |
1001 | 1202 |
1002 | 1203 |
1003 void ArchiveJob::Start() | 1204 void ArchiveJob::Start() |
1004 { | 1205 { |
1206 if (loaderThreads_ == 0) | |
1207 { | |
1208 // default behaviour before loaderThreads was introducted in 1.9.8 | |
1209 instanceLoader_.reset(new SynchronousInstanceLoader(context_)); | |
1210 } | |
1211 else | |
1212 { | |
1213 instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_)); | |
1214 } | |
1215 | |
1005 if (writer_.get() != NULL) | 1216 if (writer_.get() != NULL) |
1006 { | 1217 { |
1007 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 1218 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
1008 } | 1219 } |
1009 else | 1220 else |
1021 asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); | 1232 asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); |
1022 | 1233 |
1023 assert(asynchronousTarget_.get() != NULL); | 1234 assert(asynchronousTarget_.get() != NULL); |
1024 asynchronousTarget_->Touch(); // Make sure we can write to the temporary file | 1235 asynchronousTarget_->Touch(); // Make sure we can write to the temporary file |
1025 | 1236 |
1026 writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); | 1237 writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); |
1027 writer_->SetOutputFile(asynchronousTarget_->GetPath()); | 1238 writer_->SetOutputFile(asynchronousTarget_->GetPath()); |
1028 } | 1239 } |
1029 } | 1240 } |
1030 else | 1241 else |
1031 { | 1242 { |
1032 assert(synchronousTarget_.get() != NULL); | 1243 assert(synchronousTarget_.get() != NULL); |
1033 | 1244 |
1034 writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); | 1245 writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_)); |
1035 writer_->AcquireOutputStream(synchronousTarget_.release()); | 1246 writer_->AcquireOutputStream(synchronousTarget_.release()); |
1036 } | 1247 } |
1037 | 1248 |
1038 instancesCount_ = writer_->GetInstancesCount(); | 1249 instancesCount_ = writer_->GetInstancesCount(); |
1039 uncompressedSize_ = writer_->GetUncompressedSize(); | 1250 uncompressedSize_ = writer_->GetUncompressedSize(); |
1072 if (writer_.get() != NULL) | 1283 if (writer_.get() != NULL) |
1073 { | 1284 { |
1074 writer_->Close(); // Flush all the results | 1285 writer_->Close(); // Flush all the results |
1075 archiveSize_ = writer_->GetArchiveSize(); | 1286 archiveSize_ = writer_->GetArchiveSize(); |
1076 writer_.reset(); | 1287 writer_.reset(); |
1288 } | |
1289 | |
1290 if (instanceLoader_.get() != NULL) | |
1291 { | |
1292 instanceLoader_->Clear(); | |
1077 } | 1293 } |
1078 | 1294 |
1079 if (asynchronousTarget_.get() != NULL) | 1295 if (asynchronousTarget_.get() != NULL) |
1080 { | 1296 { |
1081 // Asynchronous behavior: Move the resulting file into the media archive | 1297 // Asynchronous behavior: Move the resulting file into the media archive |