changeset 995:8c67382f44a7 lua-scripting

limit number of jobs in the scheduler
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 03 Jul 2014 15:58:53 +0200
parents b3d4f8a30324
children cf52f3bcb2b3
files CMakeLists.txt Core/MultiThreading/Semaphore.cpp Core/MultiThreading/Semaphore.h Core/MultiThreading/SharedMessageQueue.h OrthancServer/Scheduler/ServerScheduler.cpp OrthancServer/Scheduler/ServerScheduler.h OrthancServer/ServerContext.cpp Resources/Configuration.json UnitTestsSources/MultiThreadingTests.cpp
diffstat 9 files changed, 149 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/CMakeLists.txt	Wed Jul 02 14:42:49 2014 +0200
+++ b/CMakeLists.txt	Thu Jul 03 15:58:53 2014 +0200
@@ -94,6 +94,7 @@
   Core/MultiThreading/BagOfRunnablesBySteps.cpp
   Core/MultiThreading/Mutex.cpp
   Core/MultiThreading/ReaderWriterLock.cpp
+  Core/MultiThreading/Semaphore.cpp
   Core/MultiThreading/SharedMessageQueue.cpp
   Core/MultiThreading/ThreadedCommandProcessor.cpp
   Core/ImageFormats/ImageAccessor.cpp
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Core/MultiThreading/Semaphore.cpp	Thu Jul 03 15:58:53 2014 +0200
@@ -0,0 +1,67 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "Semaphore.h"
+
+#include "../OrthancException.h"
+
+
+namespace Orthanc
+{
+  Semaphore::Semaphore(unsigned int count) : count_(count)
+  {
+    if (count == 0)
+    {
+      throw OrthancException(ErrorCode_ParameterOutOfRange);
+    }
+  }
+
+  void Semaphore::Release()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    count_++;
+    condition_.notify_one(); 
+  }
+
+  void Semaphore::Acquire()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    while (count_ == 0)
+    {
+      condition_.wait(lock);
+    }
+
+    count_++;
+  }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Core/MultiThreading/Semaphore.h	Thu Jul 03 15:58:53 2014 +0200
@@ -0,0 +1,54 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include <boost/noncopyable.hpp>
+#include <boost/thread.hpp>
+
+namespace Orthanc
+{
+  class Semaphore : public boost::noncopyable
+  {
+  private:
+    unsigned int count_;
+    boost::mutex mutex_;
+    boost::condition_variable condition_;
+
+  public:
+    explicit Semaphore(unsigned int count);
+
+    void Release();
+
+    void Acquire();
+  };
+}
--- a/Core/MultiThreading/SharedMessageQueue.h	Wed Jul 02 14:42:49 2014 +0200
+++ b/Core/MultiThreading/SharedMessageQueue.h	Thu Jul 03 15:58:53 2014 +0200
@@ -40,7 +40,7 @@
 
 namespace Orthanc
 {
-  class SharedMessageQueue
+  class SharedMessageQueue : public boost::noncopyable
   {
   private:
     typedef std::list<IDynamicObject*>  Queue;
@@ -52,8 +52,8 @@
     boost::condition_variable emptied_;
 
   public:
-    SharedMessageQueue(unsigned int maxSize = 0);
-
+    explicit SharedMessageQueue(unsigned int maxSize = 0);
+    
     ~SharedMessageQueue();
 
     // This transfers the ownership of the message
--- a/OrthancServer/Scheduler/ServerScheduler.cpp	Wed Jul 02 14:42:49 2014 +0200
+++ b/OrthancServer/Scheduler/ServerScheduler.cpp	Thu Jul 03 15:58:53 2014 +0200
@@ -98,11 +98,13 @@
       if (info.watched_)
       {
         watchedJobStatus_[jobId] = JobStatus_Success;
-        jobFinished_.notify_all();
+        watchedJobFinished_.notify_all();
       }
 
       LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
       jobs_.erase(jobId);
+
+      availableJob_.Release();
     }
   }
 
