# HG changeset patch # User Sebastien Jodogne # Date 1404395933 -7200 # Node ID 8c67382f44a7cb80703fb2b44abaecd7d07b628c # Parent b3d4f8a3032416726a5b120f4ca89bb0f9478f69 limit number of jobs in the scheduler diff -r b3d4f8a30324 -r 8c67382f44a7 CMakeLists.txt --- a/CMakeLists.txt Wed Jul 02 14:42:49 2014 +0200 +++ b/CMakeLists.txt Thu Jul 03 15:58:53 2014 +0200 @@ -94,6 +94,7 @@ Core/MultiThreading/BagOfRunnablesBySteps.cpp Core/MultiThreading/Mutex.cpp Core/MultiThreading/ReaderWriterLock.cpp + Core/MultiThreading/Semaphore.cpp Core/MultiThreading/SharedMessageQueue.cpp Core/MultiThreading/ThreadedCommandProcessor.cpp Core/ImageFormats/ImageAccessor.cpp diff -r b3d4f8a30324 -r 8c67382f44a7 Core/MultiThreading/Semaphore.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/MultiThreading/Semaphore.cpp Thu Jul 03 15:58:53 2014 +0200 @@ -0,0 +1,67 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#include "Semaphore.h" + +#include "../OrthancException.h" + + +namespace Orthanc +{ + Semaphore::Semaphore(unsigned int count) : count_(count) + { + if (count == 0) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + } + + void Semaphore::Release() + { + boost::mutex::scoped_lock lock(mutex_); + + count_++; + condition_.notify_one(); + } + + void Semaphore::Acquire() + { + boost::mutex::scoped_lock lock(mutex_); + + while (count_ == 0) + { + condition_.wait(lock); + } + + count_++; + } +} diff -r b3d4f8a30324 -r 8c67382f44a7 Core/MultiThreading/Semaphore.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/MultiThreading/Semaphore.h Thu Jul 03 15:58:53 2014 +0200 @@ -0,0 +1,54 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include +#include + +namespace Orthanc +{ + class Semaphore : public boost::noncopyable + { + private: + unsigned int count_; + boost::mutex mutex_; + boost::condition_variable condition_; + + public: + explicit Semaphore(unsigned int count); + + void Release(); + + void Acquire(); + }; +} diff -r b3d4f8a30324 -r 8c67382f44a7 Core/MultiThreading/SharedMessageQueue.h --- a/Core/MultiThreading/SharedMessageQueue.h Wed Jul 02 14:42:49 2014 +0200 +++ b/Core/MultiThreading/SharedMessageQueue.h Thu Jul 03 15:58:53 2014 +0200 @@ -40,7 +40,7 @@ namespace Orthanc { - class SharedMessageQueue + class SharedMessageQueue : public boost::noncopyable { private: typedef std::list Queue; @@ -52,8 +52,8 @@ boost::condition_variable emptied_; public: - SharedMessageQueue(unsigned int maxSize = 0); - + explicit SharedMessageQueue(unsigned int maxSize = 0); + ~SharedMessageQueue(); // This transfers the ownership of the message diff -r b3d4f8a30324 -r 8c67382f44a7 OrthancServer/Scheduler/ServerScheduler.cpp --- a/OrthancServer/Scheduler/ServerScheduler.cpp Wed Jul 02 14:42:49 2014 +0200 +++ b/OrthancServer/Scheduler/ServerScheduler.cpp Thu Jul 03 15:58:53 2014 +0200 @@ -98,11 +98,13 @@ if (info.watched_) { watchedJobStatus_[jobId] = JobStatus_Success; - jobFinished_.notify_all(); + watchedJobFinished_.notify_all(); } LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; jobs_.erase(jobId); + + availableJob_.Release(); } } @@ -119,11 +121,13 @@ if (info.watched_) { watchedJobStatus_[jobId] = JobStatus_Failure; - jobFinished_.notify_all(); + watchedJobFinished_.notify_all(); } LOG(ERROR) << "Job has failed (" << info.description_ << ")"; jobs_.erase(jobId); + + availableJob_.Release(); } } @@ -166,6 +170,8 @@ void ServerScheduler::SubmitInternal(ServerJob& job, bool watched) { + availableJob_.Acquire(); + boost::mutex::scoped_lock lock(mutex_); JobInfo info; @@ -189,7 +195,7 @@ } - ServerScheduler::ServerScheduler() + ServerScheduler::ServerScheduler(unsigned int maxJobs) : availableJob_(maxJobs) { finish_ = false; worker_ = boost::thread(Worker, this); @@ -254,7 +260,7 @@ while (watchedJobStatus_[jobId] == JobStatus_Running) { - jobFinished_.wait(lock); + watchedJobFinished_.wait(lock); } status = watchedJobStatus_[jobId]; diff -r b3d4f8a30324 -r 8c67382f44a7 OrthancServer/Scheduler/ServerScheduler.h --- a/OrthancServer/Scheduler/ServerScheduler.h Wed Jul 02 14:42:49 2014 +0200 +++ b/OrthancServer/Scheduler/ServerScheduler.h Thu Jul 03 15:58:53 2014 +0200 @@ -34,6 +34,8 @@ #include "ServerJob.h" +#include "../../Core/MultiThreading/Semaphore.h" + namespace Orthanc { class ServerScheduler : public ServerFilterInstance::IListener @@ -60,12 +62,13 @@ typedef std::map Jobs; boost::mutex mutex_; - boost::condition_variable jobFinished_; + boost::condition_variable watchedJobFinished_; Jobs jobs_; SharedMessageQueue queue_; bool finish_; boost::thread worker_; std::map watchedJobStatus_; + Semaphore availableJob_; JobInfo& GetJobInfo(const std::string& jobId); @@ -79,7 +82,7 @@ bool watched); public: - ServerScheduler(); + ServerScheduler(unsigned int maxjobs); ~ServerScheduler(); diff -r b3d4f8a30324 -r 8c67382f44a7 OrthancServer/ServerContext.cpp --- a/OrthancServer/ServerContext.cpp Wed Jul 02 14:42:49 2014 +0200 +++ b/OrthancServer/ServerContext.cpp Thu Jul 03 15:58:53 2014 +0200 @@ -67,7 +67,8 @@ accessor_(storage_), compressionEnabled_(false), provider_(*this), - dicomCache_(provider_, DICOM_CACHE_SIZE) + dicomCache_(provider_, DICOM_CACHE_SIZE), + scheduler_(Configuration::GetGlobalIntegerParameter("LimitJobs", 10)) { scu_.SetLocalApplicationEntityTitle(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC")); //scu_.SetMillisecondsBeforeClose(1); // The connection is always released diff -r b3d4f8a30324 -r 8c67382f44a7 Resources/Configuration.json --- a/Resources/Configuration.json Wed Jul 02 14:42:49 2014 +0200 +++ b/Resources/Configuration.json Thu Jul 03 15:58:53 2014 +0200 @@ -169,5 +169,10 @@ // The maximum number of results for a single C-FIND request at the // Instance level. Setting this option to "0" means no limit. - "LimitFindInstances" : 0 + "LimitFindInstances" : 0, + + // The maximum number of active jobs in the Orthanc scheduler. When + // this limit is reached, the addition of new jobs is blocked until + // some job finishes. + "LimitJobs" : 10 } diff -r b3d4f8a30324 -r 8c67382f44a7 UnitTestsSources/MultiThreadingTests.cpp --- a/UnitTestsSources/MultiThreadingTests.cpp Wed Jul 02 14:42:49 2014 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Thu Jul 03 15:58:53 2014 +0200 @@ -345,7 +345,7 @@ TEST(Toto, Toto) { - ServerScheduler scheduler; + ServerScheduler scheduler(10); ServerJob job; ServerFilterInstance& f2 = job.AddFilter(new Tutu(2));