Mercurial > hg > orthanc
changeset 5134:6aa41d86b948
fix ModificationJob state machine
author | Alain Mazy <am@osimis.io> |
---|---|
date | Tue, 10 Jan 2023 11:46:00 +0100 |
parents | 7dacd2c3c009 |
children | 252385892197 |
files | OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h |
diffstat | 3 files changed, 78 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp Fri Jan 06 12:28:36 2023 +0100 +++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp Tue Jan 10 11:46:00 2023 +0100 @@ -176,7 +176,6 @@ LOG(INFO) << "Modifying instance in a job: " << instance; - /** * Retrieve the original instance from the DICOM cache. **/ @@ -280,7 +279,7 @@ std::string modifiedInstance; ServerContext::StoreResult result = GetContext().Store(modifiedInstance, *toStore, StoreInstanceMode_Default); - if (result.GetStatus() != StoreStatus_Success) + if (result.GetStatus() != StoreStatus_Success && result.GetStatus() != StoreStatus_AlreadyStored) // when retrying a job, we might save the same data again { throw OrthancException(ErrorCode_CannotStoreInstance, "Error while storing a modified instance " + instance);
--- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp Fri Jan 06 12:28:36 2023 +0100 +++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp Tue Jan 10 11:46:00 2023 +0100 @@ -44,10 +44,11 @@ started_(false), stopRequested_(false), permissive_(false), - currentStep_(ThreadedJobStep_BeforeStart), + currentStep_(ThreadedJobStep_ProcessingInstances), workersCount_(workersCount), context_(context), - keepSource_(keepSource) + keepSource_(keepSource), + errorCode_(ErrorCode_Success) { } @@ -57,6 +58,7 @@ // no need to lock mutex here since we access variables used only by the "master" thread StopWorkers(); + WaitWorkersComplete(); } @@ -95,10 +97,9 @@ void ThreadedSetOfInstancesJob::StopWorkers() { - // no need to lock mutex here since we access variables used or set only by the "master" thread + boost::recursive_mutex::scoped_lock lock(mutex_); stopRequested_ = true; - WaitWorkersComplete(); } @@ -112,6 +113,7 @@ { // deallocate resources StopWorkers(); + WaitWorkersComplete(); } else if (reason == JobStopReason_Paused) { @@ -138,16 +140,25 @@ try { - if (currentStep_ == ThreadedJobStep_BeforeStart) - { - currentStep_ = ThreadedJobStep_ProcessingInstances; - } - if (currentStep_ == ThreadedJobStep_ProcessingInstances) { + // create the workers and enqueue all instances + for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) + { + instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); + } + + InitWorkers(workersCount_); // wait until all instances are processed by the workers WaitWorkersComplete(); - + + // check job has really completed !!! it might have been interrupted because of an error + if ((processedInstancesCount_ != instances_.size()) + || (!IsPermissive() && failedInstances_.size() > 0)) + { + return JobStepResult::Failure(GetErrorCode(), NULL); + } + currentStep_ = ThreadedJobStep_PostProcessingInstances; return JobStepResult::Continue(); } @@ -220,17 +231,40 @@ return; } - bool processed = that->HandleInstance(instanceId->GetValue()); - + try { - boost::recursive_mutex::scoped_lock lock(that->mutex_); + bool processed = that->HandleInstance(instanceId->GetValue()); - that->processedInstancesCount_++; - if (!processed) { - that->failedInstances_.insert(instanceId->GetValue()); + boost::recursive_mutex::scoped_lock lock(that->mutex_); + + that->processedInstancesCount_++; + if (!processed) + { + that->failedInstances_.insert(instanceId->GetValue()); + } } } + catch (const Orthanc::OrthancException& e) + { + if (that->IsPermissive()) + { + LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What(); + } + else + { + LOG(ERROR) << "Error in a non-permissive job: " << e.What(); + that->SetErrorCode(e.GetErrorCode()); + that->StopWorkers(); + } + } + catch (...) + { + LOG(ERROR) << "Native exception while executing a job"; + that->SetErrorCode(e.GetErrorCode()); + that->StopWorkers(); + } + } } @@ -281,14 +315,6 @@ boost::recursive_mutex::scoped_lock lock(mutex_); started_ = true; - - // create the workers and enqueue all instances - for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) - { - instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); - } - - InitWorkers(workersCount_); } @@ -298,7 +324,7 @@ if (started_) { - currentStep_ = ThreadedJobStep_BeforeStart; + currentStep_ = ThreadedJobStep_ProcessingInstances; stopRequested_ = false; processedInstancesCount_ = 0; failedInstances_.clear(); @@ -372,10 +398,11 @@ started_(false), stopRequested_(false), permissive_(false), - currentStep_(ThreadedJobStep_BeforeStart), + currentStep_(ThreadedJobStep_ProcessingInstances), workersCount_(1), context_(context), - keepSource_(defaultKeepSource) + keepSource_(defaultKeepSource), + errorCode_(ErrorCode_Success) { SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES); @@ -399,6 +426,11 @@ { SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); } + + if (source.isMember(KEY_CURRENT_STEP)) + { + currentStep_ = static_cast<ThreadedJobStep>(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP)); + } } @@ -475,6 +507,19 @@ return description_; } + void ThreadedSetOfInstancesJob::SetErrorCode(ErrorCode errorCode) + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + errorCode_ = errorCode; + } + + ErrorCode ThreadedSetOfInstancesJob::GetErrorCode() const + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + return errorCode_; + } bool ThreadedSetOfInstancesJob::IsPermissive() const {
--- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h Fri Jan 06 12:28:36 2023 +0100 +++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h Tue Jan 10 11:46:00 2023 +0100 @@ -41,7 +41,6 @@ public: enum ThreadedJobStep // cannot use "Step" since there is a method with this name ! { - ThreadedJobStep_BeforeStart, ThreadedJobStep_ProcessingInstances, ThreadedJobStep_PostProcessingInstances, ThreadedJobStep_Cleanup, @@ -67,6 +66,7 @@ ServerContext& context_; bool keepSource_; + ErrorCode errorCode_; protected: mutable boost::recursive_mutex mutex_; @@ -102,6 +102,10 @@ bool HasCleanupStep() const; + void SetErrorCode(ErrorCode errorCode); + + ErrorCode GetErrorCode() const; + public: ThreadedJobStep GetCurrentStep() const;