changeset 781:f0ac3a53ccf2 lua-scripting

scheduler
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 30 Apr 2014 18:30:05 +0200
parents 76eb563f08f0
children c9cdd53a6b31
files CMakeLists.txt OrthancServer/Scheduler/IServerFilter.h OrthancServer/Scheduler/ServerFilterInstance.cpp OrthancServer/Scheduler/ServerFilterInstance.h OrthancServer/Scheduler/ServerJob.cpp OrthancServer/Scheduler/ServerJob.h OrthancServer/Scheduler/ServerScheduler.cpp OrthancServer/Scheduler/ServerScheduler.h UnitTestsSources/MultiThreading.cpp
diffstat 9 files changed, 884 insertions(+), 561 deletions(-) [+]
line wrap: on
line diff
--- a/CMakeLists.txt	Wed Apr 30 18:10:16 2014 +0200
+++ b/CMakeLists.txt	Wed Apr 30 18:30:05 2014 +0200
@@ -220,6 +220,9 @@
   OrthancServer/ServerIndex.cpp
   OrthancServer/ToDcmtkBridge.cpp
   OrthancServer/DatabaseWrapper.cpp
+  OrthancServer/Scheduler/ServerFilterInstance.cpp
+  OrthancServer/Scheduler/ServerJob.cpp
+  OrthancServer/Scheduler/ServerScheduler.cpp
   OrthancServer/ServerContext.cpp
   OrthancServer/ServerEnumerations.cpp
   OrthancServer/ServerToolbox.cpp
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/IServerFilter.h	Wed Apr 30 18:30:05 2014 +0200
@@ -0,0 +1,54 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include <list>
+#include <string>
+
+namespace Orthanc
+{
+  class IServerFilter
+  {
+  public:
+    typedef std::list<std::string>  ListOfStrings;
+
+    virtual ~IServerFilter()
+    {
+    }
+
+    virtual bool Apply(ListOfStrings& outputs,
+                       const ListOfStrings& inputs) = 0;
+
+    virtual bool SendOutputsToSink() const = 0;
+  };
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerFilterInstance.cpp	Wed Apr 30 18:30:05 2014 +0200
@@ -0,0 +1,82 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "ServerFilterInstance.h"
+
+#include "../../Core/OrthancException.h"
+
+namespace Orthanc
+{
+  bool ServerFilterInstance::Execute(IListener& listener)
+  {
+    ListOfStrings outputs;
+    if (!filter_->Apply(outputs, inputs_))
+    {
+      listener.SignalFailure(jobId_);
+      return true;
+    }
+
+    for (std::list<ServerFilterInstance*>::iterator
+           it = next_.begin(); it != next_.end(); it++)
+    {
+      for (ListOfStrings::const_iterator
+             output = outputs.begin(); output != outputs.end(); output++)
+      {
+        (*it)->AddInput(*output);
+      }
+    }
+
+    listener.SignalSuccess(jobId_);
+    return true;
+  }
+
+
+  ServerFilterInstance::ServerFilterInstance(IServerFilter *filter,
+                                             const std::string& jobId) : 
+    filter_(filter), 
+    jobId_(jobId)
+  {
+    if (filter_ == NULL)
+    {
+      throw OrthancException(ErrorCode_ParameterOutOfRange);
+    }
+  }
+
+
+  ServerFilterInstance::~ServerFilterInstance()
+  {
+    if (filter_ != NULL)
+    {
+      delete filter_;
+    }
+  }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerFilterInstance.h	Wed Apr 30 18:30:05 2014 +0200
@@ -0,0 +1,98 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "../../Core/IDynamicObject.h"
+#include "IServerFilter.h"
+
+namespace Orthanc
+{
+  class ServerFilterInstance : public IDynamicObject
+  {
+    friend class ServerScheduler;
+
+  public:
+    class IListener
+    {
+    public:
+      virtual ~IListener()
+      {
+      }
+
+      virtual void SignalSuccess(const std::string& jobId) = 0;
+
+      virtual void SignalFailure(const std::string& jobId) = 0;
+    };
+
+  private:
+    typedef IServerFilter::ListOfStrings  ListOfStrings;
+
+    IServerFilter *filter_;
+    std::string jobId_;
+    ListOfStrings inputs_;
+    std::list<ServerFilterInstance*> next_;
+
+    bool Execute(IListener& listener);
+
+  public:
+    ServerFilterInstance(IServerFilter *filter,
+                         const std::string& jobId);
+
+    virtual ~ServerFilterInstance();
+
+    const std::string& GetJobId() const
+    {
+      return jobId_;
+    }
+
+    void AddInput(const std::string& input)
+    {
+      inputs_.push_back(input);
+    }
+
+    void ConnectNext(ServerFilterInstance& filter)
+    {
+      next_.push_back(&filter);
+    }
+
+    const std::list<ServerFilterInstance*>& GetNextFilters() const
+    {
+      return next_;
+    }
+
+    IServerFilter& GetFilter() const
+    {
+      return *filter_;
+    }
+  };
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerJob.cpp	Wed Apr 30 18:30:05 2014 +0200
@@ -0,0 +1,126 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "ServerJob.h"
+
+#include "../../Core/OrthancException.h"
+#include "../../Core/Toolbox.h"
+#include "../../Core/Uuid.h"
+
+namespace Orthanc
+{
+  void ServerJob::CheckOrdering()
+  {
+    std::map<ServerFilterInstance*, unsigned int> index;
+
+    unsigned int count = 0;
+    for (std::list<ServerFilterInstance*>::const_iterator
+           it = filters_.begin(); it != filters_.end(); it++)
+    {
+      index[*it] = count++;
+    }
+
+    for (std::list<ServerFilterInstance*>::const_iterator
+           it = filters_.begin(); it != filters_.end(); it++)
+    {
+      const std::list<ServerFilterInstance*>& nextFilters = (*it)->GetNextFilters();
+
+      for (std::list<ServerFilterInstance*>::const_iterator
+             next = nextFilters.begin(); next != nextFilters.end(); next++)
+      {
+        if (index.find(*next) == index.end() ||
+            index[*next] <= index[*it])
+        {
+          // You must reorder your calls to "ServerJob::AddFilter"
+          throw OrthancException("Bad ordering of filters in a job");
+        }
+      }
+    }
+  }
+
+
+  size_t ServerJob::Submit(SharedMessageQueue& target,
+                           ServerFilterInstance::IListener& listener)
+  {
+    if (submitted_)
+    {
+      // This job has already been submitted
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+
+    CheckOrdering();
+
+    size_t size = filters_.size();
+
+    for (std::list<ServerFilterInstance*>::iterator 
+           it = filters_.begin(); it != filters_.end(); it++)
+    {
+      target.Enqueue(*it);
+    }
+
+    filters_.clear();
+    submitted_ = true;
+
+    return size;
+  }
+
+
+  ServerJob::ServerJob()
+  {
+    jobId_ = Toolbox::GenerateUuid();      
+    submitted_ = false;
+    description_ = "no description";
+  }
+
+
+  ServerJob::~ServerJob()
+  {
+    for (std::list<ServerFilterInstance*>::iterator
+           it = filters_.begin(); it != filters_.end(); it++)
+    {
+      delete *it;
+    }
+  }
+
+
+  ServerFilterInstance& ServerJob::AddFilter(IServerFilter* filter)
+  {
+    if (submitted_)
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+
+    filters_.push_back(new ServerFilterInstance(filter, jobId_));
+      
+    return *filters_.back();
+  }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerJob.h	Wed Apr 30 18:30:05 2014 +0200
@@ -0,0 +1,77 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "ServerFilterInstance.h"
+#include "../../Core/MultiThreading/SharedMessageQueue.h"
+
+namespace Orthanc
+{
+  class ServerJob
+  {
+    friend class ServerScheduler;
+
+  private:
+    std::list<ServerFilterInstance*> filters_;
+    std::string jobId_;
+    bool submitted_;
+    std::string description_;
+
+    void CheckOrdering();
+
+    size_t Submit(SharedMessageQueue& target,
+                  ServerFilterInstance::IListener& listener);
+
+  public:
+    ServerJob();
+
+    ~ServerJob();
+
+    const std::string& GetId() const
+    {
+      return jobId_;
+    }
+
+    void SetDescription(const char* description)
+    {
+      description_ = description;
+    }
+
+    const std::string& GetDescription() const
+    {
+      return description_;
+    }
+
+    ServerFilterInstance& AddFilter(IServerFilter* filter);
+  };
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerScheduler.cpp	Wed Apr 30 18:30:05 2014 +0200
@@ -0,0 +1,327 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "ServerScheduler.h"
+
+#include "../../Core/OrthancException.h"
+
+#include <glog/logging.h>
+
+namespace Orthanc
+{
+  namespace
+  {
+    // Anonymous namespace to avoid clashes between compilation modules
+    class Sink : public IServerFilter
+    {
+    private:
+      ListOfStrings& target_;
+
+    public:
+      Sink(ListOfStrings& target) : target_(target)
+      {
+      }
+
+      virtual bool SendOutputsToSink() const
+      {
+        return false;
+      }
+
+      virtual bool Apply(ListOfStrings& outputs,
+                         const ListOfStrings& inputs)
+      {
+        for (ListOfStrings::const_iterator 
+               it = inputs.begin(); it != inputs.end(); it++)
+        {
+          target_.push_back(*it);
+        }
+
+        return true;
+      }    
+    };
+  }
+
+
+  ServerScheduler::JobInfo& ServerScheduler::GetJobInfo(const std::string& jobId)
+  {
+    Jobs::iterator info = jobs_.find(jobId);
+
+    if (info == jobs_.end())
+    {
+      throw OrthancException(ErrorCode_InternalError);
+    }
+
+    return info->second;
+  }
+
+
+  void ServerScheduler::SignalSuccess(const std::string& jobId)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    JobInfo& info = GetJobInfo(jobId);
+    info.success_++;
+
+    assert(info.failures_ == 0);
+
+    if (info.success_ >= info.size_)
+    {
+      if (info.watched_)
+      {
+        watchedJobStatus_[jobId] = JobStatus_Success;
+        jobFinished_.notify_all();
+      }
+
+      LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
+      jobs_.erase(jobId);
+    }
+  }
+
+
+  void ServerScheduler::SignalFailure(const std::string& jobId)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    JobInfo& info = GetJobInfo(jobId);
+    info.failures_++;
+
+    if (info.success_ + info.failures_ >= info.size_)
+    {
+      if (info.watched_)
+      {
+        watchedJobStatus_[jobId] = JobStatus_Failure;
+        jobFinished_.notify_all();
+      }
+
+      LOG(ERROR) << "Job has failed (" << info.description_ << ")";
+      jobs_.erase(jobId);
+    }
+  }
+
+
+  void ServerScheduler::Worker(ServerScheduler* that)
+  {
+    static const int32_t TIMEOUT = 100;
+
+    while (!that->finish_)
+    {
+      std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
+      if (object.get() != NULL)
+      {
+        ServerFilterInstance& filter = dynamic_cast<ServerFilterInstance&>(*object);
+
+        // Skip the execution of this filter if its parent job has
+        // previously failed.
+        bool jobHasFailed;
+        {
+          boost::mutex::scoped_lock lock(that->mutex_);
+          JobInfo& info = that->GetJobInfo(filter.GetJobId());
+          jobHasFailed = (info.failures_ > 0 || info.cancel_); 
+        }
+
+        if (jobHasFailed)
+        {
+          that->SignalFailure(filter.GetJobId());
+        }
+        else
+        {
+          filter.Execute(*that);
+        }
+      }
+    }
+  }
+
+
+  void ServerScheduler::SubmitInternal(ServerJob& job,
+                                       bool watched)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    JobInfo info;
+    info.size_ = job.Submit(queue_, *this);
+    info.cancel_ = false;
+    info.success_ = 0;
+    info.failures_ = 0;
+    info.description_ = job.GetDescription();
+    info.watched_ = watched;
+
+    assert(info.size_ > 0);
+
+    if (watched)
+    {
+      watchedJobStatus_[job.GetId()] = JobStatus_Running;
+    }
+
+    jobs_[job.GetId()] = info;
+
+    LOG(INFO) << "New job submitted (" << job.description_ << ")";
+  }
+
+
+  ServerScheduler::ServerScheduler()
+  {
+    finish_ = false;
+    worker_ = boost::thread(Worker, this);
+  }
+
+
+  ServerScheduler::~ServerScheduler()
+  {
+    finish_ = true;
+    worker_.join();
+  }
+
+
+  void ServerScheduler::Submit(ServerJob& job)
+  {
+    if (job.filters_.empty())
+    {
+      return;
+    }
+
+    SubmitInternal(job, false);
+  }
+
+
+  bool ServerScheduler::SubmitAndWait(ListOfStrings& outputs,
+                                      ServerJob& job)
+  {
+    std::string jobId = job.GetId();
+
+    outputs.clear();
+
+    if (job.filters_.empty())
+    {
+      return true;
+    }
+
+    // Add a sink filter to collect all the results of the filters
+    // that have no next filter.
+    ServerFilterInstance& sink = job.AddFilter(new Sink(outputs));
+
+    for (std::list<ServerFilterInstance*>::iterator
+           it = job.filters_.begin(); it != job.filters_.end(); it++)
+    {
+      if ((*it) != &sink &&
+          (*it)->GetNextFilters().size() == 0 &&
+          (*it)->GetFilter().SendOutputsToSink())
+      {
+        (*it)->ConnectNext(sink);
+      }
+    }
+
+    // Submit the job
+    SubmitInternal(job, true);
+
+    // Wait for the job to complete (either success or failure)
+    JobStatus status;
+
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end());
+        
+      while (watchedJobStatus_[jobId] == JobStatus_Running)
+      {
+        jobFinished_.wait(lock);
+      }
+
+      status = watchedJobStatus_[jobId];
+      watchedJobStatus_.erase(jobId);
+    }
+
+    return (status == JobStatus_Success);
+  }
+
+
+  bool ServerScheduler::IsRunning(const std::string& jobId)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    return jobs_.find(jobId) != jobs_.end();
+  }
+
+
+  void ServerScheduler::Cancel(const std::string& jobId)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    Jobs::iterator job = jobs_.find(jobId);
+
+    if (job != jobs_.end())
+    {
+      job->second.cancel_ = true;
+      LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")";
+    }
+  }
+
+
+  float ServerScheduler::GetProgress(const std::string& jobId) 
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    Jobs::iterator job = jobs_.find(jobId);
+
+    if (job == jobs_.end() || 
+        job->second.size_ == 0  /* should never happen */)
+    {
+      // This job is not running
+      return 1;
+    }
+
+    if (job->second.failures_ != 0)
+    {
+      return 1;
+    }
+
+    if (job->second.size_ == 1)
+    {
+      return job->second.success_;
+    }
+
+    return (static_cast<float>(job->second.success_) / 
+            static_cast<float>(job->second.size_ - 1));
+  }
+
+
+  void ServerScheduler::GetListOfJobs(ListOfStrings& jobs)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    jobs.clear();
+
+    for (Jobs::const_iterator 
+           it = jobs_.begin(); it != jobs_.end(); it++)
+    {
+      jobs.push_back(it->first);
+    }
+  }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerScheduler.h	Wed Apr 30 18:30:05 2014 +0200
@@ -0,0 +1,115 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "ServerJob.h"
+
+namespace Orthanc
+{
+  class ServerScheduler : public ServerFilterInstance::IListener
+  {
+  private:
+    struct JobInfo
+    {
+      bool watched_;
+      bool cancel_;
+      size_t size_;
+      size_t success_;
+      size_t failures_;
+      std::string description_;
+    };
+
+    enum JobStatus
+    {
+      JobStatus_Running = 1,
+      JobStatus_Success = 2,
+      JobStatus_Failure = 3
+    };
+
+    typedef IServerFilter::ListOfStrings  ListOfStrings;
+    typedef std::map<std::string, JobInfo> Jobs;
+
+    boost::mutex mutex_;
+    boost::condition_variable jobFinished_;
+    Jobs jobs_;
+    SharedMessageQueue queue_;
+    bool finish_;
+    boost::thread worker_;
+    std::map<std::string, JobStatus> watchedJobStatus_;
+
+    JobInfo& GetJobInfo(const std::string& jobId);
+
+    virtual void SignalSuccess(const std::string& jobId);
+
+    virtual void SignalFailure(const std::string& jobId);
+
+    static void Worker(ServerScheduler* that);
+
+    void SubmitInternal(ServerJob& job,
+                        bool watched);
+
+  public:
+    ServerScheduler();
+
+    ~ServerScheduler();
+
+    void Submit(ServerJob& job);
+
+    bool SubmitAndWait(ListOfStrings& outputs,
+                       ServerJob& job);
+
+    bool IsRunning(const std::string& jobId);
+
+    void Cancel(const std::string& jobId);
+
+    // Returns a number between 0 and 1
+    float GetProgress(const std::string& jobId);
+
+    bool IsRunning(const ServerJob& job)
+    {
+      return IsRunning(job.GetId());
+    }
+
+    void Cancel(const ServerJob& job) 
+    {
+      Cancel(job.GetId());
+    }
+
+    float GetProgress(const ServerJob& job) 
+    {
+      return GetProgress(job.GetId());
+    }
+
+    void GetListOfJobs(ListOfStrings& jobs);
+  };
+}
--- a/UnitTestsSources/MultiThreading.cpp	Wed Apr 30 18:10:16 2014 +0200
+++ b/UnitTestsSources/MultiThreading.cpp	Wed Apr 30 18:30:05 2014 +0200
@@ -1,6 +1,8 @@
 #include "gtest/gtest.h"
 #include <glog/logging.h>
 
