diff UnitTestsSources/MultiThreadingTests.cpp @ 2570:2e879c796ec7 jobs

JobsRegistry::SubmitAndWait(), StoreScuJob
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 07 May 2018 21:42:04 +0200
parents 2af17cd5eb1f
children 3372c5255333
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp	Mon May 07 15:37:20 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Mon May 07 21:42:04 2018 +0200
@@ -263,9 +263,13 @@
   {
   }
 
+  virtual void Start()
+  {
+  }
+    
   virtual JobStepResult* ExecuteStep()
   {
-    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(10));
 
     if (count_ == steps_ - 1)
     {
@@ -598,6 +602,323 @@
 }
 
 
+
+#include "../OrthancServer/ServerContext.h"
+
+namespace Orthanc
+{
+  class InstancesIteratorJob : public IJob
+  {
+  private:
+    bool                      started_;
+    std::vector<std::string>  instances_;
+    size_t                    position_;
+
+  public:
+    InstancesIteratorJob() :
+      started_(false),
+      position_(0)
+    {
+    }
+
+    void Reserve(size_t size)
+    {
+      if (started_)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        instances_.reserve(size);
+      }
+    }
+    
+    void AddInstance(const std::string& instance)
+    {
+      if (started_)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        instances_.push_back(instance);
+      }
+    }
+    
+    virtual void Start()
+    {
+      started_ = true;
+    }
+    
+    virtual float GetProgress()
+    {
+      if (instances_.size() == 0)
+      {
+        return 0;
+      }
+      else
+      {
+        return (static_cast<float>(position_) /
+                static_cast<float>(instances_.size()));
+      }
+    }
+
+    bool IsStarted() const
+    {
+      return started_;
+    }
+
+    bool IsDone() const
+    {
+      if (instances_.size() == 0)
+      {
+        return true;
+      }
+      else
+      {
+        return (position_ == instances_.size() - 1);
+      }
+    }
+
+    void Next()
+    {
+      if (IsDone())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        position_ += 1;
+      }
+    }
+
+    const std::string& GetCurrentInstance() const
+    {
+      if (IsDone())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        return instances_[position_];
+      }      
+    }
+  };
+
+
+  class StoreScuJob : public InstancesIteratorJob
+  {
+  private:
+    ServerContext&                      context_;
+    std::string                         localAet_;
+    RemoteModalityParameters            remote_;
+    bool                                permissive_;
+    std::string                         moveOriginatorAet_;
+    uint16_t                            moveOriginatorId_;
+    std::auto_ptr<DicomUserConnection>  connection_;
+    std::set<std::string>               failedInstances_;
+
+    void Open()
+    {
+      if (connection_.get() == NULL)
+      {
+        connection_.reset(new DicomUserConnection);
+        connection_->SetLocalApplicationEntityTitle(localAet_);
+        connection_->SetRemoteModality(remote_);
+        connection_->Open();
+      }
+    }
+    
+  public:
+    StoreScuJob(ServerContext& context) :
+      context_(context),
+      localAet_("ORTHANC"),
+      permissive_(false),
+      moveOriginatorId_(0)  // By default, not a C-MOVE
+    {
+    }
+
+    const std::string& GetLocalAet() const
+    {
+      return localAet_;
+    }
+
+    void SetLocalAet(const std::string& aet)
+    {
+      if (IsStarted())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        localAet_ = aet;
+      }
+    }
+
+    const RemoteModalityParameters& GetRemoteModality() const
+    {
+      return remote_;
+    }
+
+    void SetRemoteModality(const RemoteModalityParameters& remote)
+    {
+      if (IsStarted())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        remote_ = remote;
+      }
+    }
+
+    bool IsPermissive() const
+    {
+      return permissive_;
+    }
+
+    void SetPermissive(bool permissive)
+    {
+      if (IsStarted())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        permissive_ = permissive;
+      }
+    }
+
+    bool HasMoveOriginator() const
+    {
+      return moveOriginatorId_ != 0;
+    }
+    
+    const std::string& GetMoveOriginatorAet() const
+    {
+      if (HasMoveOriginator())
+      {
+        return moveOriginatorAet_;
+      }
+      else
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+    
+    uint16_t GetMoveOriginatorId() const
+    {
+      if (HasMoveOriginator())
+      {
+        return moveOriginatorId_;
+      }
+      else
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+
+    void SetMoveOriginator(const std::string& aet,
+                           int id)
+    {
+      if (IsStarted())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else if (id < 0 || 
+               id >= 65536)
+      {
+        throw OrthancException(ErrorCode_ParameterOutOfRange);
+      }
+      else
+      {
+        moveOriginatorId_ = static_cast<uint16_t>(id);
+        moveOriginatorAet_ = aet;
+      }
+    }
+
+    virtual JobStepResult* ExecuteStep()
+    {
+      if (IsDone())
+      {
+        return new JobStepResult(JobStepCode_Success);
+      }
+
+      Open();
+
+      bool ok = false;
+      
+      try
+      {
+        std::string dicom;
+        context_.ReadDicom(dicom, GetCurrentInstance());
+
+        if (HasMoveOriginator())
+        {
+          connection_->Store(dicom, moveOriginatorAet_, moveOriginatorId_);
+        }
+        else
+        {
+          connection_->Store(dicom);
+        }
+
+        ok = true;
+      }
+      catch (OrthancException& e)
+      {
+      }
+
+      if (!ok)
+      {
+        if (permissive_)
+        {
+          failedInstances_.insert(GetCurrentInstance());
+        }
+        else
+        {
+          return new JobStepResult(JobStepCode_Failure);
+        }
+      }
+
+      Next();
+      
+      return new JobStepResult(IsDone() ? JobStepCode_Success : JobStepCode_Continue);
+    }
+
+    virtual void ReleaseResources()   // For pausing jobs
+    {
+      connection_.release();
+    }
+
+    virtual void GetDescription(Json::Value& value)
+    {
+      value["Type"] = "C-STORE";
+      value["LocalAet"] = localAet_;
+      
+      Json::Value v;
+      remote_.ToJson(v);
+      value["Target"] = v;
+
+      if (HasMoveOriginator())
+      {
+        value["MoveOriginatorAET"] = GetMoveOriginatorAet();
+        value["MoveOriginatorID"] = GetMoveOriginatorId();
+      }
+
+      v = Json::arrayValue;
+      for (std::set<std::string>::const_iterator it = failedInstances_.begin();
+           it != failedInstances_.end(); ++it)
+      {
+        v.append(*it);
+      }
+
+      value["FailedInstances"] = v;
+    }
+  };
+}
+
+
+
 TEST(JobsEngine, Basic)
 {
   JobsEngine engine;
@@ -636,10 +957,18 @@
   std::cout << "====================================================" << std::endl;
 
   boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+
+  if (1)
+  {
+    printf(">> %d\n", engine.GetRegistry().SubmitAndWait(new DummyJob(JobStepResult(Orthanc::JobStepCode_Failure)), rand() % 10));
+  }
+
+  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+
   
   engine.Stop();
 
-
+  if (0)
   {
     typedef std::set<std::string> Jobs;