changeset 2600:140a539b4eba jobs

SequenceOfOperationsJob
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 17 May 2018 16:22:40 +0200
parents 593d6b0f4cba
children 5b6c3d77a2a1
files Core/DicomNetworking/DicomUserConnection.cpp Core/DicomNetworking/DicomUserConnection.h Core/JobsEngine/JobsRegistry.cpp UnitTestsSources/MultiThreadingTests.cpp
diffstat 4 files changed, 531 insertions(+), 58 deletions(-) [+]
line wrap: on
line diff
--- a/Core/DicomNetworking/DicomUserConnection.cpp	Wed May 16 19:00:43 2018 +0200
+++ b/Core/DicomNetworking/DicomUserConnection.cpp	Thu May 17 16:22:40 2018 +0200
@@ -1238,10 +1238,21 @@
   }
 
   
-  void DicomUserConnection::SetDefaultTimeout(uint32_t seconds)
+  void SetDefaultTimeout(uint32_t seconds)
   {
     LOG(INFO) << "Default timeout for DICOM connections if Orthanc acts as SCU (client): " 
               << seconds << " seconds (0 = no timeout)";
     defaultTimeout_ = seconds;
   }  
+
+
+  bool DicomUserConnection::IsSameAssociation(const std::string& localAet,
+                                              const RemoteModalityParameters& remote) const
+  {
+    return (localAet_ == localAet &&
+            remoteAet_ == remote.GetApplicationEntityTitle() &&
+            remoteHost_ == remote.GetHost() &&
+            remotePort_ == remote.GetPort() &&
+            manufacturer_ == remote.GetManufacturer());
+  }
 }
--- a/Core/DicomNetworking/DicomUserConnection.h	Wed May 16 19:00:43 2018 +0200
+++ b/Core/DicomNetworking/DicomUserConnection.h	Thu May 17 16:22:40 2018 +0200
@@ -208,5 +208,8 @@
                       ParsedDicomFile& query);
 
     static void SetDefaultTimeout(uint32_t seconds);
+
+    bool IsSameAssociation(const std::string& localAet,
+                           const RemoteModalityParameters& remote) const;
   };
 }
--- a/Core/JobsEngine/JobsRegistry.cpp	Wed May 16 19:00:43 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.cpp	Thu May 17 16:22:40 2018 +0200
@@ -768,6 +768,7 @@
     }
     else if (found->second->GetState() != JobState_Failure)
     {
+      printf("%s\n", EnumerationToString(found->second->GetState()));
       LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
       return false;
     }
--- a/UnitTestsSources/MultiThreadingTests.cpp	Wed May 16 19:00:43 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Thu May 17 16:22:40 2018 +0200
@@ -753,7 +753,8 @@
     enum Type
     {
       Type_DicomInstance,
-      Type_Null
+      Type_Null,
+      Type_String
     };
 
   private:
@@ -779,10 +780,49 @@
   };
 
 
-  class IDicomConnectionProvider : public boost::noncopyable
+  class NullOperationValue : public JobOperationValue
   {
   public:
-    virtual ~IDicomConnectionProvider()
+    NullOperationValue() :
+      JobOperationValue(Type_Null)
+    {
+    }
+
+    virtual JobOperationValue* Clone() const
+    {
+      return new NullOperationValue;
+    }
+  };
+
+
+  class StringOperationValue : public JobOperationValue
+  {
+  private:
+    std::string  content_;
+
+  public:
+    StringOperationValue(const std::string& content) :
+      JobOperationValue(JobOperationValue::Type_String),
+      content_(content)
+    {
+    }
+
+    virtual JobOperationValue* Clone() const
+    {
+      return new StringOperationValue(content_);
+    }
+
+    const std::string& GetContent() const
+    {
+      return content_;
+    }
+  };
+
+
+  class IDicomConnectionManager : public boost::noncopyable
+  {
+  public:
+    virtual ~IDicomConnectionManager()
     {
     }
 
@@ -796,8 +836,8 @@
       virtual DicomUserConnection& GetConnection() = 0;
     };
 
-    virtual IResource* Acquire(const std::string& localAet,
-                               const RemoteModalityParameters& remote) = 0;
+    virtual IResource* AcquireConnection(const std::string& localAet,
+                                         const RemoteModalityParameters& remote) = 0;
   };
 
 
