Mercurial > hg > orthanc
comparison OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5136:e71b22a43c0b
Threaded modifications continued: call ReconstructInstance at the end of the modification to update the DB model
author | Alain Mazy <am@osimis.io> |
---|---|
date | Tue, 17 Jan 2023 17:54:38 +0100 |
parents | 252385892197 |
children | 15109c3f0f7d |
comparison
equal
deleted
inserted
replaced
5135:252385892197 | 5136:e71b22a43c0b |
---|---|
37 | 37 |
38 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, | 38 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, |
39 bool hasPostProcessing, | 39 bool hasPostProcessing, |
40 bool keepSource, | 40 bool keepSource, |
41 size_t workersCount) : | 41 size_t workersCount) : |
42 processedInstancesCount_(0), | |
43 hasPostProcessing_(hasPostProcessing), | 42 hasPostProcessing_(hasPostProcessing), |
44 started_(false), | 43 started_(false), |
45 stopRequested_(false), | 44 stopRequested_(false), |
46 permissive_(false), | 45 permissive_(false), |
47 currentStep_(ThreadedJobStep_ProcessingInstances), | 46 currentStep_(ThreadedJobStep_ProcessingInstances), |
78 // no need to lock mutex here since we access variables used only by the "master" thread | 77 // no need to lock mutex here since we access variables used only by the "master" thread |
79 | 78 |
80 // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue | 79 // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue |
81 for (size_t i = 0; i < instancesWorkers_.size(); i++) | 80 for (size_t i = 0; i < instancesWorkers_.size(); i++) |
82 { | 81 { |
83 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE)); | 82 instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE)); |
84 } | 83 } |
85 | 84 |
86 for (size_t i = 0; i < instancesWorkers_.size(); i++) | 85 for (size_t i = 0; i < instancesWorkers_.size(); i++) |
87 { | 86 { |
88 if (instancesWorkers_[i]->joinable()) | 87 if (instancesWorkers_[i]->joinable()) |
141 try | 140 try |
142 { | 141 { |
143 if (currentStep_ == ThreadedJobStep_ProcessingInstances) | 142 if (currentStep_ == ThreadedJobStep_ProcessingInstances) |
144 { | 143 { |
145 // create the workers and enqueue all instances | 144 // create the workers and enqueue all instances |
146 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) | 145 for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it) |
147 { | 146 { |
148 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); | 147 instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(*it)); |
149 } | 148 } |
150 | 149 |
151 InitWorkers(workersCount_); | 150 InitWorkers(workersCount_); |
152 // wait until all instances are processed by the workers | 151 // wait until all instances are processed by the workers |
153 WaitWorkersComplete(); | 152 WaitWorkersComplete(); |
154 | 153 |
155 // check job has really completed !!! it might have been interrupted because of an error | 154 // check job has really completed !!! it might have been interrupted because of an error |
156 if ((processedInstancesCount_ != instances_.size()) | 155 if ((processedInstances_.size() != instancesToProcess_.size()) |
157 || (!IsPermissive() && failedInstances_.size() > 0)) | 156 || (!IsPermissive() && failedInstances_.size() > 0)) |
158 { | 157 { |
159 return JobStepResult::Failure(GetErrorCode(), NULL); | 158 return JobStepResult::Failure(GetErrorCode(), NULL); |
160 } | 159 } |
161 | 160 |
175 else if (currentStep_ == ThreadedJobStep_Cleanup) | 174 else if (currentStep_ == ThreadedJobStep_Cleanup) |
176 { | 175 { |
177 // clean after the post processing step | 176 // clean after the post processing step |
178 if (HasCleanupStep()) | 177 if (HasCleanupStep()) |
179 { | 178 { |
180 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) | 179 for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it) |
181 { | 180 { |
182 Json::Value tmp; | 181 Json::Value tmp; |
183 context_.DeleteResource(tmp, *it, ResourceType_Instance); | 182 context_.DeleteResource(tmp, *it, ResourceType_Instance); |
184 } | 183 } |
185 } | 184 } |
209 boost::recursive_mutex::scoped_lock lock(mutex_); | 208 boost::recursive_mutex::scoped_lock lock(mutex_); |
210 | 209 |
211 return hasPostProcessing_; | 210 return hasPostProcessing_; |
212 } | 211 } |
213 | 212 |
213 void ThreadedSetOfInstancesJob::PostProcessInstances() | |
214 { | |
215 if (HasPostProcessingStep()) | |
216 { | |
217 throw OrthancException(ErrorCode_InternalError, "Job with post-processing should override PostProcessInstances"); | |
218 } | |
219 } | |
220 | |
214 | 221 |
215 bool ThreadedSetOfInstancesJob::HasCleanupStep() const | 222 bool ThreadedSetOfInstancesJob::HasCleanupStep() const |
216 { | 223 { |
217 boost::recursive_mutex::scoped_lock lock(mutex_); | 224 boost::recursive_mutex::scoped_lock lock(mutex_); |
218 | 225 |
222 | 229 |
223 void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that) | 230 void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that) |
224 { | 231 { |
225 while (true) | 232 while (true) |
226 { | 233 { |
227 std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcess_.Dequeue(0))); | 234 std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcessQueue_.Dequeue(0))); |
228 if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean | 235 if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean |
229 || instanceId->GetValue() == EXIT_WORKER_MESSAGE) | 236 || instanceId->GetValue() == EXIT_WORKER_MESSAGE) |
230 { | 237 { |
231 return; | 238 return; |
232 } | 239 } |
233 | 240 |
241 bool processed = false; | |
242 | |
234 try | 243 try |
235 { | 244 { |
236 bool processed = that->HandleInstance(instanceId->GetValue()); | 245 processed = that->HandleInstance(instanceId->GetValue()); |
237 | |
238 { | |
239 boost::recursive_mutex::scoped_lock lock(that->mutex_); | |
240 | |
241 that->processedInstancesCount_++; | |
242 if (!processed) | |
243 { | |
244 that->failedInstances_.insert(instanceId->GetValue()); | |
245 } | |
246 } | |
247 } | 246 } |
248 catch (const Orthanc::OrthancException& e) | 247 catch (const Orthanc::OrthancException& e) |
249 { | 248 { |
250 if (that->IsPermissive()) | 249 if (that->IsPermissive()) |
251 { | 250 { |
254 else | 253 else |
255 { | 254 { |
256 LOG(ERROR) << "Error in a non-permissive job: " << e.What(); | 255 LOG(ERROR) << "Error in a non-permissive job: " << e.What(); |
257 that->SetErrorCode(e.GetErrorCode()); | 256 that->SetErrorCode(e.GetErrorCode()); |
258 that->StopWorkers(); | 257 that->StopWorkers(); |
258 return; | |
259 } | 259 } |
260 } | 260 } |
261 catch (...) | 261 catch (...) |
262 { | 262 { |
263 LOG(ERROR) << "Native exception while executing a job"; | 263 LOG(ERROR) << "Native exception while executing a job"; |
264 that->SetErrorCode(ErrorCode_InternalError); | 264 that->SetErrorCode(ErrorCode_InternalError); |
265 that->StopWorkers(); | 265 that->StopWorkers(); |
266 } | 266 return; |
267 | 267 } |
268 | |
269 { | |
270 boost::recursive_mutex::scoped_lock lock(that->mutex_); | |
271 | |
272 that->processedInstances_.insert(instanceId->GetValue()); | |
273 | |
274 if (!processed) | |
275 { | |
276 that->failedInstances_.insert(instanceId->GetValue()); | |
277 } | |
278 } | |
279 | |
268 } | 280 } |
269 } | 281 } |
270 | 282 |
271 | 283 |
272 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output, | 284 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output, |
280 | 292 |
281 size_t ThreadedSetOfInstancesJob::GetInstancesCount() const | 293 size_t ThreadedSetOfInstancesJob::GetInstancesCount() const |
282 { | 294 { |
283 boost::recursive_mutex::scoped_lock lock(mutex_); | 295 boost::recursive_mutex::scoped_lock lock(mutex_); |
284 | 296 |
285 return instances_.size(); | 297 return instancesToProcess_.size(); |
286 } | 298 } |
287 | 299 |
288 | 300 |
289 void ThreadedSetOfInstancesJob::GetFailedInstances(std::set<std::string>& target) const | 301 void ThreadedSetOfInstancesJob::GetFailedInstances(std::set<std::string>& target) const |
290 { | 302 { |
296 | 308 |
297 void ThreadedSetOfInstancesJob::GetInstances(std::set<std::string>& target) const | 309 void ThreadedSetOfInstancesJob::GetInstances(std::set<std::string>& target) const |
298 { | 310 { |
299 boost::recursive_mutex::scoped_lock lock(mutex_); | 311 boost::recursive_mutex::scoped_lock lock(mutex_); |
300 | 312 |
301 target = instances_; | 313 target = instancesToProcess_; |
302 } | 314 } |
303 | 315 |
304 | 316 |
305 bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const | 317 bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const |
306 { | 318 { |
316 | 328 |
317 started_ = true; | 329 started_ = true; |
318 } | 330 } |
319 | 331 |
320 | 332 |
333 // Reset is called when resubmitting a failed job | |
321 void ThreadedSetOfInstancesJob::Reset() | 334 void ThreadedSetOfInstancesJob::Reset() |
322 { | 335 { |
323 boost::recursive_mutex::scoped_lock lock(mutex_); | 336 boost::recursive_mutex::scoped_lock lock(mutex_); |
324 | 337 |
325 if (started_) | 338 if (started_) |
326 { | 339 { |
340 // TODO: cleanup the instances that have been generated during the previous run | |
327 currentStep_ = ThreadedJobStep_ProcessingInstances; | 341 currentStep_ = ThreadedJobStep_ProcessingInstances; |
328 stopRequested_ = false; | 342 stopRequested_ = false; |
329 processedInstancesCount_ = 0; | 343 processedInstances_.clear(); |
330 failedInstances_.clear(); | 344 failedInstances_.clear(); |
331 instancesToProcess_.Clear(); | 345 instancesToProcessQueue_.Clear(); |
332 } | 346 } |
333 else | 347 else |
334 { | 348 { |
335 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 349 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
336 } | 350 } |
379 target[KEY_CURRENT_STEP] = static_cast<unsigned int>(currentStep_); | 393 target[KEY_CURRENT_STEP] = static_cast<unsigned int>(currentStep_); |
380 target[KEY_DESCRIPTION] = description_; | 394 target[KEY_DESCRIPTION] = description_; |
381 target[KEY_KEEP_SOURCE] = keepSource_; | 395 target[KEY_KEEP_SOURCE] = keepSource_; |
382 target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_); | 396 target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_); |
383 | 397 |
384 SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES); | 398 SerializationToolbox::WriteSetOfStrings(target, instancesToProcess_, KEY_INSTANCES); |
385 SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES); | 399 SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES); |
386 SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES); | 400 SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES); |
387 | 401 |
388 return true; | 402 return true; |
389 } | 403 } |
391 | 405 |
392 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, | 406 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, |
393 const Json::Value& source, | 407 const Json::Value& source, |
394 bool hasPostProcessing, | 408 bool hasPostProcessing, |
395 bool defaultKeepSource) : | 409 bool defaultKeepSource) : |
396 processedInstancesCount_(0), | |
397 hasPostProcessing_(hasPostProcessing), | 410 hasPostProcessing_(hasPostProcessing), |
398 started_(false), | 411 started_(false), |
399 stopRequested_(false), | 412 stopRequested_(false), |
400 permissive_(false), | 413 permissive_(false), |
401 currentStep_(ThreadedJobStep_ProcessingInstances), | 414 currentStep_(ThreadedJobStep_ProcessingInstances), |
422 workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT); | 435 workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT); |
423 } | 436 } |
424 | 437 |
425 if (source.isMember(KEY_INSTANCES)) | 438 if (source.isMember(KEY_INSTANCES)) |
426 { | 439 { |
427 SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); | 440 SerializationToolbox::ReadSetOfStrings(instancesToProcess_, source, KEY_INSTANCES); |
428 } | 441 } |
429 | 442 |
430 if (source.isMember(KEY_CURRENT_STEP)) | 443 if (source.isMember(KEY_CURRENT_STEP)) |
431 { | 444 { |
432 currentStep_ = static_cast<ThreadedJobStep>(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP)); | 445 currentStep_ = static_cast<ThreadedJobStep>(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP)); |
456 return 1; | 469 return 1; |
457 } | 470 } |
458 else | 471 else |
459 { | 472 { |
460 size_t totalProgress = GetInstancesCount(); | 473 size_t totalProgress = GetInstancesCount(); |
461 size_t currentProgress = processedInstancesCount_; | 474 size_t currentProgress = processedInstances_.size(); |
462 | 475 |
463 if (HasPostProcessingStep()) | 476 if (HasPostProcessingStep()) |
464 { | 477 { |
465 ++totalProgress; | 478 ++totalProgress; |
466 if (currentStep_ > ThreadedJobStep_PostProcessingInstances) | 479 if (currentStep_ > ThreadedJobStep_PostProcessingInstances) |
557 boost::recursive_mutex::scoped_lock lock(mutex_); | 570 boost::recursive_mutex::scoped_lock lock(mutex_); |
558 | 571 |
559 for (std::list<std::string>::const_iterator | 572 for (std::list<std::string>::const_iterator |
560 it = instances.begin(); it != instances.end(); ++it) | 573 it = instances.begin(); it != instances.end(); ++it) |
561 { | 574 { |
562 instances_.insert(*it); | 575 instancesToProcess_.insert(*it); |
563 } | 576 } |
564 } | 577 } |
565 | 578 |
566 | 579 |
567 void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource) | 580 void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource) |