comparison OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5136:e71b22a43c0b

Threaded modifications continued: call ReconstructInstance at the end of the modification to update the DB model
author Alain Mazy <am@osimis.io>
date Tue, 17 Jan 2023 17:54:38 +0100
parents 252385892197
children 15109c3f0f7d
comparison
equal deleted inserted replaced
5135:252385892197 5136:e71b22a43c0b
37 37
38 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, 38 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
39 bool hasPostProcessing, 39 bool hasPostProcessing,
40 bool keepSource, 40 bool keepSource,
41 size_t workersCount) : 41 size_t workersCount) :
42 processedInstancesCount_(0),
43 hasPostProcessing_(hasPostProcessing), 42 hasPostProcessing_(hasPostProcessing),
44 started_(false), 43 started_(false),
45 stopRequested_(false), 44 stopRequested_(false),
46 permissive_(false), 45 permissive_(false),
47 currentStep_(ThreadedJobStep_ProcessingInstances), 46 currentStep_(ThreadedJobStep_ProcessingInstances),
78 // no need to lock mutex here since we access variables used only by the "master" thread 77 // no need to lock mutex here since we access variables used only by the "master" thread
79 78
80 // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue 79 // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue
81 for (size_t i = 0; i < instancesWorkers_.size(); i++) 80 for (size_t i = 0; i < instancesWorkers_.size(); i++)
82 { 81 {
83 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE)); 82 instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE));
84 } 83 }
85 84
86 for (size_t i = 0; i < instancesWorkers_.size(); i++) 85 for (size_t i = 0; i < instancesWorkers_.size(); i++)
87 { 86 {
88 if (instancesWorkers_[i]->joinable()) 87 if (instancesWorkers_[i]->joinable())
141 try 140 try
142 { 141 {
143 if (currentStep_ == ThreadedJobStep_ProcessingInstances) 142 if (currentStep_ == ThreadedJobStep_ProcessingInstances)
144 { 143 {
145 // create the workers and enqueue all instances 144 // create the workers and enqueue all instances
146 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) 145 for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it)
147 { 146 {
148 instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); 147 instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(*it));
149 } 148 }
150 149
151 InitWorkers(workersCount_); 150 InitWorkers(workersCount_);
152 // wait until all instances are processed by the workers 151 // wait until all instances are processed by the workers
153 WaitWorkersComplete(); 152 WaitWorkersComplete();
154 153
155 // check job has really completed !!! it might have been interrupted because of an error 154 // check job has really completed !!! it might have been interrupted because of an error
156 if ((processedInstancesCount_ != instances_.size()) 155 if ((processedInstances_.size() != instancesToProcess_.size())
157 || (!IsPermissive() && failedInstances_.size() > 0)) 156 || (!IsPermissive() && failedInstances_.size() > 0))
158 { 157 {
159 return JobStepResult::Failure(GetErrorCode(), NULL); 158 return JobStepResult::Failure(GetErrorCode(), NULL);
160 } 159 }
161 160
175 else if (currentStep_ == ThreadedJobStep_Cleanup) 174 else if (currentStep_ == ThreadedJobStep_Cleanup)
176 { 175 {
177 // clean after the post processing step 176 // clean after the post processing step
178 if (HasCleanupStep()) 177 if (HasCleanupStep())
179 { 178 {
180 for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) 179 for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it)
181 { 180 {
182 Json::Value tmp; 181 Json::Value tmp;
183 context_.DeleteResource(tmp, *it, ResourceType_Instance); 182 context_.DeleteResource(tmp, *it, ResourceType_Instance);
184 } 183 }
185 } 184 }
209 boost::recursive_mutex::scoped_lock lock(mutex_); 208 boost::recursive_mutex::scoped_lock lock(mutex_);
210 209
211 return hasPostProcessing_; 210 return hasPostProcessing_;
212 } 211 }
213 212
213 void ThreadedSetOfInstancesJob::PostProcessInstances()
214 {
215 if (HasPostProcessingStep())
216 {
217 throw OrthancException(ErrorCode_InternalError, "Job with post-processing should override PostProcessInstances");
218 }
219 }
220
214 221
215 bool ThreadedSetOfInstancesJob::HasCleanupStep() const 222 bool ThreadedSetOfInstancesJob::HasCleanupStep() const
216 { 223 {
217 boost::recursive_mutex::scoped_lock lock(mutex_); 224 boost::recursive_mutex::scoped_lock lock(mutex_);
218 225
222 229
223 void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that) 230 void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that)
224 { 231 {
225 while (true) 232 while (true)
226 { 233 {
227 std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcess_.Dequeue(0))); 234 std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcessQueue_.Dequeue(0)));
228 if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean 235 if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean
229 || instanceId->GetValue() == EXIT_WORKER_MESSAGE) 236 || instanceId->GetValue() == EXIT_WORKER_MESSAGE)
230 { 237 {
231 return; 238 return;
232 } 239 }
233 240
241 bool processed = false;
242
234 try 243 try
235 { 244 {
236 bool processed = that->HandleInstance(instanceId->GetValue()); 245 processed = that->HandleInstance(instanceId->GetValue());
237
238 {
239 boost::recursive_mutex::scoped_lock lock(that->mutex_);
240
241 that->processedInstancesCount_++;
242 if (!processed)
243 {
244 that->failedInstances_.insert(instanceId->GetValue());
245 }
246 }
247 } 246 }
248 catch (const Orthanc::OrthancException& e) 247 catch (const Orthanc::OrthancException& e)
249 { 248 {
250 if (that->IsPermissive()) 249 if (that->IsPermissive())
251 { 250 {
254 else 253 else
255 { 254 {
256 LOG(ERROR) << "Error in a non-permissive job: " << e.What(); 255 LOG(ERROR) << "Error in a non-permissive job: " << e.What();
257 that->SetErrorCode(e.GetErrorCode()); 256 that->SetErrorCode(e.GetErrorCode());
258 that->StopWorkers(); 257 that->StopWorkers();
258 return;
259 } 259 }
260 } 260 }
261 catch (...) 261 catch (...)
262 { 262 {
263 LOG(ERROR) << "Native exception while executing a job"; 263 LOG(ERROR) << "Native exception while executing a job";
264 that->SetErrorCode(ErrorCode_InternalError); 264 that->SetErrorCode(ErrorCode_InternalError);
265 that->StopWorkers(); 265 that->StopWorkers();
266 } 266 return;
267 267 }
268
269 {
270 boost::recursive_mutex::scoped_lock lock(that->mutex_);
271
272 that->processedInstances_.insert(instanceId->GetValue());
273
274 if (!processed)
275 {
276 that->failedInstances_.insert(instanceId->GetValue());
277 }
278 }
279
268 } 280 }
269 } 281 }
270 282
271 283
272 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output, 284 bool ThreadedSetOfInstancesJob::GetOutput(std::string &output,
280 292
281 size_t ThreadedSetOfInstancesJob::GetInstancesCount() const 293 size_t ThreadedSetOfInstancesJob::GetInstancesCount() const
282 { 294 {
283 boost::recursive_mutex::scoped_lock lock(mutex_); 295 boost::recursive_mutex::scoped_lock lock(mutex_);
284 296
285 return instances_.size(); 297 return instancesToProcess_.size();
286 } 298 }
287 299
288 300
289 void ThreadedSetOfInstancesJob::GetFailedInstances(std::set<std::string>& target) const 301 void ThreadedSetOfInstancesJob::GetFailedInstances(std::set<std::string>& target) const
290 { 302 {
296 308
297 void ThreadedSetOfInstancesJob::GetInstances(std::set<std::string>& target) const 309 void ThreadedSetOfInstancesJob::GetInstances(std::set<std::string>& target) const
298 { 310 {
299 boost::recursive_mutex::scoped_lock lock(mutex_); 311 boost::recursive_mutex::scoped_lock lock(mutex_);
300 312
301 target = instances_; 313 target = instancesToProcess_;
302 } 314 }
303 315
304 316
305 bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const 317 bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const
306 { 318 {
316 328
317 started_ = true; 329 started_ = true;
318 } 330 }
319 331
320 332
333 // Reset is called when resubmitting a failed job
321 void ThreadedSetOfInstancesJob::Reset() 334 void ThreadedSetOfInstancesJob::Reset()
322 { 335 {
323 boost::recursive_mutex::scoped_lock lock(mutex_); 336 boost::recursive_mutex::scoped_lock lock(mutex_);
324 337
325 if (started_) 338 if (started_)
326 { 339 {
340 // TODO: cleanup the instances that have been generated during the previous run
327 currentStep_ = ThreadedJobStep_ProcessingInstances; 341 currentStep_ = ThreadedJobStep_ProcessingInstances;
328 stopRequested_ = false; 342 stopRequested_ = false;
329 processedInstancesCount_ = 0; 343 processedInstances_.clear();
330 failedInstances_.clear(); 344 failedInstances_.clear();
331 instancesToProcess_.Clear(); 345 instancesToProcessQueue_.Clear();
332 } 346 }
333 else 347 else
334 { 348 {
335 throw OrthancException(ErrorCode_BadSequenceOfCalls); 349 throw OrthancException(ErrorCode_BadSequenceOfCalls);
336 } 350 }
379 target[KEY_CURRENT_STEP] = static_cast<unsigned int>(currentStep_); 393 target[KEY_CURRENT_STEP] = static_cast<unsigned int>(currentStep_);
380 target[KEY_DESCRIPTION] = description_; 394 target[KEY_DESCRIPTION] = description_;
381 target[KEY_KEEP_SOURCE] = keepSource_; 395 target[KEY_KEEP_SOURCE] = keepSource_;
382 target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_); 396 target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_);
383 397
384 SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES); 398 SerializationToolbox::WriteSetOfStrings(target, instancesToProcess_, KEY_INSTANCES);
385 SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES); 399 SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES);
386 SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES); 400 SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
387 401
388 return true; 402 return true;
389 } 403 }
391 405
392 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context, 406 ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
393 const Json::Value& source, 407 const Json::Value& source,
394 bool hasPostProcessing, 408 bool hasPostProcessing,
395 bool defaultKeepSource) : 409 bool defaultKeepSource) :
396 processedInstancesCount_(0),
397 hasPostProcessing_(hasPostProcessing), 410 hasPostProcessing_(hasPostProcessing),
398 started_(false), 411 started_(false),
399 stopRequested_(false), 412 stopRequested_(false),
400 permissive_(false), 413 permissive_(false),
401 currentStep_(ThreadedJobStep_ProcessingInstances), 414 currentStep_(ThreadedJobStep_ProcessingInstances),
422 workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT); 435 workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT);
423 } 436 }
424 437
425 if (source.isMember(KEY_INSTANCES)) 438 if (source.isMember(KEY_INSTANCES))
426 { 439 {
427 SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); 440 SerializationToolbox::ReadSetOfStrings(instancesToProcess_, source, KEY_INSTANCES);
428 } 441 }
429 442
430 if (source.isMember(KEY_CURRENT_STEP)) 443 if (source.isMember(KEY_CURRENT_STEP))
431 { 444 {
432 currentStep_ = static_cast<ThreadedJobStep>(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP)); 445 currentStep_ = static_cast<ThreadedJobStep>(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP));
456 return 1; 469 return 1;
457 } 470 }
458 else 471 else
459 { 472 {
460 size_t totalProgress = GetInstancesCount(); 473 size_t totalProgress = GetInstancesCount();
461 size_t currentProgress = processedInstancesCount_; 474 size_t currentProgress = processedInstances_.size();
462 475
463 if (HasPostProcessingStep()) 476 if (HasPostProcessingStep())
464 { 477 {
465 ++totalProgress; 478 ++totalProgress;
466 if (currentStep_ > ThreadedJobStep_PostProcessingInstances) 479 if (currentStep_ > ThreadedJobStep_PostProcessingInstances)
557 boost::recursive_mutex::scoped_lock lock(mutex_); 570 boost::recursive_mutex::scoped_lock lock(mutex_);
558 571
559 for (std::list<std::string>::const_iterator 572 for (std::list<std::string>::const_iterator
560 it = instances.begin(); it != instances.end(); ++it) 573 it = instances.begin(); it != instances.end(); ++it)
561 { 574 {
562 instances_.insert(*it); 575 instancesToProcess_.insert(*it);
563 } 576 }
564 } 577 }
565 578
566 579
567 void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource) 580 void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource)