comparison OrthancServer/Sources/ServerJobs/ArchiveJob.cpp @ 5302:ad3cd5ec2074

Reduced the memory usage when downloading archives when "ZipLoaderThreads" > 0
author Alain Mazy <am@osimis.io>
date Wed, 24 May 2023 11:44:21 +0200
parents 0ea402b4d901
children b5c502bcaf99
comparison
equal deleted inserted replaced
5298:76dc541c5dda 5302:ad3cd5ec2074
174 }; 174 };
175 175
176 class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader 176 class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader
177 { 177 {
178 Semaphore availableInstancesSemaphore_; 178 Semaphore availableInstancesSemaphore_;
179 Semaphore bufferedInstancesSemaphore_;
179 std::map<std::string, boost::shared_ptr<std::string> > availableInstances_; 180 std::map<std::string, boost::shared_ptr<std::string> > availableInstances_;
180 boost::mutex availableInstancesMutex_; 181 boost::mutex availableInstancesMutex_;
181 SharedMessageQueue instancesToPreload_; 182 SharedMessageQueue instancesToPreload_;
182 std::vector<boost::thread*> threads_; 183 std::vector<boost::thread*> threads_;
183 184
184 185
185 public: 186 public:
186 ThreadedInstanceLoader(ServerContext& context, size_t threadCount, bool transcode, DicomTransferSyntax transferSyntax) 187 ThreadedInstanceLoader(ServerContext& context, size_t threadCount, bool transcode, DicomTransferSyntax transferSyntax)
187 : InstanceLoader(context, transcode, transferSyntax), 188 : InstanceLoader(context, transcode, transferSyntax),
188 availableInstancesSemaphore_(0) 189 availableInstancesSemaphore_(0),
190 bufferedInstancesSemaphore_(3*threadCount)
189 { 191 {
190 for (size_t i = 0; i < threadCount; i++) 192 for (size_t i = 0; i < threadCount; i++)
191 { 193 {
192 threads_.push_back(new boost::thread(PreloaderWorkerThread, this)); 194 threads_.push_back(new boost::thread(PreloaderWorkerThread, this));
193 } 195 }
225 std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0))); 227 std::unique_ptr<InstanceId> instanceId(dynamic_cast<InstanceId*>(that->instancesToPreload_.Dequeue(0)));
226 if (instanceId.get() == NULL) // that's the signal to exit the thread 228 if (instanceId.get() == NULL) // that's the signal to exit the thread
227 { 229 {
228 return; 230 return;
229 } 231 }
232
233 // wait for the consumers (zip writer), no need to accumulate instances in memory if loaders are faster than writers
234 that->bufferedInstancesSemaphore_.Acquire();
230 235
231 try 236 try
232 { 237 {
233 boost::shared_ptr<std::string> dicomContent(new std::string()); 238 boost::shared_ptr<std::string> dicomContent(new std::string());
234 that->context_.ReadDicom(*dicomContent, instanceId->GetId()); 239 that->context_.ReadDicom(*dicomContent, instanceId->GetId());
268 { 273 {
269 while (true) 274 while (true)
270 { 275 {
271 // wait for an instance to be available but this might not be the one we are waiting for ! 276 // wait for an instance to be available but this might not be the one we are waiting for !
272 availableInstancesSemaphore_.Acquire(); 277 availableInstancesSemaphore_.Acquire();
278 bufferedInstancesSemaphore_.Release(); // unlock the "flow" of loaders
273 279
274 boost::shared_ptr<std::string> dicomContent; 280 boost::shared_ptr<std::string> dicomContent;
275 { 281 {
276 if (availableInstances_.find(instanceId) != availableInstances_.end()) 282 if (availableInstances_.find(instanceId) != availableInstances_.end())
277 { 283 {