changeset 994:b3d4f8a30324 lua-scripting

integration mainline->lua-scripting
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 02 Jul 2014 14:42:49 +0200
parents a91e7b4080d1 (diff) 2f76b92addd4 (current diff)
children 8c67382f44a7
files CMakeLists.txt UnitTestsSources/DicomMap.cpp UnitTestsSources/FileStorage.cpp UnitTestsSources/FromDcmtk.cpp UnitTestsSources/JpegLossless.cpp UnitTestsSources/Lua.cpp UnitTestsSources/MemoryCache.cpp UnitTestsSources/MultiThreading.cpp UnitTestsSources/MultiThreadingTests.cpp UnitTestsSources/Png.cpp UnitTestsSources/RestApi.cpp UnitTestsSources/SQLite.cpp UnitTestsSources/SQLiteChromium.cpp UnitTestsSources/Versions.cpp UnitTestsSources/Zip.cpp
diffstat 10 files changed, 1003 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/CMakeLists.txt	Wed Jul 02 11:56:08 2014 +0200
+++ b/CMakeLists.txt	Wed Jul 02 14:42:49 2014 +0200
@@ -151,6 +151,11 @@
   OrthancServer/ServerToolbox.cpp
   OrthancServer/OrthancFindRequestHandler.cpp
   OrthancServer/OrthancMoveRequestHandler.cpp
