changeset 765:fc97f762834c lua-scripting

scheduler
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 23 Apr 2014 16:53:59 +0200
parents 45b16f67259c
children 476febedc516
files UnitTestsSources/MultiThreading.cpp
diffstat 1 files changed, 604 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreading.cpp	Tue Apr 22 16:47:21 2014 +0200
+++ b/UnitTestsSources/MultiThreading.cpp	Wed Apr 23 16:53:59 2014 +0200
@@ -1,4 +1,5 @@
 #include "gtest/gtest.h"
+#include <glog/logging.h>
 
 #include "../Core/OrthancException.h"
 #include "../Core/Toolbox.h"
@@ -210,3 +211,606 @@
     Locker locker3(lock.ForWriter());
   }
 }
+
+
+
+
+
+
+
+#include "../Core/ICommand.h"
+#include "../Core/Toolbox.h"
+#include "../Core/Uuid.h"
+#include "../Core/MultiThreading/SharedMessageQueue.h"
+#include <boost/lexical_cast.hpp>
+
+
+namespace Orthanc
+{
+  typedef std::list<std::string>  ListOfStrings;
+
+  class IServerFilter
+  {
+  public:
+    virtual ~IServerFilter()
+    {
+    }
+
+    virtual bool Apply(ListOfStrings& outputs,
+                       const ListOfStrings& inputs) = 0;
+  };
+
+
+  class Sink : public IServerFilter
+  {
+  private:
+    ListOfStrings& target_;
+
+  public:
+    Sink(ListOfStrings& target) : target_(target)
+    {
+    }
+
+    virtual bool Apply(ListOfStrings& outputs,
+                       const ListOfStrings& inputs)
+    {
+      for (ListOfStrings::const_iterator 
+             it = inputs.begin(); it != inputs.end(); it++)
+      {
+        target_.push_back(*it);
+      }
+
+      return true;
+    }    
+  };
+
+
+
+  class IServerFilterListener
+  {
+  public:
+    virtual ~IServerFilterListener()
+    {
+    }
+
+    virtual void SignalSuccess(const std::string& jobId) = 0;
+
+    virtual void SignalFailure(const std::string& jobId) = 0;
+  };
+
+
+  class FilterWrapper : public IDynamicObject
+  {
+  private:
+    IServerFilterListener *listener_;
+    IServerFilter *filter_;
+    std::string jobId_;
+    ListOfStrings inputs_;
+    std::list<FilterWrapper*> next_;
+
+  public:
+    FilterWrapper(IServerFilter *filter,
+                  const std::string& jobId) : 
+      listener_(NULL),
+      filter_(filter), 
+      jobId_(jobId)
+    {
+      if (filter_ == NULL)
+      {
+        throw OrthancException(ErrorCode_ParameterOutOfRange);
+      }
+    }
+
+    virtual ~FilterWrapper()
+    {
+      if (filter_ != NULL)
+      {
+        delete filter_;
+      }
+    }
+
+    void SetListener(IServerFilterListener& listener)
+    {
+      listener_ = &listener;
+    }
+
+    const std::string& GetJobId() const
+    {
+      return jobId_;
+    }
+
+    void AddInput(const std::string& input)
+    {
+      inputs_.push_back(input);
+    }
+
+    bool Execute()
+    {
+      ListOfStrings outputs;
+      if (!filter_->Apply(outputs, inputs_))
+      {
+        if (listener_)
+        {
+          listener_->SignalFailure(jobId_);
+        }
+
+        return true;
+      }
+
+      for (std::list<FilterWrapper*>::iterator
+             it = next_.begin(); it != next_.end(); it++)
+      {
+        for (ListOfStrings::const_iterator
+               output = outputs.begin(); output != outputs.end(); output++)
+        {
+          (*it)->AddInput(*output);
+        }
+      }
+
+      if (listener_)
+      {
+        listener_->SignalSuccess(jobId_);
+      }
+
+      return true;
+    }
+
+    void ConnectNext(FilterWrapper& filter)
+    {
+      next_.push_back(&filter);
+    }
+
+    const std::list<FilterWrapper*>& GetNextFilters() const
+    {
+      return next_;
+    }
+  };
+
+
+  enum ServerJobStatus
+  {
+    ServerJobStatus_Running = 1,
+    ServerJobStatus_Success = 2,
+    ServerJobStatus_Failure = 3
+  };
+
+
+  class ServerJob
+  {
+    friend class ServerScheduler;
+
+  private:
+    std::list<FilterWrapper*> filters_;
+    std::string jobId_;
+    bool submitted_;
+    std::string description_;
+
+    
+    void CheckOrdering()
+    {
+      std::map<FilterWrapper*, unsigned int> index;
+
+      unsigned int count = 0;
+      for (std::list<FilterWrapper*>::const_iterator
+             it = filters_.begin(); it != filters_.end(); it++)
+      {
+        index[*it] = count++;
+      }
+
+      for (std::list<FilterWrapper*>::const_iterator
+             it = filters_.begin(); it != filters_.end(); it++)
+      {
+        const std::list<FilterWrapper*>& nextFilters = (*it)->GetNextFilters();
+
+        for (std::list<FilterWrapper*>::const_iterator
+               next = nextFilters.begin(); next != nextFilters.end(); next++)
+        {
+          if (index.find(*next) == index.end() ||
+              index[*next] <= index[*it])
+          {
+            // You must reorder your calls to "ServerJob::AddFilter"
+            throw OrthancException("Bad ordering of filters in a job");
+          }
+        }
+      }
+    }
+
+
+    size_t Submit(SharedMessageQueue& target,
+                  IServerFilterListener& listener)
+    {
+      if (submitted_)
+      {
+        // This job has already been submitted
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+
+      CheckOrdering();
+
+      size_t size = filters_.size();
+
+      for (std::list<FilterWrapper*>::iterator 
+             it = filters_.begin(); it != filters_.end(); it++)
+      {
+        (*it)->SetListener(listener);
+        target.Enqueue(*it);
+      }
+
+      filters_.clear();
+      submitted_ = true;
+
+      return size;
+    }
+
+  public:
+    ServerJob()
+    {
+      jobId_ = Toolbox::GenerateUuid();      
+      submitted_ = false;
+      description_ = "no description";
+    }
+
+    ~ServerJob()
+    {
+      for (std::list<FilterWrapper*>::iterator
+             it = filters_.begin(); it != filters_.end(); it++)
+      {
+        delete *it;
+      }
+    }
+
+    const std::string& GetId() const
+    {
+      return jobId_;
+    }
+
+    void SetDescription(const char* description)
+    {
+      description_ = description;
+    }
+
+    const std::string& GetDescription() const
+    {
+      return description_;
+    }
+
+    FilterWrapper& AddFilter(IServerFilter* filter)
+    {
+      if (submitted_)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+
+      filters_.push_back(new FilterWrapper(filter, jobId_));
+      
+      return *filters_.back();
+    }
+  };
+
+
+  class ServerScheduler : public IServerFilterListener
+  {
+  private:
+    struct JobInfo
+    {
+      bool watched_;
+      bool cancel_;
+      size_t size_;
+      size_t success_;
+      size_t failures_;
+      std::string description_;
+    };
+
+    typedef std::map<std::string, JobInfo> Jobs;
+
+    boost::mutex mutex_;
+    boost::condition_variable jobFinished_;
+    Jobs jobs_;
+    SharedMessageQueue queue_;
+    bool finish_;
+    boost::thread worker_;
+    std::map<std::string, ServerJobStatus> watchedJobStatus_;
+
+    JobInfo& GetJobInfo(const std::string& jobId)
+    {
+      Jobs::iterator info = jobs_.find(jobId);
+
+      if (info == jobs_.end())
+      {
+        throw OrthancException(ErrorCode_InternalError);
+      }
+
+      return info->second;
+    }
+
+    virtual void SignalSuccess(const std::string& jobId)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      JobInfo& info = GetJobInfo(jobId);
+      info.success_++;
+
+      assert(info.failures_ == 0);
+
+      if (info.success_ >= info.size_)
+      {
+        if (info.watched_)
+        {
+          watchedJobStatus_[jobId] = ServerJobStatus_Success;
+          jobFinished_.notify_all();
+        }
+
+        LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
+        jobs_.erase(jobId);
+      }
+    }
+
+    virtual void SignalFailure(const std::string& jobId)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      JobInfo& info = GetJobInfo(jobId);
+      info.failures_++;
+
+      if (info.success_ + info.failures_ >= info.size_)
+      {
+        if (info.watched_)
+        {
+          watchedJobStatus_[jobId] = ServerJobStatus_Failure;
+          jobFinished_.notify_all();
+        }
+
+        LOG(ERROR) << "Job has failed (" << info.description_ << ")";
+        jobs_.erase(jobId);
+      }
+    }
+
+    static void Worker(ServerScheduler* that)
+    {
+      static const int32_t TIMEOUT = 100;
+
+      while (!that->finish_)
+      {
+        std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
+        if (object.get() != NULL)
+        {
+          FilterWrapper& filter = dynamic_cast<FilterWrapper&>(*object);
+
+          // Skip the execution of this filter if its parent job has
+          // previously failed.
+          bool jobHasFailed;
+          {
+            boost::mutex::scoped_lock lock(that->mutex_);
+            JobInfo& info = that->GetJobInfo(filter.GetJobId());
+            jobHasFailed = (info.failures_ > 0 || info.cancel_); 
+          }
+
+          if (jobHasFailed)
+          {
+            that->SignalFailure(filter.GetJobId());
+          }
+          else
+          {
+            filter.Execute();
+          }
+        }
+      }
+    }
+
+    void SubmitInternal(ServerJob& job,
+                        bool watched)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      JobInfo info;
+      info.size_ = job.Submit(queue_, *this);
+      info.cancel_ = false;
+      info.success_ = 0;
+      info.failures_ = 0;
+      info.description_ = job.GetDescription();
+      info.watched_ = watched;
+
+      assert(info.size_ > 0);
+
+      if (watched)
+      {
+        watchedJobStatus_[job.GetId()] = ServerJobStatus_Running;
+      }
+
+      jobs_[job.GetId()] = info;
+
+      LOG(INFO) << "New job submitted (" << job.description_ << ")";
+    }
+
+  public:
+    ServerScheduler()
+    {
+      finish_ = false;
+      worker_ = boost::thread(Worker, this);
+    }
+
+    ~ServerScheduler()
+    {
+      finish_ = true;
+      worker_.join();
+    }
+
+    void Submit(ServerJob& job)
+    {
+      if (job.filters_.empty())
+      {
+        return;
+      }
+
+      SubmitInternal(job, false);
+    }
+
+    bool SubmitAndWait(ListOfStrings& outputs,
+                       ServerJob& job)
+    {
+      std::string jobId = job.GetId();
+
+      outputs.clear();
+
+      if (job.filters_.empty())
+      {
+        return true;
+      }
+
+      // Add a sink filter to collect all the results of the filters
+      // that have no next filter.
+      FilterWrapper& sink = job.AddFilter(new Sink(outputs));
+
+      for (std::list<FilterWrapper*>::iterator
+             it = job.filters_.begin(); it != job.filters_.end(); it++)
+      {
+        if ((*it) != &sink &&
+            (*it)->GetNextFilters().size() == 0)
+        {
+          (*it)->ConnectNext(sink);
+        }
+      }
+
+      // Submit the job
+      SubmitInternal(job, true);
+
+      // Wait for the job to complete (either success or failure)
+      ServerJobStatus status;
+
+      {
+        boost::mutex::scoped_lock lock(mutex_);
+        
+        while (watchedJobStatus_[jobId] == ServerJobStatus_Running)
+        {
+          jobFinished_.wait(lock);
+        }
+
+        status = watchedJobStatus_[jobId];
+        watchedJobStatus_.erase(jobId);
+      }
+
+      return (status == ServerJobStatus_Success);
+    }
+
+
+    bool IsRunning(const std::string& jobId)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      return jobs_.find(jobId) != jobs_.end();
+    }
+
+
+    void Cancel(const std::string& jobId)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      Jobs::iterator job = jobs_.find(jobId);
+
+      if (job != jobs_.end())
+      {
+        job->second.cancel_ = true;
+        LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")";
+      }
+    }
+
+
+    // Returns a number between 0 and 1
+    float GetProgress(const std::string& jobId) 
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      Jobs::iterator job = jobs_.find(jobId);
+
+      if (job == jobs_.end() || 
+          job->second.size_ == 0  /* should never happen */)
+      {
+        // This job is not running
+        return 1;
+      }
+
+      float n = static_cast<float>(job->second.failures_ + job->second.success_);
+      float d = static_cast<float>(job->second.size_);
+      return n / d;
+    }
+
+    bool IsRunning(const ServerJob& job)
+    {
+      return IsRunning(job.GetId());
+    }
+
+    void Cancel(const ServerJob& job) 
+    {
+      Cancel(job.GetId());
+    }
+
+    float GetProgress(const ServerJob& job) 
+    {
+      return GetProgress(job);
+    }
+  };
+
+}
+
+
+
+class Tutu : public IServerFilter
+{
+private:
+  int factor_;
+
+public:
+  Tutu(int f) : factor_(f)
+  {
+  }
+
+  virtual bool Apply(ListOfStrings& outputs,
+                     const ListOfStrings& inputs)
+  {
+    for (ListOfStrings::const_iterator 
+           it = inputs.begin(); it != inputs.end(); it++)
+    {
+      int a = boost::lexical_cast<int>(*it);
+      int b = factor_ * a;
+
+      printf("%d * %d = %d\n", a, factor_, b);
+
+      //if (a == 84) { printf("BREAK\n"); return false; }
+
+      outputs.push_back(boost::lexical_cast<std::string>(b));
+    }
+
+    Toolbox::USleep(1000000);
+
+    return true;
+  }
+};
+
+TEST(Toto, Toto)
+{
+  ServerScheduler scheduler;
+
+  ServerJob job;
+  FilterWrapper& f2 = job.AddFilter(new Tutu(2));
+  FilterWrapper& f3 = job.AddFilter(new Tutu(3));
+  FilterWrapper& f4 = job.AddFilter(new Tutu(4));
+  f2.AddInput(boost::lexical_cast<std::string>(42));
+  //f3.AddInput(boost::lexical_cast<std::string>(42));
+  //f4.AddInput(boost::lexical_cast<std::string>(42));
+  f2.ConnectNext(f3);
+  f3.ConnectNext(f4);
+
+  job.SetDescription("tutu");
+
+  //scheduler.Submit(job);
+
+  ListOfStrings l;
+  scheduler.SubmitAndWait(l, job);
+
+  for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
+  {
+    printf("** %s\n", i->c_str());
+  }
+
+  //Toolbox::ServerBarrier();
+  Toolbox::USleep(30000);
+}