Mercurial > hg > orthanc
diff UnitTestsSources/MultiThreading.cpp @ 781:f0ac3a53ccf2 lua-scripting
scheduler
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 30 Apr 2014 18:30:05 +0200 |
parents | 76eb563f08f0 |
children | 394a19d44f9d |
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreading.cpp Wed Apr 30 18:10:16 2014 +0200 +++ b/UnitTestsSources/MultiThreading.cpp Wed Apr 30 18:30:05 2014 +0200 @@ -1,6 +1,8 @@ #include "gtest/gtest.h" #include <glog/logging.h> +#include "../OrthancServer/Scheduler/ServerScheduler.h" + #include "../Core/OrthancException.h" #include "../Core/Toolbox.h" #include "../Core/MultiThreading/ArrayFilledByThreads.h" @@ -242,567 +244,6 @@ -#include "../Core/ICommand.h" -#include "../Core/Toolbox.h" -#include "../Core/Uuid.h" -#include "../Core/MultiThreading/SharedMessageQueue.h" -#include <boost/lexical_cast.hpp> - - -namespace Orthanc -{ - class IServerFilter - { - public: - typedef std::list<std::string> ListOfStrings; - - virtual ~IServerFilter() - { - } - - virtual bool Apply(ListOfStrings& outputs, - const ListOfStrings& inputs) = 0; - - virtual bool SendOutputsToSink() const = 0; - }; - - - class Sink : public IServerFilter - { - private: - ListOfStrings& target_; - - public: - Sink(ListOfStrings& target) : target_(target) - { - } - - virtual bool SendOutputsToSink() const - { - return false; - } - - virtual bool Apply(ListOfStrings& outputs, - const ListOfStrings& inputs) - { - for (ListOfStrings::const_iterator - it = inputs.begin(); it != inputs.end(); it++) - { - target_.push_back(*it); - } - - return true; - } - }; - - - - class IServerFilterListener - { - public: - virtual ~IServerFilterListener() - { - } - - virtual void SignalSuccess(const std::string& jobId) = 0; - - virtual void SignalFailure(const std::string& jobId) = 0; - }; - - - class ServerFilterInstance : public IDynamicObject - { - friend class ServerScheduler; - - private: - typedef IServerFilter::ListOfStrings ListOfStrings; - - IServerFilter *filter_; - std::string jobId_; - ListOfStrings inputs_; - std::list<ServerFilterInstance*> next_; - - bool Execute(IServerFilterListener& listener) - { - ListOfStrings outputs; - if (!filter_->Apply(outputs, inputs_)) - { - listener.SignalFailure(jobId_); - return true; - } - - for (std::list<ServerFilterInstance*>::iterator - it = next_.begin(); it != next_.end(); it++) - { - for (ListOfStrings::const_iterator - output = outputs.begin(); output != outputs.end(); output++) - { - (*it)->AddInput(*output); - } - } - - listener.SignalSuccess(jobId_); - return true; - } - - - public: - ServerFilterInstance(IServerFilter *filter, - const std::string& jobId) : - filter_(filter), - jobId_(jobId) - { - if (filter_ == NULL) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - } - - virtual ~ServerFilterInstance() - { - if (filter_ != NULL) - { - delete filter_; - } - } - - const std::string& GetJobId() const - { - return jobId_; - } - - void AddInput(const std::string& input) - { - inputs_.push_back(input); - } - - void ConnectNext(ServerFilterInstance& filter) - { - next_.push_back(&filter); - } - - const std::list<ServerFilterInstance*>& GetNextFilters() const - { - return next_; - } - - IServerFilter& GetFilter() const - { - return *filter_; - } - }; - - - class ServerJob - { - friend class ServerScheduler; - - private: - std::list<ServerFilterInstance*> filters_; - std::string jobId_; - bool submitted_; - std::string description_; - - - void CheckOrdering() - { - std::map<ServerFilterInstance*, unsigned int> index; - - unsigned int count = 0; - for (std::list<ServerFilterInstance*>::const_iterator - it = filters_.begin(); it != filters_.end(); it++) - { - index[*it] = count++; - } - - for (std::list<ServerFilterInstance*>::const_iterator - it = filters_.begin(); it != filters_.end(); it++) - { - const std::list<ServerFilterInstance*>& nextFilters = (*it)->GetNextFilters(); - - for (std::list<ServerFilterInstance*>::const_iterator - next = nextFilters.begin(); next != nextFilters.end(); next++) - { - if (index.find(*next) == index.end() || - index[*next] <= index[*it]) - { - // You must reorder your calls to "ServerJob::AddFilter" - throw OrthancException("Bad ordering of filters in a job"); - } - } - } - } - - - size_t Submit(SharedMessageQueue& target, - IServerFilterListener& listener) - { - if (submitted_) - { - // This job has already been submitted - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - CheckOrdering(); - - size_t size = filters_.size(); - - for (std::list<ServerFilterInstance*>::iterator - it = filters_.begin(); it != filters_.end(); it++) - { - target.Enqueue(*it); - } - - filters_.clear(); - submitted_ = true; - - return size; - } - - public: - ServerJob() - { - jobId_ = Toolbox::GenerateUuid(); - submitted_ = false; - description_ = "no description"; - } - - ~ServerJob() - { - for (std::list<ServerFilterInstance*>::iterator - it = filters_.begin(); it != filters_.end(); it++) - { - delete *it; - } - } - - const std::string& GetId() const - { - return jobId_; - } - - void SetDescription(const char* description) - { - description_ = description; - } - - const std::string& GetDescription() const - { - return description_; - } - - ServerFilterInstance& AddFilter(IServerFilter* filter) - { - if (submitted_) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - filters_.push_back(new ServerFilterInstance(filter, jobId_)); - - return *filters_.back(); - } - }; - - - class ServerScheduler : public IServerFilterListener - { - private: - struct JobInfo - { - bool watched_; - bool cancel_; - size_t size_; - size_t success_; - size_t failures_; - std::string description_; - }; - - enum JobStatus - { - JobStatus_Running = 1, - JobStatus_Success = 2, - JobStatus_Failure = 3 - }; - - typedef IServerFilter::ListOfStrings ListOfStrings; - typedef std::map<std::string, JobInfo> Jobs; - - boost::mutex mutex_; - boost::condition_variable jobFinished_; - Jobs jobs_; - SharedMessageQueue queue_; - bool finish_; - boost::thread worker_; - std::map<std::string, JobStatus> watchedJobStatus_; - - JobInfo& GetJobInfo(const std::string& jobId) - { - Jobs::iterator info = jobs_.find(jobId); - - if (info == jobs_.end()) - { - throw OrthancException(ErrorCode_InternalError); - } - - return info->second; - } - - virtual void SignalSuccess(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - - JobInfo& info = GetJobInfo(jobId); - info.success_++; - - assert(info.failures_ == 0); - - if (info.success_ >= info.size_) - { - if (info.watched_) - { - watchedJobStatus_[jobId] = JobStatus_Success; - jobFinished_.notify_all(); - } - - LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; - jobs_.erase(jobId); - } - } - - virtual void SignalFailure(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - - JobInfo& info = GetJobInfo(jobId); - info.failures_++; - - if (info.success_ + info.failures_ >= info.size_) - { - if (info.watched_) - { - watchedJobStatus_[jobId] = JobStatus_Failure; - jobFinished_.notify_all(); - } - - LOG(ERROR) << "Job has failed (" << info.description_ << ")"; - jobs_.erase(jobId); - } - } - - static void Worker(ServerScheduler* that) - { - static const int32_t TIMEOUT = 100; - - while (!that->finish_) - { - std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT)); - if (object.get() != NULL) - { - ServerFilterInstance& filter = dynamic_cast<ServerFilterInstance&>(*object); - - // Skip the execution of this filter if its parent job has - // previously failed. - bool jobHasFailed; - { - boost::mutex::scoped_lock lock(that->mutex_); - JobInfo& info = that->GetJobInfo(filter.GetJobId()); - jobHasFailed = (info.failures_ > 0 || info.cancel_); - } - - if (jobHasFailed) - { - that->SignalFailure(filter.GetJobId()); - } - else - { - filter.Execute(*that); - } - } - } - } - - void SubmitInternal(ServerJob& job, - bool watched) - { - boost::mutex::scoped_lock lock(mutex_); - - JobInfo info; - info.size_ = job.Submit(queue_, *this); - info.cancel_ = false; - info.success_ = 0; - info.failures_ = 0; - info.description_ = job.GetDescription(); - info.watched_ = watched; - - assert(info.size_ > 0); - - if (watched) - { - watchedJobStatus_[job.GetId()] = JobStatus_Running; - } - - jobs_[job.GetId()] = info; - - LOG(INFO) << "New job submitted (" << job.description_ << ")"; - } - - public: - ServerScheduler() - { - finish_ = false; - worker_ = boost::thread(Worker, this); - } - - ~ServerScheduler() - { - finish_ = true; - worker_.join(); - } - - void Submit(ServerJob& job) - { - if (job.filters_.empty()) - { - return; - } - - SubmitInternal(job, false); - } - - bool SubmitAndWait(ListOfStrings& outputs, - ServerJob& job) - { - std::string jobId = job.GetId(); - - outputs.clear(); - - if (job.filters_.empty()) - { - return true; - } - - // Add a sink filter to collect all the results of the filters - // that have no next filter. - ServerFilterInstance& sink = job.AddFilter(new Sink(outputs)); - - for (std::list<ServerFilterInstance*>::iterator - it = job.filters_.begin(); it != job.filters_.end(); it++) - { - if ((*it) != &sink && - (*it)->GetNextFilters().size() == 0 && - (*it)->GetFilter().SendOutputsToSink()) - { - (*it)->ConnectNext(sink); - } - } - - // Submit the job - SubmitInternal(job, true); - - // Wait for the job to complete (either success or failure) - JobStatus status; - - { - boost::mutex::scoped_lock lock(mutex_); - - assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end()); - - while (watchedJobStatus_[jobId] == JobStatus_Running) - { - jobFinished_.wait(lock); - } - - status = watchedJobStatus_[jobId]; - watchedJobStatus_.erase(jobId); - } - - return (status == JobStatus_Success); - } - - - bool IsRunning(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - return jobs_.find(jobId) != jobs_.end(); - } - - - void Cancel(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - - Jobs::iterator job = jobs_.find(jobId); - - if (job != jobs_.end()) - { - job->second.cancel_ = true; - LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")"; - } - } - - - // Returns a number between 0 and 1 - float GetProgress(const std::string& jobId) - { - boost::mutex::scoped_lock lock(mutex_); - - Jobs::iterator job = jobs_.find(jobId); - - if (job == jobs_.end() || - job->second.size_ == 0 /* should never happen */) - { - // This job is not running - return 1; - } - - if (job->second.failures_ != 0) - { - return 1; - } - - if (job->second.size_ == 1) - { - return job->second.success_; - } - - return (static_cast<float>(job->second.success_) / - static_cast<float>(job->second.size_ - 1)); - } - - bool IsRunning(const ServerJob& job) - { - return IsRunning(job.GetId()); - } - - void Cancel(const ServerJob& job) - { - Cancel(job.GetId()); - } - - float GetProgress(const ServerJob& job) - { - return GetProgress(job); - } - - void GetListOfJobs(ListOfStrings& jobs) - { - boost::mutex::scoped_lock lock(mutex_); - - jobs.clear(); - - for (Jobs::const_iterator - it = jobs_.begin(); it != jobs_.end(); it++) - { - jobs.push_back(it->first); - } - } - }; - -} - - - class Tutu : public IServerFilter { private: