diff UnitTestsSources/MultiThreading.cpp @ 781:f0ac3a53ccf2 lua-scripting

scheduler
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 30 Apr 2014 18:30:05 +0200
parents 76eb563f08f0
children 394a19d44f9d
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreading.cpp	Wed Apr 30 18:10:16 2014 +0200
+++ b/UnitTestsSources/MultiThreading.cpp	Wed Apr 30 18:30:05 2014 +0200
@@ -1,6 +1,8 @@
 #include "gtest/gtest.h"
 #include <glog/logging.h>
 
+#include "../OrthancServer/Scheduler/ServerScheduler.h"
+
 #include "../Core/OrthancException.h"
 #include "../Core/Toolbox.h"
 #include "../Core/MultiThreading/ArrayFilledByThreads.h"
@@ -242,567 +244,6 @@
 
 
 
-#include "../Core/ICommand.h"
-#include "../Core/Toolbox.h"
-#include "../Core/Uuid.h"
-#include "../Core/MultiThreading/SharedMessageQueue.h"
-#include <boost/lexical_cast.hpp>
-
-
-namespace Orthanc
-{
-  class IServerFilter
-  {
-  public:
-    typedef std::list<std::string>  ListOfStrings;
-
-    virtual ~IServerFilter()
-    {
-    }
-
-    virtual bool Apply(ListOfStrings& outputs,
-                       const ListOfStrings& inputs) = 0;
-
-    virtual bool SendOutputsToSink() const = 0;
-  };
-
-
-  class Sink : public IServerFilter
-  {
-  private:
-    ListOfStrings& target_;
-
-  public:
-    Sink(ListOfStrings& target) : target_(target)
-    {
-    }
-
-    virtual bool SendOutputsToSink() const
-    {
-      return false;
-    }
-
-    virtual bool Apply(ListOfStrings& outputs,
-                       const ListOfStrings& inputs)
-    {
-      for (ListOfStrings::const_iterator 
-             it = inputs.begin(); it != inputs.end(); it++)
-      {
-        target_.push_back(*it);
-      }
-
-      return true;
-    }    
-  };
-
-
-
-  class IServerFilterListener
-  {
-  public:
-    virtual ~IServerFilterListener()
-    {
-    }
-
-    virtual void SignalSuccess(const std::string& jobId) = 0;
-
-    virtual void SignalFailure(const std::string& jobId) = 0;
-  };
-
-
-  class ServerFilterInstance : public IDynamicObject
-  {
-    friend class ServerScheduler;
-
-  private:
-    typedef IServerFilter::ListOfStrings  ListOfStrings;
-
-    IServerFilter *filter_;
-    std::string jobId_;
-    ListOfStrings inputs_;
-    std::list<ServerFilterInstance*> next_;
-
-    bool Execute(IServerFilterListener& listener)
-    {
-      ListOfStrings outputs;
-      if (!filter_->Apply(outputs, inputs_))
-      {
-        listener.SignalFailure(jobId_);
-        return true;
-      }
-
-      for (std::list<ServerFilterInstance*>::iterator
-             it = next_.begin(); it != next_.end(); it++)
-      {
-        for (ListOfStrings::const_iterator
-               output = outputs.begin(); output != outputs.end(); output++)
-        {
-          (*it)->AddInput(*output);
-        }
-      }
-
-      listener.SignalSuccess(jobId_);
-      return true;
-    }
-
-
-  public:
-    ServerFilterInstance(IServerFilter *filter,
-                         const std::string& jobId) : 
-      filter_(filter), 
-      jobId_(jobId)
-    {
-      if (filter_ == NULL)
-      {
-        throw OrthancException(ErrorCode_ParameterOutOfRange);
-      }
-    }
-
-    virtual ~ServerFilterInstance()
-    {
-      if (filter_ != NULL)
-      {
-        delete filter_;
-      }
-    }
-
-    const std::string& GetJobId() const
-    {
-      return jobId_;
-    }
-
-    void AddInput(const std::string& input)
-    {
-      inputs_.push_back(input);
-    }
-
-    void ConnectNext(ServerFilterInstance& filter)
-    {
-      next_.push_back(&filter);
-    }
-
-    const std::list<ServerFilterInstance*>& GetNextFilters() const
-    {
-      return next_;
-    }
-
-    IServerFilter& GetFilter() const
-    {
-      return *filter_;
-    }
-  };
-
-
-  class ServerJob
-  {
-    friend class ServerScheduler;
-
-  private:
-    std::list<ServerFilterInstance*> filters_;
-    std::string jobId_;
-    bool submitted_;
-    std::string description_;
-
-    
-    void CheckOrdering()
-    {
-      std::map<ServerFilterInstance*, unsigned int> index;
-
-      unsigned int count = 0;
-      for (std::list<ServerFilterInstance*>::const_iterator
-             it = filters_.begin(); it != filters_.end(); it++)
-      {
-        index[*it] = count++;
-      }
-
-      for (std::list<ServerFilterInstance*>::const_iterator
-             it = filters_.begin(); it != filters_.end(); it++)
-      {
-        const std::list<ServerFilterInstance*>& nextFilters = (*it)->GetNextFilters();
-
-        for (std::list<ServerFilterInstance*>::const_iterator
-               next = nextFilters.begin(); next != nextFilters.end(); next++)
-        {
-          if (index.find(*next) == index.end() ||
-              index[*next] <= index[*it])
-          {
-            // You must reorder your calls to "ServerJob::AddFilter"
-            throw OrthancException("Bad ordering of filters in a job");
-          }
-        }
-      }
-    }
-
-
-    size_t Submit(SharedMessageQueue& target,
-                  IServerFilterListener& listener)
-    {
-      if (submitted_)
-      {
-        // This job has already been submitted
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-
-      CheckOrdering();
-
-      size_t size = filters_.size();
-
-      for (std::list<ServerFilterInstance*>::iterator 
-             it = filters_.begin(); it != filters_.end(); it++)
-      {
-        target.Enqueue(*it);
-      }
-
-      filters_.clear();
-      submitted_ = true;
-
-      return size;
-    }
-
-  public:
-    ServerJob()
-    {
-      jobId_ = Toolbox::GenerateUuid();      
-      submitted_ = false;
-      description_ = "no description";
-    }
-
-    ~ServerJob()
-    {
-      for (std::list<ServerFilterInstance*>::iterator
-             it = filters_.begin(); it != filters_.end(); it++)
-      {
-        delete *it;
-      }
-    }
-
-    const std::string& GetId() const
-    {
-      return jobId_;
-    }
-
-    void SetDescription(const char* description)
-    {
-      description_ = description;
-    }
-
-    const std::string& GetDescription() const
-    {
-      return description_;
-    }
-
-    ServerFilterInstance& AddFilter(IServerFilter* filter)
-    {
-      if (submitted_)
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-
-      filters_.push_back(new ServerFilterInstance(filter, jobId_));
-      
-      return *filters_.back();
-    }
-  };
-
-
-  class ServerScheduler : public IServerFilterListener
-  {
-  private:
-    struct JobInfo
-    {
-      bool watched_;
-      bool cancel_;
-      size_t size_;
-      size_t success_;
-      size_t failures_;
-      std::string description_;
-    };
-
-    enum JobStatus
-    {
-      JobStatus_Running = 1,
-      JobStatus_Success = 2,
-      JobStatus_Failure = 3
-    };
-
-    typedef IServerFilter::ListOfStrings  ListOfStrings;
-    typedef std::map<std::string, JobInfo> Jobs;
-
-    boost::mutex mutex_;
-    boost::condition_variable jobFinished_;
-    Jobs jobs_;
-    SharedMessageQueue queue_;
-    bool finish_;
-    boost::thread worker_;
-    std::map<std::string, JobStatus> watchedJobStatus_;
-
-    JobInfo& GetJobInfo(const std::string& jobId)
-    {
-      Jobs::iterator info = jobs_.find(jobId);
-
-      if (info == jobs_.end())
-      {
-        throw OrthancException(ErrorCode_InternalError);
-      }
-
-      return info->second;
-    }
-
-    virtual void SignalSuccess(const std::string& jobId)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      JobInfo& info = GetJobInfo(jobId);
-      info.success_++;
-
-      assert(info.failures_ == 0);
-
-      if (info.success_ >= info.size_)
-      {
-        if (info.watched_)
-        {
-          watchedJobStatus_[jobId] = JobStatus_Success;
-          jobFinished_.notify_all();
-        }
-
-        LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
-        jobs_.erase(jobId);
-      }
-    }
-
-    virtual void SignalFailure(const std::string& jobId)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      JobInfo& info = GetJobInfo(jobId);
-      info.failures_++;
-
-      if (info.success_ + info.failures_ >= info.size_)
-      {
-        if (info.watched_)
-        {
-          watchedJobStatus_[jobId] = JobStatus_Failure;
-          jobFinished_.notify_all();
-        }
-
-        LOG(ERROR) << "Job has failed (" << info.description_ << ")";
-        jobs_.erase(jobId);
-      }
-    }
-
-    static void Worker(ServerScheduler* that)
-    {
-      static const int32_t TIMEOUT = 100;
-
-      while (!that->finish_)
-      {
-        std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
-        if (object.get() != NULL)
-        {
-          ServerFilterInstance& filter = dynamic_cast<ServerFilterInstance&>(*object);
-
-          // Skip the execution of this filter if its parent job has
-          // previously failed.
-          bool jobHasFailed;
-          {
-            boost::mutex::scoped_lock lock(that->mutex_);
-            JobInfo& info = that->GetJobInfo(filter.GetJobId());
-            jobHasFailed = (info.failures_ > 0 || info.cancel_); 
-          }
-
-          if (jobHasFailed)
-          {
-            that->SignalFailure(filter.GetJobId());
-          }
-          else
-          {
-            filter.Execute(*that);
-          }
-        }
-      }
-    }
-
-    void SubmitInternal(ServerJob& job,
-                        bool watched)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      JobInfo info;
-      info.size_ = job.Submit(queue_, *this);
-      info.cancel_ = false;
-      info.success_ = 0;
-      info.failures_ = 0;
-      info.description_ = job.GetDescription();
-      info.watched_ = watched;
-
-      assert(info.size_ > 0);
-
-      if (watched)
-      {
-        watchedJobStatus_[job.GetId()] = JobStatus_Running;
-      }
-
-      jobs_[job.GetId()] = info;
-
-      LOG(INFO) << "New job submitted (" << job.description_ << ")";
-    }
-
-  public:
-    ServerScheduler()
-    {
-      finish_ = false;
-      worker_ = boost::thread(Worker, this);
-    }
-
-    ~ServerScheduler()
-    {
-      finish_ = true;
-      worker_.join();
-    }
-
-    void Submit(ServerJob& job)
-    {
-      if (job.filters_.empty())
-      {
-        return;
-      }
-
-      SubmitInternal(job, false);
-    }
-
-    bool SubmitAndWait(ListOfStrings& outputs,
-                       ServerJob& job)
-    {
-      std::string jobId = job.GetId();
-
-      outputs.clear();
-
-      if (job.filters_.empty())
-      {
-        return true;
-      }
-
-      // Add a sink filter to collect all the results of the filters
-      // that have no next filter.
-      ServerFilterInstance& sink = job.AddFilter(new Sink(outputs));
-
-      for (std::list<ServerFilterInstance*>::iterator
-             it = job.filters_.begin(); it != job.filters_.end(); it++)
-      {
-        if ((*it) != &sink &&
-            (*it)->GetNextFilters().size() == 0 &&
-            (*it)->GetFilter().SendOutputsToSink())
-        {
-          (*it)->ConnectNext(sink);
-        }
-      }
-
-      // Submit the job
-      SubmitInternal(job, true);
-
-      // Wait for the job to complete (either success or failure)
-      JobStatus status;
-
-      {
-        boost::mutex::scoped_lock lock(mutex_);
-
-        assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end());
-        
-        while (watchedJobStatus_[jobId] == JobStatus_Running)
-        {
-          jobFinished_.wait(lock);
-        }
-
-        status = watchedJobStatus_[jobId];
-        watchedJobStatus_.erase(jobId);
-      }
-
-      return (status == JobStatus_Success);
-    }
-
-
-    bool IsRunning(const std::string& jobId)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      return jobs_.find(jobId) != jobs_.end();
-    }
-
-
-    void Cancel(const std::string& jobId)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      Jobs::iterator job = jobs_.find(jobId);
-
-      if (job != jobs_.end())
-      {
-        job->second.cancel_ = true;
-        LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")";
-      }
-    }
-
-
-    // Returns a number between 0 and 1
-    float GetProgress(const std::string& jobId) 
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      Jobs::iterator job = jobs_.find(jobId);
-
-      if (job == jobs_.end() || 
-          job->second.size_ == 0  /* should never happen */)
-      {
-        // This job is not running
-        return 1;
-      }
-
-      if (job->second.failures_ != 0)
-      {
-        return 1;
-      }
-
-      if (job->second.size_ == 1)
-      {
-        return job->second.success_;
-      }
-
-      return (static_cast<float>(job->second.success_) / 
-              static_cast<float>(job->second.size_ - 1));
-    }
-
-    bool IsRunning(const ServerJob& job)
-    {
-      return IsRunning(job.GetId());
-    }
-
-    void Cancel(const ServerJob& job) 
-    {
-      Cancel(job.GetId());
-    }
-
-    float GetProgress(const ServerJob& job) 
-    {
-      return GetProgress(job);
-    }
-
-    void GetListOfJobs(ListOfStrings& jobs)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      jobs.clear();
-
-      for (Jobs::const_iterator 
-             it = jobs_.begin(); it != jobs_.end(); it++)
-      {
-        jobs.push_back(it->first);
-      }
-    }
-  };
-
-}
-
-
-
 class Tutu : public IServerFilter
 {
 private: