comparison OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 4810:7afbb54bd028

merge storage-cache
author Alain Mazy <am@osimis.io>
date Tue, 23 Nov 2021 09:22:11 +0100
parents 0a38000b086d 4e765c18ace7
children 46bfa3a4fd63 cd6dc99e0470
comparison
equal deleted inserted replaced
4809:2ca4213fb50a 4810:7afbb54bd028
38 #include "../../../OrthancFramework/Sources/Compression/HierarchicalZipWriter.h" 38 #include "../../../OrthancFramework/Sources/Compression/HierarchicalZipWriter.h"
39 #include "../../../OrthancFramework/Sources/DicomParsing/DicomDirWriter.h" 39 #include "../../../OrthancFramework/Sources/DicomParsing/DicomDirWriter.h"
40 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h" 40 #include "../../../OrthancFramework/Sources/DicomParsing/FromDcmtkBridge.h"
41 #include "../../../OrthancFramework/Sources/Logging.h" 41 #include "../../../OrthancFramework/Sources/Logging.h"
42 #include "../../../OrthancFramework/Sources/OrthancException.h" 42 #include "../../../OrthancFramework/Sources/OrthancException.h"
43 #include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h"
43 #include "../OrthancConfiguration.h" 44 #include "../OrthancConfiguration.h"
44 #include "../ServerContext.h" 45 #include "../ServerContext.h"
45 46
46 #include <stdio.h> 47 #include <stdio.h>
47 48
86 87
87 return isZip64; 88 return isZip64;
88 } 89 }
89 90
90 91
92 class ArchiveJob::InstanceLoader : public boost::noncopyable
93 {
94 protected:
95 ServerContext& context_;
96 public:
97 InstanceLoader(ServerContext& context)
98 : context_(context)
99 {
100 }
101
102 virtual ~InstanceLoader()
103 {
104 }
105
106 virtual void PrepareDicom(const std::string& instanceId)
107 {
108
109 }
110
111 virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0;
112
113 virtual void Clear()
114 {
115 }
116 };
117
118 class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader
119 {
120 public:
121 SynchronousInstanceLoader(ServerContext& context)
122 : InstanceLoader(context)
123 {
124 }
125
126 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE
127 {
128 context_.ReadDicom(dicom, instanceId);
129 }
130 };
131
132 class InstanceId : public Orthanc::IDynamicObject
133 {
134 private:
135 std::string id_;
136
137 public:
138 InstanceId(const std::string& id) : id_(id)
139 {
140 }
141
142 virtual ~InstanceId() ORTHANC_OVERRIDE
143 {
144 }
145
146 std::string GetId() const {return id_;};
147 };
148
149 class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader
150 {
151 Semaphore availableInstancesSemaphore_;
152 std::map<std::string, boost::shared_ptr<std::string>> availableInstances_;
153 boost::mutex availableInstancesMutex_;
154 SharedMessageQueue instancesToPreload_;
155 std::vector<boost::thread*> threads_;
156
157
158 public:
159 ThreadedInstanceLoader(ServerContext& context, size_t threadCount)
160 : InstanceLoader(context),
161 availableInstancesSemaphore_(0)
162 {
163 for (size_t i = 0; i < threadCount; i++)
164 {
165 threads_.push_back(new boost::thread(PreloaderWorkerThread, this));
166 }
167 }
168
169 virtual ~ThreadedInstanceLoader()
170 {
171 Clear();
172 }
173
174 virtual void Clear() ORTHANC_OVERRIDE
175 {
176 for (size_t i = 0; i < threads_.size(); i++)
177 {
178 instancesToPreload_.Enqueue(NULL);
179 }
180
181 for (size_t i = 0; i < threads_.size(); i++)
182 {
183 if (threads_[i]->joinable())
184 {
185 threads_[i]->join();
186 }
187 delete threads_[i];
188 }
189
190 threads_.clear();
191 availableInstances_.clear();
192 }
193
194 static void PreloaderWorkerThread(ThreadedInstanceLoader* that)
195 {
196 while (true)
197 {
198 std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0)));
199 if (instanceId.get() == NULL) // that's the signal to exit the thread
200 {
201 return;
202 }
203
204 try
205 {
206 boost::shared_ptr<std::string> dicomContent(new std::string());
207 that->context_.ReadDicom(*dicomContent, instanceId->GetId());
208 {
209 boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
210 that->availableInstances_[instanceId->GetId()] = dicomContent;
211 }
212
213 that->availableInstancesSemaphore_.Release();
214 }
215 catch (OrthancException& e)
216 {
217 boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
218 // store a NULL result to notify that we could not read the instance
219 that->availableInstances_[instanceId->GetId()] = boost::shared_ptr<std::string>();
220 that->availableInstancesSemaphore_.Release();
221 }
222 }
223 }
224
225 virtual void PrepareDicom(const std::string& instanceId) ORTHANC_OVERRIDE
226 {
227 instancesToPreload_.Enqueue(new InstanceId(instanceId));
228 }
229
230 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE
231 {
232 while (true)
233 {
234 // wait for an instance to be available but this might not be the one we are waiting for !
235 availableInstancesSemaphore_.Acquire();
236
237 boost::shared_ptr<std::string> dicomContent;
238 {
239 if (availableInstances_.find(instanceId) != availableInstances_.end())
240 {
241 // this is the instance we were waiting for
242 dicomContent = availableInstances_[instanceId];
243 availableInstances_.erase(instanceId);
244
245 if (dicomContent.get() == NULL) // there has been an error while reading the file
246 {
247 throw OrthancException(ErrorCode_InexistentItem);
248 }
249 dicom.swap(*dicomContent);
250
251 if (availableInstances_.size() > 0)
252 {
253 // we have just read the instance we were waiting for but there are still other instances available ->
254 // make sure the next GetDicom call does not wait !
255 availableInstancesSemaphore_.Release();
256 }
257 return;
258 }
259 // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when
260 // a new instance is available
261 }
262 }
263 }
264 };
265
266
91 class ArchiveJob::ResourceIdentifiers : public boost::noncopyable 267 class ArchiveJob::ResourceIdentifiers : public boost::noncopyable
92 { 268 {
93 private: 269 private:
94 ResourceType level_; 270 ResourceType level_;
95 std::string patient_; 271 std::string patient_;
400 assert(type_ == Type_WriteInstance); 576 assert(type_ == Type_WriteInstance);
401 } 577 }
402 578
403 void Apply(HierarchicalZipWriter& writer, 579 void Apply(HierarchicalZipWriter& writer,
404 ServerContext& context, 580 ServerContext& context,
581 InstanceLoader& instanceLoader,
405 DicomDirWriter* dicomDir, 582 DicomDirWriter* dicomDir,
406 const std::string& dicomDirFolder, 583 const std::string& dicomDirFolder,
407 bool transcode, 584 bool transcode,
408 DicomTransferSyntax transferSyntax) const 585 DicomTransferSyntax transferSyntax) const
409 { 586 {
421 { 598 {
422 std::string content; 599 std::string content;
423 600
424 try 601 try
425 { 602 {
426 context.ReadDicom(content, instanceId_); 603 instanceLoader.GetDicom(content, instanceId_);
427 } 604 }
428 catch (OrthancException& e) 605 catch (OrthancException& e)
429 { 606 {
430 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_; 607 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_;
431 return; 608 return;
492 }; 669 };
493 670
494 std::deque<Command*> commands_; 671 std::deque<Command*> commands_;
495 uint64_t uncompressedSize_; 672 uint64_t uncompressedSize_;
496 unsigned int instancesCount_; 673 unsigned int instancesCount_;
674 InstanceLoader& instanceLoader_;
497 675
498 676
499 void ApplyInternal(HierarchicalZipWriter& writer, 677 void ApplyInternal(HierarchicalZipWriter& writer,
500 ServerContext& context, 678 ServerContext& context,
679 InstanceLoader& instanceLoader,
501 size_t index, 680 size_t index,
502 DicomDirWriter* dicomDir, 681 DicomDirWriter* dicomDir,
503 const std::string& dicomDirFolder, 682 const std::string& dicomDirFolder,
504 bool transcode, 683 bool transcode,
505 DicomTransferSyntax transferSyntax) const 684 DicomTransferSyntax transferSyntax) const
507 if (index >= commands_.size()) 686 if (index >= commands_.size())
508 { 687 {
509 throw OrthancException(ErrorCode_ParameterOutOfRange); 688 throw OrthancException(ErrorCode_ParameterOutOfRange);
510 } 689 }
511 690
512 commands_[index]->Apply(writer, context, dicomDir, dicomDirFolder, transcode, transferSyntax); 691 commands_[index]->Apply(writer, context, instanceLoader, dicomDir, dicomDirFolder, transcode, transferSyntax);
513 } 692 }
514 693
515 public: 694 public:
516 ZipCommands() : 695 ZipCommands(InstanceLoader& instanceLoader) :
517 uncompressedSize_(0), 696 uncompressedSize_(0),
518 instancesCount_(0) 697 instancesCount_(0),
698 instanceLoader_(instanceLoader)
519 { 699 {
520 } 700 }
521 701
522 ~ZipCommands() 702 ~ZipCommands()
523 { 703 {
545 } 725 }
546 726
547 // "media" flavor (with DICOMDIR) 727 // "media" flavor (with DICOMDIR)
548 void Apply(HierarchicalZipWriter& writer, 728 void Apply(HierarchicalZipWriter& writer,
549 ServerContext& context, 729 ServerContext& context,
730 InstanceLoader& instanceLoader,
550 size_t index, 731 size_t index,
551 DicomDirWriter& dicomDir, 732 DicomDirWriter& dicomDir,
552 const std::string& dicomDirFolder, 733 const std::string& dicomDirFolder,
553 bool transcode, 734 bool transcode,
554 DicomTransferSyntax transferSyntax) const 735 DicomTransferSyntax transferSyntax) const
555 { 736 {
556 ApplyInternal(writer, context, index, &dicomDir, dicomDirFolder, transcode, transferSyntax); 737 ApplyInternal(writer, context, instanceLoader, index, &dicomDir, dicomDirFolder, transcode, transferSyntax);
557 } 738 }
558 739
559 // "archive" flavor (without DICOMDIR) 740 // "archive" flavor (without DICOMDIR)
560 void Apply(HierarchicalZipWriter& writer, 741 void Apply(HierarchicalZipWriter& writer,
561 ServerContext& context, 742 ServerContext& context,
743 InstanceLoader& instanceLoader,
562 size_t index, 744 size_t index,
563 bool transcode, 745 bool transcode,
564 DicomTransferSyntax transferSyntax) const 746 DicomTransferSyntax transferSyntax) const
565 { 747 {
566 ApplyInternal(writer, context, index, NULL, "", transcode, transferSyntax); 748 ApplyInternal(writer, context, instanceLoader, index, NULL, "", transcode, transferSyntax);
567 } 749 }
568 750
569 void AddOpenDirectory(const std::string& filename) 751 void AddOpenDirectory(const std::string& filename)
570 { 752 {
571 commands_.push_back(new Command(Type_OpenDirectory, filename)); 753 commands_.push_back(new Command(Type_OpenDirectory, filename));
578 760
579 void AddWriteInstance(const std::string& filename, 761 void AddWriteInstance(const std::string& filename,
580 const std::string& instanceId, 762 const std::string& instanceId,
581 uint64_t uncompressedSize) 763 uint64_t uncompressedSize)
582 { 764 {
765 instanceLoader_.PrepareDicom(instanceId);
583 commands_.push_back(new Command(Type_WriteInstance, filename, instanceId)); 766 commands_.push_back(new Command(Type_WriteInstance, filename, instanceId));
584 instancesCount_ ++; 767 instancesCount_ ++;
585 uncompressedSize_ += uncompressedSize; 768 uncompressedSize_ += uncompressedSize;
586 } 769 }
587 770
745 928
746 class ArchiveJob::ZipWriterIterator : public boost::noncopyable 929 class ArchiveJob::ZipWriterIterator : public boost::noncopyable
747 { 930 {
748 private: 931 private:
749 ServerContext& context_; 932 ServerContext& context_;
933 InstanceLoader& instanceLoader_;
750 ZipCommands commands_; 934 ZipCommands commands_;
751 std::unique_ptr<HierarchicalZipWriter> zip_; 935 std::unique_ptr<HierarchicalZipWriter> zip_;
752 std::unique_ptr<DicomDirWriter> dicomDir_; 936 std::unique_ptr<DicomDirWriter> dicomDir_;
753 bool isMedia_; 937 bool isMedia_;
754 bool isStream_; 938 bool isStream_;
755 939
756 public: 940 public:
757 ZipWriterIterator(ServerContext& context, 941 ZipWriterIterator(ServerContext& context,
942 InstanceLoader& instanceLoader,
758 ArchiveIndex& archive, 943 ArchiveIndex& archive,
759 bool isMedia, 944 bool isMedia,
760 bool enableExtendedSopClass) : 945 bool enableExtendedSopClass) :
761 context_(context), 946 context_(context),
947 instanceLoader_(instanceLoader),
948 commands_(instanceLoader),
762 isMedia_(isMedia), 949 isMedia_(isMedia),
763 isStream_(false) 950 isStream_(false)
764 { 951 {
765 if (isMedia) 952 if (isMedia)
766 { 953 {
880 else 1067 else
881 { 1068 {
882 if (isMedia_) 1069 if (isMedia_)
883 { 1070 {
884 assert(dicomDir_.get() != NULL); 1071 assert(dicomDir_.get() != NULL);
885 commands_.Apply(*zip_, context_, index, *dicomDir_, 1072 commands_.Apply(*zip_, context_, instanceLoader_, index, *dicomDir_,
886 MEDIA_IMAGES_FOLDER, transcode, transferSyntax); 1073 MEDIA_IMAGES_FOLDER, transcode, transferSyntax);
887 } 1074 }
888 else 1075 else
889 { 1076 {
890 assert(dicomDir_.get() == NULL); 1077 assert(dicomDir_.get() == NULL);
891 commands_.Apply(*zip_, context_, index, transcode, transferSyntax); 1078 commands_.Apply(*zip_, context_, instanceLoader_, index, transcode, transferSyntax);
892 } 1079 }
893 } 1080 }
894 } 1081 }
895 1082
896 unsigned int GetInstancesCount() const 1083 unsigned int GetInstancesCount() const
915 currentStep_(0), 1102 currentStep_(0),
916 instancesCount_(0), 1103 instancesCount_(0),
917 uncompressedSize_(0), 1104 uncompressedSize_(0),
918 archiveSize_(0), 1105 archiveSize_(0),
919 transcode_(false), 1106 transcode_(false),
920 transferSyntax_(DicomTransferSyntax_LittleEndianImplicit) 1107 transferSyntax_(DicomTransferSyntax_LittleEndianImplicit),
1108 loaderThreads_(0)
921 { 1109 {
922 } 1110 }
923 1111
924 1112
925 ArchiveJob::~ArchiveJob() 1113 ArchiveJob::~ArchiveJob()
991 transferSyntax_ = transferSyntax; 1179 transferSyntax_ = transferSyntax;
992 } 1180 }
993 } 1181 }
994 1182
995 1183
1184 void ArchiveJob::SetLoaderThreads(unsigned int loaderThreads)
1185 {
1186 if (writer_.get() != NULL) // Already started
1187 {
1188 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1189 }
1190 else
1191 {
1192 loaderThreads_ = loaderThreads;
1193 }
1194 }
1195
1196
996 void ArchiveJob::Reset() 1197 void ArchiveJob::Reset()
997 { 1198 {
998 throw OrthancException(ErrorCode_BadSequenceOfCalls, 1199 throw OrthancException(ErrorCode_BadSequenceOfCalls,
999 "Cannot resubmit the creation of an archive"); 1200 "Cannot resubmit the creation of an archive");
1000 } 1201 }
1001 1202
1002 1203
1003 void ArchiveJob::Start() 1204 void ArchiveJob::Start()
1004 { 1205 {
1206 if (loaderThreads_ == 0)
1207 {
1208 // default behaviour before loaderThreads was introducted in 1.9.8
1209 instanceLoader_.reset(new SynchronousInstanceLoader(context_));
1210 }
1211 else
1212 {
1213 instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_));
1214 }
1215
1005 if (writer_.get() != NULL) 1216 if (writer_.get() != NULL)
1006 { 1217 {
1007 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1218 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1008 } 1219 }
1009 else 1220 else
1021 asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile()); 1232 asynchronousTarget_.reset(lock.GetConfiguration().CreateTemporaryFile());
1022 1233
1023 assert(asynchronousTarget_.get() != NULL); 1234 assert(asynchronousTarget_.get() != NULL);
1024 asynchronousTarget_->Touch(); // Make sure we can write to the temporary file 1235 asynchronousTarget_->Touch(); // Make sure we can write to the temporary file
1025 1236
1026 writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); 1237 writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_));
1027 writer_->SetOutputFile(asynchronousTarget_->GetPath()); 1238 writer_->SetOutputFile(asynchronousTarget_->GetPath());
1028 } 1239 }
1029 } 1240 }
1030 else 1241 else
1031 { 1242 {
1032 assert(synchronousTarget_.get() != NULL); 1243 assert(synchronousTarget_.get() != NULL);
1033 1244
1034 writer_.reset(new ZipWriterIterator(context_, *archive_, isMedia_, enableExtendedSopClass_)); 1245 writer_.reset(new ZipWriterIterator(context_, *instanceLoader_, *archive_, isMedia_, enableExtendedSopClass_));
1035 writer_->AcquireOutputStream(synchronousTarget_.release()); 1246 writer_->AcquireOutputStream(synchronousTarget_.release());
1036 } 1247 }
1037 1248
1038 instancesCount_ = writer_->GetInstancesCount(); 1249 instancesCount_ = writer_->GetInstancesCount();
1039 uncompressedSize_ = writer_->GetUncompressedSize(); 1250 uncompressedSize_ = writer_->GetUncompressedSize();
1072 if (writer_.get() != NULL) 1283 if (writer_.get() != NULL)
1073 { 1284 {
1074 writer_->Close(); // Flush all the results 1285 writer_->Close(); // Flush all the results
1075 archiveSize_ = writer_->GetArchiveSize(); 1286 archiveSize_ = writer_->GetArchiveSize();
1076 writer_.reset(); 1287 writer_.reset();
1288 }
1289
1290 if (instanceLoader_.get() != NULL)
1291 {
1292 instanceLoader_->Clear();
1077 } 1293 }
1078 1294
1079 if (asynchronousTarget_.get() != NULL) 1295 if (asynchronousTarget_.get() != NULL)
1080 { 1296 {
1081 // Asynchronous behavior: Move the resulting file into the media archive 1297 // Asynchronous behavior: Move the resulting file into the media archive