# HG changeset patch # User Sebastien Jodogne # Date 1398264839 -7200 # Node ID fc97f762834c17d9632ee784641ad019ef54b804 # Parent 45b16f67259c4483cb2d22132da6a7045db51462 scheduler diff -r 45b16f67259c -r fc97f762834c UnitTestsSources/MultiThreading.cpp --- a/UnitTestsSources/MultiThreading.cpp Tue Apr 22 16:47:21 2014 +0200 +++ b/UnitTestsSources/MultiThreading.cpp Wed Apr 23 16:53:59 2014 +0200 @@ -1,4 +1,5 @@ #include "gtest/gtest.h" +#include #include "../Core/OrthancException.h" #include "../Core/Toolbox.h" @@ -210,3 +211,606 @@ Locker locker3(lock.ForWriter()); } } + + + + + + + +#include "../Core/ICommand.h" +#include "../Core/Toolbox.h" +#include "../Core/Uuid.h" +#include "../Core/MultiThreading/SharedMessageQueue.h" +#include + + +namespace Orthanc +{ + typedef std::list ListOfStrings; + + class IServerFilter + { + public: + virtual ~IServerFilter() + { + } + + virtual bool Apply(ListOfStrings& outputs, + const ListOfStrings& inputs) = 0; + }; + + + class Sink : public IServerFilter + { + private: + ListOfStrings& target_; + + public: + Sink(ListOfStrings& target) : target_(target) + { + } + + virtual bool Apply(ListOfStrings& outputs, + const ListOfStrings& inputs) + { + for (ListOfStrings::const_iterator + it = inputs.begin(); it != inputs.end(); it++) + { + target_.push_back(*it); + } + + return true; + } + }; + + + + class IServerFilterListener + { + public: + virtual ~IServerFilterListener() + { + } + + virtual void SignalSuccess(const std::string& jobId) = 0; + + virtual void SignalFailure(const std::string& jobId) = 0; + }; + + + class FilterWrapper : public IDynamicObject + { + private: + IServerFilterListener *listener_; + IServerFilter *filter_; + std::string jobId_; + ListOfStrings inputs_; + std::list next_; + + public: + FilterWrapper(IServerFilter *filter, + const std::string& jobId) : + listener_(NULL), + filter_(filter), + jobId_(jobId) + { + if (filter_ == NULL) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + } + + virtual ~FilterWrapper() + { + if (filter_ != NULL) + { + delete filter_; + } + } + + void SetListener(IServerFilterListener& listener) + { + listener_ = &listener; + } + + const std::string& GetJobId() const + { + return jobId_; + } + + void AddInput(const std::string& input) + { + inputs_.push_back(input); + } + + bool Execute() + { + ListOfStrings outputs; + if (!filter_->Apply(outputs, inputs_)) + { + if (listener_) + { + listener_->SignalFailure(jobId_); + } + + return true; + } + + for (std::list::iterator + it = next_.begin(); it != next_.end(); it++) + { + for (ListOfStrings::const_iterator + output = outputs.begin(); output != outputs.end(); output++) + { + (*it)->AddInput(*output); + } + } + + if (listener_) + { + listener_->SignalSuccess(jobId_); + } + + return true; + } + + void ConnectNext(FilterWrapper& filter) + { + next_.push_back(&filter); + } + + const std::list& GetNextFilters() const + { + return next_; + } + }; + + + enum ServerJobStatus + { + ServerJobStatus_Running = 1, + ServerJobStatus_Success = 2, + ServerJobStatus_Failure = 3 + }; + + + class ServerJob + { + friend class ServerScheduler; + + private: + std::list filters_; + std::string jobId_; + bool submitted_; + std::string description_; + + + void CheckOrdering() + { + std::map index; + + unsigned int count = 0; + for (std::list::const_iterator + it = filters_.begin(); it != filters_.end(); it++) + { + index[*it] = count++; + } + + for (std::list::const_iterator + it = filters_.begin(); it != filters_.end(); it++) + { + const std::list& nextFilters = (*it)->GetNextFilters(); + + for (std::list::const_iterator + next = nextFilters.begin(); next != nextFilters.end(); next++) + { + if (index.find(*next) == index.end() || + index[*next] <= index[*it]) + { + // You must reorder your calls to "ServerJob::AddFilter" + throw OrthancException("Bad ordering of filters in a job"); + } + } + } + } + + + size_t Submit(SharedMessageQueue& target, + IServerFilterListener& listener) + { + if (submitted_) + { + // This job has already been submitted + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + CheckOrdering(); + + size_t size = filters_.size(); + + for (std::list::iterator + it = filters_.begin(); it != filters_.end(); it++) + { + (*it)->SetListener(listener); + target.Enqueue(*it); + } + + filters_.clear(); + submitted_ = true; + + return size; + } + + public: + ServerJob() + { + jobId_ = Toolbox::GenerateUuid(); + submitted_ = false; + description_ = "no description"; + } + + ~ServerJob() + { + for (std::list::iterator + it = filters_.begin(); it != filters_.end(); it++) + { + delete *it; + } + } + + const std::string& GetId() const + { + return jobId_; + } + + void SetDescription(const char* description) + { + description_ = description; + } + + const std::string& GetDescription() const + { + return description_; + } + + FilterWrapper& AddFilter(IServerFilter* filter) + { + if (submitted_) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + filters_.push_back(new FilterWrapper(filter, jobId_)); + + return *filters_.back(); + } + }; + + + class ServerScheduler : public IServerFilterListener + { + private: + struct JobInfo + { + bool watched_; + bool cancel_; + size_t size_; + size_t success_; + size_t failures_; + std::string description_; + }; + + typedef std::map Jobs; + + boost::mutex mutex_; + boost::condition_variable jobFinished_; + Jobs jobs_; + SharedMessageQueue queue_; + bool finish_; + boost::thread worker_; + std::map watchedJobStatus_; + + JobInfo& GetJobInfo(const std::string& jobId) + { + Jobs::iterator info = jobs_.find(jobId); + + if (info == jobs_.end()) + { + throw OrthancException(ErrorCode_InternalError); + } + + return info->second; + } + + virtual void SignalSuccess(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + JobInfo& info = GetJobInfo(jobId); + info.success_++; + + assert(info.failures_ == 0); + + if (info.success_ >= info.size_) + { + if (info.watched_) + { + watchedJobStatus_[jobId] = ServerJobStatus_Success; + jobFinished_.notify_all(); + } + + LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; + jobs_.erase(jobId); + } + } + + virtual void SignalFailure(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + JobInfo& info = GetJobInfo(jobId); + info.failures_++; + + if (info.success_ + info.failures_ >= info.size_) + { + if (info.watched_) + { + watchedJobStatus_[jobId] = ServerJobStatus_Failure; + jobFinished_.notify_all(); + } + + LOG(ERROR) << "Job has failed (" << info.description_ << ")"; + jobs_.erase(jobId); + } + } + + static void Worker(ServerScheduler* that) + { + static const int32_t TIMEOUT = 100; + + while (!that->finish_) + { + std::auto_ptr object(that->queue_.Dequeue(TIMEOUT)); + if (object.get() != NULL) + { + FilterWrapper& filter = dynamic_cast(*object); + + // Skip the execution of this filter if its parent job has + // previously failed. + bool jobHasFailed; + { + boost::mutex::scoped_lock lock(that->mutex_); + JobInfo& info = that->GetJobInfo(filter.GetJobId()); + jobHasFailed = (info.failures_ > 0 || info.cancel_); + } + + if (jobHasFailed) + { + that->SignalFailure(filter.GetJobId()); + } + else + { + filter.Execute(); + } + } + } + } + + void SubmitInternal(ServerJob& job, + bool watched) + { + boost::mutex::scoped_lock lock(mutex_); + + JobInfo info; + info.size_ = job.Submit(queue_, *this); + info.cancel_ = false; + info.success_ = 0; + info.failures_ = 0; + info.description_ = job.GetDescription(); + info.watched_ = watched; + + assert(info.size_ > 0); + + if (watched) + { + watchedJobStatus_[job.GetId()] = ServerJobStatus_Running; + } + + jobs_[job.GetId()] = info; + + LOG(INFO) << "New job submitted (" << job.description_ << ")"; + } + + public: + ServerScheduler() + { + finish_ = false; + worker_ = boost::thread(Worker, this); + } + + ~ServerScheduler() + { + finish_ = true; + worker_.join(); + } + + void Submit(ServerJob& job) + { + if (job.filters_.empty()) + { + return; + } + + SubmitInternal(job, false); + } + + bool SubmitAndWait(ListOfStrings& outputs, + ServerJob& job) + { + std::string jobId = job.GetId(); + + outputs.clear(); + + if (job.filters_.empty()) + { + return true; + } + + // Add a sink filter to collect all the results of the filters + // that have no next filter. + FilterWrapper& sink = job.AddFilter(new Sink(outputs)); + + for (std::list::iterator + it = job.filters_.begin(); it != job.filters_.end(); it++) + { + if ((*it) != &sink && + (*it)->GetNextFilters().size() == 0) + { + (*it)->ConnectNext(sink); + } + } + + // Submit the job + SubmitInternal(job, true); + + // Wait for the job to complete (either success or failure) + ServerJobStatus status; + + { + boost::mutex::scoped_lock lock(mutex_); + + while (watchedJobStatus_[jobId] == ServerJobStatus_Running) + { + jobFinished_.wait(lock); + } + + status = watchedJobStatus_[jobId]; + watchedJobStatus_.erase(jobId); + } + + return (status == ServerJobStatus_Success); + } + + + bool IsRunning(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + return jobs_.find(jobId) != jobs_.end(); + } + + + void Cancel(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + Jobs::iterator job = jobs_.find(jobId); + + if (job != jobs_.end()) + { + job->second.cancel_ = true; + LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")"; + } + } + + + // Returns a number between 0 and 1 + float GetProgress(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + Jobs::iterator job = jobs_.find(jobId); + + if (job == jobs_.end() || + job->second.size_ == 0 /* should never happen */) + { + // This job is not running + return 1; + } + + float n = static_cast(job->second.failures_ + job->second.success_); + float d = static_cast(job->second.size_); + return n / d; + } + + bool IsRunning(const ServerJob& job) + { + return IsRunning(job.GetId()); + } + + void Cancel(const ServerJob& job) + { + Cancel(job.GetId()); + } + + float GetProgress(const ServerJob& job) + { + return GetProgress(job); + } + }; + +} + + + +class Tutu : public IServerFilter +{ +private: + int factor_; + +public: + Tutu(int f) : factor_(f) + { + } + + virtual bool Apply(ListOfStrings& outputs, + const ListOfStrings& inputs) + { + for (ListOfStrings::const_iterator + it = inputs.begin(); it != inputs.end(); it++) + { + int a = boost::lexical_cast(*it); + int b = factor_ * a; + + printf("%d * %d = %d\n", a, factor_, b); + + //if (a == 84) { printf("BREAK\n"); return false; } + + outputs.push_back(boost::lexical_cast(b)); + } + + Toolbox::USleep(1000000); + + return true; + } +}; + +TEST(Toto, Toto) +{ + ServerScheduler scheduler; + + ServerJob job; + FilterWrapper& f2 = job.AddFilter(new Tutu(2)); + FilterWrapper& f3 = job.AddFilter(new Tutu(3)); + FilterWrapper& f4 = job.AddFilter(new Tutu(4)); + f2.AddInput(boost::lexical_cast(42)); + //f3.AddInput(boost::lexical_cast(42)); + //f4.AddInput(boost::lexical_cast(42)); + f2.ConnectNext(f3); + f3.ConnectNext(f4); + + job.SetDescription("tutu"); + + //scheduler.Submit(job); + + ListOfStrings l; + scheduler.SubmitAndWait(l, job); + + for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) + { + printf("** %s\n", i->c_str()); + } + + //Toolbox::ServerBarrier(); + Toolbox::USleep(30000); +}