changeset 2570:2e879c796ec7 jobs

JobsRegistry::SubmitAndWait(), StoreScuJob
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 07 May 2018 21:42:04 +0200
parents 2af17cd5eb1f
children 3372c5255333
files Core/JobsEngine/IJob.h Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsRegistry.cpp Core/JobsEngine/JobsRegistry.h OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h Resources/Configuration.json UnitTestsSources/MultiThreadingTests.cpp
diffstat 8 files changed, 420 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/Core/JobsEngine/IJob.h	Mon May 07 15:37:20 2018 +0200
+++ b/Core/JobsEngine/IJob.h	Mon May 07 21:42:04 2018 +0200
@@ -47,6 +47,9 @@
     {
     }
 
+    // Method called once the job enters the jobs engine
+    virtual void Start() = 0;
+    
     virtual JobStepResult* ExecuteStep() = 0;
 
     virtual void ReleaseResources() = 0;   // For pausing jobs
--- a/Core/JobsEngine/JobsEngine.cpp	Mon May 07 15:37:20 2018 +0200
+++ b/Core/JobsEngine/JobsEngine.cpp	Mon May 07 21:42:04 2018 +0200
@@ -102,6 +102,7 @@
         return false;
 
       case JobStepCode_Retry:
+        running.GetJob().ReleaseResources();
         running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout());
         return false;
 
@@ -190,11 +191,6 @@
     
   void JobsEngine::SetWorkersCount(size_t count)
   {
-    if (count == 0)
-    {
-      throw OrthancException(ErrorCode_ParameterOutOfRange);
-    }
-      
     boost::mutex::scoped_lock lock(stateMutex_);
       
     if (state_ != State_Setup)
@@ -218,6 +214,19 @@
 
     retryHandler_ = boost::thread(RetryHandler, this);
 
+    if (workers_.size() == 0)
+    {
+      // Use all the available CPUs
+      size_t n = boost::thread::hardware_concurrency();
+      
+      if (n == 0)
+      {
+        n = 1;
+      }
+
+      workers_.resize(n);
+    }      
+
     for (size_t i = 0; i < workers_.size(); i++)
     {
       workers_[i] = boost::thread(Worker, this, i);
@@ -225,7 +234,7 @@
 
     state_ = State_Running;
 
-    LOG(WARNING) << "The jobs engine has started";
+    LOG(WARNING) << "The jobs engine has started with " << workers_.size() << " threads";
   }
 
 
--- a/Core/JobsEngine/JobsRegistry.cpp	Mon May 07 15:37:20 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.cpp	Mon May 07 21:42:04 2018 +0200
@@ -92,6 +92,7 @@
       }
 
       lastStatus_ = JobStatus(ErrorCode_Success, *job);
+      job->Start();
     }
 
     const std::string& GetId() const
@@ -348,6 +349,8 @@
     completedJobs_.push_back(&job);
     ForgetOldCompletedJobs();
 
+    someJobComplete_.notify_all();
+
     CheckInvariants();
   }
 
@@ -382,6 +385,24 @@
   }
 
 
+  bool JobsRegistry::GetStateInternal(JobState& state,
+                                      const std::string& id)
+  {
+    CheckInvariants();
+
+    JobsIndex::const_iterator it = jobsIndex_.find(id);
+    if (it == jobsIndex_.end())
+    {
+      return false;
+    }
+    else
+    {
+      state = it->second->GetState();
+      return true;
+    }
+  }
+
+  
   JobsRegistry::~JobsRegistry()
   {
     for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
@@ -474,6 +495,31 @@
   }
 
 
+  bool JobsRegistry::SubmitAndWait(IJob* job,        // Takes ownership
+                                   int priority)
+  {
+    std::string id;
+    Submit(id, job, priority);
+
+    printf(">> %s\n", id.c_str()); fflush(stdout);
+
+    JobState state;
+
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      while (GetStateInternal(state, id) &&
+             state != JobState_Success &&
+             state != JobState_Failure)
+      {
+        someJobComplete_.wait(lock);
+      }
+    }
+
+    return (state == JobState_Success);
+  }
+
+
   void JobsRegistry::SetPriority(const std::string& id,
                                  int priority)
   {
@@ -687,18 +733,7 @@
                               const std::string& id)
   {
     boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
-
-    JobsIndex::const_iterator it = jobsIndex_.find(id);
-    if (it == jobsIndex_.end())
-    {
-      return false;
-    }
-    else
-    {
-      state = it->second->GetState();
-      return true;
-    }
+    return GetStateInternal(state, id);
   }
 
   
--- a/Core/JobsEngine/JobsRegistry.h	Mon May 07 15:37:20 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.h	Mon May 07 21:42:04 2018 +0200
@@ -77,6 +77,7 @@
     RetryJobs                  retryJobs_;
 
     boost::condition_variable  pendingJobAvailable_;
+    boost::condition_variable  someJobComplete_;
     size_t                     maxCompletedJobs_;
 
 
