comparison Framework/MultiThreading/BagOfTasksProcessor.cpp @ 151:fb8d4cd2f618

fix applications
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 17 Jul 2018 10:06:39 +0200
parents
children 6b8ccfc02051
comparison
equal deleted inserted replaced
150:442102e14933 151:fb8d4cd2f618
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU Affero General Public License
9 * as published by the Free Software Foundation, either version 3 of
10 * the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 **/
20
21
22 #include "BagOfTasksProcessor.h"
23
24 #include <Core/Logging.h>
25 #include <Core/OrthancException.h>
26
27 #include <stdio.h>
28
29 namespace Orthanc
30 {
31 class BagOfTasksProcessor::Task : public IDynamicObject
32 {
33 private:
34 uint64_t bag_;
35 std::auto_ptr<ICommand> command_;
36
37 public:
38 Task(uint64_t bag,
39 ICommand* command) :
40 bag_(bag),
41 command_(command)
42 {
43 }
44
45 bool Execute()
46 {
47 try
48 {
49 return command_->Execute();
50 }
51 catch (OrthancException& e)
52 {
53 LOG(ERROR) << "Exception while processing a bag of tasks: " << e.What();
54 return false;
55 }
56 catch (std::runtime_error& e)
57 {
58 LOG(ERROR) << "Runtime exception while processing a bag of tasks: " << e.what();
59 return false;
60 }
61 catch (...)
62 {
63 LOG(ERROR) << "Native exception while processing a bag of tasks";
64 return false;
65 }
66 }
67
68 uint64_t GetBag()
69 {
70 return bag_;
71 }
72 };
73
74
75 void BagOfTasksProcessor::SignalProgress(Task& task,
76 Bag& bag)
77 {
78 assert(bag.done_ < bag.size_);
79
80 bag.done_ += 1;
81
82 if (bag.done_ == bag.size_)
83 {
84 exitStatus_[task.GetBag()] = (bag.status_ == BagStatus_Running);
85 bagFinished_.notify_all();
86 }
87 }
88
89 void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that)
90 {
91 while (that->continue_)
92 {
93 std::auto_ptr<IDynamicObject> obj(that->queue_.Dequeue(100));
94 if (obj.get() != NULL)
95 {
96 Task& task = *dynamic_cast<Task*>(obj.get());
97
98 {
99 boost::mutex::scoped_lock lock(that->mutex_);
100
101 Bags::iterator bag = that->bags_.find(task.GetBag());
102 assert(bag != that->bags_.end());
103 assert(bag->second.done_ < bag->second.size_);
104
105 if (bag->second.status_ != BagStatus_Running)
106 {
107 // Do not execute this task, as its parent bag of tasks
108 // has failed or is tagged as canceled
109 that->SignalProgress(task, bag->second);
110 continue;
111 }
112 }
113
114 bool success = task.Execute();
115
116 {
117 boost::mutex::scoped_lock lock(that->mutex_);
118
119 Bags::iterator bag = that->bags_.find(task.GetBag());
120 assert(bag != that->bags_.end());
121
122 if (!success)
123 {
124 bag->second.status_ = BagStatus_Failed;
125 }
126
127 that->SignalProgress(task, bag->second);
128 }
129 }
130 }
131 }
132
133
134 void BagOfTasksProcessor::Cancel(int64_t bag)
135 {
136 boost::mutex::scoped_lock lock(mutex_);
137
138 Bags::iterator it = bags_.find(bag);
139 if (it != bags_.end())
140 {
141 it->second.status_ = BagStatus_Canceled;
142 }
143 }
144
145
146 bool BagOfTasksProcessor::Join(int64_t bag)
147 {
148 boost::mutex::scoped_lock lock(mutex_);
149
150 while (continue_)
151 {
152 ExitStatus::iterator it = exitStatus_.find(bag);
153 if (it == exitStatus_.end()) // The bag is still running
154 {
155 bagFinished_.wait(lock);
156 }
157 else
158 {
159 bool status = it->second;
160 exitStatus_.erase(it);
161 return status;
162 }
163 }
164
165 return false; // The processor is stopping
166 }
167
168
169 float BagOfTasksProcessor::GetProgress(int64_t bag)
170 {
171 boost::mutex::scoped_lock lock(mutex_);
172
173 Bags::const_iterator it = bags_.find(bag);
174 if (it == bags_.end())
175 {
176 // The bag of tasks has finished
177 return 1.0f;
178 }
179 else
180 {
181 return (static_cast<float>(it->second.done_) /
182 static_cast<float>(it->second.size_));
183 }
184 }
185
186
187 bool BagOfTasksProcessor::Handle::Join()
188 {
189 if (hasJoined_)
190 {
191 return status_;
192 }
193 else
194 {
195 status_ = that_.Join(bag_);
196 hasJoined_ = true;
197 return status_;
198 }
199 }
200
201
202 BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) :
203 countBags_(0),
204 continue_(true)
205 {
206 if (countThreads == 0)
207 {
208 throw OrthancException(ErrorCode_ParameterOutOfRange);
209 }
210
211 threads_.resize(countThreads);
212
213 for (size_t i = 0; i < threads_.size(); i++)
214 {
215 threads_[i] = new boost::thread(Worker, this);
216 }
217 }
218
219
220 BagOfTasksProcessor::~BagOfTasksProcessor()
221 {
222 continue_ = false;
223
224 bagFinished_.notify_all(); // Wakes up all the pending "Join()"
225
226 for (size_t i = 0; i < threads_.size(); i++)
227 {
228 if (threads_[i])
229 {
230 if (threads_[i]->joinable())
231 {
232 threads_[i]->join();
233 }
234
235 delete threads_[i];
236 threads_[i] = NULL;
237 }
238 }
239 }
240
241
242 BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks)
243 {
244 if (tasks.GetSize() == 0)
245 {
246 return new Handle(*this, 0, true);
247 }
248
249 boost::mutex::scoped_lock lock(mutex_);
250
251 uint64_t id = countBags_;
252 countBags_ += 1;
253
254 Bag bag(tasks.GetSize());
255 bags_[id] = bag;
256
257 while (!tasks.IsEmpty())
258 {
259 queue_.Enqueue(new Task(id, tasks.Pop()));
260 }
261
262 return new Handle(*this, id, false);
263 }
264 }