+
+  # From "lua-scripting" branch
+  OrthancServer/Scheduler/ServerFilterInstance.cpp
+  OrthancServer/Scheduler/ServerJob.cpp
+  OrthancServer/Scheduler/ServerScheduler.cpp
   )
 
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/IServerFilter.h	Wed Jul 02 14:42:49 2014 +0200
@@ -0,0 +1,54 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include <list>
+#include <string>
+
+namespace Orthanc
+{
+  class IServerFilter
+  {
+  public:
+    typedef std::list<std::string>  ListOfStrings;
+
+    virtual ~IServerFilter()
+    {
+    }
+
+    virtual bool Apply(ListOfStrings& outputs,
+                       const ListOfStrings& inputs) = 0;
+
+    virtual bool SendOutputsToSink() const = 0;
+  };
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerFilterInstance.cpp	Wed Jul 02 14:42:49 2014 +0200
@@ -0,0 +1,82 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "ServerFilterInstance.h"
+
+#include "../../Core/OrthancException.h"
+
+namespace Orthanc
+{
+  bool ServerFilterInstance::Execute(IListener& listener)
+  {
+    ListOfStrings outputs;
+    if (!filter_->Apply(outputs, inputs_))
+    {
+      listener.SignalFailure(jobId_);
+      return true;
+    }
+
+    for (std::list<ServerFilterInstance*>::iterator
+           it = next_.begin(); it != next_.end(); it++)
+    {
+      for (ListOfStrings::const_iterator
+             output = outputs.begin(); output != outputs.end(); output++)
+      {
+        (*it)->AddInput(*output);
+      }
+    }
+
+    listener.SignalSuccess(jobId_);
+    return true;
+  }
+
+
+  ServerFilterInstance::ServerFilterInstance(IServerFilter *filter,
+                                             const std::string& jobId) : 
+    filter_(filter), 
+    jobId_(jobId)
+  {
+    if (filter_ == NULL)
+    {
+      throw OrthancException(ErrorCode_ParameterOutOfRange);
+    }
+  }
+
+
+  ServerFilterInstance::~ServerFilterInstance()
+  {
+    if (filter_ != NULL)
+    {
+      delete filter_;
+    }
+  }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerFilterInstance.h	Wed Jul 02 14:42:49 2014 +0200
@@ -0,0 +1,98 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "../../Core/IDynamicObject.h"
+#include "IServerFilter.h"
+
+namespace Orthanc
+{
+  class ServerFilterInstance : public IDynamicObject
+  {
+    friend class ServerScheduler;
+
+  public:
+    class IListener
+    {
+    public:
+      virtual ~IListener()
+      {
+      }
+
+      virtual void SignalSuccess(const std::string& jobId) = 0;
+
+      virtual void SignalFailure(const std::string& jobId) = 0;
+    };
+
+  private:
+    typedef IServerFilter::ListOfStrings  ListOfStrings;
+
+    IServerFilter *filter_;
+    std::string jobId_;
+    ListOfStrings inputs_;
+    std::list<ServerFilterInstance*> next_;
+
+    bool Execute(IListener& listener);
+
+  public:
+    ServerFilterInstance(IServerFilter *filter,
+                         const std::string& jobId);
+
+    virtual ~ServerFilterInstance();
+
+    const std::string& GetJobId() const
+    {
+      return jobId_;
+    }
+
+    void AddInput(const std::string& input)
+    {
+      inputs_.push_back(input);
+    }
+
+    void ConnectNext(ServerFilterInstance& filter)
+    {
+      next_.push_back(&filter);
+    }
+
+    const std::list<ServerFilterInstance*>& GetNextFilters() const
+    {
+      return next_;
+    }
+
+    IServerFilter& GetFilter() const
+    {
+      return *filter_;
+    }
+  };
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerJob.cpp	Wed Jul 02 14:42:49 2014 +0200
@@ -0,0 +1,126 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "ServerJob.h"
+
+#include "../../Core/OrthancException.h"
+#include "../../Core/Toolbox.h"
+#include "../../Core/Uuid.h"
+
+namespace Orthanc
+{
+  void ServerJob::CheckOrdering()
+  {
+    std::map<ServerFilterInstance*, unsigned int> index;
+
+    unsigned int count = 0;
+    for (std::list<ServerFilterInstance*>::const_iterator
+           it = filters_.begin(); it != filters_.end(); it++)
+    {
+      index[*it] = count++;
+    }
+
+    for (std::list<ServerFilterInstance*>::const_iterator
+           it = filters_.begin(); it != filters_.end(); it++)
+    {
+      const std::list<ServerFilterInstance*>& nextFilters = (*it)->GetNextFilters();
+
+      for (std::list<ServerFilterInstance*>::const_iterator
+             next = nextFilters.begin(); next != nextFilters.end(); next++)
+      {
+        if (index.find(*next) == index.end() ||
+            index[*next] <= index[*it])
+        {
+          // You must reorder your calls to "ServerJob::AddFilter"
+          throw OrthancException("Bad ordering of filters in a job");
+        }
+      }
+    }
+  }
+
+
+  size_t ServerJob::Submit(SharedMessageQueue& target,
+                           ServerFilterInstance::IListener& listener)
+  {
+    if (submitted_)
+    {
+      // This job has already been submitted
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+
+    CheckOrdering();
+
+    size_t size = filters_.size();
+
+    for (std::list<ServerFilterInstance*>::iterator 
+           it = filters_.begin(); it != filters_.end(); it++)
+    {
+      target.Enqueue(*it);
+    }
+
+    filters_.clear();
+    submitted_ = true;
+
+    return size;
+  }
+
+
+  ServerJob::ServerJob()
+  {
+    jobId_ = Toolbox::GenerateUuid();      
+    submitted_ = false;
+    description_ = "no description";
+  }
+
+
+  ServerJob::~ServerJob()
+  {
+    for (std::list<ServerFilterInstance*>::iterator
+           it = filters_.begin(); it != filters_.end(); it++)
+    {
+      delete *it;
+    }
+  }
+
+
+  ServerFilterInstance& ServerJob::AddFilter(IServerFilter* filter)
+  {
+    if (submitted_)
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+
+    filters_.push_back(new ServerFilterInstance(filter, jobId_));
+      
+    return *filters_.back();
+  }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerJob.h	Wed Jul 02 14:42:49 2014 +0200
@@ -0,0 +1,77 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "ServerFilterInstance.h"
+#include "../../Core/MultiThreading/SharedMessageQueue.h"
+
+namespace Orthanc
+{
+  class ServerJob
+  {
+    friend class ServerScheduler;
+
+  private:
+    std::list<ServerFilterInstance*> filters_;
+    std::string jobId_;
+    bool submitted_;
+    std::string description_;
+
+    void CheckOrdering();
+
+    size_t Submit(SharedMessageQueue& target,
+                  ServerFilterInstance::IListener& listener);
+
+  public:
+    ServerJob();
+
+    ~ServerJob();
+
+    const std::string& GetId() const
+    {
+      return jobId_;
+    }
+
+    void SetDescription(const char* description)
+    {
+      description_ = description;
+    }
+
+    const std::string& GetDescription() const
+    {
+      return description_;
+    }
+
+    ServerFilterInstance& AddFilter(IServerFilter* filter);
+  };
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerScheduler.cpp	Wed Jul 02 14:42:49 2014 +0200
@@ -0,0 +1,329 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "ServerScheduler.h"
+
+#include "../../Core/OrthancException.h"
+
+#include <glog/logging.h>
+
+namespace Orthanc
+{
+  namespace
+  {
+    // Anonymous namespace to avoid clashes between compilation modules
+    class Sink : public IServerFilter
+    {
+    private:
+      ListOfStrings& target_;
+
+    public:
+      Sink(ListOfStrings& target) : target_(target)
+      {
+      }
+
+      virtual bool SendOutputsToSink() const
+      {
+        return false;
+      }
+
+      virtual bool Apply(ListOfStrings& outputs,
+                         const ListOfStrings& inputs)
+      {
+        for (ListOfStrings::const_iterator 
+               it = inputs.begin(); it != inputs.end(); it++)
+        {
+          target_.push_back(*it);
+        }
+
+        return true;
+      }    
+    };
+  }
+
+
+  ServerScheduler::JobInfo& ServerScheduler::GetJobInfo(const std::string& jobId)
+  {
+    Jobs::iterator info = jobs_.find(jobId);
+
+    if (info == jobs_.end())
+    {
+      throw OrthancException(ErrorCode_InternalError);
+    }
+
+    return info->second;
+  }
+
+
+  void ServerScheduler::SignalSuccess(const std::string& jobId)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    JobInfo& info = GetJobInfo(jobId);
+    info.success_++;
+
+    assert(info.failures_ == 0);
+
+    if (info.success_ >= info.size_)
+    {
+      if (info.watched_)
+      {
+        watchedJobStatus_[jobId] = JobStatus_Success;
+        jobFinished_.notify_all();
+      }
+
+      LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
+      jobs_.erase(jobId);
+    }
+  }
+
+
+  void ServerScheduler::SignalFailure(const std::string& jobId)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    JobInfo& info = GetJobInfo(jobId);
+    info.failures_++;
+
+    if (info.success_ + info.failures_ >= info.size_)
+    {
+      if (info.watched_)
+      {
+        watchedJobStatus_[jobId] = JobStatus_Failure;
+        jobFinished_.notify_all();
+      }
+
+      LOG(ERROR) << "Job has failed (" << info.description_ << ")";
+      jobs_.erase(jobId);
+    }
+  }
+
+
+  void ServerScheduler::Worker(ServerScheduler* that)
+  {
+    static const int32_t TIMEOUT = 100;
+
+    LOG(WARNING) << "The server scheduler has started";
+
+    while (!that->finish_)
+    {
+      std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
+      if (object.get() != NULL)
+      {
+        ServerFilterInstance& filter = dynamic_cast<ServerFilterInstance&>(*object);
+
+        // Skip the execution of this filter if its parent job has
+        // previously failed.
+        bool jobHasFailed;
+        {
+          boost::mutex::scoped_lock lock(that->mutex_);
+          JobInfo& info = that->GetJobInfo(filter.GetJobId());
+          jobHasFailed = (info.failures_ > 0 || info.cancel_); 
+        }
+
+        if (jobHasFailed)
+        {
+          that->SignalFailure(filter.GetJobId());
+        }
+        else
+        {
+          filter.Execute(*that);
+        }
+      }
+    }
+  }
+
+
+  void ServerScheduler::SubmitInternal(ServerJob& job,
+                                       bool watched)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    JobInfo info;
+    info.size_ = job.Submit(queue_, *this);
+    info.cancel_ = false;
+    info.success_ = 0;
+    info.failures_ = 0;
+    info.description_ = job.GetDescription();
+    info.watched_ = watched;
+
+    assert(info.size_ > 0);
+
+    if (watched)
+    {
+      watchedJobStatus_[job.GetId()] = JobStatus_Running;
+    }
+
+    jobs_[job.GetId()] = info;
+
+    LOG(INFO) << "New job submitted (" << job.description_ << ")";
+  }
+
+
+  ServerScheduler::ServerScheduler()
+  {
+    finish_ = false;
+    worker_ = boost::thread(Worker, this);
+  }
+
+
+  ServerScheduler::~ServerScheduler()
+  {
+    finish_ = true;
+    worker_.join();
+  }
+
+
+  void ServerScheduler::Submit(ServerJob& job)
+  {
+    if (job.filters_.empty())
+    {
+      return;
+    }
+
+    SubmitInternal(job, false);
+  }
+
+
+  bool ServerScheduler::SubmitAndWait(ListOfStrings& outputs,
+                                      ServerJob& job)
+  {
+    std::string jobId = job.GetId();
+
+    outputs.clear();
+
+    if (job.filters_.empty())
+    {
+      return true;
+    }
+
+    // Add a sink filter to collect all the results of the filters
+    // that have no next filter.
+    ServerFilterInstance& sink = job.AddFilter(new Sink(outputs));
+
+    for (std::list<ServerFilterInstance*>::iterator
+           it = job.filters_.begin(); it != job.filters_.end(); it++)
+    {
+      if ((*it) != &sink &&
+          (*it)->GetNextFilters().size() == 0 &&
+          (*it)->GetFilter().SendOutputsToSink())
+      {
+        (*it)->ConnectNext(sink);
+      }
+    }
+
+    // Submit the job
+    SubmitInternal(job, true);
+
+    // Wait for the job to complete (either success or failure)
+    JobStatus status;
+
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end());
+        
+      while (watchedJobStatus_[jobId] == JobStatus_Running)
+      {
+        jobFinished_.wait(lock);
+      }
+
+      status = watchedJobStatus_[jobId];
+      watchedJobStatus_.erase(jobId);
+    }
+
+    return (status == JobStatus_Success);
+  }
+
+
+  bool ServerScheduler::IsRunning(const std::string& jobId)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    return jobs_.find(jobId) != jobs_.end();
+  }
+
+
+  void ServerScheduler::Cancel(const std::string& jobId)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    Jobs::iterator job = jobs_.find(jobId);
+
+    if (job != jobs_.end())
+    {
+      job->second.cancel_ = true;
+      LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")";
+    }
+  }
+
+
+  float ServerScheduler::GetProgress(const std::string& jobId) 
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    Jobs::iterator job = jobs_.find(jobId);
+
+    if (job == jobs_.end() || 
+        job->second.size_ == 0  /* should never happen */)
+    {
+      // This job is not running
+      return 1;
+    }
+
+    if (job->second.failures_ != 0)
+    {
+      return 1;
+    }
+
+    if (job->second.size_ == 1)
+    {
+      return job->second.success_;
+    }
+
+    return (static_cast<float>(job->second.success_) / 
+            static_cast<float>(job->second.size_ - 1));
+  }
+
+
+  void ServerScheduler::GetListOfJobs(ListOfStrings& jobs)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    jobs.clear();
+
+    for (Jobs::const_iterator 
+           it = jobs_.begin(); it != jobs_.end(); it++)
+    {
+      jobs.push_back(it->first);
+    }
+  }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Scheduler/ServerScheduler.h	Wed Jul 02 14:42:49 2014 +0200
@@ -0,0 +1,115 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "ServerJob.h"
+
+namespace Orthanc
+{
+  class ServerScheduler : public ServerFilterInstance::IListener
+  {
+  private:
+    struct JobInfo
+    {
+      bool watched_;
+      bool cancel_;
+      size_t size_;
+      size_t success_;
+      size_t failures_;
+      std::string description_;
+    };
+
+    enum JobStatus
+    {
+      JobStatus_Running = 1,
+      JobStatus_Success = 2,
+      JobStatus_Failure = 3
+    };
+
+    typedef IServerFilter::ListOfStrings  ListOfStrings;
+    typedef std::map<std::string, JobInfo> Jobs;
+
+    boost::mutex mutex_;
+    boost::condition_variable jobFinished_;
+    Jobs jobs_;
+    SharedMessageQueue queue_;
+    bool finish_;
+    boost::thread worker_;
+    std::map<std::string, JobStatus> watchedJobStatus_;
+
+    JobInfo& GetJobInfo(const std::string& jobId);
+
+    virtual void SignalSuccess(const std::string& jobId);
+
+    virtual void SignalFailure(const std::string& jobId);
+
+    static void Worker(ServerScheduler* that);
+
+    void SubmitInternal(ServerJob& job,
+                        bool watched);
+
+  public:
+    ServerScheduler();
+
+    ~ServerScheduler();
+
+    void Submit(ServerJob& job);
+
+    bool SubmitAndWait(ListOfStrings& outputs,
+                       ServerJob& job);
+
+    bool IsRunning(const std::string& jobId);
+
+    void Cancel(const std::string& jobId);
+
+    // Returns a number between 0 and 1
+    float GetProgress(const std::string& jobId);
+
+    bool IsRunning(const ServerJob& job)
+    {
+      return IsRunning(job.GetId());
+    }
+
+    void Cancel(const ServerJob& job) 
+    {
+      Cancel(job.GetId());
+    }
+
+    float GetProgress(const ServerJob& job) 
+    {
+      return GetProgress(job.GetId());
+    }
+
+    void GetListOfJobs(ListOfStrings& jobs);
+  };
+}
--- a/OrthancServer/ServerContext.h	Wed Jul 02 11:56:08 2014 +0200
+++ b/OrthancServer/ServerContext.h	Wed Jul 02 14:42:49 2014 +0200
@@ -40,6 +40,7 @@
 #include "ServerIndex.h"
 #include "ParsedDicomFile.h"
 #include "DicomProtocol/ReusableDicomUserConnection.h"
