diff OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5136:e71b22a43c0b

Threaded modifications continued: call ReconstructInstance at the end of the modification to update the DB model
author Alain Mazy <am@osimis.io>
date Tue, 17 Jan 2023 17:54:38 +0100
parents 252385892197
children 15109c3f0f7d
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp	Wed Jan 11 11:14:00 2023 +0100
+++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp	Tue Jan 17 17:54:38 2023 +0100
@@ -39,7 +39,6 @@
                                                         bool hasPostProcessing,
                                                         bool keepSource,
                                                         size_t workersCount) :
-    processedInstancesCount_(0),
     hasPostProcessing_(hasPostProcessing),
     started_(false),
     stopRequested_(false),
@@ -80,7 +79,7 @@
     // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue
     for (size_t i = 0; i < instancesWorkers_.size(); i++)
     {
-      instancesToProcess_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE));
+      instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE));
     }
 
     for (size_t i = 0; i < instancesWorkers_.size(); i++)
@@ -143,9 +142,9 @@
       if (currentStep_ == ThreadedJobStep_ProcessingInstances)
       {
         // create the workers and enqueue all instances
-        for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
+        for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it)
         {
-          instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it));
+          instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(*it));
         }
 
         InitWorkers(workersCount_);
@@ -153,7 +152,7 @@
         WaitWorkersComplete();
 
         // check job has really completed !!! it might have been interrupted because of an error
-        if ((processedInstancesCount_ != instances_.size())
+        if ((processedInstances_.size() != instancesToProcess_.size())
           || (!IsPermissive() && failedInstances_.size() > 0))
         {
           return JobStepResult::Failure(GetErrorCode(), NULL);
@@ -177,7 +176,7 @@
         // clean after the post processing step
         if (HasCleanupStep())
         {
-          for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
+          for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it)
           {
             Json::Value tmp;
             context_.DeleteResource(tmp, *it, ResourceType_Instance);
@@ -211,6 +210,14 @@
     return hasPostProcessing_;
   }
 
+  void ThreadedSetOfInstancesJob::PostProcessInstances()
+  {
+    if (HasPostProcessingStep())
+    {
+      throw OrthancException(ErrorCode_InternalError, "Job with post-processing should override PostProcessInstances");
+    }
+  }
+
 
   bool ThreadedSetOfInstancesJob::HasCleanupStep() const
   {
@@ -224,26 +231,18 @@
   {
     while (true)
     {
-      std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcess_.Dequeue(0)));
+      std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcessQueue_.Dequeue(0)));
       if (that->stopRequested_                // no lock(mutex) to access this variable, this is safe since it's just reading a boolean
         || instanceId->GetValue() == EXIT_WORKER_MESSAGE)
       {
         return;
       }
 
+      bool processed = false;
+      
       try
       {
-        bool processed = that->HandleInstance(instanceId->GetValue());
-
-        {
-          boost::recursive_mutex::scoped_lock lock(that->mutex_);
-
-          that->processedInstancesCount_++;
-          if (!processed)
-          {
-            that->failedInstances_.insert(instanceId->GetValue()); 
-          }
-        }
+        processed = that->HandleInstance(instanceId->GetValue());
       }
       catch (const Orthanc::OrthancException& e)
       {
@@ -256,6 +255,7 @@
           LOG(ERROR) << "Error in a non-permissive job: " << e.What();
           that->SetErrorCode(e.GetErrorCode());
           that->StopWorkers();
+          return;
         }
       }
       catch (...)
@@ -263,8 +263,20 @@
         LOG(ERROR) << "Native exception while executing a job";
         that->SetErrorCode(ErrorCode_InternalError);
         that->StopWorkers();
+        return;
       }
-      
+
+      {
+        boost::recursive_mutex::scoped_lock lock(that->mutex_);
+        
+        that->processedInstances_.insert(instanceId->GetValue());
+
+        if (!processed)
+        {
+          that->failedInstances_.insert(instanceId->GetValue()); 
+        }
+      }
+
     }
   }
 
@@ -282,7 +294,7 @@
   {
     boost::recursive_mutex::scoped_lock lock(mutex_);
     
-    return instances_.size();
+    return instancesToProcess_.size();
   }
 
 
@@ -298,7 +310,7 @@
   {
     boost::recursive_mutex::scoped_lock lock(mutex_);
 
-    target = instances_;
+    target = instancesToProcess_;
   }
 
 
@@ -318,17 +330,19 @@
   }
 
 
+  // Reset is called when resubmitting a failed job
   void ThreadedSetOfInstancesJob::Reset()
   {
     boost::recursive_mutex::scoped_lock lock(mutex_);
 
     if (started_)
     {
+      // TODO: cleanup the instances that have been generated during the previous run
       currentStep_ = ThreadedJobStep_ProcessingInstances;
       stopRequested_ = false;
-      processedInstancesCount_ = 0;
+      processedInstances_.clear();
       failedInstances_.clear();
-      instancesToProcess_.Clear();
+      instancesToProcessQueue_.Clear();
     }
     else
     {
@@ -381,7 +395,7 @@
     target[KEY_KEEP_SOURCE] = keepSource_;
     target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_);
     
-    SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES);
+    SerializationToolbox::WriteSetOfStrings(target, instancesToProcess_, KEY_INSTANCES);
     SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES);
     SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
 
@@ -393,7 +407,6 @@
                                                        const Json::Value& source,
                                                        bool hasPostProcessing,
                                                        bool defaultKeepSource) :
-    processedInstancesCount_(0),
     hasPostProcessing_(hasPostProcessing),
     started_(false),
     stopRequested_(false),
@@ -424,7 +437,7 @@
 
     if (source.isMember(KEY_INSTANCES))
     {
-      SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES);
+      SerializationToolbox::ReadSetOfStrings(instancesToProcess_, source, KEY_INSTANCES);
     }
 
     if (source.isMember(KEY_CURRENT_STEP))
@@ -458,7 +471,7 @@
     else
     {
       size_t totalProgress = GetInstancesCount();
-      size_t currentProgress = processedInstancesCount_;
+      size_t currentProgress = processedInstances_.size();
       
       if (HasPostProcessingStep())
       {
@@ -559,7 +572,7 @@
     for (std::list<std::string>::const_iterator
            it = instances.begin(); it != instances.end(); ++it)
     {
-      instances_.insert(*it);
+      instancesToProcess_.insert(*it);
     }
   }