diff UnitTestsSources/MultiThreadingTests.cpp @ 2603:988936118354 jobs

reorganization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 18 May 2018 17:02:25 +0200
parents 5b6c3d77a2a1
children 76ef12fa136c
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp	Fri May 18 15:34:11 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Fri May 18 17:02:25 2018 +0200
@@ -742,697 +742,10 @@
 
 
 
-#include "../OrthancServer/ServerContext.h"
-#include "../Core/Logging.h"
-
-#include "../Core/DicomNetworking/IDicomConnectionManager.h"
-#include "../Core/JobsEngine/Operations/JobOperationValues.h"
+#include "../OrthancServer/ServerJobs/LuaJobManager.h"
 #include "../Core/JobsEngine/Operations/StringOperationValue.h"
 #include "../Core/JobsEngine/Operations/LogJobOperation.h"
 
-namespace Orthanc
-{
-  class DicomInstanceValue : public JobOperationValue
-  {
-  private:
-    ServerContext&   context_;
-    std::string      id_;
-
-  public:
-    DicomInstanceValue(ServerContext& context,
-                       const std::string& id) :
-      JobOperationValue(Type_DicomInstance),
-      context_(context),
-      id_(id)
-    {
-    }
-
-    const std::string& GetId() const
-    {
-      return id_;
-    }
-
-    void ReadContent(std::string& dicom) const
-    {
-      context_.ReadDicom(dicom, id_);
-    }
-
-    virtual JobOperationValue* Clone() const
-    {
-      return new DicomInstanceValue(context_, id_);
-    }
-  };
-
-
-  class StoreScuOperation : public IJobOperation
-  {
-  private:
-    std::string              localAet_;
-    RemoteModalityParameters modality_;
-
-  public:
-    StoreScuOperation(const std::string& localAet,
-                      const RemoteModalityParameters& modality) :
-      localAet_(localAet),
-      modality_(modality)
-    {
-    }
-
-    virtual void Apply(JobOperationValues& outputs,
-                       const JobOperationValue& input,
-                       IDicomConnectionManager& manager)
-    {
-      std::auto_ptr<IDicomConnectionManager::IResource> resource
-        (manager.AcquireConnection(localAet_, modality_));
-
-      if (resource.get() == NULL)
-      {
-        LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle();
-        return;
-      }
-
-      if (input.GetType() != JobOperationValue::Type_DicomInstance)
-      {
-        throw OrthancException(ErrorCode_BadParameterType);
-      }
-
-      const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input);
-
-      LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \"" 
-                << modality_.GetApplicationEntityTitle() << "\"";
-
-      try
-      {
-        std::string dicom;
-        instance.ReadContent(dicom);
-        resource->GetConnection().Store(dicom);
-        outputs.Append(instance.Clone());
-      }
-      catch (OrthancException& e)
-      {
-        LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \"" 
-                   << modality_.GetApplicationEntityTitle() << "\": " << e.What();
-      }
-    }
-  };
-
-
-  class DeleteResourceOperation : public IJobOperation
-  {
-  private:
-    ServerContext&  context_;
-
-  public:
-    DeleteResourceOperation(ServerContext& context) :
-    context_(context)
-    {
-    }
-
-    virtual void Apply(JobOperationValues& outputs,
-                       const JobOperationValue& input,
-                       IDicomConnectionManager& manager)
-    {
-      switch (input.GetType())
-      {
-        case JobOperationValue::Type_DicomInstance:
-        {
-          const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input);
-          LOG(INFO) << "Deleting instance: " << instance.GetId();
-
-          try
-          {
-            Json::Value tmp;
-            context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance);
-          }
-          catch (OrthancException& e)
-          {
-            LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What();
-          }
-
-          break;
-        }
-
-        default:
-          break;
-      }
-    }
-  };
-
-
-
-  class TimeoutDicomConnectionManager : public IDicomConnectionManager
-  {
-  private:
-    class Resource : public IDicomConnectionManager::IResource
-    {
-    private:
-      TimeoutDicomConnectionManager&  that_;
-      boost::mutex::scoped_lock        lock_;
-
-    public:
-      Resource(TimeoutDicomConnectionManager& that) : 
-      that_(that),
-      lock_(that.mutex_)
-      {
-      }
-
-      virtual ~Resource()
-      {
-        that_.Touch();
-      }
-
-      virtual DicomUserConnection& GetConnection()
-      {
-        if (that_.connection_.get() == NULL)
-        {
-          throw OrthancException(ErrorCode_InternalError);
-        }
-
-        return *that_.connection_;
-      }
-    };
-
-    boost::mutex                         mutex_;
-    std::auto_ptr<DicomUserConnection>   connection_;
-    boost::posix_time::ptime             lastUse_;
-    boost::posix_time::time_duration     timeout_;
-
-    static boost::posix_time::ptime GetNow()
-    {
-      return boost::posix_time::microsec_clock::universal_time();
-    }
-
-    void Touch()
-    {
-      lastUse_ = GetNow();
-    }
-
-    void CheckTimeoutInternal()
-    {
-      if (connection_.get() != NULL &&
-          (GetNow() - lastUse_) >= timeout_)
-      {
-        connection_.reset(NULL);
-      }
-    }
-
-  public:
-    TimeoutDicomConnectionManager() :
-      timeout_(boost::posix_time::milliseconds(1000))
-    {
-    }
-
-    void SetTimeout(unsigned int timeout)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      timeout_ = boost::posix_time::milliseconds(timeout);
-      CheckTimeoutInternal();
-    }
-
-    unsigned int GetTimeout()
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      return timeout_.total_milliseconds();
-    }
-
-    void Close()
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      connection_.reset(NULL);
-    }
-
-    void CheckTimeout()
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      CheckTimeoutInternal();
-    }
-
-    virtual IResource* AcquireConnection(const std::string& localAet,
-                                         const RemoteModalityParameters& remote)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      if (connection_.get() == NULL ||
-          !connection_->IsSameAssociation(localAet, remote))
-      {
-        connection_.reset(new DicomUserConnection(localAet, remote));
-      }
-
-      return new Resource(*this);
-    }
-  };
-
-
-
-  class SequenceOfOperationsJob : public IJob
-  {
-  public:
-    class IObserver : public boost::noncopyable
-    {
-    public:
-      virtual ~IObserver()
-      {
-      }
-
-      virtual void SignalDone(const SequenceOfOperationsJob& job) = 0;
-    };
-
-  private:
-    class Operation : public boost::noncopyable
-    {
-    private:
-      JobOperationValues            originalInputs_;
-      JobOperationValues            workInputs_;
-      std::auto_ptr<IJobOperation>  operation_;
-      std::list<Operation*>         nextOperations_;
-      size_t                        currentInput_;
-
-    public:
-      Operation(IJobOperation* operation) :
-      operation_(operation),
-      currentInput_(0)
-      {
-        if (operation == NULL)
-        {
-          throw OrthancException(ErrorCode_NullPointer);
-        }
-      }
-
-      void AddOriginalInput(const JobOperationValue& value)
-      {
-        if (currentInput_ != 0)
-        {
-          // Cannot add input after processing has started
-          throw OrthancException(ErrorCode_BadSequenceOfCalls);
-        }
-        else
-        {
-          originalInputs_.Append(value.Clone());
-        }
-      }
-
-      const JobOperationValues& GetOriginalInputs() const
-      {
-        return originalInputs_;
-      }
-
-      void Reset()
-      {
-        workInputs_.Clear();
-        currentInput_ = 0;
-      }
-
-      void AddNextOperation(Operation& other)
-      {
-        if (currentInput_ != 0)
-        {
-          // Cannot add input after processing has started
-          throw OrthancException(ErrorCode_BadSequenceOfCalls);
-        }
-        else
-        {
-          nextOperations_.push_back(&other);
-        }
-      }
-
-      bool IsDone() const
-      {
-        return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
-      }
-
-      void Step()
-      {
-        if (IsDone())
-        {
-          throw OrthancException(ErrorCode_BadSequenceOfCalls);
-        }
-
-        const JobOperationValue* input;
-
-        if (currentInput_ < originalInputs_.GetSize())
-        {
-          input = &originalInputs_.GetValue(currentInput_);
-        }
-        else
-        {
-          input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
-        }
-
-        JobOperationValues outputs;
-        operation_->Apply(outputs, *input);
-
-        if (!nextOperations_.empty())
-        {
-          std::list<Operation*>::iterator first = nextOperations_.begin();
-          outputs.Move((*first)->workInputs_);
-
-          std::list<Operation*>::iterator current = first;
-          ++current;
-
-          while (current != nextOperations_.end())
-          {
-            (*first)->workInputs_.Copy((*current)->workInputs_);
-            ++current;
-          }
-        }
-
-        currentInput_ += 1;
-      }
-    };
-
-
-    std::string                       jobType_;
-    bool                              done_;
-    boost::mutex                      mutex_;
-    std::vector<Operation*>           operations_;
-    size_t                            current_;
-    boost::condition_variable         operationAdded_;
-    boost::posix_time::time_duration  trailingTimeout_;
-    std::list<IObserver*>             observers_;
-
-    // Invoked from constructors
-    void Setup()
-    {
-      done_ = false;
-      current_ = 0;
-      trailingTimeout_ = boost::posix_time::milliseconds(1000); 
-    }
-
-  public:
-    SequenceOfOperationsJob() :
-      jobType_("SequenceOfOperations")
-    {
-      Setup();
-    }    
-
-    SequenceOfOperationsJob(const std::string& jobType) :
-      jobType_(jobType)
-    {
-      Setup();
-    }    
-
-    virtual ~SequenceOfOperationsJob()
-    {
-      for (size_t i = 0; i < operations_.size(); i++)
-      {
-        if (operations_[i] != NULL)
-        {
-          delete operations_[i];
-        }
-      }
-    }
-
-    void Register(IObserver& observer)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      observers_.push_back(&observer);
-    }
-
-    // This lock allows adding new operations to the end of the job,
-    // from another thread than the worker thread, after the job has
-    // been submitted for processing
-    class Lock : public boost::noncopyable
-    {
-    private:
-      SequenceOfOperationsJob&   that_;
-      boost::mutex::scoped_lock  lock_;
-
-    public:
-      Lock(SequenceOfOperationsJob& that) :
-      that_(that),
-      lock_(that.mutex_)
-      {
-      }
-
-      bool IsDone() const
-      {
-        return that_.done_;
-      }
-
-      void SetTrailingOperationTimeout(unsigned int timeout)
-      {
-        that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
-      }
-
-      size_t AddOperation(IJobOperation* operation)
-      {
-        if (IsDone())
-        {
-          throw OrthancException(ErrorCode_BadSequenceOfCalls);
-        }
-
-        that_.operations_.push_back(new Operation(operation));
-        that_.operationAdded_.notify_one();
-
-        return that_.operations_.size() - 1;
-      }
-
-      size_t GetOperationsCount() const
-      {
-        return that_.operations_.size();
-      }
-
-      void AddInput(size_t index,
-                    const JobOperationValue& value)
-      {
-        if (IsDone())
-        {
-          throw OrthancException(ErrorCode_BadSequenceOfCalls);
-        }
-        else if (index >= that_.operations_.size() ||
-                 index < that_.current_)
-        {
-          throw OrthancException(ErrorCode_ParameterOutOfRange);
-        }
-        else
-        {
-          that_.operations_[index]->AddOriginalInput(value);
-        }
-      }
-      
-      void Connect(size_t input,
-                   size_t output)
-      {
-        if (IsDone())
-        {
-          throw OrthancException(ErrorCode_BadSequenceOfCalls);
-        }
-        else if (input >= output ||
-                 input >= that_.operations_.size() ||
-                 output >= that_.operations_.size() ||
-                 input < that_.current_ ||
-                 output < that_.current_)
-        {
-          throw OrthancException(ErrorCode_ParameterOutOfRange);
-        }
-        else
-        {
-          Operation& a = *that_.operations_[input];
-          Operation& b = *that_.operations_[output];
-          a.AddNextOperation(b);
-        }
-      }
-    };
-
-    virtual void Start()
-    {
-    }
-
-    virtual JobStepResult ExecuteStep()
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      if (current_ == operations_.size())
-      {
-        LOG(INFO) << "Executing the trailing timeout in the sequence of operations";
-        operationAdded_.timed_wait(lock, trailingTimeout_);
-            
-        if (current_ == operations_.size())
-        {
-          // No operation was added during the trailing timeout: The
-          // job is over
-          LOG(INFO) << "The sequence of operations is over";
-          done_ = true;
-
-          for (std::list<IObserver*>::iterator it = observers_.begin(); 
-               it != observers_.end(); ++it)
-          {
-            (*it)->SignalDone(*this);
-          }
-
-          return JobStepResult::Success();
-        }
-        else
-        {
-          LOG(INFO) << "New operation added to the sequence of operations";
-        }
-      }
-
-      assert(current_ < operations_.size());
-
-      while (current_ < operations_.size() &&
-             operations_[current_]->IsDone())
-      {
-        current_++;
-      }
-
-      if (current_ < operations_.size())
-      {
-        operations_[current_]->Step();
-      }
-
-      return JobStepResult::Continue();
-    }
-
-    virtual void SignalResubmit()
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      
-      current_ = 0;
-      done_ = false;
-
-      for (size_t i = 0; i < operations_.size(); i++)
-      {
-        operations_[i]->Reset();
-      }
-    }
-
-    virtual void ReleaseResources()
-    {
-    }
-
-    virtual float GetProgress()
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      
-      return (static_cast<float>(current_) / 
-              static_cast<float>(operations_.size() + 1));
-    }
-
-    virtual void GetJobType(std::string& target)
-    {
-      target = jobType_;
-    }
-
-    virtual void GetPublicContent(Json::Value& value)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      value["CountOperations"] = static_cast<unsigned int>(operations_.size());
-    }
-
-    virtual void GetInternalContent(Json::Value& value)
-    {
-      // TODO
-    }
-  };
-
-
-  class LuaJobManager : private SequenceOfOperationsJob::IObserver
-  {
-  public:
-    typedef SequenceOfOperationsJob::Lock  Lock;
-
-  private:
-    boost::mutex                   mutex_;
-    JobsEngine&                    engine_;
-    TimeoutDicomConnectionManager  connectionManager_;
-    std::string                    currentId_;
-    SequenceOfOperationsJob*       currentJob_;
-    size_t                         maxOperations_;
-    int                            priority_;
-    unsigned int                   trailingTimeout_;
-    bool                           continue_;
-    boost::thread                  connectionTimeoutThread_;
-
-    static void ConnectionTimeoutThread(LuaJobManager* manager)
-    {
-      while (manager->continue_)
-      {
-        manager->connectionManager_.CheckTimeout();
-        boost::this_thread::sleep(boost::posix_time::milliseconds(100));
-      }
-    }
-    
-    virtual void SignalDone(const SequenceOfOperationsJob& job)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      if (&job == currentJob_)
-      {
-        currentId_.clear();
-        currentJob_ = NULL;
-      }
-    }
-
-  public:
-    LuaJobManager(JobsEngine&  engine) :
-    engine_(engine),
-    currentJob_(NULL),
-    maxOperations_(1000),
-    priority_(0),
-    continue_(true)
-    {
-      connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this);
-    }
-
-    ~LuaJobManager()
-    {
-      continue_ = false;
-
-      if (connectionTimeoutThread_.joinable())
-      {
-        connectionTimeoutThread_.join();
-      }
-    }
-
-    void SetMaxOperationsPerJob(size_t count)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      maxOperations_ = count;
-    }
-
-    void SetPriority(int priority)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      priority_ = priority;
-    }
-
-    void SetTrailingOperationTimeout(unsigned int timeout)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      trailingTimeout_ = timeout;
-    }
-
-    Lock* Modify()
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      if (currentJob_ != NULL)
-      {
-        std::auto_ptr<Lock> result(new Lock(*currentJob_));
-
-        if (!result->IsDone() &&
-            result->GetOperationsCount() < maxOperations_)
-        {
-          return result.release();
-        }
-      }
-
-      // Need to create a new job, as the previous one is either
-      // finished, or is getting too long
-      currentJob_ = new SequenceOfOperationsJob;
-      engine_.GetRegistry().Submit(currentId_, currentJob_, priority_);
-
-      std::auto_ptr<Lock> result(new Lock(*currentJob_));
-      result->SetTrailingOperationTimeout(trailingTimeout_);
-
-      return result.release();
-    }
-  };
-}
-
 
 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob)
 {