Mercurial > hg > orthanc-wsi
annotate Framework/MultiThreading/BagOfTasksProcessor.cpp @ 254:20a730889ae2
upgrade to 2023
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 07 Jul 2023 17:38:07 +0200 |
parents | 49f647ed1b4c |
children | 7020852a8fa9 |
rev | line source |
---|---|
151 | 1 /** |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
254 | 5 * Copyright (C) 2017-2023 Osimis S.A., Belgium |
6 * Copyright (C) 2021-2023 Sebastien Jodogne, ICTEAM UCLouvain, Belgium | |
151 | 7 * |
8 * This program is free software: you can redistribute it and/or | |
9 * modify it under the terms of the GNU Affero General Public License | |
10 * as published by the Free Software Foundation, either version 3 of | |
11 * the License, or (at your option) any later version. | |
12 * | |
13 * This program is distributed in the hope that it will be useful, but | |
14 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 * Affero General Public License for more details. | |
17 * | |
18 * You should have received a copy of the GNU Affero General Public License | |
19 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
20 **/ | |
21 | |
22 | |
23 #include "BagOfTasksProcessor.h" | |
24 | |
199
a1c265cb2174
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
192
diff
changeset
|
25 #include <Compatibility.h> // For std::unique_ptr |
192 | 26 #include <Logging.h> |
27 #include <OrthancException.h> | |
151 | 28 |
29 #include <stdio.h> | |
30 | |
236
b0ee417b667a
migrating new definitions in namespace Orthanc to namespace OrthancWSI
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
214
diff
changeset
|
31 namespace OrthancWSI |
151 | 32 { |
236
b0ee417b667a
migrating new definitions in namespace Orthanc to namespace OrthancWSI
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
214
diff
changeset
|
33 class BagOfTasksProcessor::Task : public Orthanc::IDynamicObject |
151 | 34 { |
35 private: | |
36 uint64_t bag_; | |
199
a1c265cb2174
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
192
diff
changeset
|
37 std::unique_ptr<ICommand> command_; |
151 | 38 |
39 public: | |
40 Task(uint64_t bag, | |
41 ICommand* command) : | |
42 bag_(bag), | |
43 command_(command) | |
44 { | |
45 } | |
46 | |
47 bool Execute() | |
48 { | |
49 try | |
50 { | |
51 return command_->Execute(); | |
52 } | |
236
b0ee417b667a
migrating new definitions in namespace Orthanc to namespace OrthancWSI
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
214
diff
changeset
|
53 catch (Orthanc::OrthancException& e) |
151 | 54 { |
55 LOG(ERROR) << "Exception while processing a bag of tasks: " << e.What(); | |
56 return false; | |
57 } | |
58 catch (std::runtime_error& e) | |
59 { | |
60 LOG(ERROR) << "Runtime exception while processing a bag of tasks: " << e.what(); | |
61 return false; | |
62 } | |
63 catch (...) | |
64 { | |
65 LOG(ERROR) << "Native exception while processing a bag of tasks"; | |
66 return false; | |
67 } | |
68 } | |
69 | |
70 uint64_t GetBag() | |
71 { | |
72 return bag_; | |
73 } | |
74 }; | |
75 | |
76 | |
77 void BagOfTasksProcessor::SignalProgress(Task& task, | |
78 Bag& bag) | |
79 { | |
80 assert(bag.done_ < bag.size_); | |
81 | |
82 bag.done_ += 1; | |
83 | |
84 if (bag.done_ == bag.size_) | |
85 { | |
86 exitStatus_[task.GetBag()] = (bag.status_ == BagStatus_Running); | |
87 bagFinished_.notify_all(); | |
88 } | |
89 } | |
90 | |
91 void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that) | |
92 { | |
93 while (that->continue_) | |
94 { | |
236
b0ee417b667a
migrating new definitions in namespace Orthanc to namespace OrthancWSI
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
214
diff
changeset
|
95 std::unique_ptr<Orthanc::IDynamicObject> obj(that->queue_.Dequeue(100)); |
151 | 96 if (obj.get() != NULL) |
97 { | |
98 Task& task = *dynamic_cast<Task*>(obj.get()); | |
99 | |
100 { | |
101 boost::mutex::scoped_lock lock(that->mutex_); | |
102 | |
103 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
104 assert(bag != that->bags_.end()); | |
105 assert(bag->second.done_ < bag->second.size_); | |
106 | |
107 if (bag->second.status_ != BagStatus_Running) | |
108 { | |
109 // Do not execute this task, as its parent bag of tasks | |
110 // has failed or is tagged as canceled | |
111 that->SignalProgress(task, bag->second); | |
112 continue; | |
113 } | |
114 } | |
115 | |
116 bool success = task.Execute(); | |
117 | |
118 { | |
119 boost::mutex::scoped_lock lock(that->mutex_); | |
120 | |
121 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
122 assert(bag != that->bags_.end()); | |
123 | |
124 if (!success) | |
125 { | |
126 bag->second.status_ = BagStatus_Failed; | |
127 } | |
128 | |
129 that->SignalProgress(task, bag->second); | |
130 } | |
131 } | |
132 } | |
133 } | |
134 | |
135 | |
136 void BagOfTasksProcessor::Cancel(int64_t bag) | |
137 { | |
138 boost::mutex::scoped_lock lock(mutex_); | |
139 | |
140 Bags::iterator it = bags_.find(bag); | |
141 if (it != bags_.end()) | |
142 { | |
143 it->second.status_ = BagStatus_Canceled; | |
144 } | |
145 } | |
146 | |
147 | |
148 bool BagOfTasksProcessor::Join(int64_t bag) | |
149 { | |
150 boost::mutex::scoped_lock lock(mutex_); | |
151 | |
152 while (continue_) | |
153 { | |
154 ExitStatus::iterator it = exitStatus_.find(bag); | |
155 if (it == exitStatus_.end()) // The bag is still running | |
156 { | |
157 bagFinished_.wait(lock); | |
158 } | |
159 else | |
160 { | |
161 bool status = it->second; | |
162 exitStatus_.erase(it); | |
163 return status; | |
164 } | |
165 } | |
166 | |
167 return false; // The processor is stopping | |
168 } | |
169 | |
170 | |
171 float BagOfTasksProcessor::GetProgress(int64_t bag) | |
172 { | |
173 boost::mutex::scoped_lock lock(mutex_); | |
174 | |
175 Bags::const_iterator it = bags_.find(bag); | |
176 if (it == bags_.end()) | |
177 { | |
178 // The bag of tasks has finished | |
179 return 1.0f; | |
180 } | |
181 else | |
182 { | |
183 return (static_cast<float>(it->second.done_) / | |
184 static_cast<float>(it->second.size_)); | |
185 } | |
186 } | |
187 | |
188 | |
189 bool BagOfTasksProcessor::Handle::Join() | |
190 { | |
191 if (hasJoined_) | |
192 { | |
193 return status_; | |
194 } | |
195 else | |
196 { | |
197 status_ = that_.Join(bag_); | |
198 hasJoined_ = true; | |
199 return status_; | |
200 } | |
201 } | |
202 | |
203 | |
204 BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) : | |
205 countBags_(0), | |
206 continue_(true) | |
207 { | |
208 if (countThreads == 0) | |
209 { | |
236
b0ee417b667a
migrating new definitions in namespace Orthanc to namespace OrthancWSI
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
214
diff
changeset
|
210 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); |
151 | 211 } |
212 | |
213 threads_.resize(countThreads); | |
214 | |
215 for (size_t i = 0; i < threads_.size(); i++) | |
216 { | |
217 threads_[i] = new boost::thread(Worker, this); | |
218 } | |
219 } | |
220 | |
221 | |
222 BagOfTasksProcessor::~BagOfTasksProcessor() | |
223 { | |
224 continue_ = false; | |
225 | |
226 bagFinished_.notify_all(); // Wakes up all the pending "Join()" | |
227 | |
228 for (size_t i = 0; i < threads_.size(); i++) | |
229 { | |
230 if (threads_[i]) | |
231 { | |
232 if (threads_[i]->joinable()) | |
233 { | |
234 threads_[i]->join(); | |
235 } | |
236 | |
237 delete threads_[i]; | |
238 threads_[i] = NULL; | |
239 } | |
240 } | |
241 } | |
242 | |
243 | |
244 BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks) | |
245 { | |
246 if (tasks.GetSize() == 0) | |
247 { | |
248 return new Handle(*this, 0, true); | |
249 } | |
250 | |
251 boost::mutex::scoped_lock lock(mutex_); | |
252 | |
253 uint64_t id = countBags_; | |
254 countBags_ += 1; | |
255 | |
256 Bag bag(tasks.GetSize()); | |
257 bags_[id] = bag; | |
258 | |
259 while (!tasks.IsEmpty()) | |
260 { | |
261 queue_.Enqueue(new Task(id, tasks.Pop())); | |
262 } | |
263 | |
264 return new Handle(*this, id, false); | |
265 } | |
266 } |