Mercurial > hg > orthanc
comparison Core/MultiThreading/BagOfRunnablesBySteps.cpp @ 1678:1a3c20cd1b53 db-changes
renames in BagOfRunnablesBySteps to make things clearer
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Tue, 06 Oct 2015 11:12:13 +0200 |
parents | f967bdf8534e |
children |
comparison
equal
deleted
inserted
replaced
1677:a903d57d9f0c | 1678:1a3c20cd1b53 |
---|---|
40 | 40 |
41 namespace Orthanc | 41 namespace Orthanc |
42 { | 42 { |
43 struct BagOfRunnablesBySteps::PImpl | 43 struct BagOfRunnablesBySteps::PImpl |
44 { | 44 { |
45 bool continue_; | 45 boost::mutex mutex_; |
46 bool stopFinishListener_; | |
47 | 46 |
48 boost::mutex mutex_; | 47 // 1. The set of active runnables, i.e. the runnables that have |
49 boost::condition_variable oneThreadIsStopped_; | 48 // not finished their job yet, plus the runnables that have not |
50 boost::condition_variable oneThreadIsJoined_; | 49 // been joined yet. |
50 typedef std::map<IRunnableBySteps*, boost::thread*> ActiveRunnables; | |
51 ActiveRunnables activeRunnables_; | |
52 bool continueActiveRunnables_; | |
51 | 53 |
52 // The list of threads that are waiting to be joined. | 54 // 2. Condition variable that is notified when one active runnable |
53 typedef std::stack<IRunnableBySteps*> StoppedThreads; | 55 // stops. |
54 StoppedThreads stoppedThreads_; | 56 boost::condition_variable oneRunnableHasStopped_; |
55 | 57 |
56 // The set of active runnables, i.e. the runnables that have not | 58 // 3. The list of runnables that have stopped but are waiting to be |
57 // finished their job yet, plus the runnables that have not been | 59 // joined by the collector. |
58 // joined yet. | 60 typedef std::stack<IRunnableBySteps*> StoppedRunnables; |
59 typedef std::map<IRunnableBySteps*, boost::thread*> ActiveThreads; | 61 StoppedRunnables stoppedRunnables_; |
60 ActiveThreads activeThreads_; | 62 |
63 // 4. Condition variable that is notified when one stopped | |
64 // runnable has been joined. | |
65 boost::condition_variable oneRunnableIsJoined_; | |
61 | 66 |
62 // The thread that joins the runnables after they stop | 67 // The thread that joins the runnables after they stop |
63 std::auto_ptr<boost::thread> finishListener_; | 68 bool continueCollector_; |
69 std::auto_ptr<boost::thread> collector_; | |
64 }; | 70 }; |
65 | 71 |
66 | 72 |
67 | 73 |
68 void BagOfRunnablesBySteps::RunnableThread(BagOfRunnablesBySteps* bag, | 74 void BagOfRunnablesBySteps::RunnableThread(BagOfRunnablesBySteps* bag, |
69 IRunnableBySteps* runnable) | 75 IRunnableBySteps* runnable) |
70 { | 76 { |
71 while (bag->pimpl_->continue_) | 77 while (bag->pimpl_->continueActiveRunnables_) |
72 { | 78 { |
73 if (!runnable->Step()) | 79 if (!runnable->Step()) |
74 { | 80 { |
75 break; | 81 break; |
76 } | 82 } |
77 } | 83 } |
78 | 84 |
79 { | 85 { |
80 // Register this runnable as having stopped | 86 // Register this runnable as having stopped |
81 boost::mutex::scoped_lock lock(bag->pimpl_->mutex_); | 87 boost::mutex::scoped_lock lock(bag->pimpl_->mutex_); |
82 bag->pimpl_->stoppedThreads_.push(runnable); | 88 bag->pimpl_->stoppedRunnables_.push(runnable); |
83 bag->pimpl_->oneThreadIsStopped_.notify_one(); | 89 bag->pimpl_->oneRunnableHasStopped_.notify_one(); |
84 } | 90 } |
85 } | 91 } |
86 | 92 |
87 | 93 |
88 void BagOfRunnablesBySteps::FinishListener(BagOfRunnablesBySteps* bag) | 94 void BagOfRunnablesBySteps::CollectorThread(BagOfRunnablesBySteps* bag) |
89 { | 95 { |
90 boost::mutex::scoped_lock lock(bag->pimpl_->mutex_); | 96 boost::mutex::scoped_lock lock(bag->pimpl_->mutex_); |
91 | 97 |
92 while (!bag->pimpl_->stopFinishListener_) | 98 while (bag->pimpl_->continueCollector_) |
93 { | 99 { |
94 while (!bag->pimpl_->stoppedThreads_.empty()) | 100 while (!bag->pimpl_->stoppedRunnables_.empty()) |
95 { | 101 { |
96 std::auto_ptr<IRunnableBySteps> r(bag->pimpl_->stoppedThreads_.top()); | 102 std::auto_ptr<IRunnableBySteps> r(bag->pimpl_->stoppedRunnables_.top()); |
97 bag->pimpl_->stoppedThreads_.pop(); | 103 bag->pimpl_->stoppedRunnables_.pop(); |
98 | 104 |
99 assert(r.get() != NULL); | 105 assert(r.get() != NULL); |
100 assert(bag->pimpl_->activeThreads_.find(r.get()) != bag->pimpl_->activeThreads_.end()); | 106 assert(bag->pimpl_->activeRunnables_.find(r.get()) != bag->pimpl_->activeRunnables_.end()); |
101 | 107 |
102 std::auto_ptr<boost::thread> t(bag->pimpl_->activeThreads_[r.get()]); | 108 std::auto_ptr<boost::thread> t(bag->pimpl_->activeRunnables_[r.get()]); |
103 bag->pimpl_->activeThreads_.erase(r.get()); | 109 bag->pimpl_->activeRunnables_.erase(r.get()); |
104 | 110 |
105 assert(t.get() != NULL); | 111 assert(t.get() != NULL); |
106 assert(bag->pimpl_->activeThreads_.find(r.get()) == bag->pimpl_->activeThreads_.end()); | 112 assert(bag->pimpl_->activeRunnables_.find(r.get()) == bag->pimpl_->activeRunnables_.end()); |
107 | 113 |
108 if (t->joinable()) | 114 if (t->joinable()) |
109 { | 115 { |
110 t->join(); | 116 t->join(); |
111 } | 117 } |
112 | 118 |
113 bag->pimpl_->oneThreadIsJoined_.notify_one(); | 119 bag->pimpl_->oneRunnableIsJoined_.notify_one(); |
114 } | 120 } |
115 | 121 |
116 bag->pimpl_->oneThreadIsStopped_.wait(lock); | 122 bag->pimpl_->oneRunnableHasStopped_.wait(lock); |
117 } | 123 } |
118 } | 124 } |
119 | 125 |
120 | 126 |
121 BagOfRunnablesBySteps::BagOfRunnablesBySteps() : pimpl_(new PImpl) | 127 BagOfRunnablesBySteps::BagOfRunnablesBySteps() : pimpl_(new PImpl) |
122 { | 128 { |
123 pimpl_->continue_ = true; | 129 pimpl_->continueActiveRunnables_ = true; |
124 pimpl_->stopFinishListener_ = false; | 130 pimpl_->continueCollector_ = true; |
125 | 131 |
126 // Everyting is set up, the finish listener can be started | 132 // Everyting is set up, the finish listener can be started |
127 pimpl_->finishListener_.reset(new boost::thread(FinishListener, this)); | 133 pimpl_->collector_.reset(new boost::thread(CollectorThread, this)); |
128 } | 134 } |
129 | 135 |
130 | 136 |
131 BagOfRunnablesBySteps::~BagOfRunnablesBySteps() | 137 BagOfRunnablesBySteps::~BagOfRunnablesBySteps() |
132 { | 138 { |
133 if (!pimpl_->stopFinishListener_) | 139 if (pimpl_->continueCollector_) |
134 { | 140 { |
135 LOG(ERROR) << "INTERNAL ERROR: BagOfRunnablesBySteps::Finalize() should be invoked manually to avoid mess in the destruction order!"; | 141 LOG(ERROR) << "INTERNAL ERROR: BagOfRunnablesBySteps::Finalize() should be invoked manually to avoid mess in the destruction order!"; |
136 Finalize(); | 142 Finalize(); |
137 } | 143 } |
138 } | 144 } |
144 std::auto_ptr<IRunnableBySteps> runnableRabi(runnable); | 150 std::auto_ptr<IRunnableBySteps> runnableRabi(runnable); |
145 | 151 |
146 boost::mutex::scoped_lock lock(pimpl_->mutex_); | 152 boost::mutex::scoped_lock lock(pimpl_->mutex_); |
147 boost::thread* t(new boost::thread(RunnableThread, this, runnable)); | 153 boost::thread* t(new boost::thread(RunnableThread, this, runnable)); |
148 | 154 |
149 pimpl_->activeThreads_.insert(std::make_pair(runnableRabi.release(), t)); | 155 pimpl_->activeRunnables_.insert(std::make_pair(runnableRabi.release(), t)); |
150 } | 156 } |
151 | 157 |
152 | 158 |
153 void BagOfRunnablesBySteps::StopAll() | 159 void BagOfRunnablesBySteps::StopAll() |
154 { | 160 { |
155 boost::mutex::scoped_lock lock(pimpl_->mutex_); | 161 boost::mutex::scoped_lock lock(pimpl_->mutex_); |
156 pimpl_->continue_ = false; | 162 pimpl_->continueActiveRunnables_ = false; |
157 | 163 |
158 while (pimpl_->activeThreads_.size() > 0) | 164 while (pimpl_->activeRunnables_.size() > 0) |
159 { | 165 { |
160 pimpl_->oneThreadIsJoined_.wait(lock); | 166 pimpl_->oneRunnableIsJoined_.wait(lock); |
161 } | 167 } |
162 | 168 |
163 pimpl_->continue_ = true; | 169 pimpl_->continueActiveRunnables_ = true; |
164 } | 170 } |
165 | 171 |
166 | 172 |
167 | 173 |
168 void BagOfRunnablesBySteps::Finalize() | 174 void BagOfRunnablesBySteps::Finalize() |
169 { | 175 { |
170 if (!pimpl_->stopFinishListener_) | 176 if (pimpl_->continueCollector_) |
171 { | 177 { |
172 StopAll(); | 178 StopAll(); |
173 | 179 |
174 // Stop the finish listener | 180 // Stop the finish listener |
175 pimpl_->stopFinishListener_ = true; | 181 pimpl_->continueCollector_ = false; |
176 pimpl_->oneThreadIsStopped_.notify_one(); // Awakens the listener | 182 pimpl_->oneRunnableHasStopped_.notify_one(); // Awakens the listener |
177 | 183 |
178 if (pimpl_->finishListener_->joinable()) | 184 if (pimpl_->collector_->joinable()) |
179 { | 185 { |
180 pimpl_->finishListener_->join(); | 186 pimpl_->collector_->join(); |
181 } | 187 } |
182 } | 188 } |
183 } | 189 } |
184 | |
185 } | 190 } |