Mercurial > hg > orthanc-wsi
annotate Framework/MultiThreading/BagOfTasksProcessor.cpp @ 234:a464169306ce
integration 1.0->mainline
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 14 Jan 2021 08:50:27 +0100 |
parents | 1e864138f0da |
children | b0ee417b667a |
rev | line source |
---|---|
151 | 1 /** |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
214
1e864138f0da
upgrade to year 2021
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
199
diff
changeset
|
5 * Copyright (C) 2017-2021 Osimis S.A., Belgium |
151 | 6 * |
7 * This program is free software: you can redistribute it and/or | |
8 * modify it under the terms of the GNU Affero General Public License | |
9 * as published by the Free Software Foundation, either version 3 of | |
10 * the License, or (at your option) any later version. | |
11 * | |
12 * This program is distributed in the hope that it will be useful, but | |
13 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
15 * Affero General Public License for more details. | |
16 * | |
17 * You should have received a copy of the GNU Affero General Public License | |
18 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 **/ | |
20 | |
21 | |
22 #include "BagOfTasksProcessor.h" | |
23 | |
199
a1c265cb2174
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
192
diff
changeset
|
24 #include <Compatibility.h> // For std::unique_ptr |
192 | 25 #include <Logging.h> |
26 #include <OrthancException.h> | |
151 | 27 |
28 #include <stdio.h> | |
29 | |
30 namespace Orthanc | |
31 { | |
32 class BagOfTasksProcessor::Task : public IDynamicObject | |
33 { | |
34 private: | |
35 uint64_t bag_; | |
199
a1c265cb2174
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
192
diff
changeset
|
36 std::unique_ptr<ICommand> command_; |
151 | 37 |
38 public: | |
39 Task(uint64_t bag, | |
40 ICommand* command) : | |
41 bag_(bag), | |
42 command_(command) | |
43 { | |
44 } | |
45 | |
46 bool Execute() | |
47 { | |
48 try | |
49 { | |
50 return command_->Execute(); | |
51 } | |
52 catch (OrthancException& e) | |
53 { | |
54 LOG(ERROR) << "Exception while processing a bag of tasks: " << e.What(); | |
55 return false; | |
56 } | |
57 catch (std::runtime_error& e) | |
58 { | |
59 LOG(ERROR) << "Runtime exception while processing a bag of tasks: " << e.what(); | |
60 return false; | |
61 } | |
62 catch (...) | |
63 { | |
64 LOG(ERROR) << "Native exception while processing a bag of tasks"; | |
65 return false; | |
66 } | |
67 } | |
68 | |
69 uint64_t GetBag() | |
70 { | |
71 return bag_; | |
72 } | |
73 }; | |
74 | |
75 | |
76 void BagOfTasksProcessor::SignalProgress(Task& task, | |
77 Bag& bag) | |
78 { | |
79 assert(bag.done_ < bag.size_); | |
80 | |
81 bag.done_ += 1; | |
82 | |
83 if (bag.done_ == bag.size_) | |
84 { | |
85 exitStatus_[task.GetBag()] = (bag.status_ == BagStatus_Running); | |
86 bagFinished_.notify_all(); | |
87 } | |
88 } | |
89 | |
90 void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that) | |
91 { | |
92 while (that->continue_) | |
93 { | |
199
a1c265cb2174
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
192
diff
changeset
|
94 std::unique_ptr<IDynamicObject> obj(that->queue_.Dequeue(100)); |
151 | 95 if (obj.get() != NULL) |
96 { | |
97 Task& task = *dynamic_cast<Task*>(obj.get()); | |
98 | |
99 { | |
100 boost::mutex::scoped_lock lock(that->mutex_); | |
101 | |
102 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
103 assert(bag != that->bags_.end()); | |
104 assert(bag->second.done_ < bag->second.size_); | |
105 | |
106 if (bag->second.status_ != BagStatus_Running) | |
107 { | |
108 // Do not execute this task, as its parent bag of tasks | |
109 // has failed or is tagged as canceled | |
110 that->SignalProgress(task, bag->second); | |
111 continue; | |
112 } | |
113 } | |
114 | |
115 bool success = task.Execute(); | |
116 | |
117 { | |
118 boost::mutex::scoped_lock lock(that->mutex_); | |
119 | |
120 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
121 assert(bag != that->bags_.end()); | |
122 | |
123 if (!success) | |
124 { | |
125 bag->second.status_ = BagStatus_Failed; | |
126 } | |
127 | |
128 that->SignalProgress(task, bag->second); | |
129 } | |
130 } | |
131 } | |
132 } | |
133 | |
134 | |
135 void BagOfTasksProcessor::Cancel(int64_t bag) | |
136 { | |
137 boost::mutex::scoped_lock lock(mutex_); | |
138 | |
139 Bags::iterator it = bags_.find(bag); | |
140 if (it != bags_.end()) | |
141 { | |
142 it->second.status_ = BagStatus_Canceled; | |
143 } | |
144 } | |
145 | |
146 | |
147 bool BagOfTasksProcessor::Join(int64_t bag) | |
148 { | |
149 boost::mutex::scoped_lock lock(mutex_); | |
150 | |
151 while (continue_) | |
152 { | |
153 ExitStatus::iterator it = exitStatus_.find(bag); | |
154 if (it == exitStatus_.end()) // The bag is still running | |
155 { | |
156 bagFinished_.wait(lock); | |
157 } | |
158 else | |
159 { | |
160 bool status = it->second; | |
161 exitStatus_.erase(it); | |
162 return status; | |
163 } | |
164 } | |
165 | |
166 return false; // The processor is stopping | |
167 } | |
168 | |
169 | |
170 float BagOfTasksProcessor::GetProgress(int64_t bag) | |
171 { | |
172 boost::mutex::scoped_lock lock(mutex_); | |
173 | |
174 Bags::const_iterator it = bags_.find(bag); | |
175 if (it == bags_.end()) | |
176 { | |
177 // The bag of tasks has finished | |
178 return 1.0f; | |
179 } | |
180 else | |
181 { | |
182 return (static_cast<float>(it->second.done_) / | |
183 static_cast<float>(it->second.size_)); | |
184 } | |
185 } | |
186 | |
187 | |
188 bool BagOfTasksProcessor::Handle::Join() | |
189 { | |
190 if (hasJoined_) | |
191 { | |
192 return status_; | |
193 } | |
194 else | |
195 { | |
196 status_ = that_.Join(bag_); | |
197 hasJoined_ = true; | |
198 return status_; | |
199 } | |
200 } | |
201 | |
202 | |
203 BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) : | |
204 countBags_(0), | |
205 continue_(true) | |
206 { | |
207 if (countThreads == 0) | |
208 { | |
209 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
210 } | |
211 | |
212 threads_.resize(countThreads); | |
213 | |
214 for (size_t i = 0; i < threads_.size(); i++) | |
215 { | |
216 threads_[i] = new boost::thread(Worker, this); | |
217 } | |
218 } | |
219 | |
220 | |
221 BagOfTasksProcessor::~BagOfTasksProcessor() | |
222 { | |
223 continue_ = false; | |
224 | |
225 bagFinished_.notify_all(); // Wakes up all the pending "Join()" | |
226 | |
227 for (size_t i = 0; i < threads_.size(); i++) | |
228 { | |
229 if (threads_[i]) | |
230 { | |
231 if (threads_[i]->joinable()) | |
232 { | |
233 threads_[i]->join(); | |
234 } | |
235 | |
236 delete threads_[i]; | |
237 threads_[i] = NULL; | |
238 } | |
239 } | |
240 } | |
241 | |
242 | |
243 BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks) | |
244 { | |
245 if (tasks.GetSize() == 0) | |
246 { | |
247 return new Handle(*this, 0, true); | |
248 } | |
249 | |
250 boost::mutex::scoped_lock lock(mutex_); | |
251 | |
252 uint64_t id = countBags_; | |
253 countBags_ += 1; | |
254 | |
255 Bag bag(tasks.GetSize()); | |
256 bags_[id] = bag; | |
257 | |
258 while (!tasks.IsEmpty()) | |
259 { | |
260 queue_.Enqueue(new Task(id, tasks.Pop())); | |
261 } | |
262 | |
263 return new Handle(*this, id, false); | |
264 } | |
265 } |