@@ -806,12 +846,6 @@
   private:
     std::vector<JobOperationValue*>   values_;
 
-  public:
-    ~JobOperationValues()
-    {
-      Clear();
-    }
-
     void Append(JobOperationValues& target,
                 bool clear)
     {
@@ -836,6 +870,22 @@
       }
     }
 
+  public:
+    ~JobOperationValues()
+    {
+      Clear();
+    }
+
+    void Move(JobOperationValues& target)
+    {
+      return Append(target, true);
+    }
+
+    void Copy(JobOperationValues& target)
+    {
+      return Append(target, false);
+    }
+
     void Clear()
     {
       for (size_t i = 0; i < values_.size(); i++)
@@ -896,7 +946,34 @@
 
     virtual void Apply(JobOperationValues& outputs,
                        const JobOperationValue& input,
-                       IDicomConnectionProvider& provider);
+                       IDicomConnectionManager& manager) = 0;
+  };
+
+
+  class LogJobOperation : public IJobOperation
+  {
+  public:
+    virtual void Apply(JobOperationValues& outputs,
+                       const JobOperationValue& input,
+                       IDicomConnectionManager& manager)
+    {
+      switch (input.GetType())
+      {
+        case JobOperationValue::Type_String:
+          LOG(INFO) << "Job value: " << dynamic_cast<const StringOperationValue&>(input).GetContent();
+          break;
+
+        case JobOperationValue::Type_Null:
+          LOG(INFO) << "Job value: (null)";
+          break;
+
+        default:
+          LOG(INFO) << "Job value: (unsupport)";
+          break;
+      }
+
+      outputs.Append(input.Clone());
+    }
   };
 
 
@@ -948,9 +1025,10 @@
 
     virtual void Apply(JobOperationValues& outputs,
                        const JobOperationValue& input,
-                       IDicomConnectionProvider& provider)
+                       IDicomConnectionManager& manager)
     {
-      std::auto_ptr<IDicomConnectionProvider::IResource> resource(provider.Acquire(localAet_, modality_));
+      std::auto_ptr<IDicomConnectionManager::IResource> resource
+        (manager.AcquireConnection(localAet_, modality_));
 
       if (resource.get() == NULL)
       {
@@ -997,7 +1075,7 @@
 
     virtual void Apply(JobOperationValues& outputs,
                        const JobOperationValue& input,
-                       IDicomConnectionProvider& provider)
+                       IDicomConnectionManager& manager)
     {
       switch (input.GetType())
       {
@@ -1026,30 +1104,127 @@
   };
 
 
-  class SequenceOfOperationsJob : 
-    public IJob, 
-    private IDicomConnectionProvider
+
+  class TimeoutDicomConnectionManager : public IDicomConnectionManager
   {
   private:
-    /*class DicomConnection
+    class Resource : public IDicomConnectionManager::IResource
     {
     private:
-      boost::posix_time::ptime   lastUse_;
-
-      void Touch()
-      {
-        lastUse_ = boost::posix_time::microsec_clock::universal_time();
-      }
+      TimeoutDicomConnectionManager&  that_;
+      boost::mutex::scoped_lock        lock_;
 
     public:
-      class Resource : public IDicomConnectionProvider::IResource
+      Resource(TimeoutDicomConnectionManager& that) : 
+      that_(that),
+      lock_(that.mutex_)
+      {
+      }
+
+      virtual ~Resource()
+      {
+        that_.Touch();
+      }
+
+      virtual DicomUserConnection& GetConnection()
+      {
+        if (that_.connection_.get() == NULL)
+        {
+          throw OrthancException(ErrorCode_InternalError);
+        }
+
+        return *that_.connection_;
+      }
+    };
+
+    boost::mutex                         mutex_;
+    std::auto_ptr<DicomUserConnection>   connection_;
+    boost::posix_time::ptime             lastUse_;
+    boost::posix_time::time_duration     timeout_;
+
+    static boost::posix_time::ptime GetNow()
+    {
+      return boost::posix_time::microsec_clock::universal_time();
+    }
+
+    void Touch()
+    {
+      lastUse_ = GetNow();
+    }
+
+    void CheckTimeoutInternal()
+    {
+      if (connection_.get() != NULL &&
+          (GetNow() - lastUse_) >= timeout_)
       {
-      private:
-        DicomConnection()
-      };
-      };*/
+        connection_.reset(NULL);
+      }
+    }
+
+  public:
+    TimeoutDicomConnectionManager() :
+      timeout_(boost::posix_time::milliseconds(1000))
+    {
+    }
+
+    void SetTimeout(unsigned int timeout)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      timeout_ = boost::posix_time::milliseconds(timeout);
+      CheckTimeout();
+    }
+
+    unsigned int GetTimeout()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      return timeout_.total_milliseconds();
+    }
+
+    void Close()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      connection_.reset(NULL);
+    }
 
