diff UnitTestsSources/MultiThreadingTests.cpp @ 2601:5b6c3d77a2a1 jobs

reorganization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 17 May 2018 17:03:40 +0200
parents 140a539b4eba
children 988936118354
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp	Thu May 17 16:22:40 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Thu May 17 17:03:40 2018 +0200
@@ -745,238 +745,13 @@
 #include "../OrthancServer/ServerContext.h"
 #include "../Core/Logging.h"
 
-namespace
-{
-  class JobOperationValue : public boost::noncopyable
-  {
-  public:
-    enum Type
-    {
-      Type_DicomInstance,
-      Type_Null,
-      Type_String
-    };
-
-  private:
-    Type  type_;
-
-  protected:
-    JobOperationValue(Type type) :
-      type_(type)
-    {
-    }
-
-  public:
-    virtual ~JobOperationValue()
-    {
-    }
-
-    Type GetType() const
-    {
-      return type_;
-    }
-
-    virtual JobOperationValue* Clone() const = 0;
-  };
-
-
-  class NullOperationValue : public JobOperationValue
-  {
-  public:
-    NullOperationValue() :
-      JobOperationValue(Type_Null)
-    {
-    }
-
-    virtual JobOperationValue* Clone() const
-    {
-      return new NullOperationValue;
-    }
-  };
-
-
-  class StringOperationValue : public JobOperationValue
-  {
-  private:
-    std::string  content_;
-
-  public:
-    StringOperationValue(const std::string& content) :
-      JobOperationValue(JobOperationValue::Type_String),
-      content_(content)
-    {
-    }
-
-    virtual JobOperationValue* Clone() const
-    {
-      return new StringOperationValue(content_);
-    }
-
-    const std::string& GetContent() const
-    {
-      return content_;
-    }
-  };
-
-
-  class IDicomConnectionManager : public boost::noncopyable
-  {
-  public:
-    virtual ~IDicomConnectionManager()
-    {
-    }
-
-    class IResource : public boost::noncopyable
-    {
-    public:
-      virtual ~IResource()
-      {
-      }
-
-      virtual DicomUserConnection& GetConnection() = 0;
-    };
-
-    virtual IResource* AcquireConnection(const std::string& localAet,
-                                         const RemoteModalityParameters& remote) = 0;
-  };
-
-
-  class JobOperationValues : public boost::noncopyable
-  {
-  private:
-    std::vector<JobOperationValue*>   values_;
-
-    void Append(JobOperationValues& target,
-                bool clear)
-    {
-      target.Reserve(target.GetSize() + GetSize());
+#include "../Core/DicomNetworking/IDicomConnectionManager.h"
+#include "../Core/JobsEngine/Operations/JobOperationValues.h"
+#include "../Core/JobsEngine/Operations/StringOperationValue.h"
+#include "../Core/JobsEngine/Operations/LogJobOperation.h"
 
-      for (size_t i = 0; i < values_.size(); i++)
-      {
-        if (clear)
-        {
-          target.Append(values_[i]);
-          values_[i] = NULL;
-        }
-        else
-        {
-          target.Append(GetValue(i).Clone());
-        }
-      }
-
-      if (clear)
-      {
-        Clear();
-      }
-    }
-
-  public:
-    ~JobOperationValues()
-    {
-      Clear();
-    }
-
-    void Move(JobOperationValues& target)
-    {
-      return Append(target, true);
-    }
-
-    void Copy(JobOperationValues& target)
-    {
-      return Append(target, false);
-    }
-
-    void Clear()
-    {
-      for (size_t i = 0; i < values_.size(); i++)
-      {
-        if (values_[i] != NULL)
-        {
-          delete values_[i];
-        }
-      }
-
-      values_.clear();
-    }
-
-    void Reserve(size_t count)
-    {
-      values_.reserve(count);
-    }
-
-    void Append(JobOperationValue* value)  // Takes ownership
-    {
-      if (value == NULL)
-      {
-        throw OrthancException(ErrorCode_NullPointer);
-      }
-      else
-      {
-        values_.push_back(value);
-      }
-    }
-
-    size_t GetSize() const
-    {
-      return values_.size();
-    }
-
-    JobOperationValue& GetValue(size_t index) const
-    {
-      if (index >= values_.size())
-      {
-        throw OrthancException(ErrorCode_ParameterOutOfRange);
-      }
-      else
-      {
-        assert(values_[index] != NULL);
-        return *values_[index];
-      }
-    }
-  };
-
-
-
-  class IJobOperation : public boost::noncopyable
-  {
-  public:
-    virtual ~IJobOperation()
-    {
-    }
-
-    virtual void Apply(JobOperationValues& outputs,
-                       const JobOperationValue& input,
-                       IDicomConnectionManager& manager) = 0;
-  };
-
-
-  class LogJobOperation : public IJobOperation
-  {
-  public:
-    virtual void Apply(JobOperationValues& outputs,
-                       const JobOperationValue& input,
-                       IDicomConnectionManager& manager)
-    {
-      switch (input.GetType())
-      {
-        case JobOperationValue::Type_String:
-          LOG(INFO) << "Job value: " << dynamic_cast<const StringOperationValue&>(input).GetContent();
-          break;
-
-        case JobOperationValue::Type_Null:
-          LOG(INFO) << "Job value: (null)";
-          break;
-
-        default:
-          LOG(INFO) << "Job value: (unsupport)";
-          break;
-      }
-
-      outputs.Append(input.Clone());
-    }
-  };
-
-
+namespace Orthanc
+{
   class DicomInstanceValue : public JobOperationValue
   {
   private:
@@ -1172,7 +947,7 @@
       boost::mutex::scoped_lock lock(mutex_);
 
       timeout_ = boost::posix_time::milliseconds(timeout);
-      CheckTimeout();
+      CheckTimeoutInternal();
     }
 
     unsigned int GetTimeout()
@@ -1286,7 +1061,7 @@
         return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
       }
 
-      void Step(IDicomConnectionManager& manager)
+      void Step()
       {
         if (IsDone())
         {
@@ -1305,7 +1080,7 @@
         }
 
         JobOperationValues outputs;
-        operation_->Apply(outputs, *input, manager);
+        operation_->Apply(outputs, *input);
 
         if (!nextOperations_.empty())
         {
@@ -1333,7 +1108,6 @@
     std::vector<Operation*>           operations_;
     size_t                            current_;
     boost::condition_variable         operationAdded_;
-    TimeoutDicomConnectionManager&    connectionManager_;
     boost::posix_time::time_duration  trailingTimeout_;
     std::list<IObserver*>             observers_;
 
@@ -1346,17 +1120,14 @@
     }
 
   public:
-    SequenceOfOperationsJob(TimeoutDicomConnectionManager& manager) :
-    jobType_("SequenceOfOperations"),
-    connectionManager_(manager)
+    SequenceOfOperationsJob() :
+      jobType_("SequenceOfOperations")
     {
       Setup();
     }    
 
-    SequenceOfOperationsJob(const std::string& jobType,
-                            TimeoutDicomConnectionManager& manager) :
-      jobType_(jobType),
-    connectionManager_(manager)
+    SequenceOfOperationsJob(const std::string& jobType) :
+      jobType_(jobType)
     {
       Setup();
     }    
@@ -1399,11 +1170,6 @@
         return that_.done_;
       }
 
-      void SetDicomConnectionTimeout(unsigned int timeout)
-      {
-        that_.connectionManager_.SetTimeout(timeout);
-      }
-
       void SetTrailingOperationTimeout(unsigned int timeout)
       {
         that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
@@ -1513,7 +1279,7 @@
 
       if (current_ < operations_.size())
       {
-        operations_[current_]->Step(connectionManager_);
+        operations_[current_]->Step();
       }
 
       return JobStepResult::Continue();
@@ -1534,8 +1300,6 @@
 
     virtual void ReleaseResources()
     {
-      boost::mutex::scoped_lock lock(mutex_);
-      connectionManager_.Close();
     }
 
     virtual float GetProgress()
@@ -1573,13 +1337,24 @@
   private:
     boost::mutex                   mutex_;
     JobsEngine&                    engine_;
-    TimeoutDicomConnectionManager manager_;
+    TimeoutDicomConnectionManager  connectionManager_;
     std::string                    currentId_;
     SequenceOfOperationsJob*       currentJob_;
     size_t                         maxOperations_;
     int                            priority_;
     unsigned int                   trailingTimeout_;
+    bool                           continue_;
+    boost::thread                  connectionTimeoutThread_;
 
+    static void ConnectionTimeoutThread(LuaJobManager* manager)
+    {
+      while (manager->continue_)
+      {
+        manager->connectionManager_.CheckTimeout();
+        boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+      }
+    }
+    
     virtual void SignalDone(const SequenceOfOperationsJob& job)
     {
       boost::mutex::scoped_lock lock(mutex_);
@@ -1596,8 +1371,20 @@
     engine_(engine),
     currentJob_(NULL),
     maxOperations_(1000),
-    priority_(0)
+    priority_(0),
+    continue_(true)
     {
+      connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this);
+    }
+
+    ~LuaJobManager()
+    {
+      continue_ = false;
+
+      if (connectionTimeoutThread_.joinable())
+      {
+        connectionTimeoutThread_.join();
+      }
     }
 
     void SetMaxOperationsPerJob(size_t count)
@@ -1635,7 +1422,7 @@
 
       // Need to create a new job, as the previous one is either
       // finished, or is getting too long
-      currentJob_ = new SequenceOfOperationsJob(manager_);
+      currentJob_ = new SequenceOfOperationsJob;
       engine_.GetRegistry().Submit(currentId_, currentJob_, priority_);
 
       std::auto_ptr<Lock> result(new Lock(*currentJob_));
@@ -1649,7 +1436,6 @@
 
 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob)
 {
-  TimeoutDicomConnectionManager manager;
   JobsEngine engine;
   engine.SetWorkersCount(3);
   engine.Start();
@@ -1658,7 +1444,7 @@
   SequenceOfOperationsJob* job = NULL;
 
   {
-    std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob(manager));
+    std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob);
     job = a.get();
     engine.GetRegistry().Submit(id, a.release(), 0);
   }