diff OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5130:f2dcdbe05884

ResourceModification jobs can now use multiple threads
author Alain Mazy <am@osimis.io>
date Thu, 05 Jan 2023 17:24:43 +0100
parents
children 6aa41d86b948
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp	Thu Jan 05 17:24:43 2023 +0100
@@ -0,0 +1,529 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
+ * Department, University Hospital of Liege, Belgium
+ * Copyright (C) 2017-2022 Osimis S.A., Belgium
+ * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "ThreadedSetOfInstancesJob.h"
+
+#include "../../../OrthancFramework/Sources/Logging.h"
+#include "../../../OrthancFramework/Sources/OrthancException.h"
+#include "../../../OrthancFramework/Sources/SerializationToolbox.h"
+#include "../ServerContext.h"
+
+#include <boost/lexical_cast.hpp>
+#include <cassert>
+
+namespace Orthanc
+{
+  static const char* EXIT_WORKER_MESSAGE = "exit";
+
+   ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
+                                                        bool hasPostProcessing,
+                                                        bool keepSource,
+                                                        size_t workersCount) :
+    processedInstancesCount_(0),
+    hasPostProcessing_(hasPostProcessing),
+    started_(false),
+    stopRequested_(false),
+    permissive_(false),
+    currentStep_(ThreadedJobStep_BeforeStart),
+    workersCount_(workersCount),
+    context_(context),
+    keepSource_(keepSource)
+  {
+  }
+
+
+  ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob()
+  {
+    // no need to lock mutex here since we access variables used only by the "master" thread
+
+    StopWorkers();
+  }
+
+
+  void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount)
+  {
+    // no need to lock mutex here since we access variables used only by the "master" thread
+
+    for (size_t i = 0; i < workersCount; i++)
+    {
+      instancesWorkers_.push_back(boost::shared_ptr<boost::thread>(new boost::thread(InstanceWorkerThread, this)));
+    }
+  }
+
+
+  void ThreadedSetOfInstancesJob::WaitWorkersComplete()
+  {
+    // no need to lock mutex here since we access variables used only by the "master" thread
+
+    // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue
+    for (size_t i = 0; i < instancesWorkers_.size(); i++)
+    {
+      instancesToProcess_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE));
+    }
+
+    for (size_t i = 0; i < instancesWorkers_.size(); i++)
+    {
+      if (instancesWorkers_[i]->joinable())
+      {
+        instancesWorkers_[i]->join();
+      }
+    }
+
+    instancesWorkers_.clear();
+  }
+
+
+  void ThreadedSetOfInstancesJob::StopWorkers()
+  {
+    // no need to lock mutex here since we access variables used or set only by the "master" thread
+
+    stopRequested_ = true;
+    WaitWorkersComplete();
+  }
+
+
+  void ThreadedSetOfInstancesJob::Stop(JobStopReason reason)
+  {
+    // no need to lock mutex here since we access variables used or set only by the "master" thread
+
+    if (reason == JobStopReason_Canceled ||
+        reason == JobStopReason_Failure ||
+        reason == JobStopReason_Retry)
+    {
+      // deallocate resources
+      StopWorkers();
+    }
+    else if (reason == JobStopReason_Paused)
+    {
+      // keep resources allocated.
+      // note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state
+    }
+  }
+
+
+  JobStepResult ThreadedSetOfInstancesJob::Step(const std::string& jobId)
+  {
+    // no need to lock mutex here since we access variables used or set only by the "master" thread
+
+    if (!started_)
+    {
+      throw OrthancException(ErrorCode_InternalError);
+    }
+
+    if (GetInstancesCount() == 0)
+    {
+      // No instances to handle: We're done
+      return JobStepResult::Success();
+    }
+    
+    try
+    {
+      if (currentStep_ == ThreadedJobStep_BeforeStart)
+      {
+        currentStep_ = ThreadedJobStep_ProcessingInstances;
+      }
+
+      if (currentStep_ == ThreadedJobStep_ProcessingInstances)
+      {
+        // wait until all instances are processed by the workers
+        WaitWorkersComplete();
+        
+        currentStep_ = ThreadedJobStep_PostProcessingInstances;
+        return JobStepResult::Continue();
+      }
+      else if (currentStep_ == ThreadedJobStep_PostProcessingInstances)
+      {
+        if (HasPostProcessingStep())
+        {
+          PostProcessInstances();
+        }
+
+        currentStep_ = ThreadedJobStep_Cleanup;
+        return JobStepResult::Continue();
+      }
+      else if (currentStep_ == ThreadedJobStep_Cleanup)
+      {
+        // clean after the post processing step
+        if (HasCleanupStep())
+        {
+          for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
+          {
+            Json::Value tmp;
+            context_.DeleteResource(tmp, *it, ResourceType_Instance);
+          }
+        }
+
+        currentStep_ = ThreadedJobStep_Done;
+        return JobStepResult::Success();
+      }
+    }
+    catch (OrthancException& e)
+    {
+      if (permissive_)
+      {
+        LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What();
+      }
+      else
+      {
+        return JobStepResult::Failure(e);
+      }
+    }
+
+    return JobStepResult::Continue();
+  }
+
+
+  bool ThreadedSetOfInstancesJob::HasPostProcessingStep() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return hasPostProcessing_;
+  }
+
+
+  bool ThreadedSetOfInstancesJob::HasCleanupStep() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return !keepSource_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that)
+  {
+    while (true)
+    {
+      std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcess_.Dequeue(0)));
+      if (that->stopRequested_                // no lock(mutex) to access this variable, this is safe since it's just reading a boolean
+        || instanceId->GetValue() == EXIT_WORKER_MESSAGE)
+      {
+        return;
+      }
+
+      bool processed = that->HandleInstance(instanceId->GetValue());
+
+      {
+        boost::recursive_mutex::scoped_lock lock(that->mutex_);
+
+        that->processedInstancesCount_++;
+        if (!processed)
+        {
+          that->failedInstances_.insert(instanceId->GetValue()); 
+        }
+      }
+    }
+  }
+
+
+  bool ThreadedSetOfInstancesJob::GetOutput(std::string &output,
+                                            MimeType &mime,
+                                            std::string& filename,
+                                            const std::string &key)
+  {
+    return false;
+  }
+
+
+  size_t ThreadedSetOfInstancesJob::GetInstancesCount() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+    
+    return instances_.size();
+  }
+
+
+  void ThreadedSetOfInstancesJob::GetFailedInstances(std::set<std::string>& target) const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    target = failedInstances_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::GetInstances(std::set<std::string>& target) const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    target = instances_;
+  }
+
+
+  bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return failedInstances_.find(instance) != failedInstances_.end();
+  }
+
+
+  void ThreadedSetOfInstancesJob::Start()
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    started_ = true;
+
+    // create the workers and enqueue all instances
+    for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it)
+    {
+      instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it));
+    }
+
+    InitWorkers(workersCount_);
+  }
+
+
+  void ThreadedSetOfInstancesJob::Reset()
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    if (started_)
+    {
+      currentStep_ = ThreadedJobStep_BeforeStart;
+      stopRequested_ = false;
+      processedInstancesCount_ = 0;
+      failedInstances_.clear();
+      instancesToProcess_.Clear();
+    }
+    else
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+  }
+
+
+  static const char* KEY_FAILED_INSTANCES = "FailedInstances";
+  static const char* KEY_PARENT_RESOURCES = "ParentResources";
+  static const char* KEY_DESCRIPTION = "Description";
+  static const char* KEY_PERMISSIVE = "Permissive";
+  static const char* KEY_CURRENT_STEP = "CurrentStep";
+  static const char* KEY_TYPE = "Type";
+  static const char* KEY_INSTANCES = "Instances";
+  static const char* KEY_INSTANCES_COUNT = "InstancesCount";
+  static const char* KEY_FAILED_INSTANCES_COUNT = "FailedInstancesCount";
+  static const char* KEY_KEEP_SOURCE = "KeepSource";
+  static const char* KEY_WORKERS_COUNT = "WorkersCount";
+
+
+  void ThreadedSetOfInstancesJob::GetPublicContent(Json::Value& target)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    target[KEY_DESCRIPTION] = GetDescription();
+    target[KEY_INSTANCES_COUNT] = static_cast<uint32_t>(GetInstancesCount());
+    target[KEY_FAILED_INSTANCES_COUNT] = static_cast<uint32_t>(failedInstances_.size());
+
+    if (!parentResources_.empty())
+    {
+      SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
+    }
+  }
+
+
+  bool ThreadedSetOfInstancesJob::Serialize(Json::Value& target)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    target = Json::objectValue;
+
+    std::string type;
+    GetJobType(type);
+    target[KEY_TYPE] = type;
+    
+    target[KEY_PERMISSIVE] = permissive_;
+    target[KEY_CURRENT_STEP] = static_cast<unsigned int>(currentStep_);
+    target[KEY_DESCRIPTION] = description_;
+    target[KEY_KEEP_SOURCE] = keepSource_;
+    target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_);
+    
+    SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES);
+    SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES);
+    SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
+
+    return true;
+  }
+
+
+  ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
+                                                       const Json::Value& source,
+                                                       bool hasPostProcessing,
+                                                       bool defaultKeepSource) :
+    processedInstancesCount_(0),
+    hasPostProcessing_(hasPostProcessing),
+    started_(false),
+    stopRequested_(false),
+    permissive_(false),
+    currentStep_(ThreadedJobStep_BeforeStart),
+    workersCount_(1),
+    context_(context),
+    keepSource_(defaultKeepSource)
+  {
+    SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES);
+
+    if (source.isMember(KEY_PARENT_RESOURCES))
+    {
+      // Backward compatibility with Orthanc <= 1.5.6
+      SerializationToolbox::ReadSetOfStrings(parentResources_, source, KEY_PARENT_RESOURCES);
+    }
+    
+    if (source.isMember(KEY_KEEP_SOURCE))
+    {
+      keepSource_ = SerializationToolbox::ReadBoolean(source, KEY_KEEP_SOURCE);
+    }
+
+    if (source.isMember(KEY_WORKERS_COUNT))
+    {
+      workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT);
+    }
+
+    if (source.isMember(KEY_INSTANCES))
+    {
+      SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES);
+    }
+  }
+
+
+  void ThreadedSetOfInstancesJob::SetKeepSource(bool keep)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    if (IsStarted())
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+
+    keepSource_ = keep;
+  }
+
+
+  float ThreadedSetOfInstancesJob::GetProgress()
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    if (GetInstancesCount() == 0)
+    {
+      return 1;
+    }
+    else
+    {
+      size_t totalProgress = GetInstancesCount();
+      size_t currentProgress = processedInstancesCount_;
+      
+      if (HasPostProcessingStep())
+      {
+        ++totalProgress;
+        if (currentStep_ > ThreadedJobStep_PostProcessingInstances)
+        {
+          ++currentProgress;
+        }
+      }
+
+      if (HasCleanupStep())
+      {
+        ++totalProgress;
+        if (currentStep_ > ThreadedJobStep_Cleanup)
+        {
+          ++currentProgress;
+        }
+      }
+
+      return (static_cast<float>(currentProgress) /
+              static_cast<float>(totalProgress));
+    }
+  }
+
+
+  ThreadedSetOfInstancesJob::ThreadedJobStep ThreadedSetOfInstancesJob::GetCurrentStep() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return currentStep_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::SetDescription(const std::string &description)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    description_ = description;
+  }
+
+
+  const std::string& ThreadedSetOfInstancesJob::GetDescription() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return description_;
+  }
+
+
+  bool ThreadedSetOfInstancesJob::IsPermissive() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return permissive_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::SetPermissive(bool permissive)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    if (started_)
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+    else
+    {
+      permissive_ = permissive;
+    }
+  }
+
+
+  bool ThreadedSetOfInstancesJob::IsStarted() const
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    return started_;
+  }
+
+
+  void ThreadedSetOfInstancesJob::AddInstances(const std::list<std::string>& instances)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    for (std::list<std::string>::const_iterator
+           it = instances.begin(); it != instances.end(); ++it)
+    {
+      instances_.insert(*it);
+    }
+  }
+
+
+  void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource)
+  {
+    boost::recursive_mutex::scoped_lock lock(mutex_);
+
+    parentResources_.insert(resource);
+  }
+
+}
\ No newline at end of file