-   class Operation : public boost::noncopyable
+    void CheckTimeout()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckTimeoutInternal();
+    }
+
+    virtual IResource* AcquireConnection(const std::string& localAet,
+                                         const RemoteModalityParameters& remote)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      if (connection_.get() == NULL ||
+          !connection_->IsSameAssociation(localAet, remote))
+      {
+        connection_.reset(new DicomUserConnection(localAet, remote));
+      }
+
+      return new Resource(*this);
+    }
+  };
+
+
+
+  class SequenceOfOperationsJob : public IJob
+  {
+  public:
+    class IObserver : public boost::noncopyable
+    {
+    public:
+      virtual ~IObserver()
+      {
+      }
+
+      virtual void SignalDone(const SequenceOfOperationsJob& job) = 0;
+    };
+
+  private:
+    class Operation : public boost::noncopyable
     {
     private:
       JobOperationValues            originalInputs_;
@@ -1111,7 +1286,7 @@
         return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
       }
 
-      void Step(IDicomConnectionProvider& provider)
+      void Step(IDicomConnectionManager& manager)
       {
         if (IsDone())
         {
@@ -1126,15 +1301,25 @@
         }
         else
         {
-          input = &originalInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
+          input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
         }
 
         JobOperationValues outputs;
-        operation_->Apply(outputs, *input, provider);
+        operation_->Apply(outputs, *input, manager);
 
         if (!nextOperations_.empty())
         {
-          // TODO
+          std::list<Operation*>::iterator first = nextOperations_.begin();
+          outputs.Move((*first)->workInputs_);
+
+          std::list<Operation*>::iterator current = first;
+          ++current;
+
+          while (current != nextOperations_.end())
+          {
+            (*first)->workInputs_.Copy((*current)->workInputs_);
+            ++current;
+          }
         }
 
         currentInput_ += 1;
@@ -1142,16 +1327,39 @@
     };
 
 
-    boost::mutex               mutex_;
-    std::vector<Operation*>    operations_;
-    size_t                     currentOperation_;
-    boost::condition_variable  operationAdded_;
+    std::string                       jobType_;
+    bool                              done_;
+    boost::mutex                      mutex_;
+    std::vector<Operation*>           operations_;
+    size_t                            current_;
+    boost::condition_variable         operationAdded_;
+    TimeoutDicomConnectionManager&    connectionManager_;
+    boost::posix_time::time_duration  trailingTimeout_;
+    std::list<IObserver*>             observers_;
+
+    // Invoked from constructors
+    void Setup()
+    {
+      done_ = false;
+      current_ = 0;
+      trailingTimeout_ = boost::posix_time::milliseconds(1000); 
+    }
 
   public:
-    SequenceOfOperationsJob() :
-      currentOperation_(0)
+    SequenceOfOperationsJob(TimeoutDicomConnectionManager& manager) :
+    jobType_("SequenceOfOperations"),
+    connectionManager_(manager)
     {
-    }
+      Setup();
+    }    
+
+    SequenceOfOperationsJob(const std::string& jobType,
+                            TimeoutDicomConnectionManager& manager) :
+      jobType_(jobType),
+    connectionManager_(manager)
+    {
+      Setup();
+    }    
 
     virtual ~SequenceOfOperationsJob()
     {
@@ -1164,6 +1372,15 @@
       }
     }
 
