changeset 2599:593d6b0f4cba jobs

IJobOperation
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 16 May 2018 19:00:43 +0200
parents 34dc57f4a7d2
children 140a539b4eba
files UnitTestsSources/MultiThreadingTests.cpp
diffstat 1 files changed, 492 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp	Wed May 16 16:23:20 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Wed May 16 19:00:43 2018 +0200
@@ -272,8 +272,6 @@
     
   virtual JobStepResult ExecuteStep()
   {
-    boost::this_thread::sleep(boost::posix_time::milliseconds(10));
-
     if (fails_)
     {
       return JobStepResult::Failure(ErrorCode_ParameterOutOfRange);
@@ -728,77 +726,525 @@
 
 
 
-
-TEST(JobsEngine, Basic)
+TEST(JobsEngine, SubmitAndWait)
 {
   JobsEngine engine;
-
-  std::string s;
-
-  for (size_t i = 0; i < 20; i++)
-    engine.GetRegistry().Submit(s, new DummyJob(), rand() % 10);
-
   engine.SetWorkersCount(3);
   engine.Start();
 
-  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+  ASSERT_TRUE(engine.GetRegistry().SubmitAndWait(new DummyJob(), rand() % 10));
+  ASSERT_FALSE(engine.GetRegistry().SubmitAndWait(new DummyJob(true), rand() % 10));
+
+  engine.Stop();
+}
+
+
 
+
+
+#include "../OrthancServer/ServerContext.h"
+#include "../Core/Logging.h"
+
+namespace
+{
+  class JobOperationValue : public boost::noncopyable
   {
-    typedef std::set<std::string> Jobs;
+  public:
+    enum Type
+    {
+      Type_DicomInstance,
+      Type_Null
+    };
+
+  private:
+    Type  type_;
+
+  protected:
+    JobOperationValue(Type type) :
+      type_(type)
+    {
+    }
+
+  public:
+    virtual ~JobOperationValue()
+    {
+    }
 
-    Jobs jobs;
-    engine.GetRegistry().ListJobs(jobs);
+    Type GetType() const
+    {
+      return type_;
+    }
+
+    virtual JobOperationValue* Clone() const = 0;
+  };
+
+
+  class IDicomConnectionProvider : public boost::noncopyable
+  {
+  public:
+    virtual ~IDicomConnectionProvider()
+    {
+    }
+
+    class IResource : public boost::noncopyable
+    {
+    public:
+      virtual ~IResource()
+      {
+      }
+
+      virtual DicomUserConnection& GetConnection() = 0;
+    };
 
-    Json::Value v = Json::arrayValue;
-    for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it)
+    virtual IResource* Acquire(const std::string& localAet,
+                               const RemoteModalityParameters& remote) = 0;
+  };
+
+
+  class JobOperationValues : public boost::noncopyable
+  {
+  private:
+    std::vector<JobOperationValue*>   values_;
+
+  public:
+    ~JobOperationValues()
     {
-      JobInfo info;
+      Clear();
+    }
+
+    void Append(JobOperationValues& target,
+                bool clear)
+    {
+      target.Reserve(target.GetSize() + GetSize());
+
+      for (size_t i = 0; i < values_.size(); i++)
+      {
+        if (clear)
+        {
+          target.Append(values_[i]);
+          values_[i] = NULL;
+        }
+        else
+        {
+          target.Append(GetValue(i).Clone());
+        }
+      }
 
-      if (engine.GetRegistry().GetJobInfo(info, *it))
+      if (clear)
+      {
+        Clear();
+      }
+    }
+
+    void Clear()
+    {
+      for (size_t i = 0; i < values_.size(); i++)
       {
-        Json::Value vv;
-        info.Serialize(vv, true);
-        v.append(vv);
+        if (values_[i] != NULL)
+        {
+          delete values_[i];
+        }
+      }
+
+      values_.clear();
+    }
+
+    void Reserve(size_t count)
+    {
+      values_.reserve(count);
+    }
+
+    void Append(JobOperationValue* value)  // Takes ownership
+    {
+      if (value == NULL)
+      {
+        throw OrthancException(ErrorCode_NullPointer);
+      }
+      else
+      {
+        values_.push_back(value);
       }
     }
 
-    std::cout << v << std::endl;
-  }
-  std::cout << "====================================================" << std::endl;
+    size_t GetSize() const
+    {
+      return values_.size();
+    }
+
+    JobOperationValue& GetValue(size_t index) const
+    {
+      if (index >= values_.size())
+      {
+        throw OrthancException(ErrorCode_ParameterOutOfRange);
+      }
+      else
+      {
+        assert(values_[index] != NULL);
+        return *values_[index];
+      }
+    }
+  };
+
+
+
+  class IJobOperation : public boost::noncopyable
+  {
+  public:
+    virtual ~IJobOperation()
+    {
+    }
+
+    virtual void Apply(JobOperationValues& outputs,
+                       const JobOperationValue& input,
+                       IDicomConnectionProvider& provider);
+  };
+
 
-  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+  class DicomInstanceValue : public JobOperationValue
+  {
+  private:
+    ServerContext&   context_;
+    std::string      id_;
+
+  public:
+    DicomInstanceValue(ServerContext& context,
+                       const std::string& id) :
+      JobOperationValue(Type_DicomInstance),
+      context_(context),
+      id_(id)
+    {
+    }
 
-  if (1)
+    const std::string& GetId() const
+    {
+      return id_;
+    }
+
+    void ReadContent(std::string& dicom) const
+    {
+      context_.ReadDicom(dicom, id_);
+    }
+
+    virtual JobOperationValue* Clone() const
+    {
+      return new DicomInstanceValue(context_, id_);
+    }
+  };
+
+
+  class StoreScuOperation : public IJobOperation
   {
-    ASSERT_TRUE(engine.GetRegistry().SubmitAndWait(new DummyJob(), rand() % 10));
-    ASSERT_FALSE(engine.GetRegistry().SubmitAndWait(new DummyJob(true), rand() % 10));
-  }
+  private:
+    std::string              localAet_;
+    RemoteModalityParameters modality_;
+
+  public:
+    StoreScuOperation(const std::string& localAet,
+                      const RemoteModalityParameters& modality) :
+      localAet_(localAet),
+      modality_(modality)
+    {
+    }
+
+    virtual void Apply(JobOperationValues& outputs,
+                       const JobOperationValue& input,
+                       IDicomConnectionProvider& provider)
+    {
+      std::auto_ptr<IDicomConnectionProvider::IResource> resource(provider.Acquire(localAet_, modality_));
+
+      if (resource.get() == NULL)
+      {
+        LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle();
+        return;
+      }
+
+      if (input.GetType() != JobOperationValue::Type_DicomInstance)
+      {
+        throw OrthancException(ErrorCode_BadParameterType);
+      }
+
+      const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input);
+
+      LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \"" 
+                << modality_.GetApplicationEntityTitle() << "\"";
 
-  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+      try
+      {
+        std::string dicom;
+        instance.ReadContent(dicom);
+        resource->GetConnection().Store(dicom);
+        outputs.Append(instance.Clone());
+      }
+      catch (OrthancException& e)
+      {
+        LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \"" 
+                   << modality_.GetApplicationEntityTitle() << "\": " << e.What();
+      }
+    }
+  };
+
 
