comparison OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5130:f2dcdbe05884

ResourceModification jobs can now use multiple threads
author Alain Mazy <am@osimis.io>
date Thu, 05 Jan 2023 17:24:43 +0100
parents
children 6aa41d86b948
comparison
equal deleted inserted replaced
5128:ede035d48b8e 5130:f2dcdbe05884
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2022 Osimis S.A., Belgium
6 * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
7 *
8 * This program is free software: you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public License
10 * as published by the Free Software Foundation, either version 3 of
11 * the License, or (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this program. If not, see
20 * <http://www.gnu.org/licenses/>.
21 **/
22
23
24 #include "ThreadedSetOfInstancesJob.h"
25
26 #include "../../../OrthancFramework/Sources/Logging.h"
27 #include "../../../OrthancFramework/Sources/OrthancException.h"
28 #include "../../../OrthancFramework/Sources/SerializationToolbox.h"
29 #include "../ServerContext.h"
30
31 #include <boost/lexical_cast.hpp>
32 #include <cassert>
33
34 namespace Orthanc
35 {
36 static const char* EXIT_WORKER_MESSAGE = "exit";
37
38 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
39 bool hasPostProcessing,
40 bool keepSource,
41 size_t workersCount) :
42 processedInstancesCount_(0),
43 hasPostProcessing_(hasPostProcessing),
44 started_(false),
45 stopRequested_(false),
46 permissive_(false),
47 currentStep_(ThreadedJobStep_BeforeStart),
48 workersCount_(workersCount),
49 context_(context),
50 keepSource_(keepSource)
51 {
52 }
53
54
55 ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob()
56 {
57 // no need to lock mutex here since we access variables used only by the "master" thread
58
59 StopWorkers();
60 }
61
62
63 void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount)
64 {
65 // no need to lock mutex here since we access variables used only by the "master" thread
66
67 for (size_t i = 0; i < workersCount; i++)
68 {
69 instancesWorkers_.push_back(boost::shared_ptr<boost::thread>(new boost::thread(InstanceWorkerThread, this)));
70 }
71 }
72
73
74 void ThreadedSetOfInstancesJob::WaitWorkersComplete()
75 {
76 // no need to lock mutex here since we access variables used only by the "master" thread
77
78 // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue
79 for (size_t i = 0; i < instancesWorkers_.size(); i++)
80 {
81 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE));
82 }
83
84 for (size_t i = 0; i < instancesWorkers_.size(); i++)
85 {
86 if (instancesWorkers_[i]->joinable())
87 {
88 instancesWorkers_[i]->join();
89 }
90 }
91
92 instancesWorkers_.clear();
93 }
94
95
96 void ThreadedSetOfInstancesJob::StopWorkers()
97 {
98 // no need to lock mutex here since we access variables used or set only by the "master" thread
99
100 stopRequested_ = true;
101 WaitWorkersComplete();
102 }
103
104
105 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason)
106 {
107 // no need to lock mutex here since we access variables used or set only by the "master" thread
108
109 if (reason == JobStopReason_Canceled ||
110 reason == JobStopReason_Failure ||
111 reason == JobStopReason_Retry)
112 {
113 // deallocate resources
114 StopWorkers();
115 }
116 else if (reason == JobStopReason_Paused)
117 {
118 // keep resources allocated.
119 // note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state
120 }
121 }
122
123
124 JobStepResult ThreadedSetOfInstancesJob::Step(const std::string& jobId)
125 {
126 // no need to lock mutex here since we access variables used or set only by the "master" thread
127
128 if (!started_)
129 {
130 throw OrthancException(ErrorCode_InternalError);
131 }
132
133 if (GetInstancesCount() == 0)
134 {
135 // No instances to handle: We're done
136 return JobStepResult::Success();
137 }
138
139 try
140 {
141 if (currentStep_ == ThreadedJobStep_BeforeStart)
142 {
143 currentStep_ = ThreadedJobStep_ProcessingInstances;
144 }
145
146 if (currentStep_ == ThreadedJobStep_ProcessingInstances)
147 {
148 // wait until all instances are processed by the workers
149 WaitWorkersComplete();
150
151 currentStep_ = ThreadedJobStep_PostProcessingInstances;
152 return JobStepResult::Continue();
153 }
154 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances)
155 {
156 if (HasPostProcessingStep())
157 {
158 PostProcessInstances();
159 }
160
161 currentStep_ = ThreadedJobStep_Cleanup;
162 return JobStepResult::Continue();
163 }
164 else if (currentStep_ == ThreadedJobStep_Cleanup)
165 {
166 // clean after the post processing step
167 if (HasCleanupStep())
168 {
169 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
170 {
171 Json::Value tmp;
172 context_.DeleteResource(tmp, *it, ResourceType_Instance);
173 }
174 }
175
176 currentStep_ = ThreadedJobStep_Done;
177 return JobStepResult::Success();
178 }
179 }
180 catch (OrthancException& e)
181 {
182 if (permissive_)
183 {
184 LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What();
185 }
186 else
187 {
188 return JobStepResult::Failure(e);
189 }
190 }
191
192 return JobStepResult::Continue();
193 }
194
195
196 bool ThreadedSetOfInstancesJob::HasPostProcessingStep() const
197 {
198 boost::recursive_mutex::scoped_lock lock(mutex_);
199
200 return hasPostProcessing_;
201 }
202
203
204 bool ThreadedSetOfInstancesJob::HasCleanupStep() const
205 {
206 boost::recursive_mutex::scoped_lock lock(mutex_);
207
208 return !keepSource_;
209 }
210
211
212 void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that)
213 {
214 while (true)
215 {
216 std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcess_.Dequeue(0)));
217 if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean
218 || instanceId->GetValue() == EXIT_WORKER_MESSAGE)
219 {
220 return;
221 }
222
223 bool processed = that->HandleInstance(instanceId->GetValue());
224
225 {
226 boost::recursive_mutex::scoped_lock lock(that->mutex_);
227
228 that->processedInstancesCount_++;
229 if (!processed)
230 {
231 that->failedInstances_.insert(instanceId->GetValue());
232 }
233 }
234 }
235 }
236
237
238 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output,
239 MimeType &mime,
240 std::string& filename,
241 const std::string &key)
242 {
243 return false;
244 }
245
246
247 size_t ThreadedSetOfInstancesJob::GetInstancesCount() const
248 {
249 boost::recursive_mutex::scoped_lock lock(mutex_);
250
251 return instances_.size();
252 }
253
254
255 void ThreadedSetOfInstancesJob::GetFailedInstances(std::set<std::string>& target) const
256 {
257 boost::recursive_mutex::scoped_lock lock(mutex_);
258
259 target = failedInstances_;
260 }
261
262
263 void ThreadedSetOfInstancesJob::GetInstances(std::set<std::string>& target) const
264 {
265 boost::recursive_mutex::scoped_lock lock(mutex_);
266
267 target = instances_;
268 }
269
270
271 bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const
272 {
273 boost::recursive_mutex::scoped_lock lock(mutex_);
274
275 return failedInstances_.find(instance) != failedInstances_.end();
276 }
277
278
279 void ThreadedSetOfInstancesJob::Start()
280 {
281 boost::recursive_mutex::scoped_lock lock(mutex_);
282
283 started_ = true;
284
285 // create the workers and enqueue all instances
286 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
287 {
288 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it));
289 }
290
291 InitWorkers(workersCount_);
292 }
293
294
295 void ThreadedSetOfInstancesJob::Reset()
296 {
297 boost::recursive_mutex::scoped_lock lock(mutex_);
298
299 if (started_)
300 {
301 currentStep_ = ThreadedJobStep_BeforeStart;
302 stopRequested_ = false;
303 processedInstancesCount_ = 0;
304 failedInstances_.clear();
305 instancesToProcess_.Clear();
306 }
307 else
308 {
309 throw OrthancException(ErrorCode_BadSequenceOfCalls);
310 }
311 }
312
313
314 static const char* KEY_FAILED_INSTANCES = "FailedInstances";
315 static const char* KEY_PARENT_RESOURCES = "ParentResources";
316 static const char* KEY_DESCRIPTION = "Description";
317 static const char* KEY_PERMISSIVE = "Permissive";
318 static const char* KEY_CURRENT_STEP = "CurrentStep";
319 static const char* KEY_TYPE = "Type";
320 static const char* KEY_INSTANCES = "Instances";
321 static const char* KEY_INSTANCES_COUNT = "InstancesCount";
322 static const char* KEY_FAILED_INSTANCES_COUNT = "FailedInstancesCount";
323 static const char* KEY_KEEP_SOURCE = "KeepSource";
324 static const char* KEY_WORKERS_COUNT = "WorkersCount";
325
326
327 void ThreadedSetOfInstancesJob::GetPublicContent(Json::Value& target)
328 {
329 boost::recursive_mutex::scoped_lock lock(mutex_);
330
331 target[KEY_DESCRIPTION] = GetDescription();
332 target[KEY_INSTANCES_COUNT] = static_cast<uint32_t>(GetInstancesCount());
333 target[KEY_FAILED_INSTANCES_COUNT] = static_cast<uint32_t>(failedInstances_.size());
334
335 if (!parentResources_.empty())
336 {
337 SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
338 }
339 }
340
341
342 bool ThreadedSetOfInstancesJob::Serialize(Json::Value& target)
343 {
344 boost::recursive_mutex::scoped_lock lock(mutex_);
345
346 target = Json::objectValue;
347
348 std::string type;
349 GetJobType(type);
350 target[KEY_TYPE] = type;
351
352 target[KEY_PERMISSIVE] = permissive_;
353 target[KEY_CURRENT_STEP] = static_cast<unsigned int>(currentStep_);
354 target[KEY_DESCRIPTION] = description_;
355 target[KEY_KEEP_SOURCE] = keepSource_;
356 target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_);
357
358 SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES);
359 SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES);
360 SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
361
362 return true;
363 }
364
365
366 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
367 const Json::Value& source,
368 bool hasPostProcessing,
369 bool defaultKeepSource) :
370 processedInstancesCount_(0),
371 hasPostProcessing_(hasPostProcessing),
372 started_(false),
373 stopRequested_(false),
374 permissive_(false),
375 currentStep_(ThreadedJobStep_BeforeStart),
376 workersCount_(1),
377 context_(context),
378 keepSource_(defaultKeepSource)
379 {
380 SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES);
381
382 if (source.isMember(KEY_PARENT_RESOURCES))
383 {
384 // Backward compatibility with Orthanc <= 1.5.6
385 SerializationToolbox::ReadSetOfStrings(parentResources_, source, KEY_PARENT_RESOURCES);
386 }
387
388 if (source.isMember(KEY_KEEP_SOURCE))
389 {
390 keepSource_ = SerializationToolbox::ReadBoolean(source, KEY_KEEP_SOURCE);
391 }
392
393 if (source.isMember(KEY_WORKERS_COUNT))
394 {
395 workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT);
396 }
397
398 if (source.isMember(KEY_INSTANCES))
399 {
400 SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES);
401 }
402 }
403
404
405 void ThreadedSetOfInstancesJob::SetKeepSource(bool keep)
406 {
407 boost::recursive_mutex::scoped_lock lock(mutex_);
408
409 if (IsStarted())
410 {
411 throw OrthancException(ErrorCode_BadSequenceOfCalls);
412 }
413
414 keepSource_ = keep;
415 }
416
417
418 float ThreadedSetOfInstancesJob::GetProgress()
419 {
420 boost::recursive_mutex::scoped_lock lock(mutex_);
421
422 if (GetInstancesCount() == 0)
423 {
424 return 1;
425 }
426 else
427 {
428 size_t totalProgress = GetInstancesCount();
429 size_t currentProgress = processedInstancesCount_;
430
431 if (HasPostProcessingStep())
432 {
433 ++totalProgress;
434 if (currentStep_ > ThreadedJobStep_PostProcessingInstances)
435 {
436 ++currentProgress;
437 }
438 }
439
440 if (HasCleanupStep())
441 {
442 ++totalProgress;
443 if (currentStep_ > ThreadedJobStep_Cleanup)
444 {
445 ++currentProgress;
446 }
447 }
448
449 return (static_cast<float>(currentProgress) /
450 static_cast<float>(totalProgress));
451 }
452 }
453
454
455 ThreadedSetOfInstancesJob::ThreadedJobStep ThreadedSetOfInstancesJob::GetCurrentStep() const
456 {
457 boost::recursive_mutex::scoped_lock lock(mutex_);
458
459 return currentStep_;
460 }
461
462
463 void ThreadedSetOfInstancesJob::SetDescription(const std::string &description)
464 {
465 boost::recursive_mutex::scoped_lock lock(mutex_);
466
467 description_ = description;
468 }
469
470
471 const std::string& ThreadedSetOfInstancesJob::GetDescription() const
472 {
473 boost::recursive_mutex::scoped_lock lock(mutex_);
474
475 return description_;
476 }
477
478
479 bool ThreadedSetOfInstancesJob::IsPermissive() const
480 {
481 boost::recursive_mutex::scoped_lock lock(mutex_);
482
483 return permissive_;
484 }
485
486
487 void ThreadedSetOfInstancesJob::SetPermissive(bool permissive)
488 {
489 boost::recursive_mutex::scoped_lock lock(mutex_);
490
491 if (started_)
492 {
493 throw OrthancException(ErrorCode_BadSequenceOfCalls);
494 }
495 else
496 {
497 permissive_ = permissive;
498 }
499 }
500
501
502 bool ThreadedSetOfInstancesJob::IsStarted() const
503 {
504 boost::recursive_mutex::scoped_lock lock(mutex_);
505
506 return started_;
507 }
508
509
510 void ThreadedSetOfInstancesJob::AddInstances(const std::list<std::string>& instances)
511 {
512 boost::recursive_mutex::scoped_lock lock(mutex_);
513
514 for (std::list<std::string>::const_iterator
515 it = instances.begin(); it != instances.end(); ++it)
516 {
517 instances_.insert(*it);
518 }
519 }
520
521
522 void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource)
523 {
524 boost::recursive_mutex::scoped_lock lock(mutex_);
525
526 parentResources_.insert(resource);
527 }
528
529 }