+    void Register(IObserver& observer)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      observers_.push_back(&observer);
+    }
+
+    // This lock allows adding new operations to the end of the job,
+    // from another thread than the worker thread, after the job has
+    // been submitted for processing
     class Lock : public boost::noncopyable
     {
     private:
@@ -1177,19 +1394,48 @@
       {
       }
 
+      bool IsDone() const
+      {
+        return that_.done_;
+      }
+
+      void SetDicomConnectionTimeout(unsigned int timeout)
+      {
+        that_.connectionManager_.SetTimeout(timeout);
+      }
+
+      void SetTrailingOperationTimeout(unsigned int timeout)
+      {
+        that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
+      }
+
       size_t AddOperation(IJobOperation* operation)
       {
+        if (IsDone())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+
         that_.operations_.push_back(new Operation(operation));
         that_.operationAdded_.notify_one();
 
         return that_.operations_.size() - 1;
       }
 
+      size_t GetOperationsCount() const
+      {
+        return that_.operations_.size();
+      }
+
       void AddInput(size_t index,
                     const JobOperationValue& value)
       {
-        if (index >= that_.operations_.size() ||
-            index < that_.currentOperation_)
+        if (IsDone())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else if (index >= that_.operations_.size() ||
+                 index < that_.current_)
         {
           throw OrthancException(ErrorCode_ParameterOutOfRange);
         }
@@ -1202,11 +1448,15 @@
       void Connect(size_t input,
                    size_t output)
       {
-        if (input >= output ||
-            input >= that_.operations_.size() ||
-            output >= that_.operations_.size() ||
-            input < that_.currentOperation_ ||
-            output < that_.currentOperation_)
+        if (IsDone())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else if (input >= output ||
+                 input >= that_.operations_.size() ||
+                 output >= that_.operations_.size() ||
+                 input < that_.current_ ||
+                 output < that_.current_)
         {
           throw OrthancException(ErrorCode_ParameterOutOfRange);
         }
@@ -1223,13 +1473,58 @@
     {
     }
 
-    virtual JobStepResult ExecuteStep() = 0;
+    virtual JobStepResult ExecuteStep()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      if (current_ == operations_.size())
+      {
+        LOG(INFO) << "Executing the trailing timeout in the sequence of operations";
+        operationAdded_.timed_wait(lock, trailingTimeout_);
+            
+        if (current_ == operations_.size())
+        {
+          // No operation was added during the trailing timeout: The
+          // job is over
+          LOG(INFO) << "The sequence of operations is over";
+          done_ = true;
+
+          for (std::list<IObserver*>::iterator it = observers_.begin(); 
+               it != observers_.end(); ++it)
+          {
+            (*it)->SignalDone(*this);
+          }
+
+          return JobStepResult::Success();
+        }
+        else
+        {
+          LOG(INFO) << "New operation added to the sequence of operations";
+        }
+      }
+
+      assert(current_ < operations_.size());
+
+      while (current_ < operations_.size() &&
+             operations_[current_]->IsDone())
+      {
+        current_++;
+      }
+
+      if (current_ < operations_.size())
+      {
+        operations_[current_]->Step(connectionManager_);
+      }
+
+      return JobStepResult::Continue();
+    }
 
     virtual void SignalResubmit()
     {
       boost::mutex::scoped_lock lock(mutex_);
       
-      currentOperation_ = 0;
+      current_ = 0;
+      done_ = false;
 
       for (size_t i = 0; i < operations_.size(); i++)
       {
@@ -1237,14 +1532,177 @@
       }
     }
 
-    virtual void ReleaseResources() = 0;   // For pausing/canceling jobs
+    virtual void ReleaseResources()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      connectionManager_.Close();
+    }
+
+    virtual float GetProgress()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      
+      return (static_cast<float>(current_) / 
+              static_cast<float>(operations_.size() + 1));
+    }
+
+    virtual void GetJobType(std::string& target)
+    {
+      target = jobType_;
+    }
+
+    virtual void GetPublicContent(Json::Value& value)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      value["CountOperations"] = static_cast<unsigned int>(operations_.size());
+    }
 
