comparison Core/MultiThreading/BagOfRunnablesBySteps.cpp @ 1678:1a3c20cd1b53 db-changes

renames in BagOfRunnablesBySteps to make things clearer
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 06 Oct 2015 11:12:13 +0200
parents f967bdf8534e
children
comparison
equal deleted inserted replaced
1677:a903d57d9f0c 1678:1a3c20cd1b53
40 40
41 namespace Orthanc 41 namespace Orthanc
42 { 42 {
43 struct BagOfRunnablesBySteps::PImpl 43 struct BagOfRunnablesBySteps::PImpl
44 { 44 {
45 bool continue_; 45 boost::mutex mutex_;
46 bool stopFinishListener_;
47 46
48 boost::mutex mutex_; 47 // 1. The set of active runnables, i.e. the runnables that have
49 boost::condition_variable oneThreadIsStopped_; 48 // not finished their job yet, plus the runnables that have not
50 boost::condition_variable oneThreadIsJoined_; 49 // been joined yet.
50 typedef std::map<IRunnableBySteps*, boost::thread*> ActiveRunnables;
51 ActiveRunnables activeRunnables_;
52 bool continueActiveRunnables_;
51 53
52 // The list of threads that are waiting to be joined. 54 // 2. Condition variable that is notified when one active runnable
53 typedef std::stack<IRunnableBySteps*> StoppedThreads; 55 // stops.
54 StoppedThreads stoppedThreads_; 56 boost::condition_variable oneRunnableHasStopped_;
55 57
56 // The set of active runnables, i.e. the runnables that have not 58 // 3. The list of runnables that have stopped but are waiting to be
57 // finished their job yet, plus the runnables that have not been 59 // joined by the collector.
58 // joined yet. 60 typedef std::stack<IRunnableBySteps*> StoppedRunnables;
59 typedef std::map<IRunnableBySteps*, boost::thread*> ActiveThreads; 61 StoppedRunnables stoppedRunnables_;
60 ActiveThreads activeThreads_; 62
63 // 4. Condition variable that is notified when one stopped
64 // runnable has been joined.
65 boost::condition_variable oneRunnableIsJoined_;
61 66
62 // The thread that joins the runnables after they stop 67 // The thread that joins the runnables after they stop
63 std::auto_ptr<boost::thread> finishListener_; 68 bool continueCollector_;
69 std::auto_ptr<boost::thread> collector_;
64 }; 70 };
65 71
66 72
67 73
68 void BagOfRunnablesBySteps::RunnableThread(BagOfRunnablesBySteps* bag, 74 void BagOfRunnablesBySteps::RunnableThread(BagOfRunnablesBySteps* bag,
69 IRunnableBySteps* runnable) 75 IRunnableBySteps* runnable)
70 { 76 {
71 while (bag->pimpl_->continue_) 77 while (bag->pimpl_->continueActiveRunnables_)
72 { 78 {
73 if (!runnable->Step()) 79 if (!runnable->Step())
74 { 80 {
75 break; 81 break;
76 } 82 }
77 } 83 }
78 84
79 { 85 {
80 // Register this runnable as having stopped 86 // Register this runnable as having stopped
81 boost::mutex::scoped_lock lock(bag->pimpl_->mutex_); 87 boost::mutex::scoped_lock lock(bag->pimpl_->mutex_);
82 bag->pimpl_->stoppedThreads_.push(runnable); 88 bag->pimpl_->stoppedRunnables_.push(runnable);
83 bag->pimpl_->oneThreadIsStopped_.notify_one(); 89 bag->pimpl_->oneRunnableHasStopped_.notify_one();
84 } 90 }
85 } 91 }
86 92
87 93
88 void BagOfRunnablesBySteps::FinishListener(BagOfRunnablesBySteps* bag) 94 void BagOfRunnablesBySteps::CollectorThread(BagOfRunnablesBySteps* bag)
89 { 95 {
90 boost::mutex::scoped_lock lock(bag->pimpl_->mutex_); 96 boost::mutex::scoped_lock lock(bag->pimpl_->mutex_);
91 97
92 while (!bag->pimpl_->stopFinishListener_) 98 while (bag->pimpl_->continueCollector_)
93 { 99 {
94 while (!bag->pimpl_->stoppedThreads_.empty()) 100 while (!bag->pimpl_->stoppedRunnables_.empty())
95 { 101 {
96 std::auto_ptr<IRunnableBySteps> r(bag->pimpl_->stoppedThreads_.top()); 102 std::auto_ptr<IRunnableBySteps> r(bag->pimpl_->stoppedRunnables_.top());
97 bag->pimpl_->stoppedThreads_.pop(); 103 bag->pimpl_->stoppedRunnables_.pop();
98 104
99 assert(r.get() != NULL); 105 assert(r.get() != NULL);
100 assert(bag->pimpl_->activeThreads_.find(r.get()) != bag->pimpl_->activeThreads_.end()); 106 assert(bag->pimpl_->activeRunnables_.find(r.get()) != bag->pimpl_->activeRunnables_.end());
101 107
102 std::auto_ptr<boost::thread> t(bag->pimpl_->activeThreads_[r.get()]); 108 std::auto_ptr<boost::thread> t(bag->pimpl_->activeRunnables_[r.get()]);
103 bag->pimpl_->activeThreads_.erase(r.get()); 109 bag->pimpl_->activeRunnables_.erase(r.get());
104 110
105 assert(t.get() != NULL); 111 assert(t.get() != NULL);
106 assert(bag->pimpl_->activeThreads_.find(r.get()) == bag->pimpl_->activeThreads_.end()); 112 assert(bag->pimpl_->activeRunnables_.find(r.get()) == bag->pimpl_->activeRunnables_.end());
107 113
108 if (t->joinable()) 114 if (t->joinable())
109 { 115 {
110 t->join(); 116 t->join();
111 } 117 }
112 118
113 bag->pimpl_->oneThreadIsJoined_.notify_one(); 119 bag->pimpl_->oneRunnableIsJoined_.notify_one();
114 } 120 }
115 121
116 bag->pimpl_->oneThreadIsStopped_.wait(lock); 122 bag->pimpl_->oneRunnableHasStopped_.wait(lock);
117 } 123 }
118 } 124 }
119 125
120 126
121 BagOfRunnablesBySteps::BagOfRunnablesBySteps() : pimpl_(new PImpl) 127 BagOfRunnablesBySteps::BagOfRunnablesBySteps() : pimpl_(new PImpl)
122 { 128 {
123 pimpl_->continue_ = true; 129 pimpl_->continueActiveRunnables_ = true;
124 pimpl_->stopFinishListener_ = false; 130 pimpl_->continueCollector_ = true;
125 131
126 // Everyting is set up, the finish listener can be started 132 // Everyting is set up, the finish listener can be started
127 pimpl_->finishListener_.reset(new boost::thread(FinishListener, this)); 133 pimpl_->collector_.reset(new boost::thread(CollectorThread, this));
128 } 134 }
129 135
130 136
131 BagOfRunnablesBySteps::~BagOfRunnablesBySteps() 137 BagOfRunnablesBySteps::~BagOfRunnablesBySteps()
132 { 138 {
133 if (!pimpl_->stopFinishListener_) 139 if (pimpl_->continueCollector_)
134 { 140 {
135 LOG(ERROR) << "INTERNAL ERROR: BagOfRunnablesBySteps::Finalize() should be invoked manually to avoid mess in the destruction order!"; 141 LOG(ERROR) << "INTERNAL ERROR: BagOfRunnablesBySteps::Finalize() should be invoked manually to avoid mess in the destruction order!";
136 Finalize(); 142 Finalize();
137 } 143 }
138 } 144 }
144 std::auto_ptr<IRunnableBySteps> runnableRabi(runnable); 150 std::auto_ptr<IRunnableBySteps> runnableRabi(runnable);
145 151
146 boost::mutex::scoped_lock lock(pimpl_->mutex_); 152 boost::mutex::scoped_lock lock(pimpl_->mutex_);
147 boost::thread* t(new boost::thread(RunnableThread, this, runnable)); 153 boost::thread* t(new boost::thread(RunnableThread, this, runnable));
148 154
149 pimpl_->activeThreads_.insert(std::make_pair(runnableRabi.release(), t)); 155 pimpl_->activeRunnables_.insert(std::make_pair(runnableRabi.release(), t));
150 } 156 }
151 157
152 158
153 void BagOfRunnablesBySteps::StopAll() 159 void BagOfRunnablesBySteps::StopAll()
154 { 160 {
155 boost::mutex::scoped_lock lock(pimpl_->mutex_); 161 boost::mutex::scoped_lock lock(pimpl_->mutex_);
156 pimpl_->continue_ = false; 162 pimpl_->continueActiveRunnables_ = false;
157 163
158 while (pimpl_->activeThreads_.size() > 0) 164 while (pimpl_->activeRunnables_.size() > 0)
159 { 165 {
160 pimpl_->oneThreadIsJoined_.wait(lock); 166 pimpl_->oneRunnableIsJoined_.wait(lock);
161 } 167 }
162 168
163 pimpl_->continue_ = true; 169 pimpl_->continueActiveRunnables_ = true;
164 } 170 }
165 171
166 172
167 173
168 void BagOfRunnablesBySteps::Finalize() 174 void BagOfRunnablesBySteps::Finalize()
169 { 175 {
170 if (!pimpl_->stopFinishListener_) 176 if (pimpl_->continueCollector_)
171 { 177 {
172 StopAll(); 178 StopAll();
173 179
174 // Stop the finish listener 180 // Stop the finish listener
175 pimpl_->stopFinishListener_ = true; 181 pimpl_->continueCollector_ = false;
176 pimpl_->oneThreadIsStopped_.notify_one(); // Awakens the listener 182 pimpl_->oneRunnableHasStopped_.notify_one(); // Awakens the listener
177 183
178 if (pimpl_->finishListener_->joinable()) 184 if (pimpl_->collector_->joinable())
179 { 185 {
180 pimpl_->finishListener_->join(); 186 pimpl_->collector_->join();
181 } 187 }
182 } 188 }
183 } 189 }
184
185 } 190 }