comparison OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 5108:a386dfb5b386

Optimization: now using multiple threads to transcode files for asynchronous download of studies archive
author Alain Mazy <am@osimis.io>
date Tue, 22 Nov 2022 12:54:10 +0100
parents df86d2505df8
children 027366cae766
comparison
equal deleted inserted replaced
5107:43461cc99838 5108:a386dfb5b386
80 80
81 class ArchiveJob::InstanceLoader : public boost::noncopyable 81 class ArchiveJob::InstanceLoader : public boost::noncopyable
82 { 82 {
83 protected: 83 protected:
84 ServerContext& context_; 84 ServerContext& context_;
85 bool transcode_;
86 DicomTransferSyntax transferSyntax_;
85 public: 87 public:
86 explicit InstanceLoader(ServerContext& context) 88 explicit InstanceLoader(ServerContext& context, bool transcode, DicomTransferSyntax transferSyntax)
87 : context_(context) 89 : context_(context),
90 transcode_(transcode),
91 transferSyntax_(transferSyntax)
88 { 92 {
89 } 93 }
90 94
91 virtual ~InstanceLoader() 95 virtual ~InstanceLoader()
92 { 96 {
93 } 97 }
94 98
95 virtual void PrepareDicom(const std::string& instanceId) 99 virtual void PrepareDicom(const std::string& instanceId)
96 { 100 {
97 101 }
102
103 bool TranscodeDicom(std::string& transcodedBuffer, const std::string& sourceBuffer, const std::string& instanceId)
104 {
105 if (transcode_)
106 {
107 std::set<DicomTransferSyntax> syntaxes;
108 syntaxes.insert(transferSyntax_);
109
110 IDicomTranscoder::DicomImage source, transcoded;
111 source.SetExternalBuffer(sourceBuffer);
112
113 if (context_.Transcode(transcoded, source, syntaxes, true /* allow new SOP instance UID */))
114 {
115 transcodedBuffer.assign(reinterpret_cast<const char*>(transcoded.GetBufferData()), transcoded.GetBufferSize());
116 return true;
117 }
118 else
119 {
120 LOG(INFO) << "Cannot transcode instance " << instanceId
121 << " to transfer syntax: " << GetTransferSyntaxUid(transferSyntax_);
122 }
123 }
124
125 return false;
98 } 126 }
99 127
100 virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0; 128 virtual void GetDicom(std::string& dicom, const std::string& instanceId) = 0;
101 129
102 virtual void Clear() 130 virtual void Clear()
105 }; 133 };
106 134
107 class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader 135 class ArchiveJob::SynchronousInstanceLoader : public ArchiveJob::InstanceLoader
108 { 136 {
109 public: 137 public:
110 explicit SynchronousInstanceLoader(ServerContext& context) 138 explicit SynchronousInstanceLoader(ServerContext& context, bool transcode, DicomTransferSyntax transferSyntax)
111 : InstanceLoader(context) 139 : InstanceLoader(context, transcode, transferSyntax)
112 { 140 {
113 } 141 }
114 142
115 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE 143 virtual void GetDicom(std::string& dicom, const std::string& instanceId) ORTHANC_OVERRIDE
116 { 144 {
117 context_.ReadDicom(dicom, instanceId); 145 context_.ReadDicom(dicom, instanceId);
146
147 if (transcode_)
148 {
149 std::string transcoded;
150 if (TranscodeDicom(transcoded, dicom, instanceId))
151 {
152 dicom.swap(transcoded);
153 }
154 }
155
118 } 156 }
119 }; 157 };
120 158
121 class InstanceId : public Orthanc::IDynamicObject 159 class InstanceId : public Orthanc::IDynamicObject
122 { 160 {
143 SharedMessageQueue instancesToPreload_; 181 SharedMessageQueue instancesToPreload_;
144 std::vector<boost::thread*> threads_; 182 std::vector<boost::thread*> threads_;
145 183
146 184
147 public: 185 public:
148 ThreadedInstanceLoader(ServerContext& context, size_t threadCount) 186 ThreadedInstanceLoader(ServerContext& context, size_t threadCount, bool transcode, DicomTransferSyntax transferSyntax)
149 : InstanceLoader(context), 187 : InstanceLoader(context, transcode, transferSyntax),
150 availableInstancesSemaphore_(0) 188 availableInstancesSemaphore_(0)
151 { 189 {
152 for (size_t i = 0; i < threadCount; i++) 190 for (size_t i = 0; i < threadCount; i++)
153 { 191 {
154 threads_.push_back(new boost::thread(PreloaderWorkerThread, this)); 192 threads_.push_back(new boost::thread(PreloaderWorkerThread, this));
192 230
193 try 231 try
194 { 232 {
195 boost::shared_ptr<std::string> dicomContent(new std::string()); 233 boost::shared_ptr<std::string> dicomContent(new std::string());
196 that->context_.ReadDicom(*dicomContent, instanceId->GetId()); 234 that->context_.ReadDicom(*dicomContent, instanceId->GetId());
235
236 if (that->transcode_)
237 {
238 boost::shared_ptr<std::string> transcodedDicom(new std::string());
239 if (that->TranscodeDicom(*transcodedDicom, *dicomContent, instanceId->GetId()))
240 {
241 dicomContent = transcodedDicom;
242 }
243 }
244
197 { 245 {
198 boost::mutex::scoped_lock lock(that->availableInstancesMutex_); 246 boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
199 that->availableInstances_[instanceId->GetId()] = dicomContent; 247 that->availableInstances_[instanceId->GetId()] = dicomContent;
200 } 248 }
201 249
595 { 643 {
596 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_; 644 LOG(WARNING) << "An instance was removed after the job was issued: " << instanceId_;
597 return; 645 return;
598 } 646 }
599 647
600 //boost::this_thread::sleep(boost::posix_time::milliseconds(300));
601
602 writer.OpenFile(filename_.c_str()); 648 writer.OpenFile(filename_.c_str());
603 649
604 bool transcodeSuccess = false; 650 bool transcodeSuccess = false;
605 651
606 std::unique_ptr<ParsedDicomFile> parsed; 652 std::unique_ptr<ParsedDicomFile> parsed;
607 653
608 if (transcode)
609 {
610 // New in Orthanc 1.7.0
611 std::set<DicomTransferSyntax> syntaxes;
612 syntaxes.insert(transferSyntax);
613
614 IDicomTranscoder::DicomImage source, transcoded;
615 source.SetExternalBuffer(content);
616
617 if (context.Transcode(transcoded, source, syntaxes, true /* allow new SOP instance UID */))
618 {
619 writer.Write(transcoded.GetBufferData(), transcoded.GetBufferSize());
620
621 if (dicomDir != NULL)
622 {
623 std::unique_ptr<ParsedDicomFile> tmp(transcoded.ReleaseAsParsedDicomFile());
624 dicomDir->Add(dicomDirFolder, filename_, *tmp);
625 }
626
627 transcodeSuccess = true;
628 }
629 else
630 {
631 LOG(INFO) << "Cannot transcode instance " << instanceId_
632 << " to transfer syntax: " << GetTransferSyntaxUid(transferSyntax);
633 }
634 }
635
636 if (!transcodeSuccess) 654 if (!transcodeSuccess)
637 { 655 {
638 writer.Write(content); 656 writer.Write(content);
639 657
640 if (dicomDir != NULL) 658 if (dicomDir != NULL)
1193 void ArchiveJob::Start() 1211 void ArchiveJob::Start()
1194 { 1212 {
1195 if (loaderThreads_ == 0) 1213 if (loaderThreads_ == 0)
1196 { 1214 {
1197 // default behaviour before loaderThreads was introducted in 1.10.0 1215 // default behaviour before loaderThreads was introducted in 1.10.0
1198 instanceLoader_.reset(new SynchronousInstanceLoader(context_)); 1216 instanceLoader_.reset(new SynchronousInstanceLoader(context_, transcode_, transferSyntax_));
1199 } 1217 }
1200 else 1218 else
1201 { 1219 {
1202 instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_)); 1220 instanceLoader_.reset(new ThreadedInstanceLoader(context_, loaderThreads_, transcode_, transferSyntax_));
1203 } 1221 }
1204 1222
1205 if (writer_.get() != NULL) 1223 if (writer_.get() != NULL)
1206 { 1224 {
1207 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1225 throw OrthancException(ErrorCode_BadSequenceOfCalls);