-  
-  engine.Stop();
+  class DeleteResourceOperation : public IJobOperation
+  {
+  private:
+    ServerContext&  context_;
+
+  public:
+    DeleteResourceOperation(ServerContext& context) :
+    context_(context)
+    {
+    }
+
+    virtual void Apply(JobOperationValues& outputs,
+                       const JobOperationValue& input,
+                       IDicomConnectionProvider& provider)
+    {
+      switch (input.GetType())
+      {
+        case JobOperationValue::Type_DicomInstance:
+        {
+          const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input);
+          LOG(INFO) << "Deleting instance: " << instance.GetId();
 
-  if (0)
+          try
+          {
+            Json::Value tmp;
+            context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance);
+          }
+          catch (OrthancException& e)
+          {
+            LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What();
+          }
+
+          break;
+        }
+
+        default:
+          break;
+      }
+    }
+  };
+
+
+  class SequenceOfOperationsJob : 
+    public IJob, 
+    private IDicomConnectionProvider
   {
-    typedef std::set<std::string> Jobs;
+  private:
+    /*class DicomConnection
+    {
+    private:
+      boost::posix_time::ptime   lastUse_;
+
+      void Touch()
+      {
+        lastUse_ = boost::posix_time::microsec_clock::universal_time();
+      }
+
+    public:
+      class Resource : public IDicomConnectionProvider::IResource
+      {
+      private:
+        DicomConnection()
+      };
+      };*/
 
-    Jobs jobs;
-    engine.GetRegistry().ListJobs(jobs);
+   class Operation : public boost::noncopyable
+    {
+    private:
+      JobOperationValues            originalInputs_;
+      JobOperationValues            workInputs_;
+      std::auto_ptr<IJobOperation>  operation_;
+      std::list<Operation*>         nextOperations_;
+      size_t                        currentInput_;
+
+    public:
+      Operation(IJobOperation* operation) :
+      operation_(operation),
+      currentInput_(0)
+      {
+        if (operation == NULL)
+        {
+          throw OrthancException(ErrorCode_NullPointer);
+        }
+      }
+
+      void AddOriginalInput(const JobOperationValue& value)
+      {
+        if (currentInput_ != 0)
+        {
+          // Cannot add input after processing has started
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else
+        {
+          originalInputs_.Append(value.Clone());
+        }
+      }
 
-    Json::Value v = Json::arrayValue;
-    for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it)
-    {
-      JobInfo info;
+      const JobOperationValues& GetOriginalInputs() const
+      {
+        return originalInputs_;
+      }
+
+      void Reset()
+      {
+        workInputs_.Clear();
+        currentInput_ = 0;
+      }
+
+      void AddNextOperation(Operation& other)
+      {
+        if (currentInput_ != 0)
+        {
+          // Cannot add input after processing has started
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else
+        {
+          nextOperations_.push_back(&other);
+        }
+      }
+
+      bool IsDone() const
+      {
+        return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
+      }
+
+      void Step(IDicomConnectionProvider& provider)
+      {
+        if (IsDone())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+
+        const JobOperationValue* input;
 
-      if (engine.GetRegistry().GetJobInfo(info, *it))
+        if (currentInput_ < originalInputs_.GetSize())
+        {
+          input = &originalInputs_.GetValue(currentInput_);
+        }
+        else
+        {
+          input = &originalInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
+        }
+
+        JobOperationValues outputs;
+        operation_->Apply(outputs, *input, provider);
+
+        if (!nextOperations_.empty())
+        {
+          // TODO
+        }
+
+        currentInput_ += 1;
+      }
+    };
+
+
+    boost::mutex               mutex_;
+    std::vector<Operation*>    operations_;
+    size_t                     currentOperation_;
+    boost::condition_variable  operationAdded_;
+
+  public:
+    SequenceOfOperationsJob() :
+      currentOperation_(0)
+    {
+    }
+
+    virtual ~SequenceOfOperationsJob()
+    {
+      for (size_t i = 0; i < operations_.size(); i++)
       {
-        Json::Value vv;
-        info.Serialize(vv, true);
-        v.append(vv);
+        if (operations_[i] != NULL)
+        {
+          delete operations_[i];
+        }
       }
     }
 
-    std::cout << v << std::endl;
-  }
+    class Lock : public boost::noncopyable
+    {
+    private:
+      SequenceOfOperationsJob&   that_;
+      boost::mutex::scoped_lock  lock_;
+
+    public:
+      Lock(SequenceOfOperationsJob& that) :
+      that_(that),
+      lock_(that.mutex_)
+      {
+      }
+
+      size_t AddOperation(IJobOperation* operation)
+      {
+        that_.operations_.push_back(new Operation(operation));
+        that_.operationAdded_.notify_one();
+
+        return that_.operations_.size() - 1;
+      }
+
+      void AddInput(size_t index,
+                    const JobOperationValue& value)
+      {
+        if (index >= that_.operations_.size() ||
+            index < that_.currentOperation_)
+        {
+          throw OrthancException(ErrorCode_ParameterOutOfRange);
+        }
+        else
+        {
+          that_.operations_[index]->AddOriginalInput(value);
+        }
+      }
+      
+      void Connect(size_t input,
+                   size_t output)
+      {
+        if (input >= output ||
+            input >= that_.operations_.size() ||
+            output >= that_.operations_.size() ||
+            input < that_.currentOperation_ ||
+            output < that_.currentOperation_)
+        {
+          throw OrthancException(ErrorCode_ParameterOutOfRange);
+        }
+        else
+        {
+          Operation& a = *that_.operations_[input];
+          Operation& b = *that_.operations_[output];
+          a.AddNextOperation(b);
+        }
+      }
+    };
+
+    virtual void Start()
+    {
+    }
+
+    virtual JobStepResult ExecuteStep() = 0;
+
+    virtual void SignalResubmit()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      
+      currentOperation_ = 0;
+
+      for (size_t i = 0; i < operations_.size(); i++)
+      {
+        operations_[i]->Reset();
+      }
+    }
+
+    virtual void ReleaseResources() = 0;   // For pausing/canceling jobs
+
+    virtual float GetProgress() = 0;
+
+    virtual void GetJobType(std::string& target) = 0;
+    
+    virtual void GetPublicContent(Json::Value& value) = 0;
+
+    virtual void GetInternalContent(Json::Value& value) = 0;
+  };
 }