Mercurial > hg > orthanc
comparison OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5145:df040c83796c
correctly report ThreadedSetOfInstancesJob progress
author | Alain Mazy <am@osimis.io> |
---|---|
date | Fri, 27 Jan 2023 11:23:35 +0100 |
parents | 15109c3f0f7d |
children | 4a0dfa23d28c |
comparison
equal
deleted
inserted
replaced
5144:9fc5bf6f3c75 | 5145:df040c83796c |
---|---|
41 size_t workersCount) : | 41 size_t workersCount) : |
42 hasPostProcessing_(hasPostProcessing), | 42 hasPostProcessing_(hasPostProcessing), |
43 started_(false), | 43 started_(false), |
44 stopRequested_(false), | 44 stopRequested_(false), |
45 permissive_(false), | 45 permissive_(false), |
46 currentStep_(ThreadedJobStep_ProcessingInstances), | 46 currentStep_(ThreadedJobStep_NotStarted), |
47 workersCount_(workersCount), | 47 workersCount_(workersCount), |
48 context_(context), | 48 context_(context), |
49 keepSource_(keepSource), | 49 keepSource_(keepSource), |
50 errorCode_(ErrorCode_Success) | 50 errorCode_(ErrorCode_Success) |
51 { | 51 { |
96 | 96 |
97 void ThreadedSetOfInstancesJob::StopWorkers() | 97 void ThreadedSetOfInstancesJob::StopWorkers() |
98 { | 98 { |
99 boost::recursive_mutex::scoped_lock lock(mutex_); | 99 boost::recursive_mutex::scoped_lock lock(mutex_); |
100 | 100 |
101 instancesToProcessQueue_.Clear(); | |
101 stopRequested_ = true; | 102 stopRequested_ = true; |
102 } | 103 } |
103 | 104 |
104 | 105 |
105 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason) | 106 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason) |
137 return JobStepResult::Success(); | 138 return JobStepResult::Success(); |
138 } | 139 } |
139 | 140 |
140 try | 141 try |
141 { | 142 { |
142 if (currentStep_ == ThreadedJobStep_ProcessingInstances) | 143 if (currentStep_ == ThreadedJobStep_NotStarted) |
143 { | 144 { |
144 // create the workers and enqueue all instances | 145 // create the workers and enqueue all instances |
145 for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it) | 146 for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it) |
146 { | 147 { |
147 instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(*it)); | 148 instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(*it)); |
148 } | 149 } |
149 | 150 |
150 InitWorkers(workersCount_); | 151 InitWorkers(workersCount_); |
152 currentStep_ = ThreadedJobStep_ProcessingInstances; | |
153 } | |
154 else if (currentStep_ == ThreadedJobStep_ProcessingInstances) | |
155 { | |
151 // wait until all instances are processed by the workers | 156 // wait until all instances are processed by the workers |
152 WaitWorkersComplete(); | 157 if (instancesToProcessQueue_.GetSize() != 0) |
153 | 158 { |
154 // check job has really completed !!! it might have been interrupted because of an error | 159 return JobStepResult::Continue(); |
155 if ((processedInstances_.size() != instancesToProcess_.size()) | 160 } |
156 || (!IsPermissive() && failedInstances_.size() > 0)) | 161 else |
157 { | 162 { |
158 return JobStepResult::Failure(GetErrorCode(), NULL); | 163 WaitWorkersComplete(); |
159 } | 164 |
160 | 165 // check job has really completed !!! it might have been interrupted because of an error |
161 currentStep_ = ThreadedJobStep_PostProcessingInstances; | 166 if ((processedInstances_.size() != instancesToProcess_.size()) |
162 return JobStepResult::Continue(); | 167 || (!IsPermissive() && failedInstances_.size() > 0)) |
168 { | |
169 return JobStepResult::Failure(GetErrorCode(), NULL); | |
170 } | |
171 | |
172 currentStep_ = ThreadedJobStep_PostProcessingInstances; | |
173 return JobStepResult::Continue(); | |
174 } | |
163 } | 175 } |
164 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances) | 176 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances) |
165 { | 177 { |
166 if (HasPostProcessingStep()) | 178 if (HasPostProcessingStep()) |
167 { | 179 { |
336 boost::recursive_mutex::scoped_lock lock(mutex_); | 348 boost::recursive_mutex::scoped_lock lock(mutex_); |
337 | 349 |
338 if (started_) | 350 if (started_) |
339 { | 351 { |
340 // TODO: cleanup the instances that have been generated during the previous run | 352 // TODO: cleanup the instances that have been generated during the previous run |
341 currentStep_ = ThreadedJobStep_ProcessingInstances; | 353 currentStep_ = ThreadedJobStep_NotStarted; |
342 stopRequested_ = false; | 354 stopRequested_ = false; |
343 processedInstances_.clear(); | 355 processedInstances_.clear(); |
344 failedInstances_.clear(); | 356 failedInstances_.clear(); |
345 instancesToProcessQueue_.Clear(); | 357 instancesToProcessQueue_.Clear(); |
346 } | 358 } |
409 bool defaultKeepSource) : | 421 bool defaultKeepSource) : |
410 hasPostProcessing_(hasPostProcessing), | 422 hasPostProcessing_(hasPostProcessing), |
411 started_(false), | 423 started_(false), |
412 stopRequested_(false), | 424 stopRequested_(false), |
413 permissive_(false), | 425 permissive_(false), |
414 currentStep_(ThreadedJobStep_ProcessingInstances), | 426 currentStep_(ThreadedJobStep_NotStarted), |
415 workersCount_(1), | 427 workersCount_(1), |
416 context_(context), | 428 context_(context), |
417 keepSource_(defaultKeepSource), | 429 keepSource_(defaultKeepSource), |
418 errorCode_(ErrorCode_Success) | 430 errorCode_(ErrorCode_Success) |
419 { | 431 { |