changeset 5136:e71b22a43c0b

Threaded modifications continued: call ReconstructInstance at the end of the modification to update the DB model
author Alain Mazy <am@osimis.io>
date Tue, 17 Jan 2023 17:54:38 +0100
parents 252385892197
children 15109c3f0f7d
files OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp OrthancServer/Sources/ServerJobs/ResourceModificationJob.h OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h
diffstat 4 files changed, 81 insertions(+), 36 deletions(-) [+]
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp	Wed Jan 11 11:14:00 2023 +0100
+++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp	Tue Jan 17 17:54:38 2023 +0100
@@ -161,8 +161,32 @@
       return false;
     }
   };
+
+  // Reset is called when resubmitting a failed job
+  void ResourceModificationJob::Reset()
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    // TODO: cleanup the instances that have been generated during the previous run
+    modifiedInstances_.clear();
+
+    ThreadedSetOfInstancesJob::Reset();
+  }
+
+  void ResourceModificationJob::PostProcessInstances()
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    // reconstruct the parents MainDicomTags in case one of them has changed
+    if (modifiedInstances_.size() > 0)
+    {
+      ServerContext::DicomCacheLocker locker(GetContext(), *(modifiedInstances_.begin()));
+      ParsedDicomFile& modifiedDicom = locker.GetDicom();
+
+      GetContext().GetIndex().ReconstructInstance(modifiedDicom);
+    }
     
-
+  }
 
   bool ResourceModificationJob::HandleInstance(const std::string& instance)
   {
@@ -296,6 +320,7 @@
       boost::recursive_mutex::scoped_lock lock(outputMutex_);
 
       output_->Update(modifiedHasher);
+      modifiedInstances_.insert(modifiedInstance);
     }
 
     return true;
@@ -303,7 +328,7 @@
 
 
   ResourceModificationJob::ResourceModificationJob(ServerContext& context, unsigned int workersCount) :
-    ThreadedSetOfInstancesJob(context, false /* no post processing step */, true /* by default, keep source */, workersCount),
+    ThreadedSetOfInstancesJob(context, true /* post processing step */, true /* by default, keep source */, workersCount),
     isAnonymization_(false),
     transcode_(false),
     transferSyntax_(DicomTransferSyntax_LittleEndianExplicit)  // dummy initialization
--- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.h	Wed Jan 11 11:14:00 2023 +0100
+++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.h	Tue Jan 17 17:54:38 2023 +0100
@@ -60,10 +60,14 @@
     DicomInstanceOrigin                 origin_;
     bool                                transcode_;
     DicomTransferSyntax                 transferSyntax_;
+    std::set<std::string>               modifiedInstances_;   // the list of new instance ids of the newly generated instances
+
 
   protected:
     virtual bool HandleInstance(const std::string& instance) ORTHANC_OVERRIDE; // from ThreadedSetOfInstancesJob
     
+    virtual void PostProcessInstances();
+
   public:
     explicit ResourceModificationJob(ServerContext& context, unsigned int workersCount);
 
@@ -121,5 +125,7 @@
     virtual void GetPublicContent(Json::Value& value) ORTHANC_OVERRIDE;
     
     virtual bool Serialize(Json::Value& value) ORTHANC_OVERRIDE;
+
+    virtual void Reset() ORTHANC_OVERRIDE;
   };
 }
--- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp	Wed Jan 11 11:14:00 2023 +0100
+++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp	Tue Jan 17 17:54:38 2023 +0100
@@ -39,7 +39,6 @@
                                                         bool hasPostProcessing,
                                                         bool keepSource,
                                                         size_t workersCount) :
-    processedInstancesCount_(0),
     hasPostProcessing_(hasPostProcessing),
     started_(false),
     stopRequested_(false),
@@ -80,7 +79,7 @@
     // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue
     for (size_t i = 0; i < instancesWorkers_.size(); i++)
     {
-      instancesToProcess_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE));
+      instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE));
     }
 
     for (size_t i = 0; i < instancesWorkers_.size(); i++)
@@ -143,9 +142,9 @@
       if (currentStep_ == ThreadedJobStep_ProcessingInstances)
       {
         // create the workers and enqueue all instances
-        for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
+        for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it)
         {
-          instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it));
+          instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(*it));
         }
 
         InitWorkers(workersCount_);
@@ -153,7 +152,7 @@
         WaitWorkersComplete();
 
         // check job has really completed !!! it might have been interrupted because of an error
