Mercurial > hg > orthanc-wsi
comparison Framework/MultiThreading/BagOfTasksProcessor.cpp @ 151:fb8d4cd2f618
fix applications
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Tue, 17 Jul 2018 10:06:39 +0200 |
parents | |
children | 6b8ccfc02051 |
comparison
equal
deleted
inserted
replaced
150:442102e14933 | 151:fb8d4cd2f618 |
---|---|
1 /** | |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium | |
6 * | |
7 * This program is free software: you can redistribute it and/or | |
8 * modify it under the terms of the GNU Affero General Public License | |
9 * as published by the Free Software Foundation, either version 3 of | |
10 * the License, or (at your option) any later version. | |
11 * | |
12 * This program is distributed in the hope that it will be useful, but | |
13 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
15 * Affero General Public License for more details. | |
16 * | |
17 * You should have received a copy of the GNU Affero General Public License | |
18 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 **/ | |
20 | |
21 | |
22 #include "BagOfTasksProcessor.h" | |
23 | |
24 #include <Core/Logging.h> | |
25 #include <Core/OrthancException.h> | |
26 | |
27 #include <stdio.h> | |
28 | |
29 namespace Orthanc | |
30 { | |
31 class BagOfTasksProcessor::Task : public IDynamicObject | |
32 { | |
33 private: | |
34 uint64_t bag_; | |
35 std::auto_ptr<ICommand> command_; | |
36 | |
37 public: | |
38 Task(uint64_t bag, | |
39 ICommand* command) : | |
40 bag_(bag), | |
41 command_(command) | |
42 { | |
43 } | |
44 | |
45 bool Execute() | |
46 { | |
47 try | |
48 { | |
49 return command_->Execute(); | |
50 } | |
51 catch (OrthancException& e) | |
52 { | |
53 LOG(ERROR) << "Exception while processing a bag of tasks: " << e.What(); | |
54 return false; | |
55 } | |
56 catch (std::runtime_error& e) | |
57 { | |
58 LOG(ERROR) << "Runtime exception while processing a bag of tasks: " << e.what(); | |
59 return false; | |
60 } | |
61 catch (...) | |
62 { | |
63 LOG(ERROR) << "Native exception while processing a bag of tasks"; | |
64 return false; | |
65 } | |
66 } | |
67 | |
68 uint64_t GetBag() | |
69 { | |
70 return bag_; | |
71 } | |
72 }; | |
73 | |
74 | |
75 void BagOfTasksProcessor::SignalProgress(Task& task, | |
76 Bag& bag) | |
77 { | |
78 assert(bag.done_ < bag.size_); | |
79 | |
80 bag.done_ += 1; | |
81 | |
82 if (bag.done_ == bag.size_) | |
83 { | |
84 exitStatus_[task.GetBag()] = (bag.status_ == BagStatus_Running); | |
85 bagFinished_.notify_all(); | |
86 } | |
87 } | |
88 | |
89 void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that) | |
90 { | |
91 while (that->continue_) | |
92 { | |
93 std::auto_ptr<IDynamicObject> obj(that->queue_.Dequeue(100)); | |
94 if (obj.get() != NULL) | |
95 { | |
96 Task& task = *dynamic_cast<Task*>(obj.get()); | |
97 | |
98 { | |
99 boost::mutex::scoped_lock lock(that->mutex_); | |
100 | |
101 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
102 assert(bag != that->bags_.end()); | |
103 assert(bag->second.done_ < bag->second.size_); | |
104 | |
105 if (bag->second.status_ != BagStatus_Running) | |
106 { | |
107 // Do not execute this task, as its parent bag of tasks | |
108 // has failed or is tagged as canceled | |
109 that->SignalProgress(task, bag->second); | |
110 continue; | |
111 } | |
112 } | |
113 | |
114 bool success = task.Execute(); | |
115 | |
116 { | |
117 boost::mutex::scoped_lock lock(that->mutex_); | |
118 | |
119 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
120 assert(bag != that->bags_.end()); | |
121 | |
122 if (!success) | |
123 { | |
124 bag->second.status_ = BagStatus_Failed; | |
125 } | |
126 | |
127 that->SignalProgress(task, bag->second); | |
128 } | |
129 } | |
130 } | |
131 } | |
132 | |
133 | |
134 void BagOfTasksProcessor::Cancel(int64_t bag) | |
135 { | |
136 boost::mutex::scoped_lock lock(mutex_); | |
137 | |
138 Bags::iterator it = bags_.find(bag); | |
139 if (it != bags_.end()) | |
140 { | |
141 it->second.status_ = BagStatus_Canceled; | |
142 } | |
143 } | |
144 | |
145 | |
146 bool BagOfTasksProcessor::Join(int64_t bag) | |
147 { | |
148 boost::mutex::scoped_lock lock(mutex_); | |
149 | |
150 while (continue_) | |
151 { | |
152 ExitStatus::iterator it = exitStatus_.find(bag); | |
153 if (it == exitStatus_.end()) // The bag is still running | |
154 { | |
155 bagFinished_.wait(lock); | |
156 } | |
157 else | |
158 { | |
159 bool status = it->second; | |
160 exitStatus_.erase(it); | |
161 return status; | |
162 } | |
163 } | |
164 | |
165 return false; // The processor is stopping | |
166 } | |
167 | |
168 | |
169 float BagOfTasksProcessor::GetProgress(int64_t bag) | |
170 { | |
171 boost::mutex::scoped_lock lock(mutex_); | |
172 | |
173 Bags::const_iterator it = bags_.find(bag); | |
174 if (it == bags_.end()) | |
175 { | |
176 // The bag of tasks has finished | |
177 return 1.0f; | |
178 } | |
179 else | |
180 { | |
181 return (static_cast<float>(it->second.done_) / | |
182 static_cast<float>(it->second.size_)); | |
183 } | |
184 } | |
185 | |
186 | |
187 bool BagOfTasksProcessor::Handle::Join() | |
188 { | |
189 if (hasJoined_) | |
190 { | |
191 return status_; | |
192 } | |
193 else | |
194 { | |
195 status_ = that_.Join(bag_); | |
196 hasJoined_ = true; | |
197 return status_; | |
198 } | |
199 } | |
200 | |
201 | |
202 BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) : | |
203 countBags_(0), | |
204 continue_(true) | |
205 { | |
206 if (countThreads == 0) | |
207 { | |
208 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
209 } | |
210 | |
211 threads_.resize(countThreads); | |
212 | |
213 for (size_t i = 0; i < threads_.size(); i++) | |
214 { | |
215 threads_[i] = new boost::thread(Worker, this); | |
216 } | |
217 } | |
218 | |
219 | |
220 BagOfTasksProcessor::~BagOfTasksProcessor() | |
221 { | |
222 continue_ = false; | |
223 | |
224 bagFinished_.notify_all(); // Wakes up all the pending "Join()" | |
225 | |
226 for (size_t i = 0; i < threads_.size(); i++) | |
227 { | |
228 if (threads_[i]) | |
229 { | |
230 if (threads_[i]->joinable()) | |
231 { | |
232 threads_[i]->join(); | |
233 } | |
234 | |
235 delete threads_[i]; | |
236 threads_[i] = NULL; | |
237 } | |
238 } | |
239 } | |
240 | |
241 | |
242 BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks) | |
243 { | |
244 if (tasks.GetSize() == 0) | |
245 { | |
246 return new Handle(*this, 0, true); | |
247 } | |
248 | |
249 boost::mutex::scoped_lock lock(mutex_); | |
250 | |
251 uint64_t id = countBags_; | |
252 countBags_ += 1; | |
253 | |
254 Bag bag(tasks.GetSize()); | |
255 bags_[id] = bag; | |
256 | |
257 while (!tasks.IsEmpty()) | |
258 { | |
259 queue_.Enqueue(new Task(id, tasks.Pop())); | |
260 } | |
261 | |
262 return new Handle(*this, id, false); | |
263 } | |
264 } |