@@ -99,6 +100,9 @@
                             unsigned int timeout);
     
     void MarkRunningAsPaused(JobHandler& job);
+    
+    bool GetStateInternal(JobState& state,
+                          const std::string& id);
 
   public:
     JobsRegistry() :
@@ -122,6 +126,9 @@
     
     void Submit(IJob* job,        // Takes ownership
                 int priority);
+
+    bool SubmitAndWait(IJob* job,        // Takes ownership
+                       int priority);
     
     void SetPriority(const std::string& id,
                      int priority);
--- a/OrthancServer/ServerContext.cpp	Mon May 07 15:37:20 2018 +0200
+++ b/OrthancServer/ServerContext.cpp	Mon May 07 21:42:04 2018 +0200
@@ -134,6 +134,10 @@
 
     listeners_.push_back(ServerListener(lua_, "Lua"));
 
+    jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2));
+    //jobsEngine_.SetMaxCompleted   // TODO
+    jobsEngine_.Start();
+
     changeThread_ = boost::thread(ChangeThread, this);
   }
 
@@ -168,6 +172,7 @@
       scu_.Finalize();
 
       // Do not change the order below!
+      jobsEngine_.Stop();
       scheduler_.Stop();
       index_.Stop();
     }
--- a/OrthancServer/ServerContext.h	Mon May 07 15:37:20 2018 +0200
+++ b/OrthancServer/ServerContext.h	Mon May 07 21:42:04 2018 +0200
@@ -48,6 +48,7 @@
 #include "Scheduler/ServerScheduler.h"
 #include "ServerIndex.h"
 #include "OrthancHttpHandler.h"
+#include "../Core/JobsEngine/JobsEngine.h"
 
 #include <boost/filesystem.hpp>
 #include <boost/thread.hpp>
@@ -120,6 +121,7 @@
     MemoryCache dicomCache_;
     ReusableDicomUserConnection scu_;
     ServerScheduler scheduler_;
+    JobsEngine jobsEngine_;
 
     LuaScripting lua_;
 
@@ -248,6 +250,11 @@
       return scheduler_;
     }
 
+    JobsEngine& GetJobsEngine()
+    {
+      return jobsEngine_;
+    }
+
     bool DeleteResource(Json::Value& target,
                         const std::string& uuid,
                         ResourceType expectedType);
--- a/Resources/Configuration.json	Mon May 07 15:37:20 2018 +0200
+++ b/Resources/Configuration.json	Mon May 07 21:42:04 2018 +0200
@@ -43,6 +43,11 @@
   "Plugins" : [
   ],
 
