# HG changeset patch # User Sebastien Jodogne # Date 1398875405 -7200 # Node ID f0ac3a53ccf25d5db9866296c813f728c34c96d7 # Parent 76eb563f08f0cf03e414ec277d47fb80ca309b2e scheduler diff -r 76eb563f08f0 -r f0ac3a53ccf2 CMakeLists.txt --- a/CMakeLists.txt Wed Apr 30 18:10:16 2014 +0200 +++ b/CMakeLists.txt Wed Apr 30 18:30:05 2014 +0200 @@ -220,6 +220,9 @@ OrthancServer/ServerIndex.cpp OrthancServer/ToDcmtkBridge.cpp OrthancServer/DatabaseWrapper.cpp + OrthancServer/Scheduler/ServerFilterInstance.cpp + OrthancServer/Scheduler/ServerJob.cpp + OrthancServer/Scheduler/ServerScheduler.cpp OrthancServer/ServerContext.cpp OrthancServer/ServerEnumerations.cpp OrthancServer/ServerToolbox.cpp diff -r 76eb563f08f0 -r f0ac3a53ccf2 OrthancServer/Scheduler/IServerFilter.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/IServerFilter.h Wed Apr 30 18:30:05 2014 +0200 @@ -0,0 +1,54 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include +#include + +namespace Orthanc +{ + class IServerFilter + { + public: + typedef std::list ListOfStrings; + + virtual ~IServerFilter() + { + } + + virtual bool Apply(ListOfStrings& outputs, + const ListOfStrings& inputs) = 0; + + virtual bool SendOutputsToSink() const = 0; + }; +} diff -r 76eb563f08f0 -r f0ac3a53ccf2 OrthancServer/Scheduler/ServerFilterInstance.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerFilterInstance.cpp Wed Apr 30 18:30:05 2014 +0200 @@ -0,0 +1,82 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#include "ServerFilterInstance.h" + +#include "../../Core/OrthancException.h" + +namespace Orthanc +{ + bool ServerFilterInstance::Execute(IListener& listener) + { + ListOfStrings outputs; + if (!filter_->Apply(outputs, inputs_)) + { + listener.SignalFailure(jobId_); + return true; + } + + for (std::list::iterator + it = next_.begin(); it != next_.end(); it++) + { + for (ListOfStrings::const_iterator + output = outputs.begin(); output != outputs.end(); output++) + { + (*it)->AddInput(*output); + } + } + + listener.SignalSuccess(jobId_); + return true; + } + + + ServerFilterInstance::ServerFilterInstance(IServerFilter *filter, + const std::string& jobId) : + filter_(filter), + jobId_(jobId) + { + if (filter_ == NULL) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + } + + + ServerFilterInstance::~ServerFilterInstance() + { + if (filter_ != NULL) + { + delete filter_; + } + } +} diff -r 76eb563f08f0 -r f0ac3a53ccf2 OrthancServer/Scheduler/ServerFilterInstance.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerFilterInstance.h Wed Apr 30 18:30:05 2014 +0200 @@ -0,0 +1,98 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include "../../Core/IDynamicObject.h" +#include "IServerFilter.h" + +namespace Orthanc +{ + class ServerFilterInstance : public IDynamicObject + { + friend class ServerScheduler; + + public: + class IListener + { + public: + virtual ~IListener() + { + } + + virtual void SignalSuccess(const std::string& jobId) = 0; + + virtual void SignalFailure(const std::string& jobId) = 0; + }; + + private: + typedef IServerFilter::ListOfStrings ListOfStrings; + + IServerFilter *filter_; + std::string jobId_; + ListOfStrings inputs_; + std::list next_; + + bool Execute(IListener& listener); + + public: + ServerFilterInstance(IServerFilter *filter, + const std::string& jobId); + + virtual ~ServerFilterInstance(); + + const std::string& GetJobId() const + { + return jobId_; + } + + void AddInput(const std::string& input) + { + inputs_.push_back(input); + } + + void ConnectNext(ServerFilterInstance& filter) + { + next_.push_back(&filter); + } + + const std::list& GetNextFilters() const + { + return next_; + } + + IServerFilter& GetFilter() const + { + return *filter_; + } + }; +} diff -r 76eb563f08f0 -r f0ac3a53ccf2 OrthancServer/Scheduler/ServerJob.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerJob.cpp Wed Apr 30 18:30:05 2014 +0200 @@ -0,0 +1,126 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#include "ServerJob.h" + +#include "../../Core/OrthancException.h" +#include "../../Core/Toolbox.h" +#include "../../Core/Uuid.h" + +namespace Orthanc +{ + void ServerJob::CheckOrdering() + { + std::map index; + + unsigned int count = 0; + for (std::list::const_iterator + it = filters_.begin(); it != filters_.end(); it++) + { + index[*it] = count++; + } + + for (std::list::const_iterator + it = filters_.begin(); it != filters_.end(); it++) + { + const std::list& nextFilters = (*it)->GetNextFilters(); + + for (std::list::const_iterator + next = nextFilters.begin(); next != nextFilters.end(); next++) + { + if (index.find(*next) == index.end() || + index[*next] <= index[*it]) + { + // You must reorder your calls to "ServerJob::AddFilter" + throw OrthancException("Bad ordering of filters in a job"); + } + } + } + } + + + size_t ServerJob::Submit(SharedMessageQueue& target, + ServerFilterInstance::IListener& listener) + { + if (submitted_) + { + // This job has already been submitted + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + CheckOrdering(); + + size_t size = filters_.size(); + + for (std::list::iterator + it = filters_.begin(); it != filters_.end(); it++) + { + target.Enqueue(*it); + } + + filters_.clear(); + submitted_ = true; + + return size; + } + + + ServerJob::ServerJob() + { + jobId_ = Toolbox::GenerateUuid(); + submitted_ = false; + description_ = "no description"; + } + + + ServerJob::~ServerJob() + { + for (std::list::iterator + it = filters_.begin(); it != filters_.end(); it++) + { + delete *it; + } + } + + + ServerFilterInstance& ServerJob::AddFilter(IServerFilter* filter) + { + if (submitted_) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + filters_.push_back(new ServerFilterInstance(filter, jobId_)); + + return *filters_.back(); + } +} diff -r 76eb563f08f0 -r f0ac3a53ccf2 OrthancServer/Scheduler/ServerJob.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerJob.h Wed Apr 30 18:30:05 2014 +0200 @@ -0,0 +1,77 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include "ServerFilterInstance.h" +#include "../../Core/MultiThreading/SharedMessageQueue.h" + +namespace Orthanc +{ + class ServerJob + { + friend class ServerScheduler; + + private: + std::list filters_; + std::string jobId_; + bool submitted_; + std::string description_; + + void CheckOrdering(); + + size_t Submit(SharedMessageQueue& target, + ServerFilterInstance::IListener& listener); + + public: + ServerJob(); + + ~ServerJob(); + + const std::string& GetId() const + { + return jobId_; + } + + void SetDescription(const char* description) + { + description_ = description; + } + + const std::string& GetDescription() const + { + return description_; + } + + ServerFilterInstance& AddFilter(IServerFilter* filter); + }; +} diff -r 76eb563f08f0 -r f0ac3a53ccf2 OrthancServer/Scheduler/ServerScheduler.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerScheduler.cpp Wed Apr 30 18:30:05 2014 +0200 @@ -0,0 +1,327 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#include "ServerScheduler.h" + +#include "../../Core/OrthancException.h" + +#include + +namespace Orthanc +{ + namespace + { + // Anonymous namespace to avoid clashes between compilation modules + class Sink : public IServerFilter + { + private: + ListOfStrings& target_; + + public: + Sink(ListOfStrings& target) : target_(target) + { + } + + virtual bool SendOutputsToSink() const + { + return false; + } + + virtual bool Apply(ListOfStrings& outputs, + const ListOfStrings& inputs) + { + for (ListOfStrings::const_iterator + it = inputs.begin(); it != inputs.end(); it++) + { + target_.push_back(*it); + } + + return true; + } + }; + } + + + ServerScheduler::JobInfo& ServerScheduler::GetJobInfo(const std::string& jobId) + { + Jobs::iterator info = jobs_.find(jobId); + + if (info == jobs_.end()) + { + throw OrthancException(ErrorCode_InternalError); + } + + return info->second; + } + + + void ServerScheduler::SignalSuccess(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + JobInfo& info = GetJobInfo(jobId); + info.success_++; + + assert(info.failures_ == 0); + + if (info.success_ >= info.size_) + { + if (info.watched_) + { + watchedJobStatus_[jobId] = JobStatus_Success; + jobFinished_.notify_all(); + } + + LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; + jobs_.erase(jobId); + } + } + + + void ServerScheduler::SignalFailure(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + JobInfo& info = GetJobInfo(jobId); + info.failures_++; + + if (info.success_ + info.failures_ >= info.size_) + { + if (info.watched_) + { + watchedJobStatus_[jobId] = JobStatus_Failure; + jobFinished_.notify_all(); + } + + LOG(ERROR) << "Job has failed (" << info.description_ << ")"; + jobs_.erase(jobId); + } + } + + + void ServerScheduler::Worker(ServerScheduler* that) + { + static const int32_t TIMEOUT = 100; + + while (!that->finish_) + { + std::auto_ptr object(that->queue_.Dequeue(TIMEOUT)); + if (object.get() != NULL) + { + ServerFilterInstance& filter = dynamic_cast(*object); + + // Skip the execution of this filter if its parent job has + // previously failed. + bool jobHasFailed; + { + boost::mutex::scoped_lock lock(that->mutex_); + JobInfo& info = that->GetJobInfo(filter.GetJobId()); + jobHasFailed = (info.failures_ > 0 || info.cancel_); + } + + if (jobHasFailed) + { + that->SignalFailure(filter.GetJobId()); + } + else + { + filter.Execute(*that); + } + } + } + } + + + void ServerScheduler::SubmitInternal(ServerJob& job, + bool watched) + { + boost::mutex::scoped_lock lock(mutex_); + + JobInfo info; + info.size_ = job.Submit(queue_, *this); + info.cancel_ = false; + info.success_ = 0; + info.failures_ = 0; + info.description_ = job.GetDescription(); + info.watched_ = watched; + + assert(info.size_ > 0); + + if (watched) + { + watchedJobStatus_[job.GetId()] = JobStatus_Running; + } + + jobs_[job.GetId()] = info; + + LOG(INFO) << "New job submitted (" << job.description_ << ")"; + } + + + ServerScheduler::ServerScheduler() + { + finish_ = false; + worker_ = boost::thread(Worker, this); + } + + + ServerScheduler::~ServerScheduler() + { + finish_ = true; + worker_.join(); + } + + + void ServerScheduler::Submit(ServerJob& job) + { + if (job.filters_.empty()) + { + return; + } + + SubmitInternal(job, false); + } + + + bool ServerScheduler::SubmitAndWait(ListOfStrings& outputs, + ServerJob& job) + { + std::string jobId = job.GetId(); + + outputs.clear(); + + if (job.filters_.empty()) + { + return true; + } + + // Add a sink filter to collect all the results of the filters + // that have no next filter. + ServerFilterInstance& sink = job.AddFilter(new Sink(outputs)); + + for (std::list::iterator + it = job.filters_.begin(); it != job.filters_.end(); it++) + { + if ((*it) != &sink && + (*it)->GetNextFilters().size() == 0 && + (*it)->GetFilter().SendOutputsToSink()) + { + (*it)->ConnectNext(sink); + } + } + + // Submit the job + SubmitInternal(job, true); + + // Wait for the job to complete (either success or failure) + JobStatus status; + + { + boost::mutex::scoped_lock lock(mutex_); + + assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end()); + + while (watchedJobStatus_[jobId] == JobStatus_Running) + { + jobFinished_.wait(lock); + } + + status = watchedJobStatus_[jobId]; + watchedJobStatus_.erase(jobId); + } + + return (status == JobStatus_Success); + } + + + bool ServerScheduler::IsRunning(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + return jobs_.find(jobId) != jobs_.end(); + } + + + void ServerScheduler::Cancel(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + Jobs::iterator job = jobs_.find(jobId); + + if (job != jobs_.end()) + { + job->second.cancel_ = true; + LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")"; + } + } + + + float ServerScheduler::GetProgress(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + Jobs::iterator job = jobs_.find(jobId); + + if (job == jobs_.end() || + job->second.size_ == 0 /* should never happen */) + { + // This job is not running + return 1; + } + + if (job->second.failures_ != 0) + { + return 1; + } + + if (job->second.size_ == 1) + { + return job->second.success_; + } + + return (static_cast(job->second.success_) / + static_cast(job->second.size_ - 1)); + } + + + void ServerScheduler::GetListOfJobs(ListOfStrings& jobs) + { + boost::mutex::scoped_lock lock(mutex_); + + jobs.clear(); + + for (Jobs::const_iterator + it = jobs_.begin(); it != jobs_.end(); it++) + { + jobs.push_back(it->first); + } + } +} diff -r 76eb563f08f0 -r f0ac3a53ccf2 OrthancServer/Scheduler/ServerScheduler.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerScheduler.h Wed Apr 30 18:30:05 2014 +0200 @@ -0,0 +1,115 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include "ServerJob.h" + +namespace Orthanc +{ + class ServerScheduler : public ServerFilterInstance::IListener + { + private: + struct JobInfo + { + bool watched_; + bool cancel_; + size_t size_; + size_t success_; + size_t failures_; + std::string description_; + }; + + enum JobStatus + { + JobStatus_Running = 1, + JobStatus_Success = 2, + JobStatus_Failure = 3 + }; + + typedef IServerFilter::ListOfStrings ListOfStrings; + typedef std::map Jobs; + + boost::mutex mutex_; + boost::condition_variable jobFinished_; + Jobs jobs_; + SharedMessageQueue queue_; + bool finish_; + boost::thread worker_; + std::map watchedJobStatus_; + + JobInfo& GetJobInfo(const std::string& jobId); + + virtual void SignalSuccess(const std::string& jobId); + + virtual void SignalFailure(const std::string& jobId); + + static void Worker(ServerScheduler* that); + + void SubmitInternal(ServerJob& job, + bool watched); + + public: + ServerScheduler(); + + ~ServerScheduler(); + + void Submit(ServerJob& job); + + bool SubmitAndWait(ListOfStrings& outputs, + ServerJob& job); + + bool IsRunning(const std::string& jobId); + + void Cancel(const std::string& jobId); + + // Returns a number between 0 and 1 + float GetProgress(const std::string& jobId); + + bool IsRunning(const ServerJob& job) + { + return IsRunning(job.GetId()); + } + + void Cancel(const ServerJob& job) + { + Cancel(job.GetId()); + } + + float GetProgress(const ServerJob& job) + { + return GetProgress(job.GetId()); + } + + void GetListOfJobs(ListOfStrings& jobs); + }; +} diff -r 76eb563f08f0 -r f0ac3a53ccf2 UnitTestsSources/MultiThreading.cpp --- a/UnitTestsSources/MultiThreading.cpp Wed Apr 30 18:10:16 2014 +0200 +++ b/UnitTestsSources/MultiThreading.cpp Wed Apr 30 18:30:05 2014 +0200 @@ -1,6 +1,8 @@ #include "gtest/gtest.h" #include +#include "../OrthancServer/Scheduler/ServerScheduler.h" + #include "../Core/OrthancException.h" #include "../Core/Toolbox.h" #include "../Core/MultiThreading/ArrayFilledByThreads.h" @@ -242,567 +244,6 @@ -#include "../Core/ICommand.h" -#include "../Core/Toolbox.h" -#include "../Core/Uuid.h" -#include "../Core/MultiThreading/SharedMessageQueue.h" -#include - - -namespace Orthanc -{ - class IServerFilter - { - public: - typedef std::list ListOfStrings; - - virtual ~IServerFilter() - { - } - - virtual bool Apply(ListOfStrings& outputs, - const ListOfStrings& inputs) = 0; - - virtual bool SendOutputsToSink() const = 0; - }; - - - class Sink : public IServerFilter - { - private: - ListOfStrings& target_; - - public: - Sink(ListOfStrings& target) : target_(target) - { - } - - virtual bool SendOutputsToSink() const - { - return false; - } - - virtual bool Apply(ListOfStrings& outputs, - const ListOfStrings& inputs) - { - for (ListOfStrings::const_iterator - it = inputs.begin(); it != inputs.end(); it++) - { - target_.push_back(*it); - } - - return true; - } - }; - - - - class IServerFilterListener - { - public: - virtual ~IServerFilterListener() - { - } - - virtual void SignalSuccess(const std::string& jobId) = 0; - - virtual void SignalFailure(const std::string& jobId) = 0; - }; - - - class ServerFilterInstance : public IDynamicObject - { - friend class ServerScheduler; - - private: - typedef IServerFilter::ListOfStrings ListOfStrings; - - IServerFilter *filter_; - std::string jobId_; - ListOfStrings inputs_; - std::list next_; - - bool Execute(IServerFilterListener& listener) - { - ListOfStrings outputs; - if (!filter_->Apply(outputs, inputs_)) - { - listener.SignalFailure(jobId_); - return true; - } - - for (std::list::iterator - it = next_.begin(); it != next_.end(); it++) - { - for (ListOfStrings::const_iterator - output = outputs.begin(); output != outputs.end(); output++) - { - (*it)->AddInput(*output); - } - } - - listener.SignalSuccess(jobId_); - return true; - } - - - public: - ServerFilterInstance(IServerFilter *filter, - const std::string& jobId) : - filter_(filter), - jobId_(jobId) - { - if (filter_ == NULL) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - } - - virtual ~ServerFilterInstance() - { - if (filter_ != NULL) - { - delete filter_; - } - } - - const std::string& GetJobId() const - { - return jobId_; - } - - void AddInput(const std::string& input) - { - inputs_.push_back(input); - } - - void ConnectNext(ServerFilterInstance& filter) - { - next_.push_back(&filter); - } - - const std::list& GetNextFilters() const - { - return next_; - } - - IServerFilter& GetFilter() const - { - return *filter_; - } - }; - - - class ServerJob - { - friend class ServerScheduler; - - private: - std::list filters_; - std::string jobId_; - bool submitted_; - std::string description_; - - - void CheckOrdering() - { - std::map index; - - unsigned int count = 0; - for (std::list::const_iterator - it = filters_.begin(); it != filters_.end(); it++) - { - index[*it] = count++; - } - - for (std::list::const_iterator - it = filters_.begin(); it != filters_.end(); it++) - { - const std::list& nextFilters = (*it)->GetNextFilters(); - - for (std::list::const_iterator - next = nextFilters.begin(); next != nextFilters.end(); next++) - { - if (index.find(*next) == index.end() || - index[*next] <= index[*it]) - { - // You must reorder your calls to "ServerJob::AddFilter" - throw OrthancException("Bad ordering of filters in a job"); - } - } - } - } - - - size_t Submit(SharedMessageQueue& target, - IServerFilterListener& listener) - { - if (submitted_) - { - // This job has already been submitted - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - CheckOrdering(); - - size_t size = filters_.size(); - - for (std::list::iterator - it = filters_.begin(); it != filters_.end(); it++) - { - target.Enqueue(*it); - } - - filters_.clear(); - submitted_ = true; - - return size; - } - - public: - ServerJob() - { - jobId_ = Toolbox::GenerateUuid(); - submitted_ = false; - description_ = "no description"; - } - - ~ServerJob() - { - for (std::list::iterator - it = filters_.begin(); it != filters_.end(); it++) - { - delete *it; - } - } - - const std::string& GetId() const - { - return jobId_; - } - - void SetDescription(const char* description) - { - description_ = description; - } - - const std::string& GetDescription() const - { - return description_; - } - - ServerFilterInstance& AddFilter(IServerFilter* filter) - { - if (submitted_) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - filters_.push_back(new ServerFilterInstance(filter, jobId_)); - - return *filters_.back(); - } - }; - - - class ServerScheduler : public IServerFilterListener - { - private: - struct JobInfo - { - bool watched_; - bool cancel_; - size_t size_; - size_t success_; - size_t failures_; - std::string description_; - }; - - enum JobStatus - { - JobStatus_Running = 1, - JobStatus_Success = 2, - JobStatus_Failure = 3 - }; - - typedef IServerFilter::ListOfStrings ListOfStrings; - typedef std::map Jobs; - - boost::mutex mutex_; - boost::condition_variable jobFinished_; - Jobs jobs_; - SharedMessageQueue queue_; - bool finish_; - boost::thread worker_; - std::map watchedJobStatus_; - - JobInfo& GetJobInfo(const std::string& jobId) - { - Jobs::iterator info = jobs_.find(jobId); - - if (info == jobs_.end()) - { - throw OrthancException(ErrorCode_InternalError); - } - - return info->second; - } - - virtual void SignalSuccess(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - - JobInfo& info = GetJobInfo(jobId); - info.success_++; - - assert(info.failures_ == 0); - - if (info.success_ >= info.size_) - { - if (info.watched_) - { - watchedJobStatus_[jobId] = JobStatus_Success; - jobFinished_.notify_all(); - } - - LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; - jobs_.erase(jobId); - } - } - - virtual void SignalFailure(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - - JobInfo& info = GetJobInfo(jobId); - info.failures_++; - - if (info.success_ + info.failures_ >= info.size_) - { - if (info.watched_) - { - watchedJobStatus_[jobId] = JobStatus_Failure; - jobFinished_.notify_all(); - } - - LOG(ERROR) << "Job has failed (" << info.description_ << ")"; - jobs_.erase(jobId); - } - } - - static void Worker(ServerScheduler* that) - { - static const int32_t TIMEOUT = 100; - - while (!that->finish_) - { - std::auto_ptr object(that->queue_.Dequeue(TIMEOUT)); - if (object.get() != NULL) - { - ServerFilterInstance& filter = dynamic_cast(*object); - - // Skip the execution of this filter if its parent job has - // previously failed. - bool jobHasFailed; - { - boost::mutex::scoped_lock lock(that->mutex_); - JobInfo& info = that->GetJobInfo(filter.GetJobId()); - jobHasFailed = (info.failures_ > 0 || info.cancel_); - } - - if (jobHasFailed) - { - that->SignalFailure(filter.GetJobId()); - } - else - { - filter.Execute(*that); - } - } - } - } - - void SubmitInternal(ServerJob& job, - bool watched) - { - boost::mutex::scoped_lock lock(mutex_); - - JobInfo info; - info.size_ = job.Submit(queue_, *this); - info.cancel_ = false; - info.success_ = 0; - info.failures_ = 0; - info.description_ = job.GetDescription(); - info.watched_ = watched; - - assert(info.size_ > 0); - - if (watched) - { - watchedJobStatus_[job.GetId()] = JobStatus_Running; - } - - jobs_[job.GetId()] = info; - - LOG(INFO) << "New job submitted (" << job.description_ << ")"; - } - - public: - ServerScheduler() - { - finish_ = false; - worker_ = boost::thread(Worker, this); - } - - ~ServerScheduler() - { - finish_ = true; - worker_.join(); - } - - void Submit(ServerJob& job) - { - if (job.filters_.empty()) - { - return; - } - - SubmitInternal(job, false); - } - - bool SubmitAndWait(ListOfStrings& outputs, - ServerJob& job) - { - std::string jobId = job.GetId(); - - outputs.clear(); - - if (job.filters_.empty()) - { - return true; - } - - // Add a sink filter to collect all the results of the filters - // that have no next filter. - ServerFilterInstance& sink = job.AddFilter(new Sink(outputs)); - - for (std::list::iterator - it = job.filters_.begin(); it != job.filters_.end(); it++) - { - if ((*it) != &sink && - (*it)->GetNextFilters().size() == 0 && - (*it)->GetFilter().SendOutputsToSink()) - { - (*it)->ConnectNext(sink); - } - } - - // Submit the job - SubmitInternal(job, true); - - // Wait for the job to complete (either success or failure) - JobStatus status; - - { - boost::mutex::scoped_lock lock(mutex_); - - assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end()); - - while (watchedJobStatus_[jobId] == JobStatus_Running) - { - jobFinished_.wait(lock); - } - - status = watchedJobStatus_[jobId]; - watchedJobStatus_.erase(jobId); - } - - return (status == JobStatus_Success); - } - - - bool IsRunning(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - return jobs_.find(jobId) != jobs_.end(); - } - - - void Cancel(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - - Jobs::iterator job = jobs_.find(jobId); - - if (job != jobs_.end()) - { - job->second.cancel_ = true; - LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")"; - } - } - - - // Returns a number between 0 and 1 - float GetProgress(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - - Jobs::iterator job = jobs_.find(jobId); - - if (job == jobs_.end() || - job->second.size_ == 0 /* should never happen */) - { - // This job is not running - return 1; - } - - if (job->second.failures_ != 0) - { - return 1; - } - - if (job->second.size_ == 1) - { - return job->second.success_; - } - - return (static_cast(job->second.success_) / - static_cast(job->second.size_ - 1)); - } - - bool IsRunning(const ServerJob& job) - { - return IsRunning(job.GetId()); - } - - void Cancel(const ServerJob& job) - { - Cancel(job.GetId()); - } - - float GetProgress(const ServerJob& job) - { - return GetProgress(job); - } - - void GetListOfJobs(ListOfStrings& jobs) - { - boost::mutex::scoped_lock lock(mutex_); - - jobs.clear(); - - for (Jobs::const_iterator - it = jobs_.begin(); it != jobs_.end(); it++) - { - jobs.push_back(it->first); - } - } - }; - -} - - - class Tutu : public IServerFilter { private: