Mercurial > hg > orthanc
annotate Core/MultiThreading/BagOfTasksProcessor.cpp @ 2182:085a41c66133
fix
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 25 Nov 2016 19:08:10 +0100 |
parents | 5b818f677dd6 |
children | a3a65de1840f |
rev | line source |
---|---|
1920 | 1 /** |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
5 * | |
6 * This program is free software: you can redistribute it and/or | |
7 * modify it under the terms of the GNU General Public License as | |
8 * published by the Free Software Foundation, either version 3 of the | |
9 * License, or (at your option) any later version. | |
10 * | |
11 * In addition, as a special exception, the copyright holders of this | |
12 * program give permission to link the code of its release with the | |
13 * OpenSSL project's "OpenSSL" library (or with modified versions of it | |
14 * that use the same license as the "OpenSSL" library), and distribute | |
15 * the linked executables. You must obey the GNU General Public License | |
16 * in all respects for all of the code used other than "OpenSSL". If you | |
17 * modify file(s) with this exception, you may extend this exception to | |
18 * your version of the file(s), but you are not obligated to do so. If | |
19 * you do not wish to do so, delete this exception statement from your | |
20 * version. If you delete this exception statement from all source files | |
21 * in the program, then also delete it here. | |
22 * | |
23 * This program is distributed in the hope that it will be useful, but | |
24 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
26 * General Public License for more details. | |
27 * | |
28 * You should have received a copy of the GNU General Public License | |
29 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
30 **/ | |
31 | |
32 | |
33 #include "../PrecompiledHeaders.h" | |
34 #include "BagOfTasksProcessor.h" | |
35 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
36 #include "../Logging.h" |
1920 | 37 #include "../OrthancException.h" |
38 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
39 #include <stdio.h> |
1920 | 40 |
41 namespace Orthanc | |
42 { | |
43 class BagOfTasksProcessor::Task : public IDynamicObject | |
44 { | |
45 private: | |
46 uint64_t bag_; | |
47 std::auto_ptr<ICommand> command_; | |
48 | |
49 public: | |
50 Task(uint64_t bag, | |
51 ICommand* command) : | |
52 bag_(bag), | |
53 command_(command) | |
54 { | |
55 } | |
56 | |
57 bool Execute() | |
58 { | |
59 try | |
60 { | |
61 return command_->Execute(); | |
62 } | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
63 catch (OrthancException& e) |
1920 | 64 { |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
65 LOG(ERROR) << "Exception while processing a bag of tasks: " << e.What(); |
1920 | 66 return false; |
67 } | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
68 catch (std::runtime_error& e) |
1920 | 69 { |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
70 LOG(ERROR) << "Runtime exception while processing a bag of tasks: " << e.what(); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
71 return false; |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
72 } |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
73 catch (...) |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
74 { |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
75 LOG(ERROR) << "Native exception while processing a bag of tasks"; |
1920 | 76 return false; |
77 } | |
78 } | |
79 | |
80 uint64_t GetBag() | |
81 { | |
82 return bag_; | |
83 } | |
84 }; | |
85 | |
86 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
87 void BagOfTasksProcessor::SignalProgress(Task& task, |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
88 Bag& bag) |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
89 { |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
90 assert(bag.done_ < bag.size_); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
91 |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
92 bag.done_ += 1; |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
93 |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
94 if (bag.done_ == bag.size_) |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
95 { |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
96 exitStatus_[task.GetBag()] = (bag.status_ == BagStatus_Running); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
97 bagFinished_.notify_all(); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
98 } |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
99 } |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
100 |
1920 | 101 void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that) |
102 { | |
103 while (that->continue_) | |
104 { | |
105 std::auto_ptr<IDynamicObject> obj(that->queue_.Dequeue(100)); | |
106 if (obj.get() != NULL) | |
107 { | |
108 Task& task = *dynamic_cast<Task*>(obj.get()); | |
109 | |
110 { | |
111 boost::mutex::scoped_lock lock(that->mutex_); | |
112 | |
113 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
114 assert(bag != that->bags_.end()); | |
115 assert(bag->second.done_ < bag->second.size_); | |
116 | |
117 if (bag->second.status_ != BagStatus_Running) | |
118 { | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
119 // Do not execute this task, as its parent bag of tasks |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
120 // has failed or is tagged as canceled |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
121 that->SignalProgress(task, bag->second); |
1920 | 122 continue; |
123 } | |
124 } | |
125 | |
126 bool success = task.Execute(); | |
127 | |
128 { | |
129 boost::mutex::scoped_lock lock(that->mutex_); | |
130 | |
131 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
132 assert(bag != that->bags_.end()); | |
133 | |
134 if (!success) | |
135 { | |
136 bag->second.status_ = BagStatus_Failed; | |
137 } | |
138 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
139 that->SignalProgress(task, bag->second); |
1920 | 140 } |
141 } | |
142 } | |
143 } | |
144 | |
145 | |
146 void BagOfTasksProcessor::Cancel(int64_t bag) | |
147 { | |
148 boost::mutex::scoped_lock lock(mutex_); | |
149 | |
150 Bags::iterator it = bags_.find(bag); | |
151 if (it != bags_.end()) | |
152 { | |
153 it->second.status_ = BagStatus_Canceled; | |
154 } | |
155 } | |
156 | |
157 | |
158 bool BagOfTasksProcessor::Join(int64_t bag) | |
159 { | |
160 boost::mutex::scoped_lock lock(mutex_); | |
161 | |
162 while (continue_) | |
163 { | |
164 ExitStatus::iterator it = exitStatus_.find(bag); | |
165 if (it == exitStatus_.end()) // The bag is still running | |
166 { | |
167 bagFinished_.wait(lock); | |
168 } | |
169 else | |
170 { | |
171 bool status = it->second; | |
172 exitStatus_.erase(it); | |
173 return status; | |
174 } | |
175 } | |
176 | |
177 return false; // The processor is stopping | |
178 } | |
179 | |
180 | |
181 float BagOfTasksProcessor::GetProgress(int64_t bag) | |
182 { | |
183 boost::mutex::scoped_lock lock(mutex_); | |
184 | |
185 Bags::const_iterator it = bags_.find(bag); | |
186 if (it == bags_.end()) | |
187 { | |
188 // The bag of tasks has finished | |
189 return 1.0f; | |
190 } | |
191 else | |
192 { | |
193 return (static_cast<float>(it->second.done_) / | |
194 static_cast<float>(it->second.size_)); | |
195 } | |
196 } | |
197 | |
198 | |
199 bool BagOfTasksProcessor::Handle::Join() | |
200 { | |
201 if (hasJoined_) | |
202 { | |
203 return status_; | |
204 } | |
205 else | |
206 { | |
207 status_ = that_.Join(bag_); | |
208 hasJoined_ = true; | |
209 return status_; | |
210 } | |
211 } | |
212 | |
213 | |
214 BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) : | |
215 countBags_(0), | |
216 continue_(true) | |
217 { | |
2016 | 218 if (countThreads == 0) |
1920 | 219 { |
220 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
221 } | |
222 | |
223 threads_.resize(countThreads); | |
224 | |
225 for (size_t i = 0; i < threads_.size(); i++) | |
226 { | |
227 threads_[i] = new boost::thread(Worker, this); | |
228 } | |
229 } | |
230 | |
231 | |
232 BagOfTasksProcessor::~BagOfTasksProcessor() | |
233 { | |
234 continue_ = false; | |
235 | |
236 bagFinished_.notify_all(); // Wakes up all the pending "Join()" | |
237 | |
238 for (size_t i = 0; i < threads_.size(); i++) | |
239 { | |
240 if (threads_[i]) | |
241 { | |
242 if (threads_[i]->joinable()) | |
243 { | |
244 threads_[i]->join(); | |
245 } | |
246 | |
247 delete threads_[i]; | |
248 threads_[i] = NULL; | |
249 } | |
250 } | |
251 } | |
252 | |
253 | |
254 BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks) | |
255 { | |
1923
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
256 if (tasks.GetSize() == 0) |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
257 { |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
258 return new Handle(*this, 0, true); |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
259 } |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
260 |
1920 | 261 boost::mutex::scoped_lock lock(mutex_); |
262 | |
263 uint64_t id = countBags_; | |
264 countBags_ += 1; | |
265 | |
266 Bag bag(tasks.GetSize()); | |
267 bags_[id] = bag; | |
268 | |
269 while (!tasks.IsEmpty()) | |
270 { | |
271 queue_.Enqueue(new Task(id, tasks.Pop())); | |
272 } | |
273 | |
1923
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
274 return new Handle(*this, id, false); |
1920 | 275 } |
276 } |