Mercurial > hg > orthanc
diff OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5134:6aa41d86b948
fix ModificationJob state machine
author | Alain Mazy <am@osimis.io> |
---|---|
date | Tue, 10 Jan 2023 11:46:00 +0100 |
parents | f2dcdbe05884 |
children | 252385892197 |
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp Fri Jan 06 12:28:36 2023 +0100 +++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp Tue Jan 10 11:46:00 2023 +0100 @@ -44,10 +44,11 @@ started_(false), stopRequested_(false), permissive_(false), - currentStep_(ThreadedJobStep_BeforeStart), + currentStep_(ThreadedJobStep_ProcessingInstances), workersCount_(workersCount), context_(context), - keepSource_(keepSource) + keepSource_(keepSource), + errorCode_(ErrorCode_Success) { } @@ -57,6 +58,7 @@ // no need to lock mutex here since we access variables used only by the "master" thread StopWorkers(); + WaitWorkersComplete(); } @@ -95,10 +97,9 @@ void ThreadedSetOfInstancesJob::StopWorkers() { - // no need to lock mutex here since we access variables used or set only by the "master" thread + boost::recursive_mutex::scoped_lock lock(mutex_); stopRequested_ = true; - WaitWorkersComplete(); } @@ -112,6 +113,7 @@ { // deallocate resources StopWorkers(); + WaitWorkersComplete(); } else if (reason == JobStopReason_Paused) { @@ -138,16 +140,25 @@ try { - if (currentStep_ == ThreadedJobStep_BeforeStart) - { - currentStep_ = ThreadedJobStep_ProcessingInstances; - } - if (currentStep_ == ThreadedJobStep_ProcessingInstances) { + // create the workers and enqueue all instances + for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) + { + instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); + } + + InitWorkers(workersCount_); // wait until all instances are processed by the workers WaitWorkersComplete(); - + + // check job has really completed !!! it might have been interrupted because of an error + if ((processedInstancesCount_ != instances_.size()) + || (!IsPermissive() && failedInstances_.size() > 0)) + { + return JobStepResult::Failure(GetErrorCode(), NULL); + } + currentStep_ = ThreadedJobStep_PostProcessingInstances; return JobStepResult::Continue(); } @@ -220,17 +231,40 @@ return; } - bool processed = that->HandleInstance(instanceId->GetValue()); - + try { - boost::recursive_mutex::scoped_lock lock(that->mutex_); + bool processed = that->HandleInstance(instanceId->GetValue()); - that->processedInstancesCount_++; - if (!processed) { - that->failedInstances_.insert(instanceId->GetValue()); + boost::recursive_mutex::scoped_lock lock(that->mutex_); + + that->processedInstancesCount_++; + if (!processed) + { + that->failedInstances_.insert(instanceId->GetValue()); + } } } + catch (const Orthanc::OrthancException& e) + { + if (that->IsPermissive()) + { + LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What(); + } + else + { + LOG(ERROR) << "Error in a non-permissive job: " << e.What(); + that->SetErrorCode(e.GetErrorCode()); + that->StopWorkers(); + } + } + catch (...) + { + LOG(ERROR) << "Native exception while executing a job"; + that->SetErrorCode(e.GetErrorCode()); + that->StopWorkers(); + } + } } @@ -281,14 +315,6 @@ boost::recursive_mutex::scoped_lock lock(mutex_); started_ = true; - - // create the workers and enqueue all instances - for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) - { - instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); - } - - InitWorkers(workersCount_); } @@ -298,7 +324,7 @@ if (started_) { - currentStep_ = ThreadedJobStep_BeforeStart; + currentStep_ = ThreadedJobStep_ProcessingInstances; stopRequested_ = false; processedInstancesCount_ = 0; failedInstances_.clear(); @@ -372,10 +398,11 @@ started_(false), stopRequested_(false), permissive_(false), - currentStep_(ThreadedJobStep_BeforeStart), + currentStep_(ThreadedJobStep_ProcessingInstances), workersCount_(1), context_(context), - keepSource_(defaultKeepSource) + keepSource_(defaultKeepSource), + errorCode_(ErrorCode_Success) { SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES); @@ -399,6 +426,11 @@ { SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); } + + if (source.isMember(KEY_CURRENT_STEP)) + { + currentStep_ = static_cast<ThreadedJobStep>(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP)); + } } @@ -475,6 +507,19 @@ return description_; } + void ThreadedSetOfInstancesJob::SetErrorCode(ErrorCode errorCode) + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + errorCode_ = errorCode; + } + + ErrorCode ThreadedSetOfInstancesJob::GetErrorCode() const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return errorCode_; + } bool ThreadedSetOfInstancesJob::IsPermissive() const {