+#include "../OrthancServer/Scheduler/ServerScheduler.h"
+
 #include "../Core/OrthancException.h"
 #include "../Core/Toolbox.h"
 #include "../Core/MultiThreading/ArrayFilledByThreads.h"
@@ -242,567 +244,6 @@
 
 
 
-#include "../Core/ICommand.h"
-#include "../Core/Toolbox.h"
-#include "../Core/Uuid.h"
-#include "../Core/MultiThreading/SharedMessageQueue.h"
-#include <boost/lexical_cast.hpp>
-
-
-namespace Orthanc
-{
-  class IServerFilter
-  {
-  public:
-    typedef std::list<std::string>  ListOfStrings;
-
-    virtual ~IServerFilter()
-    {
-    }
-
-    virtual bool Apply(ListOfStrings& outputs,
-                       const ListOfStrings& inputs) = 0;
-
-    virtual bool SendOutputsToSink() const = 0;
-  };
-
-
-  class Sink : public IServerFilter
-  {
-  private:
-    ListOfStrings& target_;
-
-  public:
-    Sink(ListOfStrings& target) : target_(target)
-    {
-    }
-
-    virtual bool SendOutputsToSink() const
-    {
-      return false;
-    }
-
-    virtual bool Apply(ListOfStrings& outputs,
-                       const ListOfStrings& inputs)
-    {
-      for (ListOfStrings::const_iterator 
-             it = inputs.begin(); it != inputs.end(); it++)
-      {
-        target_.push_back(*it);
-      }
-
-      return true;
-    }    
-  };
-
-
-
-  class IServerFilterListener
-  {
-  public:
-    virtual ~IServerFilterListener()
-    {
-    }
-
-    virtual void SignalSuccess(const std::string& jobId) = 0;
-
-    virtual void SignalFailure(const std::string& jobId) = 0;
-  };
-
-
-  class ServerFilterInstance : public IDynamicObject
-  {
-    friend class ServerScheduler;
-
-  private:
-    typedef IServerFilter::ListOfStrings  ListOfStrings;
-
-    IServerFilter *filter_;
-    std::string jobId_;
-    ListOfStrings inputs_;
-    std::list<ServerFilterInstance*> next_;
-
-    bool Execute(IServerFilterListener& listener)
-    {
-      ListOfStrings outputs;
-      if (!filter_->Apply(outputs, inputs_))
-      {
-        listener.SignalFailure(jobId_);
-        return true;
-      }
-
-      for (std::list<ServerFilterInstance*>::iterator
-             it = next_.begin(); it != next_.end(); it++)
-      {
-        for (ListOfStrings::const_iterator
-               output = outputs.begin(); output != outputs.end(); output++)
-        {
-          (*it)->AddInput(*output);
-        }
-      }
-
-      listener.SignalSuccess(jobId_);
-      return true;
-    }
-
-
-  public:
-    ServerFilterInstance(IServerFilter *filter,
-                         const std::string& jobId) : 
-      filter_(filter), 
-      jobId_(jobId)
-    {
-      if (filter_ == NULL)
-      {
-        throw OrthancException(ErrorCode_ParameterOutOfRange);
-      }
-    }
-
-    virtual ~ServerFilterInstance()
-    {
-      if (filter_ != NULL)
-      {
-        delete filter_;
-      }
-    }
-
-    const std::string& GetJobId() const
-    {
-      return jobId_;
-    }
-
-    void AddInput(const std::string& input)
-    {
-      inputs_.push_back(input);
-    }
-
-    void ConnectNext(ServerFilterInstance& filter)
-    {
-      next_.push_back(&filter);
-    }
-
-    const std::list<ServerFilterInstance*>& GetNextFilters() const
-    {
-      return next_;
-    }
-
-    IServerFilter& GetFilter() const
-    {
-      return *filter_;
-    }
-  };
-
-
-  class ServerJob
-  {
-    friend class ServerScheduler;
-
-  private:
-    std::list<ServerFilterInstance*> filters_;
-    std::string jobId_;
-    bool submitted_;
-    std::string description_;
-
-    
-    void CheckOrdering()
-    {
-      std::map<ServerFilterInstance*, unsigned int> index;
-
-      unsigned int count = 0;
-      for (std::list<ServerFilterInstance*>::const_iterator
-             it = filters_.begin(); it != filters_.end(); it++)
-      {
-        index[*it] = count++;
-      }
-
-      for (std::list<ServerFilterInstance*>::const_iterator
-             it = filters_.begin(); it != filters_.end(); it++)
-      {
-        const std::list<ServerFilterInstance*>& nextFilters = (*it)->GetNextFilters();
-
-        for (std::list<ServerFilterInstance*>::const_iterator
-               next = nextFilters.begin(); next != nextFilters.end(); next++)
-        {
-          if (index.find(*next) == index.end() ||
-              index[*next] <= index[*it])
-          {
-            // You must reorder your calls to "ServerJob::AddFilter"
-            throw OrthancException("Bad ordering of filters in a job");
-          }
-        }
-      }
-    }
-
-
-    size_t Submit(SharedMessageQueue& target,
-                  IServerFilterListener& listener)
-    {
-      if (submitted_)
-      {
-        // This job has already been submitted
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-
-      CheckOrdering();
-
-      size_t size = filters_.size();
-
-      for (std::list<ServerFilterInstance*>::iterator 
-             it = filters_.begin(); it != filters_.end(); it++)
-      {
-        target.Enqueue(*it);
-      }
-
-      filters_.clear();
-      submitted_ = true;
-
-      return size;
-    }
-
-  public:
-    ServerJob()
-    {
-      jobId_ = Toolbox::GenerateUuid();      
-      submitted_ = false;
-      description_ = "no description";
-    }
-
-    ~ServerJob()
-    {
-      for (std::list<ServerFilterInstance*>::iterator
-             it = filters_.begin(); it != filters_.end(); it++)
-      {
-        delete *it;
-      }
-    }
-
-    const std::string& GetId() const
-    {
-      return jobId_;
-    }
-
-    void SetDescription(const char* description)
-    {
-      description_ = description;
-    }
-
-    const std::string& GetDescription() const
-    {
-      return description_;
-    }
-
-    ServerFilterInstance& AddFilter(IServerFilter* filter)
-    {
-      if (submitted_)
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-
-      filters_.push_back(new ServerFilterInstance(filter, jobId_));
-      
-      return *filters_.back();
-    }
-  };
-
-
-  class ServerScheduler : public IServerFilterListener
-  {
-  private:
-    struct JobInfo
-    {
-      bool watched_;
-      bool cancel_;
-      size_t size_;
-      size_t success_;
-      size_t failures_;
-      std::string description_;
-    };
-
-    enum JobStatus
-    {
-      JobStatus_Running = 1,
-      JobStatus_Success = 2,
-      JobStatus_Failure = 3
-    };
-
-    typedef IServerFilter::ListOfStrings  ListOfStrings;
-    typedef std::map<std::string, JobInfo> Jobs;
-
-    boost::mutex mutex_;
-    boost::condition_variable jobFinished_;
-    Jobs jobs_;
-    SharedMessageQueue queue_;
-    bool finish_;
-    boost::thread worker_;
-    std::map<std::string, JobStatus> watchedJobStatus_;
-
-    JobInfo& GetJobInfo(const std::string& jobId)
-    {
-      Jobs::iterator info = jobs_.find(jobId);
-
-      if (info == jobs_.end())
-      {
-        throw OrthancException(ErrorCode_InternalError);
-      }
-
-      return info->second;
-    }
-
-    virtual void SignalSuccess(const std::string& jobId)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      JobInfo& info = GetJobInfo(jobId);
-      info.success_++;
-
-      assert(info.failures_ == 0);
-
-      if (info.success_ >= info.size_)
-      {
-        if (info.watched_)
-        {
-          watchedJobStatus_[jobId] = JobStatus_Success;
-          jobFinished_.notify_all();
-        }
-
-        LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
-        jobs_.erase(jobId);
-      }
-    }
-
-    virtual void SignalFailure(const std::string& jobId)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      JobInfo& info = GetJobInfo(jobId);
-      info.failures_++;
-
-      if (info.success_ + info.failures_ >= info.size_)
-      {
-        if (info.watched_)
-        {
-          watchedJobStatus_[jobId] = JobStatus_Failure;
-          jobFinished_.notify_all();
-        }
-
-        LOG(ERROR) << "Job has failed (" << info.description_ << ")";
-        jobs_.erase(jobId);
-      }
-    }
-
-    static void Worker(ServerScheduler* that)
-    {
-      static const int32_t TIMEOUT = 100;
-
-      while (!that->finish_)
-      {
-        std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
-        if (object.get() != NULL)
-        {
-          ServerFilterInstance& filter = dynamic_cast<ServerFilterInstance&>(*object);
-
-          // Skip the execution of this filter if its parent job has
-          // previously failed.
-          bool jobHasFailed;
-          {
-            boost::mutex::scoped_lock lock(that->mutex_);
-            JobInfo& info = that->GetJobInfo(filter.GetJobId());
-            jobHasFailed = (info.failures_ > 0 || info.cancel_); 
-          }
-
-          if (jobHasFailed)
-          {
-            that->SignalFailure(filter.GetJobId());
-          }
-          else
-          {
-            filter.Execute(*that);
-          }
-        }
-      }
-    }
-
-    void SubmitInternal(ServerJob& job,
-                        bool watched)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      JobInfo info;
-      info.size_ = job.Submit(queue_, *this);
-      info.cancel_ = false;
-      info.success_ = 0;
-      info.failures_ = 0;
-      info.description_ = job.GetDescription();
-      info.watched_ = watched;
-
-      assert(info.size_ > 0);
-
-      if (watched)
-      {
-        watchedJobStatus_[job.GetId()] = JobStatus_Running;
-      }
-
-      jobs_[job.GetId()] = info;
-
-      LOG(INFO) << "New job submitted (" << job.description_ << ")";
-    }
-
-  public:
-    ServerScheduler()
-    {
-      finish_ = false;
-      worker_ = boost::thread(Worker, this);
-    }
-
-    ~ServerScheduler()
-    {
-      finish_ = true;
-      worker_.join();
-    }
-
-    void Submit(ServerJob& job)
-    {
-      if (job.filters_.empty())
-      {
-        return;
-      }
-
-      SubmitInternal(job, false);
-    }
-
-    bool SubmitAndWait(ListOfStrings& outputs,
-                       ServerJob& job)
-    {
-      std::string jobId = job.GetId();
-
-      outputs.clear();
-
-      if (job.filters_.empty())
-      {
-        return true;
-      }
-
-      // Add a sink filter to collect all the results of the filters
-      // that have no next filter.
-      ServerFilterInstance& sink = job.AddFilter(new Sink(outputs));
-
-      for (std::list<ServerFilterInstance*>::iterator
-             it = job.filters_.begin(); it != job.filters_.end(); it++)
-      {
-        if ((*it) != &sink &&
-            (*it)->GetNextFilters().size() == 0 &&
-            (*it)->GetFilter().SendOutputsToSink())
-        {
-          (*it)->ConnectNext(sink);
-        }
-      }
-
-      // Submit the job
-      SubmitInternal(job, true);
-
-      // Wait for the job to complete (either success or failure)
-      JobStatus status;
-
-      {
-        boost::mutex::scoped_lock lock(mutex_);
-
-        assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end());
-        
-        while (watchedJobStatus_[jobId] == JobStatus_Running)
-        {
-          jobFinished_.wait(lock);
-        }
-
-        status = watchedJobStatus_[jobId];
-        watchedJobStatus_.erase(jobId);
-      }
-
-      return (status == JobStatus_Success);
-    }
-
-
-    bool IsRunning(const std::string& jobId)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      return jobs_.find(jobId) != jobs_.end();
-    }
-
-
-    void Cancel(const std::string& jobId)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      Jobs::iterator job = jobs_.find(jobId);
-
-      if (job != jobs_.end())
-      {
-        job->second.cancel_ = true;
-        LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")";
-      }
-    }
-
-
-    // Returns a number between 0 and 1
-    float GetProgress(const std::string& jobId) 
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      Jobs::iterator job = jobs_.find(jobId);
-
-      if (job == jobs_.end() || 
-          job->second.size_ == 0  /* should never happen */)
-      {
-        // This job is not running
-        return 1;
-      }
-
-      if (job->second.failures_ != 0)
-      {
-        return 1;
-      }
-
-      if (job->second.size_ == 1)
-      {
-        return job->second.success_;
-      }
-
-      return (static_cast<float>(job->second.success_) / 
-              static_cast<float>(job->second.size_ - 1));
-    }
-
-    bool IsRunning(const ServerJob& job)
-    {
-      return IsRunning(job.GetId());
-    }
-
-    void Cancel(const ServerJob& job) 
-    {
-      Cancel(job.GetId());
-    }
-
-    float GetProgress(const ServerJob& job) 
-    {
-      return GetProgress(job);
-    }
-
-    void GetListOfJobs(ListOfStrings& jobs)
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      jobs.clear();
-
-      for (Jobs::const_iterator 
-             it = jobs_.begin(); it != jobs_.end(); it++)
-      {
-        jobs.push_back(it->first);
-      }
-    }
-  };
-
-}
-
-
-
 class Tutu : public IServerFilter
 {
 private: