# HG changeset patch # User Sebastien Jodogne # Date 1444122733 -7200 # Node ID 1a3c20cd1b5387575e8454f4521b9a90ec28040d # Parent a903d57d9f0cba17c92f8a7b0545951cf2770817 renames in BagOfRunnablesBySteps to make things clearer diff -r a903d57d9f0c -r 1a3c20cd1b53 Core/MultiThreading/BagOfRunnablesBySteps.cpp --- a/Core/MultiThreading/BagOfRunnablesBySteps.cpp Mon Oct 05 16:40:14 2015 +0200 +++ b/Core/MultiThreading/BagOfRunnablesBySteps.cpp Tue Oct 06 11:12:13 2015 +0200 @@ -42,25 +42,31 @@ { struct BagOfRunnablesBySteps::PImpl { - bool continue_; - bool stopFinishListener_; + boost::mutex mutex_; - boost::mutex mutex_; - boost::condition_variable oneThreadIsStopped_; - boost::condition_variable oneThreadIsJoined_; + // 1. The set of active runnables, i.e. the runnables that have + // not finished their job yet, plus the runnables that have not + // been joined yet. + typedef std::map ActiveRunnables; + ActiveRunnables activeRunnables_; + bool continueActiveRunnables_; - // The list of threads that are waiting to be joined. - typedef std::stack StoppedThreads; - StoppedThreads stoppedThreads_; + // 2. Condition variable that is notified when one active runnable + // stops. + boost::condition_variable oneRunnableHasStopped_; - // The set of active runnables, i.e. the runnables that have not - // finished their job yet, plus the runnables that have not been - // joined yet. - typedef std::map ActiveThreads; - ActiveThreads activeThreads_; + // 3. The list of runnables that have stopped but are waiting to be + // joined by the collector. + typedef std::stack StoppedRunnables; + StoppedRunnables stoppedRunnables_; + + // 4. Condition variable that is notified when one stopped + // runnable has been joined. + boost::condition_variable oneRunnableIsJoined_; // The thread that joins the runnables after they stop - std::auto_ptr finishListener_; + bool continueCollector_; + std::auto_ptr collector_; }; @@ -68,7 +74,7 @@ void BagOfRunnablesBySteps::RunnableThread(BagOfRunnablesBySteps* bag, IRunnableBySteps* runnable) { - while (bag->pimpl_->continue_) + while (bag->pimpl_->continueActiveRunnables_) { if (!runnable->Step()) { @@ -79,58 +85,58 @@ { // Register this runnable as having stopped boost::mutex::scoped_lock lock(bag->pimpl_->mutex_); - bag->pimpl_->stoppedThreads_.push(runnable); - bag->pimpl_->oneThreadIsStopped_.notify_one(); + bag->pimpl_->stoppedRunnables_.push(runnable); + bag->pimpl_->oneRunnableHasStopped_.notify_one(); } } - void BagOfRunnablesBySteps::FinishListener(BagOfRunnablesBySteps* bag) + void BagOfRunnablesBySteps::CollectorThread(BagOfRunnablesBySteps* bag) { boost::mutex::scoped_lock lock(bag->pimpl_->mutex_); - while (!bag->pimpl_->stopFinishListener_) + while (bag->pimpl_->continueCollector_) { - while (!bag->pimpl_->stoppedThreads_.empty()) + while (!bag->pimpl_->stoppedRunnables_.empty()) { - std::auto_ptr r(bag->pimpl_->stoppedThreads_.top()); - bag->pimpl_->stoppedThreads_.pop(); + std::auto_ptr r(bag->pimpl_->stoppedRunnables_.top()); + bag->pimpl_->stoppedRunnables_.pop(); assert(r.get() != NULL); - assert(bag->pimpl_->activeThreads_.find(r.get()) != bag->pimpl_->activeThreads_.end()); + assert(bag->pimpl_->activeRunnables_.find(r.get()) != bag->pimpl_->activeRunnables_.end()); - std::auto_ptr t(bag->pimpl_->activeThreads_[r.get()]); - bag->pimpl_->activeThreads_.erase(r.get()); + std::auto_ptr t(bag->pimpl_->activeRunnables_[r.get()]); + bag->pimpl_->activeRunnables_.erase(r.get()); assert(t.get() != NULL); - assert(bag->pimpl_->activeThreads_.find(r.get()) == bag->pimpl_->activeThreads_.end()); + assert(bag->pimpl_->activeRunnables_.find(r.get()) == bag->pimpl_->activeRunnables_.end()); if (t->joinable()) { t->join(); } - bag->pimpl_->oneThreadIsJoined_.notify_one(); + bag->pimpl_->oneRunnableIsJoined_.notify_one(); } - bag->pimpl_->oneThreadIsStopped_.wait(lock); + bag->pimpl_->oneRunnableHasStopped_.wait(lock); } } BagOfRunnablesBySteps::BagOfRunnablesBySteps() : pimpl_(new PImpl) { - pimpl_->continue_ = true; - pimpl_->stopFinishListener_ = false; + pimpl_->continueActiveRunnables_ = true; + pimpl_->continueCollector_ = true; // Everyting is set up, the finish listener can be started - pimpl_->finishListener_.reset(new boost::thread(FinishListener, this)); + pimpl_->collector_.reset(new boost::thread(CollectorThread, this)); } BagOfRunnablesBySteps::~BagOfRunnablesBySteps() { - if (!pimpl_->stopFinishListener_) + if (pimpl_->continueCollector_) { LOG(ERROR) << "INTERNAL ERROR: BagOfRunnablesBySteps::Finalize() should be invoked manually to avoid mess in the destruction order!"; Finalize(); @@ -146,40 +152,39 @@ boost::mutex::scoped_lock lock(pimpl_->mutex_); boost::thread* t(new boost::thread(RunnableThread, this, runnable)); - pimpl_->activeThreads_.insert(std::make_pair(runnableRabi.release(), t)); + pimpl_->activeRunnables_.insert(std::make_pair(runnableRabi.release(), t)); } void BagOfRunnablesBySteps::StopAll() { boost::mutex::scoped_lock lock(pimpl_->mutex_); - pimpl_->continue_ = false; + pimpl_->continueActiveRunnables_ = false; - while (pimpl_->activeThreads_.size() > 0) + while (pimpl_->activeRunnables_.size() > 0) { - pimpl_->oneThreadIsJoined_.wait(lock); + pimpl_->oneRunnableIsJoined_.wait(lock); } - pimpl_->continue_ = true; + pimpl_->continueActiveRunnables_ = true; } void BagOfRunnablesBySteps::Finalize() { - if (!pimpl_->stopFinishListener_) + if (pimpl_->continueCollector_) { StopAll(); // Stop the finish listener - pimpl_->stopFinishListener_ = true; - pimpl_->oneThreadIsStopped_.notify_one(); // Awakens the listener + pimpl_->continueCollector_ = false; + pimpl_->oneRunnableHasStopped_.notify_one(); // Awakens the listener - if (pimpl_->finishListener_->joinable()) + if (pimpl_->collector_->joinable()) { - pimpl_->finishListener_->join(); + pimpl_->collector_->join(); } } } - } diff -r a903d57d9f0c -r 1a3c20cd1b53 Core/MultiThreading/BagOfRunnablesBySteps.h --- a/Core/MultiThreading/BagOfRunnablesBySteps.h Mon Oct 05 16:40:14 2015 +0200 +++ b/Core/MultiThreading/BagOfRunnablesBySteps.h Tue Oct 06 11:12:13 2015 +0200 @@ -48,7 +48,7 @@ static void RunnableThread(BagOfRunnablesBySteps* bag, IRunnableBySteps* runnable); - static void FinishListener(BagOfRunnablesBySteps* bag); + static void CollectorThread(BagOfRunnablesBySteps* bag); public: BagOfRunnablesBySteps();