comparison OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5134:6aa41d86b948

fix ModificationJob state machine
author Alain Mazy <am@osimis.io>
date Tue, 10 Jan 2023 11:46:00 +0100
parents f2dcdbe05884
children 252385892197
comparison
equal deleted inserted replaced
5133:7dacd2c3c009 5134:6aa41d86b948
42 processedInstancesCount_(0), 42 processedInstancesCount_(0),
43 hasPostProcessing_(hasPostProcessing), 43 hasPostProcessing_(hasPostProcessing),
44 started_(false), 44 started_(false),
45 stopRequested_(false), 45 stopRequested_(false),
46 permissive_(false), 46 permissive_(false),
47 currentStep_(ThreadedJobStep_BeforeStart), 47 currentStep_(ThreadedJobStep_ProcessingInstances),
48 workersCount_(workersCount), 48 workersCount_(workersCount),
49 context_(context), 49 context_(context),
50 keepSource_(keepSource) 50 keepSource_(keepSource),
51 errorCode_(ErrorCode_Success)
51 { 52 {
52 } 53 }
53 54
54 55
55 ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob() 56 ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob()
56 { 57 {
57 // no need to lock mutex here since we access variables used only by the "master" thread 58 // no need to lock mutex here since we access variables used only by the "master" thread
58 59
59 StopWorkers(); 60 StopWorkers();
61 WaitWorkersComplete();
60 } 62 }
61 63
62 64
63 void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount) 65 void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount)
64 { 66 {
93 } 95 }
94 96
95 97
96 void ThreadedSetOfInstancesJob::StopWorkers() 98 void ThreadedSetOfInstancesJob::StopWorkers()
97 { 99 {
98 // no need to lock mutex here since we access variables used or set only by the "master" thread 100 boost::recursive_mutex::scoped_lock lock(mutex_);
99 101
100 stopRequested_ = true; 102 stopRequested_ = true;
101 WaitWorkersComplete();
102 } 103 }
103 104
104 105
105 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason) 106 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason)
106 { 107 {
110 reason == JobStopReason_Failure || 111 reason == JobStopReason_Failure ||
111 reason == JobStopReason_Retry) 112 reason == JobStopReason_Retry)
112 { 113 {
113 // deallocate resources 114 // deallocate resources
114 StopWorkers(); 115 StopWorkers();
116 WaitWorkersComplete();
115 } 117 }
116 else if (reason == JobStopReason_Paused) 118 else if (reason == JobStopReason_Paused)
117 { 119 {
118 // keep resources allocated. 120 // keep resources allocated.
119 // note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state 121 // note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state
136 return JobStepResult::Success(); 138 return JobStepResult::Success();
137 } 139 }
138 140
139 try 141 try
140 { 142 {
141 if (currentStep_ == ThreadedJobStep_BeforeStart)
142 {
143 currentStep_ = ThreadedJobStep_ProcessingInstances;
144 }
145
146 if (currentStep_ == ThreadedJobStep_ProcessingInstances) 143 if (currentStep_ == ThreadedJobStep_ProcessingInstances)
147 { 144 {
145 // create the workers and enqueue all instances
146 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
147 {
148 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it));
149 }
150
151 InitWorkers(workersCount_);
148 // wait until all instances are processed by the workers 152 // wait until all instances are processed by the workers
149 WaitWorkersComplete(); 153 WaitWorkersComplete();
150 154
155 // check job has really completed !!! it might have been interrupted because of an error
156 if ((processedInstancesCount_ != instances_.size())
157 || (!IsPermissive() && failedInstances_.size() > 0))
158 {
159 return JobStepResult::Failure(GetErrorCode(), NULL);
160 }
161
151 currentStep_ = ThreadedJobStep_PostProcessingInstances; 162 currentStep_ = ThreadedJobStep_PostProcessingInstances;
152 return JobStepResult::Continue(); 163 return JobStepResult::Continue();
153 } 164 }
154 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances) 165 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances)
155 { 166 {
218 || instanceId->GetValue() == EXIT_WORKER_MESSAGE) 229 || instanceId->GetValue() == EXIT_WORKER_MESSAGE)
219 { 230 {
220 return; 231 return;
221 } 232 }
222 233
223 bool processed = that->HandleInstance(instanceId->GetValue()); 234 try
224 235 {
225 { 236 bool processed = that->HandleInstance(instanceId->GetValue());
226 boost::recursive_mutex::scoped_lock lock(that->mutex_); 237
227 238 {
228 that->processedInstancesCount_++; 239 boost::recursive_mutex::scoped_lock lock(that->mutex_);
229 if (!processed) 240
230 { 241 that->processedInstancesCount_++;
231 that->failedInstances_.insert(instanceId->GetValue()); 242 if (!processed)
232 } 243 {
233 } 244 that->failedInstances_.insert(instanceId->GetValue());
245 }
246 }
247 }
248 catch (const Orthanc::OrthancException& e)
249 {
250 if (that->IsPermissive())
251 {
252 LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What();
253 }
254 else
255 {
256 LOG(ERROR) << "Error in a non-permissive job: " << e.What();
257 that->SetErrorCode(e.GetErrorCode());
258 that->StopWorkers();
259 }
260 }
261 catch (...)
262 {
263 LOG(ERROR) << "Native exception while executing a job";
264 that->SetErrorCode(e.GetErrorCode());
265 that->StopWorkers();
266 }
267
234 } 268 }
235 } 269 }
236 270
237 271
238 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output, 272 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output,
279 void ThreadedSetOfInstancesJob::Start() 313 void ThreadedSetOfInstancesJob::Start()
280 { 314 {
281 boost::recursive_mutex::scoped_lock lock(mutex_); 315 boost::recursive_mutex::scoped_lock lock(mutex_);
282 316
283 started_ = true; 317 started_ = true;
284
285 // create the workers and enqueue all instances
286 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
287 {
288 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it));
289 }
290
291 InitWorkers(workersCount_);
292 } 318 }
293 319
294 320
295 void ThreadedSetOfInstancesJob::Reset() 321 void ThreadedSetOfInstancesJob::Reset()
296 { 322 {
297 boost::recursive_mutex::scoped_lock lock(mutex_); 323 boost::recursive_mutex::scoped_lock lock(mutex_);
298 324
299 if (started_) 325 if (started_)
300 { 326 {
301 currentStep_ = ThreadedJobStep_BeforeStart; 327 currentStep_ = ThreadedJobStep_ProcessingInstances;
302 stopRequested_ = false; 328 stopRequested_ = false;
303 processedInstancesCount_ = 0; 329 processedInstancesCount_ = 0;
304 failedInstances_.clear(); 330 failedInstances_.clear();
305 instancesToProcess_.Clear(); 331 instancesToProcess_.Clear();
306 } 332 }
370 processedInstancesCount_(0), 396 processedInstancesCount_(0),
371 hasPostProcessing_(hasPostProcessing), 397 hasPostProcessing_(hasPostProcessing),
372 started_(false), 398 started_(false),
373 stopRequested_(false), 399 stopRequested_(false),
374 permissive_(false), 400 permissive_(false),
375 currentStep_(ThreadedJobStep_BeforeStart), 401 currentStep_(ThreadedJobStep_ProcessingInstances),
376 workersCount_(1), 402 workersCount_(1),
377 context_(context), 403 context_(context),
378 keepSource_(defaultKeepSource) 404 keepSource_(defaultKeepSource),
405 errorCode_(ErrorCode_Success)
379 { 406 {
380 SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES); 407 SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES);
381 408
382 if (source.isMember(KEY_PARENT_RESOURCES)) 409 if (source.isMember(KEY_PARENT_RESOURCES))
383 { 410 {
396 } 423 }
397 424
398 if (source.isMember(KEY_INSTANCES)) 425 if (source.isMember(KEY_INSTANCES))
399 { 426 {
400 SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); 427 SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES);
428 }
429
430 if (source.isMember(KEY_CURRENT_STEP))
431 {
432 currentStep_ = static_cast<ThreadedJobStep>(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP));
401 } 433 }
402 } 434 }
403 435
404 436
405 void ThreadedSetOfInstancesJob::SetKeepSource(bool keep) 437 void ThreadedSetOfInstancesJob::SetKeepSource(bool keep)
473 boost::recursive_mutex::scoped_lock lock(mutex_); 505 boost::recursive_mutex::scoped_lock lock(mutex_);
474 506
475 return description_; 507 return description_;
476 } 508 }
477 509
510 void ThreadedSetOfInstancesJob::SetErrorCode(ErrorCode errorCode)
511 {
512 boost::recursive_mutex::scoped_lock lock(mutex_);
513
514 errorCode_ = errorCode;
515 }
516
517 ErrorCode ThreadedSetOfInstancesJob::GetErrorCode() const
518 {
519 boost::recursive_mutex::scoped_lock lock(mutex_);
520
521 return errorCode_;
522 }
478 523
479 bool ThreadedSetOfInstancesJob::IsPermissive() const 524 bool ThreadedSetOfInstancesJob::IsPermissive() const
480 { 525 {
481 boost::recursive_mutex::scoped_lock lock(mutex_); 526 boost::recursive_mutex::scoped_lock lock(mutex_);
482 527