+  // Maximum number of processing jobs that are simultanously running
+  // at any given time. A value of "0" indicates to use all the
+  // available CPU logical cores. To emulate Orthanc <= 1.3.2, set
+  // this value to "1".
+  "ConcurrentJobs" : 2,
 
 
   /**
--- a/UnitTestsSources/MultiThreadingTests.cpp	Mon May 07 15:37:20 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Mon May 07 21:42:04 2018 +0200
@@ -263,9 +263,13 @@
   {
   }
 
+  virtual void Start()
+  {
+  }
+    
   virtual JobStepResult* ExecuteStep()
   {
-    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(10));
 
     if (count_ == steps_ - 1)
     {
@@ -598,6 +602,323 @@
 }
 
 
+
+#include "../OrthancServer/ServerContext.h"
+
+namespace Orthanc
+{
+  class InstancesIteratorJob : public IJob
+  {
+  private:
+    bool                      started_;
+    std::vector<std::string>  instances_;
+    size_t                    position_;
+
+  public:
+    InstancesIteratorJob() :
+      started_(false),
+      position_(0)
+    {
+    }
+
+    void Reserve(size_t size)
+    {
+      if (started_)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        instances_.reserve(size);
+      }
+    }
+    
+    void AddInstance(const std::string& instance)
+    {
+      if (started_)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        instances_.push_back(instance);
+      }
+    }
+    
+    virtual void Start()
+    {
+      started_ = true;
+    }
+    
+    virtual float GetProgress()
+    {
+      if (instances_.size() == 0)
+      {
+        return 0;
+      }
+      else
+      {
+        return (static_cast<float>(position_) /
+                static_cast<float>(instances_.size()));
+      }
+    }
+
+    bool IsStarted() const
+    {
+      return started_;
+    }
+
+    bool IsDone() const
+    {
+      if (instances_.size() == 0)
+      {
+        return true;
+      }
+      else
+      {
+        return (position_ == instances_.size() - 1);
+      }
+    }
+
+    void Next()
+    {
+      if (IsDone())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        position_ += 1;
+      }
+    }
+
+    const std::string& GetCurrentInstance() const
+    {
+      if (IsDone())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        return instances_[position_];
+      }      
+    }
+  };
+
+
+  class StoreScuJob : public InstancesIteratorJob
+  {
+  private:
+    ServerContext&                      context_;
+    std::string                         localAet_;
+    RemoteModalityParameters            remote_;
+    bool                                permissive_;
+    std::string                         moveOriginatorAet_;
+    uint16_t                            moveOriginatorId_;
+    std::auto_ptr<DicomUserConnection>  connection_;
+    std::set<std::string>               failedInstances_;
+
+    void Open()
+    {
+      if (connection_.get() == NULL)
+      {
+        connection_.reset(new DicomUserConnection);
+        connection_->SetLocalApplicationEntityTitle(localAet_);
+        connection_->SetRemoteModality(remote_);
+        connection_->Open();
+      }
+    }
+    
+  public:
+    StoreScuJob(ServerContext& context) :
+      context_(context),
+      localAet_("ORTHANC"),
+      permissive_(false),
+      moveOriginatorId_(0)  // By default, not a C-MOVE
+    {
+    }
+
+    const std::string& GetLocalAet() const
+    {
+      return localAet_;
+    }
+
+    void SetLocalAet(const std::string& aet)
+    {
+      if (IsStarted())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        localAet_ = aet;
+      }
+    }
+
+    const RemoteModalityParameters& GetRemoteModality() const
+    {
+      return remote_;
+    }
+
+    void SetRemoteModality(const RemoteModalityParameters& remote)
+    {
+      if (IsStarted())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        remote_ = remote;
+      }
+    }
+
+    bool IsPermissive() const
+    {
+      return permissive_;
+    }
+
+    void SetPermissive(bool permissive)
+    {
+      if (IsStarted())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        permissive_ = permissive;
+      }
+    }
+
+    bool HasMoveOriginator() const
+    {
+      return moveOriginatorId_ != 0;
+    }
+    
+    const std::string& GetMoveOriginatorAet() const
+    {
+      if (HasMoveOriginator())
+      {
+        return moveOriginatorAet_;
+      }
+      else
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+    
+    uint16_t GetMoveOriginatorId() const
+    {
+      if (HasMoveOriginator())
+      {
+        return moveOriginatorId_;
+      }
+      else
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+
+    void SetMoveOriginator(const std::string& aet,
+                           int id)
+    {
+      if (IsStarted())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else if (id < 0 || 
+               id >= 65536)
+      {
+        throw OrthancException(ErrorCode_ParameterOutOfRange);
+      }
+      else
+      {
+        moveOriginatorId_ = static_cast<uint16_t>(id);
+        moveOriginatorAet_ = aet;
+      }
+    }
+
+    virtual JobStepResult* ExecuteStep()
+    {
+      if (IsDone())
+      {
+        return new JobStepResult(JobStepCode_Success);
+      }
+
+      Open();
+
+      bool ok = false;
+      
+      try
+      {
+        std::string dicom;
+        context_.ReadDicom(dicom, GetCurrentInstance());
+
+        if (HasMoveOriginator())
+        {
+          connection_->Store(dicom, moveOriginatorAet_, moveOriginatorId_);
+        }
+        else
+        {
+          connection_->Store(dicom);
+        }
+
+        ok = true;
+      }
+      catch (OrthancException& e)
+      {
+      }
+
+      if (!ok)
+      {
+        if (permissive_)
+        {
+          failedInstances_.insert(GetCurrentInstance());
+        }
+        else
+        {
+          return new JobStepResult(JobStepCode_Failure);
+        }
+      }
+
+      Next();
+      
+      return new JobStepResult(IsDone() ? JobStepCode_Success : JobStepCode_Continue);
+    }
+
+    virtual void ReleaseResources()   // For pausing jobs
+    {
+      connection_.release();
+    }
+
+    virtual void GetDescription(Json::Value& value)
+    {
+      value["Type"] = "C-STORE";
+      value["LocalAet"] = localAet_;
+      
+      Json::Value v;
+      remote_.ToJson(v);
+      value["Target"] = v;
+
+      if (HasMoveOriginator())
+      {
+        value["MoveOriginatorAET"] = GetMoveOriginatorAet();
+        value["MoveOriginatorID"] = GetMoveOriginatorId();
+      }
+
+      v = Json::arrayValue;
+      for (std::set<std::string>::const_iterator it = failedInstances_.begin();
+           it != failedInstances_.end(); ++it)
+      {
+        v.append(*it);
+      }
+
+      value["FailedInstances"] = v;
+    }
+  };
+}
+
+
+
 TEST(JobsEngine, Basic)
 {
   JobsEngine engine;
@@ -636,10 +957,18 @@
   std::cout << "====================================================" << std::endl;
 
   boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+
+  if (1)
+  {
+    printf(">> %d\n", engine.GetRegistry().SubmitAndWait(new DummyJob(JobStepResult(Orthanc::JobStepCode_Failure)), rand() % 10));
+  }
+
+  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+
   
   engine.Stop();
 
-
+  if (0)
   {
     typedef std::set<std::string> Jobs;