changeset 1678:1a3c20cd1b53 db-changes

renames in BagOfRunnablesBySteps to make things clearer
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 06 Oct 2015 11:12:13 +0200
parents a903d57d9f0c
children 6414043df7d8
files Core/MultiThreading/BagOfRunnablesBySteps.cpp Core/MultiThreading/BagOfRunnablesBySteps.h
diffstat 2 files changed, 49 insertions(+), 44 deletions(-) [+]
line wrap: on
line diff
--- a/Core/MultiThreading/BagOfRunnablesBySteps.cpp	Mon Oct 05 16:40:14 2015 +0200
+++ b/Core/MultiThreading/BagOfRunnablesBySteps.cpp	Tue Oct 06 11:12:13 2015 +0200
@@ -42,25 +42,31 @@
 {
   struct BagOfRunnablesBySteps::PImpl
   {
-    bool continue_;
-    bool stopFinishListener_;
+    boost::mutex mutex_;
 
-    boost::mutex mutex_;
-    boost::condition_variable oneThreadIsStopped_;
-    boost::condition_variable oneThreadIsJoined_;
+    // 1. The set of active runnables, i.e. the runnables that have
+    // not finished their job yet, plus the runnables that have not
+    // been joined yet.
+    typedef std::map<IRunnableBySteps*, boost::thread*>  ActiveRunnables;
+    ActiveRunnables  activeRunnables_;
+    bool             continueActiveRunnables_;
 
-    // The list of threads that are waiting to be joined.
-    typedef std::stack<IRunnableBySteps*>  StoppedThreads;
-    StoppedThreads  stoppedThreads_;
+    // 2. Condition variable that is notified when one active runnable
+    // stops.
+    boost::condition_variable oneRunnableHasStopped_;
 
-    // The set of active runnables, i.e. the runnables that have not
-    // finished their job yet, plus the runnables that have not been
-    // joined yet.
-    typedef std::map<IRunnableBySteps*, boost::thread*>  ActiveThreads;
-    ActiveThreads  activeThreads_;
+    // 3. The list of runnables that have stopped but are waiting to be
+    // joined by the collector.
+    typedef std::stack<IRunnableBySteps*>  StoppedRunnables;
+    StoppedRunnables  stoppedRunnables_;
+
+    // 4. Condition variable that is notified when one stopped
+    // runnable has been joined.
+    boost::condition_variable oneRunnableIsJoined_;
 
     // The thread that joins the runnables after they stop
-    std::auto_ptr<boost::thread> finishListener_;
+    bool continueCollector_;
+    std::auto_ptr<boost::thread> collector_;
   };
 
 
@@ -68,7 +74,7 @@
   void BagOfRunnablesBySteps::RunnableThread(BagOfRunnablesBySteps* bag,
                                              IRunnableBySteps* runnable)
   {
-    while (bag->pimpl_->continue_)
+    while (bag->pimpl_->continueActiveRunnables_)
     {
       if (!runnable->Step())
       {
@@ -79,58 +85,58 @@
     {
       // Register this runnable as having stopped
       boost::mutex::scoped_lock lock(bag->pimpl_->mutex_);
-      bag->pimpl_->stoppedThreads_.push(runnable);
-      bag->pimpl_->oneThreadIsStopped_.notify_one();
+      bag->pimpl_->stoppedRunnables_.push(runnable);
+      bag->pimpl_->oneRunnableHasStopped_.notify_one();
     }
   }
 
   
-  void BagOfRunnablesBySteps::FinishListener(BagOfRunnablesBySteps* bag)
+  void BagOfRunnablesBySteps::CollectorThread(BagOfRunnablesBySteps* bag)
   {
     boost::mutex::scoped_lock lock(bag->pimpl_->mutex_);
 
-    while (!bag->pimpl_->stopFinishListener_)
+    while (bag->pimpl_->continueCollector_)
     {
-      while (!bag->pimpl_->stoppedThreads_.empty())
+      while (!bag->pimpl_->stoppedRunnables_.empty())
       {
-        std::auto_ptr<IRunnableBySteps> r(bag->pimpl_->stoppedThreads_.top());
-        bag->pimpl_->stoppedThreads_.pop();
+        std::auto_ptr<IRunnableBySteps> r(bag->pimpl_->stoppedRunnables_.top());
+        bag->pimpl_->stoppedRunnables_.pop();
 
         assert(r.get() != NULL);
-        assert(bag->pimpl_->activeThreads_.find(r.get()) != bag->pimpl_->activeThreads_.end());
+        assert(bag->pimpl_->activeRunnables_.find(r.get()) != bag->pimpl_->activeRunnables_.end());
 
-        std::auto_ptr<boost::thread> t(bag->pimpl_->activeThreads_[r.get()]);
-        bag->pimpl_->activeThreads_.erase(r.get());
+        std::auto_ptr<boost::thread> t(bag->pimpl_->activeRunnables_[r.get()]);
+        bag->pimpl_->activeRunnables_.erase(r.get());
 
         assert(t.get() != NULL);
-        assert(bag->pimpl_->activeThreads_.find(r.get()) == bag->pimpl_->activeThreads_.end());
+        assert(bag->pimpl_->activeRunnables_.find(r.get()) == bag->pimpl_->activeRunnables_.end());
 
         if (t->joinable())
         {
           t->join();
         }
 
-        bag->pimpl_->oneThreadIsJoined_.notify_one();
+        bag->pimpl_->oneRunnableIsJoined_.notify_one();
       }
 
-      bag->pimpl_->oneThreadIsStopped_.wait(lock);
+      bag->pimpl_->oneRunnableHasStopped_.wait(lock);
     }
   }
 
 
   BagOfRunnablesBySteps::BagOfRunnablesBySteps() : pimpl_(new PImpl)
   {
-    pimpl_->continue_ = true;
-    pimpl_->stopFinishListener_ = false;
+    pimpl_->continueActiveRunnables_ = true;
+    pimpl_->continueCollector_ = true;
 
     // Everyting is set up, the finish listener can be started
-    pimpl_->finishListener_.reset(new boost::thread(FinishListener, this));
+    pimpl_->collector_.reset(new boost::thread(CollectorThread, this));
   }
 
 
   BagOfRunnablesBySteps::~BagOfRunnablesBySteps()
   {
-    if (!pimpl_->stopFinishListener_)
+    if (pimpl_->continueCollector_)
     {
       LOG(ERROR) << "INTERNAL ERROR: BagOfRunnablesBySteps::Finalize() should be invoked manually to avoid mess in the destruction order!";
       Finalize();
@@ -146,40 +152,39 @@
     boost::mutex::scoped_lock lock(pimpl_->mutex_);
     boost::thread* t(new boost::thread(RunnableThread, this, runnable));
 
-    pimpl_->activeThreads_.insert(std::make_pair(runnableRabi.release(), t));
+    pimpl_->activeRunnables_.insert(std::make_pair(runnableRabi.release(), t));
   }
 
 
   void BagOfRunnablesBySteps::StopAll()
   {
     boost::mutex::scoped_lock lock(pimpl_->mutex_);
-    pimpl_->continue_ = false;
+    pimpl_->continueActiveRunnables_ = false;
 
-    while (pimpl_->activeThreads_.size() > 0)
+    while (pimpl_->activeRunnables_.size() > 0)
     {
-      pimpl_->oneThreadIsJoined_.wait(lock);
+      pimpl_->oneRunnableIsJoined_.wait(lock);
     }
 
-    pimpl_->continue_ = true;
+    pimpl_->continueActiveRunnables_ = true;
   }
 
 
 
   void BagOfRunnablesBySteps::Finalize()
   {
-    if (!pimpl_->stopFinishListener_)
+    if (pimpl_->continueCollector_)
     {
       StopAll();
 
       // Stop the finish listener
-      pimpl_->stopFinishListener_ = true;
-      pimpl_->oneThreadIsStopped_.notify_one();  // Awakens the listener
+      pimpl_->continueCollector_ = false;
+      pimpl_->oneRunnableHasStopped_.notify_one();  // Awakens the listener
 
-      if (pimpl_->finishListener_->joinable())
+      if (pimpl_->collector_->joinable())
       {
-        pimpl_->finishListener_->join();
+        pimpl_->collector_->join();
       }
     }
   }
-
 }
--- a/Core/MultiThreading/BagOfRunnablesBySteps.h	Mon Oct 05 16:40:14 2015 +0200
+++ b/Core/MultiThreading/BagOfRunnablesBySteps.h	Tue Oct 06 11:12:13 2015 +0200
@@ -48,7 +48,7 @@
     static void RunnableThread(BagOfRunnablesBySteps* bag,
                                IRunnableBySteps* runnable);
 
-    static void FinishListener(BagOfRunnablesBySteps* bag);
+    static void CollectorThread(BagOfRunnablesBySteps* bag);
 
   public:
     BagOfRunnablesBySteps();