Mercurial > hg > orthanc
annotate OrthancFramework/Resources/Graveyard/Multithreading/BagOfTasksProcessor.cpp @ 4361:98f55e7df5ab
rollback incorrect removal for msvc2008
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Tue, 08 Dec 2020 19:10:46 +0100 |
parents | bf7b9edf6b81 |
children | d9473bd5ed43 |
rev | line source |
---|---|
1920 | 1 /** |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
3640
94f4a18a79cc
upgrade to year 2020
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
3060
diff
changeset
|
5 * Copyright (C) 2017-2020 Osimis S.A., Belgium |
1920 | 6 * |
7 * This program is free software: you can redistribute it and/or | |
4119
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
8 * modify it under the terms of the GNU Lesser General Public License |
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
9 * as published by the Free Software Foundation, either version 3 of |
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
10 * the License, or (at your option) any later version. |
1920 | 11 * |
12 * This program is distributed in the hope that it will be useful, but | |
13 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
4119
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
15 * Lesser General Public License for more details. |
1920 | 16 * |
4119
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
17 * You should have received a copy of the GNU Lesser General Public |
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
18 * License along with this program. If not, see |
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
19 * <http://www.gnu.org/licenses/>. |
1920 | 20 **/ |
21 | |
22 | |
23 #include "../PrecompiledHeaders.h" | |
24 #include "BagOfTasksProcessor.h" | |
25 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
26 #include "../Logging.h" |
1920 | 27 #include "../OrthancException.h" |
28 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
29 #include <stdio.h> |
1920 | 30 |
31 namespace Orthanc | |
32 { | |
33 class BagOfTasksProcessor::Task : public IDynamicObject | |
34 { | |
35 private: | |
36 uint64_t bag_; | |
37 std::auto_ptr<ICommand> command_; | |
38 | |
39 public: | |
40 Task(uint64_t bag, | |
41 ICommand* command) : | |
42 bag_(bag), | |
43 command_(command) | |
44 { | |
45 } | |
46 | |
47 bool Execute() | |
48 { | |
49 try | |
50 { | |
51 return command_->Execute(); | |
52 } | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
53 catch (OrthancException& e) |
1920 | 54 { |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
55 LOG(ERROR) << "Exception while processing a bag of tasks: " << e.What(); |
1920 | 56 return false; |
57 } | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
58 catch (std::runtime_error& e) |
1920 | 59 { |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
60 LOG(ERROR) << "Runtime exception while processing a bag of tasks: " << e.what(); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
61 return false; |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
62 } |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
63 catch (...) |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
64 { |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
65 LOG(ERROR) << "Native exception while processing a bag of tasks"; |
1920 | 66 return false; |
67 } | |
68 } | |
69 | |
70 uint64_t GetBag() | |
71 { | |
72 return bag_; | |
73 } | |
74 }; | |
75 | |
76 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
77 void BagOfTasksProcessor::SignalProgress(Task& task, |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
78 Bag& bag) |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
79 { |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
80 assert(bag.done_ < bag.size_); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
81 |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
82 bag.done_ += 1; |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
83 |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
84 if (bag.done_ == bag.size_) |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
85 { |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
86 exitStatus_[task.GetBag()] = (bag.status_ == BagStatus_Running); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
87 bagFinished_.notify_all(); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
88 } |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
89 } |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
90 |
1920 | 91 void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that) |
92 { | |
93 while (that->continue_) | |
94 { | |
95 std::auto_ptr<IDynamicObject> obj(that->queue_.Dequeue(100)); | |
96 if (obj.get() != NULL) | |
97 { | |
98 Task& task = *dynamic_cast<Task*>(obj.get()); | |
99 | |
100 { | |
101 boost::mutex::scoped_lock lock(that->mutex_); | |
102 | |
103 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
104 assert(bag != that->bags_.end()); | |
105 assert(bag->second.done_ < bag->second.size_); | |
106 | |
107 if (bag->second.status_ != BagStatus_Running) | |
108 { | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
109 // Do not execute this task, as its parent bag of tasks |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
110 // has failed or is tagged as canceled |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
111 that->SignalProgress(task, bag->second); |
1920 | 112 continue; |
113 } | |
114 } | |
115 | |
116 bool success = task.Execute(); | |
117 | |
118 { | |
119 boost::mutex::scoped_lock lock(that->mutex_); | |
120 | |
121 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
122 assert(bag != that->bags_.end()); | |
123 | |
124 if (!success) | |
125 { | |
126 bag->second.status_ = BagStatus_Failed; | |
127 } | |
128 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
129 that->SignalProgress(task, bag->second); |
1920 | 130 } |
131 } | |
132 } | |
133 } | |
134 | |
135 | |
136 void BagOfTasksProcessor::Cancel(int64_t bag) | |
137 { | |
138 boost::mutex::scoped_lock lock(mutex_); | |
139 | |
140 Bags::iterator it = bags_.find(bag); | |
141 if (it != bags_.end()) | |
142 { | |
143 it->second.status_ = BagStatus_Canceled; | |
144 } | |
145 } | |
146 | |
147 | |
148 bool BagOfTasksProcessor::Join(int64_t bag) | |
149 { | |
150 boost::mutex::scoped_lock lock(mutex_); | |
151 | |
152 while (continue_) | |
153 { | |
154 ExitStatus::iterator it = exitStatus_.find(bag); | |
155 if (it == exitStatus_.end()) // The bag is still running | |
156 { | |
157 bagFinished_.wait(lock); | |
158 } | |
159 else | |
160 { | |
161 bool status = it->second; | |
162 exitStatus_.erase(it); | |
163 return status; | |
164 } | |
165 } | |
166 | |
167 return false; // The processor is stopping | |
168 } | |
169 | |
170 | |
171 float BagOfTasksProcessor::GetProgress(int64_t bag) | |
172 { | |
173 boost::mutex::scoped_lock lock(mutex_); | |
174 | |
175 Bags::const_iterator it = bags_.find(bag); | |
176 if (it == bags_.end()) | |
177 { | |
178 // The bag of tasks has finished | |
179 return 1.0f; | |
180 } | |
181 else | |
182 { | |
183 return (static_cast<float>(it->second.done_) / | |
184 static_cast<float>(it->second.size_)); | |
185 } | |
186 } | |
187 | |
188 | |
189 bool BagOfTasksProcessor::Handle::Join() | |
190 { | |
191 if (hasJoined_) | |
192 { | |
193 return status_; | |
194 } | |
195 else | |
196 { | |
197 status_ = that_.Join(bag_); | |
198 hasJoined_ = true; | |
199 return status_; | |
200 } | |
201 } | |
202 | |
203 | |
204 BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) : | |
205 countBags_(0), | |
206 continue_(true) | |
207 { | |
2016 | 208 if (countThreads == 0) |
1920 | 209 { |
210 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
211 } | |
212 | |
213 threads_.resize(countThreads); | |
214 | |
215 for (size_t i = 0; i < threads_.size(); i++) | |
216 { | |
217 threads_[i] = new boost::thread(Worker, this); | |
218 } | |
219 } | |
220 | |
221 | |
222 BagOfTasksProcessor::~BagOfTasksProcessor() | |
223 { | |
224 continue_ = false; | |
225 | |
226 bagFinished_.notify_all(); // Wakes up all the pending "Join()" | |
227 | |
228 for (size_t i = 0; i < threads_.size(); i++) | |
229 { | |
230 if (threads_[i]) | |
231 { | |
232 if (threads_[i]->joinable()) | |
233 { | |
234 threads_[i]->join(); | |
235 } | |
236 | |
237 delete threads_[i]; | |
238 threads_[i] = NULL; | |
239 } | |
240 } | |
241 } | |
242 | |
243 | |
244 BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks) | |
245 { | |
1923
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
246 if (tasks.GetSize() == 0) |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
247 { |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
248 return new Handle(*this, 0, true); |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
249 } |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
250 |
1920 | 251 boost::mutex::scoped_lock lock(mutex_); |
252 | |
253 uint64_t id = countBags_; | |
254 countBags_ += 1; | |
255 | |
256 Bag bag(tasks.GetSize()); | |
257 bags_[id] = bag; | |
258 | |
259 while (!tasks.IsEmpty()) | |
260 { | |
261 queue_.Enqueue(new Task(id, tasks.Pop())); | |
262 } | |
263 | |
1923
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
264 return new Handle(*this, id, false); |
1920 | 265 } |
266 } |