comparison Core/MultiThreading/BagOfTasksProcessor.cpp @ 1920:b97aa579e85b

bag of tasks
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 19 Feb 2016 17:01:50 +0100
parents
children 6ac7f31fc543
comparison
equal deleted inserted replaced
1919:d9e33b165112 1920:b97aa579e85b
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 *
6 * This program is free software: you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation, either version 3 of the
9 * License, or (at your option) any later version.
10 *
11 * In addition, as a special exception, the copyright holders of this
12 * program give permission to link the code of its release with the
13 * OpenSSL project's "OpenSSL" library (or with modified versions of it
14 * that use the same license as the "OpenSSL" library), and distribute
15 * the linked executables. You must obey the GNU General Public License
16 * in all respects for all of the code used other than "OpenSSL". If you
17 * modify file(s) with this exception, you may extend this exception to
18 * your version of the file(s), but you are not obligated to do so. If
19 * you do not wish to do so, delete this exception statement from your
20 * version. If you delete this exception statement from all source files
21 * in the program, then also delete it here.
22 *
23 * This program is distributed in the hope that it will be useful, but
24 * WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 * General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License
29 * along with this program. If not, see <http://www.gnu.org/licenses/>.
30 **/
31
32
33 #include "../PrecompiledHeaders.h"
34 #include "BagOfTasksProcessor.h"
35
36 #include "../OrthancException.h"
37
38
39 namespace Orthanc
40 {
41 class BagOfTasksProcessor::Task : public IDynamicObject
42 {
43 private:
44 uint64_t bag_;
45 std::auto_ptr<ICommand> command_;
46
47 public:
48 Task(uint64_t bag,
49 ICommand* command) :
50 bag_(bag),
51 command_(command)
52 {
53 }
54
55 bool Execute()
56 {
57 try
58 {
59 return command_->Execute();
60 }
61 catch (OrthancException&)
62 {
63 return false;
64 }
65 catch (std::runtime_error&)
66 {
67 return false;
68 }
69 }
70
71 uint64_t GetBag()
72 {
73 return bag_;
74 }
75 };
76
77
78 void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that)
79 {
80 while (that->continue_)
81 {
82 std::auto_ptr<IDynamicObject> obj(that->queue_.Dequeue(100));
83 if (obj.get() != NULL)
84 {
85 Task& task = *dynamic_cast<Task*>(obj.get());
86
87 {
88 boost::mutex::scoped_lock lock(that->mutex_);
89
90 Bags::iterator bag = that->bags_.find(task.GetBag());
91 assert(bag != that->bags_.end());
92 assert(bag->second.done_ < bag->second.size_);
93
94 if (bag->second.status_ != BagStatus_Running)
95 {
96 // This bag of task has failed or is tagged as canceled, do nothing
97 bag->second.done_ += 1;
98 continue;
99 }
100 }
101
102 bool success = task.Execute();
103
104 {
105 boost::mutex::scoped_lock lock(that->mutex_);
106
107 Bags::iterator bag = that->bags_.find(task.GetBag());
108 assert(bag != that->bags_.end());
109
110 if (!success)
111 {
112 bag->second.status_ = BagStatus_Failed;
113 }
114
115 assert(bag->second.done_ < bag->second.size_);
116 bag->second.done_ += 1;
117
118 if (bag->second.done_ == bag->second.size_)
119 {
120 that->exitStatus_[task.GetBag()] = (bag->second.status_ == BagStatus_Running);
121 that->bagFinished_.notify_all();
122 }
123 }
124 }
125 }
126 }
127
128
129 void BagOfTasksProcessor::Cancel(int64_t bag)
130 {
131 boost::mutex::scoped_lock lock(mutex_);
132
133 Bags::iterator it = bags_.find(bag);
134 if (it != bags_.end())
135 {
136 it->second.status_ = BagStatus_Canceled;
137 }
138 }
139
140
141 bool BagOfTasksProcessor::Join(int64_t bag)
142 {
143 boost::mutex::scoped_lock lock(mutex_);
144
145 while (continue_)
146 {
147 ExitStatus::iterator it = exitStatus_.find(bag);
148 if (it == exitStatus_.end()) // The bag is still running
149 {
150 bagFinished_.wait(lock);
151 }
152 else
153 {
154 bool status = it->second;
155 exitStatus_.erase(it);
156 return status;
157 }
158 }
159
160 return false; // The processor is stopping
161 }
162
163
164 float BagOfTasksProcessor::GetProgress(int64_t bag)
165 {
166 boost::mutex::scoped_lock lock(mutex_);
167
168 Bags::const_iterator it = bags_.find(bag);
169 if (it == bags_.end())
170 {
171 // The bag of tasks has finished
172 return 1.0f;
173 }
174 else
175 {
176 return (static_cast<float>(it->second.done_) /
177 static_cast<float>(it->second.size_));
178 }
179 }
180
181
182 bool BagOfTasksProcessor::Handle::Join()
183 {
184 if (hasJoined_)
185 {
186 return status_;
187 }
188 else
189 {
190 status_ = that_.Join(bag_);
191 hasJoined_ = true;
192 return status_;
193 }
194 }
195
196
197 BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) :
198 countBags_(0),
199 continue_(true)
200 {
201 if (countThreads <= 0)
202 {
203 throw OrthancException(ErrorCode_ParameterOutOfRange);
204 }
205
206 threads_.resize(countThreads);
207
208 for (size_t i = 0; i < threads_.size(); i++)
209 {
210 threads_[i] = new boost::thread(Worker, this);
211 }
212 }
213
214
215 BagOfTasksProcessor::~BagOfTasksProcessor()
216 {
217 continue_ = false;
218
219 bagFinished_.notify_all(); // Wakes up all the pending "Join()"
220
221 for (size_t i = 0; i < threads_.size(); i++)
222 {
223 if (threads_[i])
224 {
225 if (threads_[i]->joinable())
226 {
227 threads_[i]->join();
228 }
229
230 delete threads_[i];
231 threads_[i] = NULL;
232 }
233 }
234 }
235
236
237 BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks)
238 {
239 boost::mutex::scoped_lock lock(mutex_);
240
241 uint64_t id = countBags_;
242 countBags_ += 1;
243
244 Bag bag(tasks.GetSize());
245 bags_[id] = bag;
246
247 while (!tasks.IsEmpty())
248 {
249 queue_.Enqueue(new Task(id, tasks.Pop()));
250 }
251
252 return new Handle(*this, id);
253 }
254 }