Mercurial > hg > orthanc
comparison OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5130:f2dcdbe05884
ResourceModification jobs can now use multiple threads
author | Alain Mazy <am@osimis.io> |
---|---|
date | Thu, 05 Jan 2023 17:24:43 +0100 |
parents | |
children | 6aa41d86b948 |
comparison
equal
deleted
inserted
replaced
5128:ede035d48b8e | 5130:f2dcdbe05884 |
---|---|
1 /** | |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
5 * Copyright (C) 2017-2022 Osimis S.A., Belgium | |
6 * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium | |
7 * | |
8 * This program is free software: you can redistribute it and/or | |
9 * modify it under the terms of the GNU Lesser General Public License | |
10 * as published by the Free Software Foundation, either version 3 of | |
11 * the License, or (at your option) any later version. | |
12 * | |
13 * This program is distributed in the hope that it will be useful, but | |
14 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 * Lesser General Public License for more details. | |
17 * | |
18 * You should have received a copy of the GNU Lesser General Public | |
19 * License along with this program. If not, see | |
20 * <http://www.gnu.org/licenses/>. | |
21 **/ | |
22 | |
23 | |
24 #include "ThreadedSetOfInstancesJob.h" | |
25 | |
26 #include "../../../OrthancFramework/Sources/Logging.h" | |
27 #include "../../../OrthancFramework/Sources/OrthancException.h" | |
28 #include "../../../OrthancFramework/Sources/SerializationToolbox.h" | |
29 #include "../ServerContext.h" | |
30 | |
31 #include <boost/lexical_cast.hpp> | |
32 #include <cassert> | |
33 | |
34 namespace Orthanc | |
35 { | |
36 static const char* EXIT_WORKER_MESSAGE = "exit"; | |
37 | |
38 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, | |
39 bool hasPostProcessing, | |
40 bool keepSource, | |
41 size_t workersCount) : | |
42 processedInstancesCount_(0), | |
43 hasPostProcessing_(hasPostProcessing), | |
44 started_(false), | |
45 stopRequested_(false), | |
46 permissive_(false), | |
47 currentStep_(ThreadedJobStep_BeforeStart), | |
48 workersCount_(workersCount), | |
49 context_(context), | |
50 keepSource_(keepSource) | |
51 { | |
52 } | |
53 | |
54 | |
55 ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob() | |
56 { | |
57 // no need to lock mutex here since we access variables used only by the "master" thread | |
58 | |
59 StopWorkers(); | |
60 } | |
61 | |
62 | |
63 void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount) | |
64 { | |
65 // no need to lock mutex here since we access variables used only by the "master" thread | |
66 | |
67 for (size_t i = 0; i < workersCount; i++) | |
68 { | |
69 instancesWorkers_.push_back(boost::shared_ptr<boost::thread>(new boost::thread(InstanceWorkerThread, this))); | |
70 } | |
71 } | |
72 | |
73 | |
74 void ThreadedSetOfInstancesJob::WaitWorkersComplete() | |
75 { | |
76 // no need to lock mutex here since we access variables used only by the "master" thread | |
77 | |
78 // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue | |
79 for (size_t i = 0; i < instancesWorkers_.size(); i++) | |
80 { | |
81 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE)); | |
82 } | |
83 | |
84 for (size_t i = 0; i < instancesWorkers_.size(); i++) | |
85 { | |
86 if (instancesWorkers_[i]->joinable()) | |
87 { | |
88 instancesWorkers_[i]->join(); | |
89 } | |
90 } | |
91 | |
92 instancesWorkers_.clear(); | |
93 } | |
94 | |
95 | |
96 void ThreadedSetOfInstancesJob::StopWorkers() | |
97 { | |
98 // no need to lock mutex here since we access variables used or set only by the "master" thread | |
99 | |
100 stopRequested_ = true; | |
101 WaitWorkersComplete(); | |
102 } | |
103 | |
104 | |
105 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason) | |
106 { | |
107 // no need to lock mutex here since we access variables used or set only by the "master" thread | |
108 | |
109 if (reason == JobStopReason_Canceled || | |
110 reason == JobStopReason_Failure || | |
111 reason == JobStopReason_Retry) | |
112 { | |
113 // deallocate resources | |
114 StopWorkers(); | |
115 } | |
116 else if (reason == JobStopReason_Paused) | |
117 { | |
118 // keep resources allocated. | |
119 // note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state | |
120 } | |
121 } | |
122 | |
123 | |
124 JobStepResult ThreadedSetOfInstancesJob::Step(const std::string& jobId) | |
125 { | |
126 // no need to lock mutex here since we access variables used or set only by the "master" thread | |
127 | |
128 if (!started_) | |
129 { | |
130 throw OrthancException(ErrorCode_InternalError); | |
131 } | |
132 | |
133 if (GetInstancesCount() == 0) | |
134 { | |
135 // No instances to handle: We're done | |
136 return JobStepResult::Success(); | |
137 } | |
138 | |
139 try | |
140 { | |
141 if (currentStep_ == ThreadedJobStep_BeforeStart) | |
142 { | |
143 currentStep_ = ThreadedJobStep_ProcessingInstances; | |
144 } | |
145 | |
146 if (currentStep_ == ThreadedJobStep_ProcessingInstances) | |
147 { | |
148 // wait until all instances are processed by the workers | |
149 WaitWorkersComplete(); | |
150 | |
151 currentStep_ = ThreadedJobStep_PostProcessingInstances; | |
152 return JobStepResult::Continue(); | |
153 } | |
154 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances) | |
155 { | |
156 if (HasPostProcessingStep()) | |
157 { | |
158 PostProcessInstances(); | |
159 } | |
160 | |
161 currentStep_ = ThreadedJobStep_Cleanup; | |
162 return JobStepResult::Continue(); | |
163 } | |
164 else if (currentStep_ == ThreadedJobStep_Cleanup) | |
165 { | |
166 // clean after the post processing step | |
167 if (HasCleanupStep()) | |
168 { | |
169 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) | |
170 { | |
171 Json::Value tmp; | |
172 context_.DeleteResource(tmp, *it, ResourceType_Instance); | |
173 } | |
174 } | |
175 | |
176 currentStep_ = ThreadedJobStep_Done; | |
177 return JobStepResult::Success(); | |
178 } | |
179 } | |
180 catch (OrthancException& e) | |
181 { | |
182 if (permissive_) | |
183 { | |
184 LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What(); | |
185 } | |
186 else | |
187 { | |
188 return JobStepResult::Failure(e); | |
189 } | |
190 } | |
191 | |
192 return JobStepResult::Continue(); | |
193 } | |
194 | |
195 | |
196 bool ThreadedSetOfInstancesJob::HasPostProcessingStep() const | |
197 { | |
198 boost::recursive_mutex::scoped_lock lock(mutex_); | |
199 | |
200 return hasPostProcessing_; | |
201 } | |
202 | |
203 | |
204 bool ThreadedSetOfInstancesJob::HasCleanupStep() const | |
205 { | |
206 boost::recursive_mutex::scoped_lock lock(mutex_); | |
207 | |
208 return !keepSource_; | |
209 } | |
210 | |
211 | |
212 void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that) | |
213 { | |
214 while (true) | |
215 { | |
216 std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcess_.Dequeue(0))); | |
217 if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean | |
218 || instanceId->GetValue() == EXIT_WORKER_MESSAGE) | |
219 { | |
220 return; | |
221 } | |
222 | |
223 bool processed = that->HandleInstance(instanceId->GetValue()); | |
224 | |
225 { | |
226 boost::recursive_mutex::scoped_lock lock(that->mutex_); | |
227 | |
228 that->processedInstancesCount_++; | |
229 if (!processed) | |
230 { | |
231 that->failedInstances_.insert(instanceId->GetValue()); | |
232 } | |
233 } | |
234 } | |
235 } | |
236 | |
237 | |
238 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output, | |
239 MimeType &mime, | |
240 std::string& filename, | |
241 const std::string &key) | |
242 { | |
243 return false; | |
244 } | |
245 | |
246 | |
247 size_t ThreadedSetOfInstancesJob::GetInstancesCount() const | |
248 { | |
249 boost::recursive_mutex::scoped_lock lock(mutex_); | |
250 | |
251 return instances_.size(); | |
252 } | |
253 | |
254 | |
255 void ThreadedSetOfInstancesJob::GetFailedInstances(std::set<std::string>& target) const | |
256 { | |
257 boost::recursive_mutex::scoped_lock lock(mutex_); | |
258 | |
259 target = failedInstances_; | |
260 } | |
261 | |
262 | |
263 void ThreadedSetOfInstancesJob::GetInstances(std::set<std::string>& target) const | |
264 { | |
265 boost::recursive_mutex::scoped_lock lock(mutex_); | |
266 | |
267 target = instances_; | |
268 } | |
269 | |
270 | |
271 bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const | |
272 { | |
273 boost::recursive_mutex::scoped_lock lock(mutex_); | |
274 | |
275 return failedInstances_.find(instance) != failedInstances_.end(); | |
276 } | |
277 | |
278 | |
279 void ThreadedSetOfInstancesJob::Start() | |
280 { | |
281 boost::recursive_mutex::scoped_lock lock(mutex_); | |
282 | |
283 started_ = true; | |
284 | |
285 // create the workers and enqueue all instances | |
286 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) | |
287 { | |
288 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); | |
289 } | |
290 | |
291 InitWorkers(workersCount_); | |
292 } | |
293 | |
294 | |
295 void ThreadedSetOfInstancesJob::Reset() | |
296 { | |
297 boost::recursive_mutex::scoped_lock lock(mutex_); | |
298 | |
299 if (started_) | |
300 { | |
301 currentStep_ = ThreadedJobStep_BeforeStart; | |
302 stopRequested_ = false; | |
303 processedInstancesCount_ = 0; | |
304 failedInstances_.clear(); | |
305 instancesToProcess_.Clear(); | |
306 } | |
307 else | |
308 { | |
309 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
310 } | |
311 } | |
312 | |
313 | |
314 static const char* KEY_FAILED_INSTANCES = "FailedInstances"; | |
315 static const char* KEY_PARENT_RESOURCES = "ParentResources"; | |
316 static const char* KEY_DESCRIPTION = "Description"; | |
317 static const char* KEY_PERMISSIVE = "Permissive"; | |
318 static const char* KEY_CURRENT_STEP = "CurrentStep"; | |
319 static const char* KEY_TYPE = "Type"; | |
320 static const char* KEY_INSTANCES = "Instances"; | |
321 static const char* KEY_INSTANCES_COUNT = "InstancesCount"; | |
322 static const char* KEY_FAILED_INSTANCES_COUNT = "FailedInstancesCount"; | |
323 static const char* KEY_KEEP_SOURCE = "KeepSource"; | |
324 static const char* KEY_WORKERS_COUNT = "WorkersCount"; | |
325 | |
326 | |
327 void ThreadedSetOfInstancesJob::GetPublicContent(Json::Value& target) | |
328 { | |
329 boost::recursive_mutex::scoped_lock lock(mutex_); | |
330 | |
331 target[KEY_DESCRIPTION] = GetDescription(); | |
332 target[KEY_INSTANCES_COUNT] = static_cast<uint32_t>(GetInstancesCount()); | |
333 target[KEY_FAILED_INSTANCES_COUNT] = static_cast<uint32_t>(failedInstances_.size()); | |
334 | |
335 if (!parentResources_.empty()) | |
336 { | |
337 SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES); | |
338 } | |
339 } | |
340 | |
341 | |
342 bool ThreadedSetOfInstancesJob::Serialize(Json::Value& target) | |
343 { | |
344 boost::recursive_mutex::scoped_lock lock(mutex_); | |
345 | |
346 target = Json::objectValue; | |
347 | |
348 std::string type; | |
349 GetJobType(type); | |
350 target[KEY_TYPE] = type; | |
351 | |
352 target[KEY_PERMISSIVE] = permissive_; | |
353 target[KEY_CURRENT_STEP] = static_cast<unsigned int>(currentStep_); | |
354 target[KEY_DESCRIPTION] = description_; | |
355 target[KEY_KEEP_SOURCE] = keepSource_; | |
356 target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_); | |
357 | |
358 SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES); | |
359 SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES); | |
360 SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES); | |
361 | |
362 return true; | |
363 } | |
364 | |
365 | |
366 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, | |
367 const Json::Value& source, | |
368 bool hasPostProcessing, | |
369 bool defaultKeepSource) : | |
370 processedInstancesCount_(0), | |
371 hasPostProcessing_(hasPostProcessing), | |
372 started_(false), | |
373 stopRequested_(false), | |
374 permissive_(false), | |
375 currentStep_(ThreadedJobStep_BeforeStart), | |
376 workersCount_(1), | |
377 context_(context), | |
378 keepSource_(defaultKeepSource) | |
379 { | |
380 SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES); | |
381 | |
382 if (source.isMember(KEY_PARENT_RESOURCES)) | |
383 { | |
384 // Backward compatibility with Orthanc <= 1.5.6 | |
385 SerializationToolbox::ReadSetOfStrings(parentResources_, source, KEY_PARENT_RESOURCES); | |
386 } | |
387 | |
388 if (source.isMember(KEY_KEEP_SOURCE)) | |
389 { | |
390 keepSource_ = SerializationToolbox::ReadBoolean(source, KEY_KEEP_SOURCE); | |
391 } | |
392 | |
393 if (source.isMember(KEY_WORKERS_COUNT)) | |
394 { | |
395 workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT); | |
396 } | |
397 | |
398 if (source.isMember(KEY_INSTANCES)) | |
399 { | |
400 SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); | |
401 } | |
402 } | |
403 | |
404 | |
405 void ThreadedSetOfInstancesJob::SetKeepSource(bool keep) | |
406 { | |
407 boost::recursive_mutex::scoped_lock lock(mutex_); | |
408 | |
409 if (IsStarted()) | |
410 { | |
411 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
412 } | |
413 | |
414 keepSource_ = keep; | |
415 } | |
416 | |
417 | |
418 float ThreadedSetOfInstancesJob::GetProgress() | |
419 { | |
420 boost::recursive_mutex::scoped_lock lock(mutex_); | |
421 | |
422 if (GetInstancesCount() == 0) | |
423 { | |
424 return 1; | |
425 } | |
426 else | |
427 { | |
428 size_t totalProgress = GetInstancesCount(); | |
429 size_t currentProgress = processedInstancesCount_; | |
430 | |
431 if (HasPostProcessingStep()) | |
432 { | |
433 ++totalProgress; | |
434 if (currentStep_ > ThreadedJobStep_PostProcessingInstances) | |
435 { | |
436 ++currentProgress; | |
437 } | |
438 } | |
439 | |
440 if (HasCleanupStep()) | |
441 { | |
442 ++totalProgress; | |
443 if (currentStep_ > ThreadedJobStep_Cleanup) | |
444 { | |
445 ++currentProgress; | |
446 } | |
447 } | |
448 | |
449 return (static_cast<float>(currentProgress) / | |
450 static_cast<float>(totalProgress)); | |
451 } | |
452 } | |
453 | |
454 | |
455 ThreadedSetOfInstancesJob::ThreadedJobStep ThreadedSetOfInstancesJob::GetCurrentStep() const | |
456 { | |
457 boost::recursive_mutex::scoped_lock lock(mutex_); | |
458 | |
459 return currentStep_; | |
460 } | |
461 | |
462 | |
463 void ThreadedSetOfInstancesJob::SetDescription(const std::string &description) | |
464 { | |
465 boost::recursive_mutex::scoped_lock lock(mutex_); | |
466 | |
467 description_ = description; | |
468 } | |
469 | |
470 | |
471 const std::string& ThreadedSetOfInstancesJob::GetDescription() const | |
472 { | |
473 boost::recursive_mutex::scoped_lock lock(mutex_); | |
474 | |
475 return description_; | |
476 } | |
477 | |
478 | |
479 bool ThreadedSetOfInstancesJob::IsPermissive() const | |
480 { | |
481 boost::recursive_mutex::scoped_lock lock(mutex_); | |
482 | |
483 return permissive_; | |
484 } | |
485 | |
486 | |
487 void ThreadedSetOfInstancesJob::SetPermissive(bool permissive) | |
488 { | |
489 boost::recursive_mutex::scoped_lock lock(mutex_); | |
490 | |
491 if (started_) | |
492 { | |
493 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
494 } | |
495 else | |
496 { | |
497 permissive_ = permissive; | |
498 } | |
499 } | |
500 | |
501 | |
502 bool ThreadedSetOfInstancesJob::IsStarted() const | |
503 { | |
504 boost::recursive_mutex::scoped_lock lock(mutex_); | |
505 | |
506 return started_; | |
507 } | |
508 | |
509 | |
510 void ThreadedSetOfInstancesJob::AddInstances(const std::list<std::string>& instances) | |
511 { | |
512 boost::recursive_mutex::scoped_lock lock(mutex_); | |
513 | |
514 for (std::list<std::string>::const_iterator | |
515 it = instances.begin(); it != instances.end(); ++it) | |
516 { | |
517 instances_.insert(*it); | |
518 } | |
519 } | |
520 | |
521 | |
522 void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource) | |
523 { | |
524 boost::recursive_mutex::scoped_lock lock(mutex_); | |
525 | |
526 parentResources_.insert(resource); | |
527 } | |
528 | |
529 } |