-    virtual float GetProgress() = 0;
+    virtual void GetInternalContent(Json::Value& value)
+    {
+      // TODO
+    }
+  };
+
+
+  class LuaJobManager : private SequenceOfOperationsJob::IObserver
+  {
+  public:
+    typedef SequenceOfOperationsJob::Lock  Lock;
+
+  private:
+    boost::mutex                   mutex_;
+    JobsEngine&                    engine_;
+    TimeoutDicomConnectionManager manager_;
+    std::string                    currentId_;
+    SequenceOfOperationsJob*       currentJob_;
+    size_t                         maxOperations_;
+    int                            priority_;
+    unsigned int                   trailingTimeout_;
+
+    virtual void SignalDone(const SequenceOfOperationsJob& job)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
 
-    virtual void GetJobType(std::string& target) = 0;
-    
-    virtual void GetPublicContent(Json::Value& value) = 0;
+      if (&job == currentJob_)
+      {
+        currentId_.clear();
+        currentJob_ = NULL;
+      }
+    }
+
+  public:
+    LuaJobManager(JobsEngine&  engine) :
+    engine_(engine),
+    currentJob_(NULL),
+    maxOperations_(1000),
+    priority_(0)
+    {
+    }
+
+    void SetMaxOperationsPerJob(size_t count)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      maxOperations_ = count;
+    }
+
+    void SetPriority(int priority)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      priority_ = priority;
+    }
 
-    virtual void GetInternalContent(Json::Value& value) = 0;
+    void SetTrailingOperationTimeout(unsigned int timeout)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      trailingTimeout_ = timeout;
+    }
+
+    Lock* Modify()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      if (currentJob_ != NULL)
+      {
+        std::auto_ptr<Lock> result(new Lock(*currentJob_));
+
+        if (!result->IsDone() &&
+            result->GetOperationsCount() < maxOperations_)
+        {
+          return result.release();
+        }
+      }
+
+      // Need to create a new job, as the previous one is either
+      // finished, or is getting too long
+      currentJob_ = new SequenceOfOperationsJob(manager_);
+      engine_.GetRegistry().Submit(currentId_, currentJob_, priority_);
+
+      std::auto_ptr<Lock> result(new Lock(*currentJob_));
+      result->SetTrailingOperationTimeout(trailingTimeout_);
+
+      return result.release();
+    }
   };
 }
+
+
+TEST(JobsEngine, DISABLED_SequenceOfOperationsJob)
+{
+  TimeoutDicomConnectionManager manager;
+  JobsEngine engine;
+  engine.SetWorkersCount(3);
+  engine.Start();
+
+  std::string id;
+  SequenceOfOperationsJob* job = NULL;
+
+  {
+    std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob(manager));
+    job = a.get();
+    engine.GetRegistry().Submit(id, a.release(), 0);
+  }
+
+  boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+
+  {
+    SequenceOfOperationsJob::Lock lock(*job);
+    size_t i = lock.AddOperation(new LogJobOperation);
+    size_t j = lock.AddOperation(new LogJobOperation);
+    size_t k = lock.AddOperation(new LogJobOperation);
+    lock.AddInput(i, StringOperationValue("Hello"));
+    lock.AddInput(i, StringOperationValue("World"));
+    lock.Connect(i, j);
+    lock.Connect(j, k);
+  }
+
+  boost::this_thread::sleep(boost::posix_time::milliseconds(2000));
+
+  engine.Stop();
+
+}
+
+
+TEST(JobsEngine, Lua)
+{
+  JobsEngine engine;
+  engine.SetWorkersCount(2);
+  engine.Start();
+
+  LuaJobManager lua(engine);
+  lua.SetMaxOperationsPerJob(5);
+  lua.SetTrailingOperationTimeout(200);
+
+  for (size_t i = 0; i < 30; i++)
+  {
+    boost::this_thread::sleep(boost::posix_time::milliseconds(150));
+    std::auto_ptr<LuaJobManager::Lock> lock(lua.Modify());
+    size_t a = lock->AddOperation(new LogJobOperation);
+    lock->AddInput(a, StringOperationValue(boost::lexical_cast<std::string>(i)));
+  }
+
+  boost::this_thread::sleep(boost::posix_time::milliseconds(2000));
+
+  engine.Stop();
+
+}