-        if ((processedInstancesCount_ != instances_.size())
+        if ((processedInstances_.size() != instancesToProcess_.size())
           || (!IsPermissive() && failedInstances_.size() > 0))
         {
           return JobStepResult::Failure(GetErrorCode(), NULL);
@@ -177,7 +176,7 @@
         // clean after the post processing step
         if (HasCleanupStep())
         {
-          for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
+          for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it)
           {
             Json::Value tmp;
             context_.DeleteResource(tmp, *it, ResourceType_Instance);
@@ -211,6 +210,14 @@
     return hasPostProcessing_;
   }
 
+  void ThreadedSetOfInstancesJob::PostProcessInstances()
+  {
+    if (HasPostProcessingStep())
+    {
+      throw OrthancException(ErrorCode_InternalError, "Job with post-processing should override PostProcessInstances");
+    }
+  }
+
 
   bool ThreadedSetOfInstancesJob::HasCleanupStep() const
   {
@@ -224,26 +231,18 @@
   {
     while (true)
     {
-      std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcess_.Dequeue(0)));
+      std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcessQueue_.Dequeue(0)));
       if (that->stopRequested_                // no lock(mutex) to access this variable, this is safe since it's just reading a boolean
         || instanceId->GetValue() == EXIT_WORKER_MESSAGE)
       {
         return;
       }
 
+      bool processed = false;
+      
       try
       {
-        bool processed = that->HandleInstance(instanceId->GetValue());
-
-        {
-          boost::recursive_mutex::scoped_lock lock(that->mutex_);
-
-          that->processedInstancesCount_++;
-          if (!processed)
-          {
-            that->failedInstances_.insert(instanceId->GetValue()); 
-          }
-        }
+        processed = that->HandleInstance(instanceId->GetValue());
       }
       catch (const Orthanc::OrthancException& e)
       {
@@ -256,6 +255,7 @@
           LOG(ERROR) << "Error in a non-permissive job: " << e.What();
           that->SetErrorCode(e.GetErrorCode());
           that->StopWorkers();
+          return;
         }
       }
       catch (...)
@@ -263,8 +263,20 @@
         LOG(ERROR) << "Native exception while executing a job";
         that->SetErrorCode(ErrorCode_InternalError);
         that->StopWorkers();
+        return;
       }
-      
+
+      {
+        boost::recursive_mutex::scoped_lock lock(that->mutex_);
+        
+        that->processedInstances_.insert(instanceId->GetValue());
+
+        if (!processed)
+        {
+          that->failedInstances_.insert(instanceId->GetValue()); 
+        }
+      }
+
     }
   }
 
@@ -282,7 +294,7 @@
   {
     boost::recursive_mutex::scoped_lock lock(mutex_);
     
-    return instances_.size();
+    return instancesToProcess_.size();
   }
 
 
@@ -298,7 +310,7 @@
   {
     boost::recursive_mutex::scoped_lock lock(mutex_);
 
-    target = instances_;
+    target = instancesToProcess_;
   }
 
 
@@ -318,17 +330,19 @@
   }
 
 
+  // Reset is called when resubmitting a failed job
   void ThreadedSetOfInstancesJob::Reset()
   {
     boost::recursive_mutex::scoped_lock lock(mutex_);
 
     if (started_)
     {
+      // TODO: cleanup the instances that have been generated during the previous run
       currentStep_ = ThreadedJobStep_ProcessingInstances;
       stopRequested_ = false;
-      processedInstancesCount_ = 0;
+      processedInstances_.clear();
       failedInstances_.clear();
-      instancesToProcess_.Clear();
+      instancesToProcessQueue_.Clear();
     }
     else
     {
@@ -381,7 +395,7 @@
     target[KEY_KEEP_SOURCE] = keepSource_;
     target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_);
     
-    SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES);
+    SerializationToolbox::WriteSetOfStrings(target, instancesToProcess_, KEY_INSTANCES);
     SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES);
     SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
 
@@ -393,7 +407,6 @@
                                                        const Json::Value& source,
                                                        bool hasPostProcessing,
                                                        bool defaultKeepSource) :
-    processedInstancesCount_(0),
     hasPostProcessing_(hasPostProcessing),
     started_(false),
     stopRequested_(false),
@@ -424,7 +437,7 @@
 
     if (source.isMember(KEY_INSTANCES))
     {
-      SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES);
+      SerializationToolbox::ReadSetOfStrings(instancesToProcess_, source, KEY_INSTANCES);
     }
 
     if (source.isMember(KEY_CURRENT_STEP))
@@ -458,7 +471,7 @@
     else
     {
       size_t totalProgress = GetInstancesCount();
-      size_t currentProgress = processedInstancesCount_;
+      size_t currentProgress = processedInstances_.size();
       
       if (HasPostProcessingStep())
       {
@@ -559,7 +572,7 @@
     for (std::list<std::string>::const_iterator
            it = instances.begin(); it != instances.end(); ++it)
     {
-      instances_.insert(*it);
+      instancesToProcess_.insert(*it);
     }
   }
 
--- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h	Wed Jan 11 11:14:00 2023 +0100
+++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h	Tue Jan 17 17:54:38 2023 +0100
@@ -48,13 +48,14 @@
     };
 
   private:
-    std::set<std::string>  failedInstances_;
-    std::set<std::string>  parentResources_;
+    std::set<std::string>               instancesToProcess_;  // the list of source instances ids to process
+    std::set<std::string>               failedInstances_;     // the list of source instances ids that failed processing
+    std::set<std::string>               processedInstances_;  // the list of source instances ids that have been processed (including failed ones)
+
+    std::set<std::string>               parentResources_;
     
-    size_t                              processedInstancesCount_;
-    SharedMessageQueue                  instancesToProcess_;
+    SharedMessageQueue                  instancesToProcessQueue_;
     std::vector<boost::shared_ptr<boost::thread> >         instancesWorkers_;
-    std::set<std::string>               instances_;
 
     bool                    hasPostProcessing_;  // final step before "KeepSource" cleanup
     bool                    started_;
@@ -86,7 +87,7 @@
   protected:
     virtual bool HandleInstance(const std::string& instance) = 0;
 
-    virtual void PostProcessInstances() {}
+    virtual void PostProcessInstances();
 
     void InitWorkers(size_t workersCount);