changeset 5134:6aa41d86b948

fix ModificationJob state machine
author Alain Mazy <am@osimis.io>
date Tue, 10 Jan 2023 11:46:00 +0100
parents 7dacd2c3c009
children 252385892197
files OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h
diffstat 3 files changed, 78 insertions(+), 30 deletions(-) [+]
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp	Fri Jan 06 12:28:36 2023 +0100
+++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp	Tue Jan 10 11:46:00 2023 +0100
@@ -176,7 +176,6 @@
       
     LOG(INFO) << "Modifying instance in a job: " << instance;
 
-
     /**
      * Retrieve the original instance from the DICOM cache.
      **/
@@ -280,7 +279,7 @@
 
     std::string modifiedInstance;
     ServerContext::StoreResult result = GetContext().Store(modifiedInstance, *toStore, StoreInstanceMode_Default);
-    if (result.GetStatus() != StoreStatus_Success)
+    if (result.GetStatus() != StoreStatus_Success && result.GetStatus() != StoreStatus_AlreadyStored) // when retrying a job, we might save the same data again
     {
       throw OrthancException(ErrorCode_CannotStoreInstance,
                              "Error while storing a modified instance " + instance);
--- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp	Fri Jan 06 12:28:36 2023 +0100
+++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp	Tue Jan 10 11:46:00 2023 +0100
@@ -44,10 +44,11 @@
     started_(false),
     stopRequested_(false),
     permissive_(false),
-    currentStep_(ThreadedJobStep_BeforeStart),
+    currentStep_(ThreadedJobStep_ProcessingInstances),
     workersCount_(workersCount),
     context_(context),
-    keepSource_(keepSource)
+    keepSource_(keepSource),
+    errorCode_(ErrorCode_Success)
   {
   }
 
@@ -57,6 +58,7 @@
     // no need to lock mutex here since we access variables used only by the "master" thread
 
     StopWorkers();
+    WaitWorkersComplete();
   }
 
 
@@ -95,10 +97,9 @@
 
   void ThreadedSetOfInstancesJob::StopWorkers()
   {
-    // no need to lock mutex here since we access variables used or set only by the "master" thread
+    boost::recursive_mutex::scoped_lock lock(mutex_);
 
     stopRequested_ = true;
-    WaitWorkersComplete();
   }
 
 
@@ -112,6 +113,7 @@
     {
       // deallocate resources
       StopWorkers();
+      WaitWorkersComplete();
     }
     else if (reason == JobStopReason_Paused)
     {
@@ -138,16 +140,25 @@
     
     try
     {
-      if (currentStep_ == ThreadedJobStep_BeforeStart)
-      {
-        currentStep_ = ThreadedJobStep_ProcessingInstances;
-      }
-
       if (currentStep_ == ThreadedJobStep_ProcessingInstances)
       {
+        // create the workers and enqueue all instances
+        for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
+        {
+          instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it));
+        }
+
+        InitWorkers(workersCount_);
         // wait until all instances are processed by the workers
         WaitWorkersComplete();
-        
+
+        // check job has really completed !!! it might have been interrupted because of an error
+        if ((processedInstancesCount_ != instances_.size())
+          || (!IsPermissive() && failedInstances_.size() > 0))
+        {
+          return JobStepResult::Failure(GetErrorCode(), NULL);
+        }
+
         currentStep_ = ThreadedJobStep_PostProcessingInstances;
         return JobStepResult::Continue();
       }
@@ -220,17 +231,40 @@
         return;
       }
 
-      bool processed = that->HandleInstance(instanceId->GetValue());
-
+      try
       {
-        boost::recursive_mutex::scoped_lock lock(that->mutex_);
+        bool processed = that->HandleInstance(instanceId->GetValue());
 
-        that->processedInstancesCount_++;
-        if (!processed)
         {
-          that->failedInstances_.insert(instanceId->GetValue()); 
+          boost::recursive_mutex::scoped_lock lock(that->mutex_);
+
+          that->processedInstancesCount_++;
+          if (!processed)
+          {
+            that->failedInstances_.insert(instanceId->GetValue()); 
+          }
         }
       }
+      catch (const Orthanc::OrthancException& e)
+      {
+        if (that->IsPermissive())
+        {
+          LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What();
+        }
+        else
+        {
+          LOG(ERROR) << "Error in a non-permissive job: " << e.What();
+          that->SetErrorCode(e.GetErrorCode());
+          that->StopWorkers();
+        }
+      }
+      catch (...)
+      {
+        LOG(ERROR) << "Native exception while executing a job";
+        that->SetErrorCode(e.GetErrorCode());
+        that->StopWorkers();
+      }
+      
     }
   }
 
@@ -281,14 +315,6 @@
     boost::recursive_mutex::scoped_lock lock(mutex_);
 
     started_ = true;
-
-    // create the workers and enqueue all instances
-    for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
-    {
-      instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it));
-    }
-
-    InitWorkers(workersCount_);
   }
 
 
@@ -298,7 +324,7 @@
 
     if (started_)
     {
-      currentStep_ = ThreadedJobStep_BeforeStart;
+      currentStep_ = ThreadedJobStep_ProcessingInstances;
       stopRequested_ = false;
       processedInstancesCount_ = 0;
       failedInstances_.clear();
@@ -372,10 +398,11 @@
     started_(false),
     stopRequested_(false),
     permissive_(false),
-    currentStep_(ThreadedJobStep_BeforeStart),
+    currentStep_(ThreadedJobStep_ProcessingInstances),
     workersCount_(1),
     context_(context),
-    keepSource_(defaultKeepSource)
+    keepSource_(defaultKeepSource),
+    errorCode_(ErrorCode_Success)
   {
     SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES);
 
@@ -399,6 +426,11 @@
     {
       SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES);
     }
+
+    if (source.isMember(KEY_CURRENT_STEP))
+    {
+      currentStep_ = static_cast<ThreadedJobStep>(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP));
+    }
   }
 
 
@@ -475,6 +507,19 @@
     return description_;
   }
 
+  void ThreadedSetOfInstancesJob::SetErrorCode(ErrorCode errorCode)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    errorCode_ = errorCode;
+  }
+
+  ErrorCode ThreadedSetOfInstancesJob::GetErrorCode() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return errorCode_;
+  }
 
   bool ThreadedSetOfInstancesJob::IsPermissive() const
   {
--- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h	Fri Jan 06 12:28:36 2023 +0100
+++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h	Tue Jan 10 11:46:00 2023 +0100
@@ -41,7 +41,6 @@
   public:
     enum ThreadedJobStep  // cannot use "Step" since there is a method with this name !
     {
-      ThreadedJobStep_BeforeStart,
       ThreadedJobStep_ProcessingInstances,
       ThreadedJobStep_PostProcessingInstances,
       ThreadedJobStep_Cleanup,
@@ -67,6 +66,7 @@
 
     ServerContext&          context_;
     bool                    keepSource_;
+    ErrorCode               errorCode_;
   protected:
     mutable boost::recursive_mutex mutex_;
 
@@ -102,6 +102,10 @@
 
     bool HasCleanupStep() const;
 
+    void SetErrorCode(ErrorCode errorCode);
+
+    ErrorCode GetErrorCode() const;
+
   public:
 
     ThreadedJobStep GetCurrentStep() const;