# HG changeset patch # User Sebastien Jodogne # Date 1398266598 -7200 # Node ID 476febedc5161d2d5bd53f888f50be346ac299e9 # Parent fc97f762834c17d9632ee784641ad019ef54b804 improvements diff -r fc97f762834c -r 476febedc516 UnitTestsSources/MultiThreading.cpp --- a/UnitTestsSources/MultiThreading.cpp Wed Apr 23 16:53:59 2014 +0200 +++ b/UnitTestsSources/MultiThreading.cpp Wed Apr 23 17:23:18 2014 +0200 @@ -281,17 +281,41 @@ class FilterWrapper : public IDynamicObject { + friend class ServerScheduler; + private: - IServerFilterListener *listener_; IServerFilter *filter_; std::string jobId_; ListOfStrings inputs_; std::list next_; + bool Execute(IServerFilterListener& listener) + { + ListOfStrings outputs; + if (!filter_->Apply(outputs, inputs_)) + { + listener.SignalFailure(jobId_); + return true; + } + + for (std::list::iterator + it = next_.begin(); it != next_.end(); it++) + { + for (ListOfStrings::const_iterator + output = outputs.begin(); output != outputs.end(); output++) + { + (*it)->AddInput(*output); + } + } + + listener.SignalSuccess(jobId_); + return true; + } + + public: FilterWrapper(IServerFilter *filter, const std::string& jobId) : - listener_(NULL), filter_(filter), jobId_(jobId) { @@ -309,11 +333,6 @@ } } - void SetListener(IServerFilterListener& listener) - { - listener_ = &listener; - } - const std::string& GetJobId() const { return jobId_; @@ -324,37 +343,6 @@ inputs_.push_back(input); } - bool Execute() - { - ListOfStrings outputs; - if (!filter_->Apply(outputs, inputs_)) - { - if (listener_) - { - listener_->SignalFailure(jobId_); - } - - return true; - } - - for (std::list::iterator - it = next_.begin(); it != next_.end(); it++) - { - for (ListOfStrings::const_iterator - output = outputs.begin(); output != outputs.end(); output++) - { - (*it)->AddInput(*output); - } - } - - if (listener_) - { - listener_->SignalSuccess(jobId_); - } - - return true; - } - void ConnectNext(FilterWrapper& filter) { next_.push_back(&filter); @@ -367,14 +355,6 @@ }; - enum ServerJobStatus - { - ServerJobStatus_Running = 1, - ServerJobStatus_Success = 2, - ServerJobStatus_Failure = 3 - }; - - class ServerJob { friend class ServerScheduler; @@ -432,7 +412,6 @@ for (std::list::iterator it = filters_.begin(); it != filters_.end(); it++) { - (*it)->SetListener(listener); target.Enqueue(*it); } @@ -501,6 +480,13 @@ std::string description_; }; + enum JobStatus + { + JobStatus_Running = 1, + JobStatus_Success = 2, + JobStatus_Failure = 3 + }; + typedef std::map Jobs; boost::mutex mutex_; @@ -509,7 +495,7 @@ SharedMessageQueue queue_; bool finish_; boost::thread worker_; - std::map watchedJobStatus_; + std::map watchedJobStatus_; JobInfo& GetJobInfo(const std::string& jobId) { @@ -536,7 +522,7 @@ { if (info.watched_) { - watchedJobStatus_[jobId] = ServerJobStatus_Success; + watchedJobStatus_[jobId] = JobStatus_Success; jobFinished_.notify_all(); } @@ -556,7 +542,7 @@ { if (info.watched_) { - watchedJobStatus_[jobId] = ServerJobStatus_Failure; + watchedJobStatus_[jobId] = JobStatus_Failure; jobFinished_.notify_all(); } @@ -591,7 +577,7 @@ } else { - filter.Execute(); + filter.Execute(*that); } } } @@ -614,7 +600,7 @@ if (watched) { - watchedJobStatus_[job.GetId()] = ServerJobStatus_Running; + watchedJobStatus_[job.GetId()] = JobStatus_Running; } jobs_[job.GetId()] = info; @@ -675,12 +661,14 @@ SubmitInternal(job, true); // Wait for the job to complete (either success or failure) - ServerJobStatus status; + JobStatus status; { boost::mutex::scoped_lock lock(mutex_); + + assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end()); - while (watchedJobStatus_[jobId] == ServerJobStatus_Running) + while (watchedJobStatus_[jobId] == JobStatus_Running) { jobFinished_.wait(lock); } @@ -689,7 +677,7 @@ watchedJobStatus_.erase(jobId); } - return (status == ServerJobStatus_Success); + return (status == JobStatus_Success); } @@ -728,9 +716,18 @@ return 1; } - float n = static_cast(job->second.failures_ + job->second.success_); - float d = static_cast(job->second.size_); - return n / d; + if (job->second.failures_ != 0) + { + return 1; + } + + if (job->second.size_ == 1) + { + return job->second.success_; + } + + return (static_cast(job->second.success_) / + static_cast(job->second.size_ - 1)); } bool IsRunning(const ServerJob& job) @@ -747,6 +744,19 @@ { return GetProgress(job); } + + void GetListOfJobs(ListOfStrings& jobs) + { + boost::mutex::scoped_lock lock(mutex_); + + jobs.clear(); + + for (Jobs::const_iterator + it = jobs_.begin(); it != jobs_.end(); it++) + { + jobs.push_back(it->first); + } + } }; } @@ -785,6 +795,33 @@ } }; + +static void Tata(ServerScheduler* s, ServerJob* j) +{ +#if 1 + for (;;) + { + ListOfStrings l; + s->GetListOfJobs(l); + for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) + printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i)); + Toolbox::USleep(100000); + } +#else + ListOfStrings l; + s->GetListOfJobs(l); + for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) + printf(">> %s\n", i->c_str()); + Toolbox::USleep(1500000); + s->Cancel(*j); + Toolbox::USleep(1000000); + s->GetListOfJobs(l); + for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) + printf(">> %s\n", i->c_str()); +#endif +} + + TEST(Toto, Toto) { ServerScheduler scheduler; @@ -793,14 +830,19 @@ FilterWrapper& f2 = job.AddFilter(new Tutu(2)); FilterWrapper& f3 = job.AddFilter(new Tutu(3)); FilterWrapper& f4 = job.AddFilter(new Tutu(4)); + FilterWrapper& f5 = job.AddFilter(new Tutu(5)); f2.AddInput(boost::lexical_cast(42)); //f3.AddInput(boost::lexical_cast(42)); //f4.AddInput(boost::lexical_cast(42)); f2.ConnectNext(f3); f3.ConnectNext(f4); + f4.ConnectNext(f5); job.SetDescription("tutu"); + boost::thread t(Tata, &scheduler, &job); + + //scheduler.Submit(job); ListOfStrings l; @@ -812,5 +854,7 @@ } //Toolbox::ServerBarrier(); - Toolbox::USleep(30000); + //Toolbox::USleep(3000000); + + //t.join(); }