@@ -119,11 +121,13 @@
       if (info.watched_)
       {
         watchedJobStatus_[jobId] = JobStatus_Failure;
-        jobFinished_.notify_all();
+        watchedJobFinished_.notify_all();
       }
 
       LOG(ERROR) << "Job has failed (" << info.description_ << ")";
       jobs_.erase(jobId);
+
+      availableJob_.Release();
     }
   }
 
@@ -166,6 +170,8 @@
   void ServerScheduler::SubmitInternal(ServerJob& job,
                                        bool watched)
   {
+    availableJob_.Acquire();
+
     boost::mutex::scoped_lock lock(mutex_);
 
     JobInfo info;
@@ -189,7 +195,7 @@
   }
 
 
-  ServerScheduler::ServerScheduler()
+  ServerScheduler::ServerScheduler(unsigned int maxJobs) : availableJob_(maxJobs)
   {
     finish_ = false;
     worker_ = boost::thread(Worker, this);
@@ -254,7 +260,7 @@
         
       while (watchedJobStatus_[jobId] == JobStatus_Running)
       {
-        jobFinished_.wait(lock);
+        watchedJobFinished_.wait(lock);
       }
 
       status = watchedJobStatus_[jobId];
--- a/OrthancServer/Scheduler/ServerScheduler.h	Wed Jul 02 14:42:49 2014 +0200
+++ b/OrthancServer/Scheduler/ServerScheduler.h	Thu Jul 03 15:58:53 2014 +0200
@@ -34,6 +34,8 @@
 
 #include "ServerJob.h"
 
+#include "../../Core/MultiThreading/Semaphore.h"
+
 namespace Orthanc
 {
   class ServerScheduler : public ServerFilterInstance::IListener
@@ -60,12 +62,13 @@
     typedef std::map<std::string, JobInfo> Jobs;
 
     boost::mutex mutex_;
-    boost::condition_variable jobFinished_;
+    boost::condition_variable watchedJobFinished_;
     Jobs jobs_;
     SharedMessageQueue queue_;
     bool finish_;
     boost::thread worker_;
     std::map<std::string, JobStatus> watchedJobStatus_;
+    Semaphore availableJob_;
 
     JobInfo& GetJobInfo(const std::string& jobId);
 
@@ -79,7 +82,7 @@
                         bool watched);
 
   public:
-    ServerScheduler();
+    ServerScheduler(unsigned int maxjobs);
 
     ~ServerScheduler();
 
--- a/OrthancServer/ServerContext.cpp	Wed Jul 02 14:42:49 2014 +0200
+++ b/OrthancServer/ServerContext.cpp	Thu Jul 03 15:58:53 2014 +0200
@@ -67,7 +67,8 @@
     accessor_(storage_),
     compressionEnabled_(false),
     provider_(*this),
-    dicomCache_(provider_, DICOM_CACHE_SIZE)
+    dicomCache_(provider_, DICOM_CACHE_SIZE),
+    scheduler_(Configuration::GetGlobalIntegerParameter("LimitJobs", 10))
   {
     scu_.SetLocalApplicationEntityTitle(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC"));
     //scu_.SetMillisecondsBeforeClose(1);  // The connection is always released
--- a/Resources/Configuration.json	Wed Jul 02 14:42:49 2014 +0200
+++ b/Resources/Configuration.json	Thu Jul 03 15:58:53 2014 +0200
@@ -169,5 +169,10 @@
 
   // The maximum number of results for a single C-FIND request at the
   // Instance level. Setting this option to "0" means no limit.
-  "LimitFindInstances" : 0
+  "LimitFindInstances" : 0,
+
+  // The maximum number of active jobs in the Orthanc scheduler. When
+  // this limit is reached, the addition of new jobs is blocked until
+  // some job finishes.
+  "LimitJobs" : 10
 }
--- a/UnitTestsSources/MultiThreadingTests.cpp	Wed Jul 02 14:42:49 2014 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Thu Jul 03 15:58:53 2014 +0200
@@ -345,7 +345,7 @@
 
 TEST(Toto, Toto)
 {
-  ServerScheduler scheduler;
+  ServerScheduler scheduler(10);
 
   ServerJob job;
   ServerFilterInstance& f2 = job.AddFilter(new Tutu(2));