comparison Resources/Orthanc/Core/MultiThreading/BagOfTasksProcessor.cpp @ 59:7a3853d51c45

Move "Framework/Orthanc/" as "Resources/Orthanc/"
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 25 Nov 2016 16:34:34 +0100
parents Framework/Orthanc/Core/MultiThreading/BagOfTasksProcessor.cpp@bc3ca410b765
children ff0ef01c332c
comparison
equal deleted inserted replaced
58:35468714a38e 59:7a3853d51c45
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 *
6 * This program is free software: you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation, either version 3 of the
9 * License, or (at your option) any later version.
10 *
11 * In addition, as a special exception, the copyright holders of this
12 * program give permission to link the code of its release with the
13 * OpenSSL project's "OpenSSL" library (or with modified versions of it
14 * that use the same license as the "OpenSSL" library), and distribute
15 * the linked executables. You must obey the GNU General Public License
16 * in all respects for all of the code used other than "OpenSSL". If you
17 * modify file(s) with this exception, you may extend this exception to
18 * your version of the file(s), but you are not obligated to do so. If
19 * you do not wish to do so, delete this exception statement from your
20 * version. If you delete this exception statement from all source files
21 * in the program, then also delete it here.
22 *
23 * This program is distributed in the hope that it will be useful, but
24 * WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 * General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License
29 * along with this program. If not, see <http://www.gnu.org/licenses/>.
30 **/
31
32
33 #include "../PrecompiledHeaders.h"
34 #include "BagOfTasksProcessor.h"
35
36 #include "../Logging.h"
37 #include "../OrthancException.h"
38
39 #include <stdio.h>
40
41 namespace Orthanc
42 {
43 class BagOfTasksProcessor::Task : public IDynamicObject
44 {
45 private:
46 uint64_t bag_;
47 std::auto_ptr<ICommand> command_;
48
49 public:
50 Task(uint64_t bag,
51 ICommand* command) :
52 bag_(bag),
53 command_(command)
54 {
55 }
56
57 bool Execute()
58 {
59 try
60 {
61 return command_->Execute();
62 }
63 catch (OrthancException& e)
64 {
65 LOG(ERROR) << "Exception while processing a bag of tasks: " << e.What();
66 return false;
67 }
68 catch (std::runtime_error& e)
69 {
70 LOG(ERROR) << "Runtime exception while processing a bag of tasks: " << e.what();
71 return false;
72 }
73 catch (...)
74 {
75 LOG(ERROR) << "Native exception while processing a bag of tasks";
76 return false;
77 }
78 }
79
80 uint64_t GetBag()
81 {
82 return bag_;
83 }
84 };
85
86
87 void BagOfTasksProcessor::SignalProgress(Task& task,
88 Bag& bag)
89 {
90 assert(bag.done_ < bag.size_);
91
92 bag.done_ += 1;
93
94 if (bag.done_ == bag.size_)
95 {
96 exitStatus_[task.GetBag()] = (bag.status_ == BagStatus_Running);
97 bagFinished_.notify_all();
98 }
99 }
100
101 void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that)
102 {
103 while (that->continue_)
104 {
105 std::auto_ptr<IDynamicObject> obj(that->queue_.Dequeue(100));
106 if (obj.get() != NULL)
107 {
108 Task& task = *dynamic_cast<Task*>(obj.get());
109
110 {
111 boost::mutex::scoped_lock lock(that->mutex_);
112
113 Bags::iterator bag = that->bags_.find(task.GetBag());
114 assert(bag != that->bags_.end());
115 assert(bag->second.done_ < bag->second.size_);
116
117 if (bag->second.status_ != BagStatus_Running)
118 {
119 // Do not execute this task, as its parent bag of tasks
120 // has failed or is tagged as canceled
121 that->SignalProgress(task, bag->second);
122 continue;
123 }
124 }
125
126 bool success = task.Execute();
127
128 {
129 boost::mutex::scoped_lock lock(that->mutex_);
130
131 Bags::iterator bag = that->bags_.find(task.GetBag());
132 assert(bag != that->bags_.end());
133
134 if (!success)
135 {
136 bag->second.status_ = BagStatus_Failed;
137 }
138
139 that->SignalProgress(task, bag->second);
140 }
141 }
142 }
143 }
144
145
146 void BagOfTasksProcessor::Cancel(int64_t bag)
147 {
148 boost::mutex::scoped_lock lock(mutex_);
149
150 Bags::iterator it = bags_.find(bag);
151 if (it != bags_.end())
152 {
153 it->second.status_ = BagStatus_Canceled;
154 }
155 }
156
157
158 bool BagOfTasksProcessor::Join(int64_t bag)
159 {
160 boost::mutex::scoped_lock lock(mutex_);
161
162 while (continue_)
163 {
164 ExitStatus::iterator it = exitStatus_.find(bag);
165 if (it == exitStatus_.end()) // The bag is still running
166 {
167 bagFinished_.wait(lock);
168 }
169 else
170 {
171 bool status = it->second;
172 exitStatus_.erase(it);
173 return status;
174 }
175 }
176
177 return false; // The processor is stopping
178 }
179
180
181 float BagOfTasksProcessor::GetProgress(int64_t bag)
182 {
183 boost::mutex::scoped_lock lock(mutex_);
184
185 Bags::const_iterator it = bags_.find(bag);
186 if (it == bags_.end())
187 {
188 // The bag of tasks has finished
189 return 1.0f;
190 }
191 else
192 {
193 return (static_cast<float>(it->second.done_) /
194 static_cast<float>(it->second.size_));
195 }
196 }
197
198
199 bool BagOfTasksProcessor::Handle::Join()
200 {
201 if (hasJoined_)
202 {
203 return status_;
204 }
205 else
206 {
207 status_ = that_.Join(bag_);
208 hasJoined_ = true;
209 return status_;
210 }
211 }
212
213
214 BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) :
215 countBags_(0),
216 continue_(true)
217 {
218 if (countThreads == 0)
219 {
220 throw OrthancException(ErrorCode_ParameterOutOfRange);
221 }
222
223 threads_.resize(countThreads);
224
225 for (size_t i = 0; i < threads_.size(); i++)
226 {
227 threads_[i] = new boost::thread(Worker, this);
228 }
229 }
230
231
232 BagOfTasksProcessor::~BagOfTasksProcessor()
233 {
234 continue_ = false;
235
236 bagFinished_.notify_all(); // Wakes up all the pending "Join()"
237
238 for (size_t i = 0; i < threads_.size(); i++)
239 {
240 if (threads_[i])
241 {
242 if (threads_[i]->joinable())
243 {
244 threads_[i]->join();
245 }
246
247 delete threads_[i];
248 threads_[i] = NULL;
249 }
250 }
251 }
252
253
254 BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks)
255 {
256 if (tasks.GetSize() == 0)
257 {
258 return new Handle(*this, 0, true);
259 }
260
261 boost::mutex::scoped_lock lock(mutex_);
262
263 uint64_t id = countBags_;
264 countBags_ += 1;
265
266 Bag bag(tasks.GetSize());
267 bags_[id] = bag;
268
269 while (!tasks.IsEmpty())
270 {
271 queue_.Enqueue(new Task(id, tasks.Pop()));
272 }
273
274 return new Handle(*this, id, false);
275 }
276 }