+#include "Scheduler/ServerScheduler.h"
 
 namespace Orthanc
 {
@@ -73,6 +74,7 @@
     boost::mutex dicomCacheMutex_;
     MemoryCache dicomCache_;
     ReusableDicomUserConnection scu_;
+    ServerScheduler scheduler_;
 
     LuaContext lua_;
 
@@ -167,5 +169,10 @@
     {
       return scu_;
     }
+
+    ServerScheduler& GetScheduler()
+    {
+      return scheduler_;
+    }
   };
 }
--- a/UnitTestsSources/MultiThreadingTests.cpp	Wed Jul 02 11:56:08 2014 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Wed Jul 02 14:42:49 2014 +0200
@@ -32,8 +32,9 @@
 
 #include "PrecompiledHeadersUnitTests.h"
 #include "gtest/gtest.h"
+#include <glog/logging.h>
 
-#include <glog/logging.h>
+#include "../OrthancServer/Scheduler/ServerScheduler.h"
 
 #include "../Core/OrthancException.h"
 #include "../Core/Toolbox.h"
@@ -248,8 +249,6 @@
 
 
 
-
-
 #include "../OrthancServer/DicomProtocol/ReusableDicomUserConnection.h"
 
 TEST(ReusableDicomUserConnection, DISABLED_Basic)
