comparison OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5145:df040c83796c

correctly report ThreadedSetOfInstancesJob progress
author Alain Mazy <am@osimis.io>
date Fri, 27 Jan 2023 11:23:35 +0100
parents 15109c3f0f7d
children 4a0dfa23d28c
comparison
equal deleted inserted replaced
5144:9fc5bf6f3c75 5145:df040c83796c
41 size_t workersCount) : 41 size_t workersCount) :
42 hasPostProcessing_(hasPostProcessing), 42 hasPostProcessing_(hasPostProcessing),
43 started_(false), 43 started_(false),
44 stopRequested_(false), 44 stopRequested_(false),
45 permissive_(false), 45 permissive_(false),
46 currentStep_(ThreadedJobStep_ProcessingInstances), 46 currentStep_(ThreadedJobStep_NotStarted),
47 workersCount_(workersCount), 47 workersCount_(workersCount),
48 context_(context), 48 context_(context),
49 keepSource_(keepSource), 49 keepSource_(keepSource),
50 errorCode_(ErrorCode_Success) 50 errorCode_(ErrorCode_Success)
51 { 51 {
96 96
97 void ThreadedSetOfInstancesJob::StopWorkers() 97 void ThreadedSetOfInstancesJob::StopWorkers()
98 { 98 {
99 boost::recursive_mutex::scoped_lock lock(mutex_); 99 boost::recursive_mutex::scoped_lock lock(mutex_);
100 100
101 instancesToProcessQueue_.Clear();
101 stopRequested_ = true; 102 stopRequested_ = true;
102 } 103 }
103 104
104 105
105 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason) 106 void ThreadedSetOfInstancesJob::Stop(JobStopReason reason)
137 return JobStepResult::Success(); 138 return JobStepResult::Success();
138 } 139 }
139 140
140 try 141 try
141 { 142 {
142 if (currentStep_ == ThreadedJobStep_ProcessingInstances) 143 if (currentStep_ == ThreadedJobStep_NotStarted)
143 { 144 {
144 // create the workers and enqueue all instances 145 // create the workers and enqueue all instances
145 for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it) 146 for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it)
146 { 147 {
147 instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(*it)); 148 instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(*it));
148 } 149 }
149 150
150 InitWorkers(workersCount_); 151 InitWorkers(workersCount_);
152 currentStep_ = ThreadedJobStep_ProcessingInstances;
153 }
154 else if (currentStep_ == ThreadedJobStep_ProcessingInstances)
155 {
151 // wait until all instances are processed by the workers 156 // wait until all instances are processed by the workers
152 WaitWorkersComplete(); 157 if (instancesToProcessQueue_.GetSize() != 0)
153 158 {
154 // check job has really completed !!! it might have been interrupted because of an error 159 return JobStepResult::Continue();
155 if ((processedInstances_.size() != instancesToProcess_.size()) 160 }
156 || (!IsPermissive() && failedInstances_.size() > 0)) 161 else
157 { 162 {
158 return JobStepResult::Failure(GetErrorCode(), NULL); 163 WaitWorkersComplete();
159 } 164
160 165 // check job has really completed !!! it might have been interrupted because of an error
161 currentStep_ = ThreadedJobStep_PostProcessingInstances; 166 if ((processedInstances_.size() != instancesToProcess_.size())
162 return JobStepResult::Continue(); 167 || (!IsPermissive() && failedInstances_.size() > 0))
168 {
169 return JobStepResult::Failure(GetErrorCode(), NULL);
170 }
171
172 currentStep_ = ThreadedJobStep_PostProcessingInstances;
173 return JobStepResult::Continue();
174 }
163 } 175 }
164 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances) 176 else if (currentStep_ == ThreadedJobStep_PostProcessingInstances)
165 { 177 {
166 if (HasPostProcessingStep()) 178 if (HasPostProcessingStep())
167 { 179 {
336 boost::recursive_mutex::scoped_lock lock(mutex_); 348 boost::recursive_mutex::scoped_lock lock(mutex_);
337 349
338 if (started_) 350 if (started_)
339 { 351 {
340 // TODO: cleanup the instances that have been generated during the previous run 352 // TODO: cleanup the instances that have been generated during the previous run
341 currentStep_ = ThreadedJobStep_ProcessingInstances; 353 currentStep_ = ThreadedJobStep_NotStarted;
342 stopRequested_ = false; 354 stopRequested_ = false;
343 processedInstances_.clear(); 355 processedInstances_.clear();
344 failedInstances_.clear(); 356 failedInstances_.clear();
345 instancesToProcessQueue_.Clear(); 357 instancesToProcessQueue_.Clear();
346 } 358 }
409 bool defaultKeepSource) : 421 bool defaultKeepSource) :
410 hasPostProcessing_(hasPostProcessing), 422 hasPostProcessing_(hasPostProcessing),
411 started_(false), 423 started_(false),
412 stopRequested_(false), 424 stopRequested_(false),
413 permissive_(false), 425 permissive_(false),
414 currentStep_(ThreadedJobStep_ProcessingInstances), 426 currentStep_(ThreadedJobStep_NotStarted),
415 workersCount_(1), 427 workersCount_(1),
416 context_(context), 428 context_(context),
417 keepSource_(defaultKeepSource), 429 keepSource_(defaultKeepSource),
418 errorCode_(ErrorCode_Success) 430 errorCode_(ErrorCode_Success)
419 { 431 {