Mercurial > hg > orthanc
comparison OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5134:6aa41d86b948
fix ModificationJob state machine
author | Alain Mazy <am@osimis.io> |
---|---|
date | Tue, 10 Jan 2023 11:46:00 +0100 |
parents | f2dcdbe05884 |
children | 252385892197 |
comparison
equal
deleted
inserted
replaced
5133:7dacd2c3c009 | 5134:6aa41d86b948 |
---|---|
42 processedInstancesCount_(0), | 42 processedInstancesCount_(0), |
43 hasPostProcessing_(hasPostProcessing), | 43 hasPostProcessing_(hasPostProcessing), |
44 started_(false), | 44 started_(false), |
45 stopRequested_(false), | 45 stopRequested_(false), |
46 permissive_(false), | 46 permissive_(false), |
47 currentStep_(ThreadedJobStep_BeforeStart), | 47 currentStep_(ThreadedJobStep_ProcessingInstances), |
48 workersCount_(workersCount), | 48 workersCount_(workersCount), |
49 context_(context), | 49 context_(context), |
50 keepSource_(keepSource) | 50 keepSource_(keepSource), |
51 errorCode_(ErrorCode_Success) | |
51 { | 52 { |
52 } | 53 } |
53 | 54 |
54 | 55 |
55 ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob() | 56 ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob() |
56 { | 57 { |
57 // no need to lock mutex here since we access variables used only by the "master" thread | 58 // no need to lock mutex here since we access variables used only by the "master" thread |
58 | 59 |
59 StopWorkers(); | 60 StopWorkers(); |
61 WaitWorkersComplete(); | |
60 } | 62 } |
61 | 63 |
62 | 64 |
63 void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount) | 65 void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount) |
64 { | 66 { |
93 } | 95 } |
94 | 96 |
95 | 97 |
96 void ThreadedSetOfInstancesJob::StopWorkers() | 98 void ThreadedSetOfInstancesJob::StopWorkers() |
97 { | 99 { |
98 // no need to lock mutex here since we access variables used or set only by the "master" thread | 100 boost::recursive_mutex::scoped_lock lock(mutex_); |
99 | 101 |
100 stopRequested_ = true; | 102 stopRequested_ = true; |
101 WaitWorkersComplete(); | |
102 } | 103 } |
103 | 104 |
104 | 105 |
105 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason) | 106 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason) |
106 { | 107 { |
110 reason == JobStopReason_Failure || | 111 reason == JobStopReason_Failure || |
111 reason == JobStopReason_Retry) | 112 reason == JobStopReason_Retry) |
112 { | 113 { |
113 // deallocate resources | 114 // deallocate resources |
114 StopWorkers(); | 115 StopWorkers(); |
116 WaitWorkersComplete(); | |
115 } | 117 } |
116 else if (reason == JobStopReason_Paused) | 118 else if (reason == JobStopReason_Paused) |
117 { | 119 { |
118 // keep resources allocated. | 120 // keep resources allocated. |
119 // note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state | 121 // note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state |
136 return JobStepResult::Success(); | 138 return JobStepResult::Success(); |
137 } | 139 } |
138 | 140 |
139 try | 141 try |
140 { | 142 { |
141 if (currentStep_ == ThreadedJobStep_BeforeStart) | |
142 { | |
143 currentStep_ = ThreadedJobStep_ProcessingInstances; | |
144 } | |
145 | |
146 if (currentStep_ == ThreadedJobStep_ProcessingInstances) | 143 if (currentStep_ == ThreadedJobStep_ProcessingInstances) |
147 { | 144 { |
145 // create the workers and enqueue all instances | |
146 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) | |
147 { | |
148 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); | |
149 } | |
150 | |
151 InitWorkers(workersCount_); | |
148 // wait until all instances are processed by the workers | 152 // wait until all instances are processed by the workers |
149 WaitWorkersComplete(); | 153 WaitWorkersComplete(); |
150 | 154 |
155 // check job has really completed !!! it might have been interrupted because of an error | |
156 if ((processedInstancesCount_ != instances_.size()) | |
157 || (!IsPermissive() && failedInstances_.size() > 0)) | |
158 { | |
159 return JobStepResult::Failure(GetErrorCode(), NULL); | |
160 } | |
161 | |
151 currentStep_ = ThreadedJobStep_PostProcessingInstances; | 162 currentStep_ = ThreadedJobStep_PostProcessingInstances; |
152 return JobStepResult::Continue(); | 163 return JobStepResult::Continue(); |
153 } | 164 } |
154 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances) | 165 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances) |
155 { | 166 { |
218 || instanceId->GetValue() == EXIT_WORKER_MESSAGE) | 229 || instanceId->GetValue() == EXIT_WORKER_MESSAGE) |
219 { | 230 { |
220 return; | 231 return; |
221 } | 232 } |
222 | 233 |
223 bool processed = that->HandleInstance(instanceId->GetValue()); | 234 try |
224 | 235 { |
225 { | 236 bool processed = that->HandleInstance(instanceId->GetValue()); |
226 boost::recursive_mutex::scoped_lock lock(that->mutex_); | 237 |
227 | 238 { |
228 that->processedInstancesCount_++; | 239 boost::recursive_mutex::scoped_lock lock(that->mutex_); |
229 if (!processed) | 240 |
230 { | 241 that->processedInstancesCount_++; |
231 that->failedInstances_.insert(instanceId->GetValue()); | 242 if (!processed) |
232 } | 243 { |
233 } | 244 that->failedInstances_.insert(instanceId->GetValue()); |
245 } | |
246 } | |
247 } | |
248 catch (const Orthanc::OrthancException& e) | |
249 { | |
250 if (that->IsPermissive()) | |
251 { | |
252 LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What(); | |
253 } | |
254 else | |
255 { | |
256 LOG(ERROR) << "Error in a non-permissive job: " << e.What(); | |
257 that->SetErrorCode(e.GetErrorCode()); | |
258 that->StopWorkers(); | |
259 } | |
260 } | |
261 catch (...) | |
262 { | |
263 LOG(ERROR) << "Native exception while executing a job"; | |
264 that->SetErrorCode(e.GetErrorCode()); | |
265 that->StopWorkers(); | |
266 } | |
267 | |
234 } | 268 } |
235 } | 269 } |
236 | 270 |
237 | 271 |
238 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output, | 272 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output, |
279 void ThreadedSetOfInstancesJob::Start() | 313 void ThreadedSetOfInstancesJob::Start() |
280 { | 314 { |
281 boost::recursive_mutex::scoped_lock lock(mutex_); | 315 boost::recursive_mutex::scoped_lock lock(mutex_); |
282 | 316 |
283 started_ = true; | 317 started_ = true; |
284 | |
285 // create the workers and enqueue all instances | |
286 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) | |
287 { | |
288 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); | |
289 } | |
290 | |
291 InitWorkers(workersCount_); | |
292 } | 318 } |
293 | 319 |
294 | 320 |
295 void ThreadedSetOfInstancesJob::Reset() | 321 void ThreadedSetOfInstancesJob::Reset() |
296 { | 322 { |
297 boost::recursive_mutex::scoped_lock lock(mutex_); | 323 boost::recursive_mutex::scoped_lock lock(mutex_); |
298 | 324 |
299 if (started_) | 325 if (started_) |
300 { | 326 { |
301 currentStep_ = ThreadedJobStep_BeforeStart; | 327 currentStep_ = ThreadedJobStep_ProcessingInstances; |
302 stopRequested_ = false; | 328 stopRequested_ = false; |
303 processedInstancesCount_ = 0; | 329 processedInstancesCount_ = 0; |
304 failedInstances_.clear(); | 330 failedInstances_.clear(); |
305 instancesToProcess_.Clear(); | 331 instancesToProcess_.Clear(); |
306 } | 332 } |
370 processedInstancesCount_(0), | 396 processedInstancesCount_(0), |
371 hasPostProcessing_(hasPostProcessing), | 397 hasPostProcessing_(hasPostProcessing), |
372 started_(false), | 398 started_(false), |
373 stopRequested_(false), | 399 stopRequested_(false), |
374 permissive_(false), | 400 permissive_(false), |
375 currentStep_(ThreadedJobStep_BeforeStart), | 401 currentStep_(ThreadedJobStep_ProcessingInstances), |
376 workersCount_(1), | 402 workersCount_(1), |
377 context_(context), | 403 context_(context), |
378 keepSource_(defaultKeepSource) | 404 keepSource_(defaultKeepSource), |
405 errorCode_(ErrorCode_Success) | |
379 { | 406 { |
380 SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES); | 407 SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES); |
381 | 408 |
382 if (source.isMember(KEY_PARENT_RESOURCES)) | 409 if (source.isMember(KEY_PARENT_RESOURCES)) |
383 { | 410 { |
396 } | 423 } |
397 | 424 |
398 if (source.isMember(KEY_INSTANCES)) | 425 if (source.isMember(KEY_INSTANCES)) |
399 { | 426 { |
400 SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); | 427 SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); |
428 } | |
429 | |
430 if (source.isMember(KEY_CURRENT_STEP)) | |
431 { | |
432 currentStep_ = static_cast<ThreadedJobStep>(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP)); | |
401 } | 433 } |
402 } | 434 } |
403 | 435 |
404 | 436 |
405 void ThreadedSetOfInstancesJob::SetKeepSource(bool keep) | 437 void ThreadedSetOfInstancesJob::SetKeepSource(bool keep) |
473 boost::recursive_mutex::scoped_lock lock(mutex_); | 505 boost::recursive_mutex::scoped_lock lock(mutex_); |
474 | 506 |
475 return description_; | 507 return description_; |
476 } | 508 } |
477 | 509 |
510 void ThreadedSetOfInstancesJob::SetErrorCode(ErrorCode errorCode) | |
511 { | |
512 boost::recursive_mutex::scoped_lock lock(mutex_); | |
513 | |
514 errorCode_ = errorCode; | |
515 } | |
516 | |
517 ErrorCode ThreadedSetOfInstancesJob::GetErrorCode() const | |
518 { | |
519 boost::recursive_mutex::scoped_lock lock(mutex_); | |
520 | |
521 return errorCode_; | |
522 } | |
478 | 523 |
479 bool ThreadedSetOfInstancesJob::IsPermissive() const | 524 bool ThreadedSetOfInstancesJob::IsPermissive() const |
480 { | 525 { |
481 boost::recursive_mutex::scoped_lock lock(mutex_); | 526 boost::recursive_mutex::scoped_lock lock(mutex_); |
482 | 527 |