@@ -257,6 +256,7 @@
   ReusableDicomUserConnection c;
   c.SetMillisecondsBeforeClose(200);
   printf("START\n"); fflush(stdout);
+
   {
     ReusableDicomUserConnection::Locker lock(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic);
     lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676281");
@@ -274,3 +274,110 @@
   Toolbox::ServerBarrier();
   printf("DONE\n"); fflush(stdout);
 }
+
+
+
+class Tutu : public IServerFilter
+{
+private:
+  int factor_;
+
+public:
+  Tutu(int f) : factor_(f)
+  {
+  }
+
+  virtual bool Apply(ListOfStrings& outputs,
+                     const ListOfStrings& inputs)
+  {
+    for (ListOfStrings::const_iterator 
+           it = inputs.begin(); it != inputs.end(); it++)
+    {
+      int a = boost::lexical_cast<int>(*it);
+      int b = factor_ * a;
+
+      printf("%d * %d = %d\n", a, factor_, b);
+
+      //if (a == 84) { printf("BREAK\n"); return false; }
+
+      outputs.push_back(boost::lexical_cast<std::string>(b));
+    }
+
+    Toolbox::USleep(1000000);
+
+    return true;
+  }
+
+  virtual bool SendOutputsToSink() const
+  {
+    return true;
+  }
+};
+
+
+static void Tata(ServerScheduler* s, ServerJob* j, bool* done)
+{
+  typedef IServerFilter::ListOfStrings  ListOfStrings;
+
+#if 1
+  while (!(*done))
+  {
+    ListOfStrings l;
+    s->GetListOfJobs(l);
+    for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
+      printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i));
+    Toolbox::USleep(100000);
+  }
+#else
+  ListOfStrings l;
+  s->GetListOfJobs(l);
+  for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
+    printf(">> %s\n", i->c_str());
+  Toolbox::USleep(1500000);
+  s->Cancel(*j);
+  Toolbox::USleep(1000000);
+  s->GetListOfJobs(l);
+  for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
+    printf(">> %s\n", i->c_str());
+#endif
+}
+
+
+TEST(Toto, Toto)
+{
+  ServerScheduler scheduler;
+
+  ServerJob job;
+  ServerFilterInstance& f2 = job.AddFilter(new Tutu(2));
+  ServerFilterInstance& f3 = job.AddFilter(new Tutu(3));
+  ServerFilterInstance& f4 = job.AddFilter(new Tutu(4));
+  ServerFilterInstance& f5 = job.AddFilter(new Tutu(5));
+  f2.AddInput(boost::lexical_cast<std::string>(42));
+  //f3.AddInput(boost::lexical_cast<std::string>(42));
+  //f4.AddInput(boost::lexical_cast<std::string>(42));
+  f2.ConnectNext(f3);
+  f3.ConnectNext(f4);
+  f4.ConnectNext(f5);
+
+  job.SetDescription("tutu");
+
+  bool done = false;
+  boost::thread t(Tata, &scheduler, &job, &done);
+
+
+  //scheduler.Submit(job);
+
+  IServerFilter::ListOfStrings l;
+  scheduler.SubmitAndWait(l, job);
+
+  for (IServerFilter::ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
+  {
+    printf("** %s\n", i->c_str());
+  }
+
+  //Toolbox::ServerBarrier();
+  //Toolbox::USleep(3000000);
+
+  done = true;
+  t.join();
+}