Mercurial > hg > orthanc
annotate OrthancFramework/Resources/Graveyard/Multithreading/BagOfTasksProcessor.cpp @ 5834:79a497908b04 attach-custom-data tip
merged find-refactoring -> attach-custom-data
author | Alain Mazy <am@orthanc.team> |
---|---|
date | Wed, 09 Oct 2024 11:06:20 +0200 |
parents | f7adfb22e20e |
children |
rev | line source |
---|---|
1920 | 1 /** |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
5640
f7adfb22e20e
updated copyright, as Orthanc Team now replaces Osimis
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5485
diff
changeset
|
5 * Copyright (C) 2017-2023 Osimis S.A., Belgium |
f7adfb22e20e
updated copyright, as Orthanc Team now replaces Osimis
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5485
diff
changeset
|
6 * Copyright (C) 2024-2024 Orthanc Team SRL, Belgium |
5485
48b8dae6dc77
upgrade to year 2024
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5185
diff
changeset
|
7 * Copyright (C) 2021-2024 Sebastien Jodogne, ICTEAM UCLouvain, Belgium |
1920 | 8 * |
9 * This program is free software: you can redistribute it and/or | |
4119
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
10 * modify it under the terms of the GNU Lesser General Public License |
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
11 * as published by the Free Software Foundation, either version 3 of |
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
12 * the License, or (at your option) any later version. |
1920 | 13 * |
14 * This program is distributed in the hope that it will be useful, but | |
15 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
4119
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
17 * Lesser General Public License for more details. |
1920 | 18 * |
4119
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
19 * You should have received a copy of the GNU Lesser General Public |
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
20 * License along with this program. If not, see |
bf7b9edf6b81
re-licensing the OrthancFramework to LGPL, in order to license Stone of Orthanc under LGPL
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
4044
diff
changeset
|
21 * <http://www.gnu.org/licenses/>. |
1920 | 22 **/ |
23 | |
24 | |
25 #include "../PrecompiledHeaders.h" | |
26 #include "BagOfTasksProcessor.h" | |
27 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
28 #include "../Logging.h" |
1920 | 29 #include "../OrthancException.h" |
30 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
31 #include <stdio.h> |
1920 | 32 |
33 namespace Orthanc | |
34 { | |
35 class BagOfTasksProcessor::Task : public IDynamicObject | |
36 { | |
37 private: | |
38 uint64_t bag_; | |
39 std::auto_ptr<ICommand> command_; | |
40 | |
41 public: | |
42 Task(uint64_t bag, | |
43 ICommand* command) : | |
44 bag_(bag), | |
45 command_(command) | |
46 { | |
47 } | |
48 | |
49 bool Execute() | |
50 { | |
51 try | |
52 { | |
53 return command_->Execute(); | |
54 } | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
55 catch (OrthancException& e) |
1920 | 56 { |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
57 LOG(ERROR) << "Exception while processing a bag of tasks: " << e.What(); |
1920 | 58 return false; |
59 } | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
60 catch (std::runtime_error& e) |
1920 | 61 { |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
62 LOG(ERROR) << "Runtime exception while processing a bag of tasks: " << e.what(); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
63 return false; |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
64 } |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
65 catch (...) |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
66 { |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
67 LOG(ERROR) << "Native exception while processing a bag of tasks"; |
1920 | 68 return false; |
69 } | |
70 } | |
71 | |
72 uint64_t GetBag() | |
73 { | |
74 return bag_; | |
75 } | |
76 }; | |
77 | |
78 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
79 void BagOfTasksProcessor::SignalProgress(Task& task, |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
80 Bag& bag) |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
81 { |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
82 assert(bag.done_ < bag.size_); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
83 |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
84 bag.done_ += 1; |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
85 |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
86 if (bag.done_ == bag.size_) |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
87 { |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
88 exitStatus_[task.GetBag()] = (bag.status_ == BagStatus_Running); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
89 bagFinished_.notify_all(); |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
90 } |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
91 } |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
92 |
1920 | 93 void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that) |
94 { | |
95 while (that->continue_) | |
96 { | |
97 std::auto_ptr<IDynamicObject> obj(that->queue_.Dequeue(100)); | |
98 if (obj.get() != NULL) | |
99 { | |
100 Task& task = *dynamic_cast<Task*>(obj.get()); | |
101 | |
102 { | |
103 boost::mutex::scoped_lock lock(that->mutex_); | |
104 | |
105 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
106 assert(bag != that->bags_.end()); | |
107 assert(bag->second.done_ < bag->second.size_); | |
108 | |
109 if (bag->second.status_ != BagStatus_Running) | |
110 { | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
111 // Do not execute this task, as its parent bag of tasks |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
112 // has failed or is tagged as canceled |
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
113 that->SignalProgress(task, bag->second); |
1920 | 114 continue; |
115 } | |
116 } | |
117 | |
118 bool success = task.Execute(); | |
119 | |
120 { | |
121 boost::mutex::scoped_lock lock(that->mutex_); | |
122 | |
123 Bags::iterator bag = that->bags_.find(task.GetBag()); | |
124 assert(bag != that->bags_.end()); | |
125 | |
126 if (!success) | |
127 { | |
128 bag->second.status_ = BagStatus_Failed; | |
129 } | |
130 | |
2110
5b818f677dd6
fix in BagOfTasksProcessor
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2016
diff
changeset
|
131 that->SignalProgress(task, bag->second); |
1920 | 132 } |
133 } | |
134 } | |
135 } | |
136 | |
137 | |
138 void BagOfTasksProcessor::Cancel(int64_t bag) | |
139 { | |
140 boost::mutex::scoped_lock lock(mutex_); | |
141 | |
142 Bags::iterator it = bags_.find(bag); | |
143 if (it != bags_.end()) | |
144 { | |
145 it->second.status_ = BagStatus_Canceled; | |
146 } | |
147 } | |
148 | |
149 | |
150 bool BagOfTasksProcessor::Join(int64_t bag) | |
151 { | |
152 boost::mutex::scoped_lock lock(mutex_); | |
153 | |
154 while (continue_) | |
155 { | |
156 ExitStatus::iterator it = exitStatus_.find(bag); | |
157 if (it == exitStatus_.end()) // The bag is still running | |
158 { | |
159 bagFinished_.wait(lock); | |
160 } | |
161 else | |
162 { | |
163 bool status = it->second; | |
164 exitStatus_.erase(it); | |
165 return status; | |
166 } | |
167 } | |
168 | |
169 return false; // The processor is stopping | |
170 } | |
171 | |
172 | |
173 float BagOfTasksProcessor::GetProgress(int64_t bag) | |
174 { | |
175 boost::mutex::scoped_lock lock(mutex_); | |
176 | |
177 Bags::const_iterator it = bags_.find(bag); | |
178 if (it == bags_.end()) | |
179 { | |
180 // The bag of tasks has finished | |
181 return 1.0f; | |
182 } | |
183 else | |
184 { | |
185 return (static_cast<float>(it->second.done_) / | |
186 static_cast<float>(it->second.size_)); | |
187 } | |
188 } | |
189 | |
190 | |
191 bool BagOfTasksProcessor::Handle::Join() | |
192 { | |
193 if (hasJoined_) | |
194 { | |
195 return status_; | |
196 } | |
197 else | |
198 { | |
199 status_ = that_.Join(bag_); | |
200 hasJoined_ = true; | |
201 return status_; | |
202 } | |
203 } | |
204 | |
205 | |
206 BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) : | |
207 countBags_(0), | |
208 continue_(true) | |
209 { | |
2016 | 210 if (countThreads == 0) |
1920 | 211 { |
212 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
213 } | |
214 | |
215 threads_.resize(countThreads); | |
216 | |
217 for (size_t i = 0; i < threads_.size(); i++) | |
218 { | |
219 threads_[i] = new boost::thread(Worker, this); | |
220 } | |
221 } | |
222 | |
223 | |
224 BagOfTasksProcessor::~BagOfTasksProcessor() | |
225 { | |
226 continue_ = false; | |
227 | |
228 bagFinished_.notify_all(); // Wakes up all the pending "Join()" | |
229 | |
230 for (size_t i = 0; i < threads_.size(); i++) | |
231 { | |
232 if (threads_[i]) | |
233 { | |
234 if (threads_[i]->joinable()) | |
235 { | |
236 threads_[i]->join(); | |
237 } | |
238 | |
239 delete threads_[i]; | |
240 threads_[i] = NULL; | |
241 } | |
242 } | |
243 } | |
244 | |
245 | |
246 BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks) | |
247 { | |
1923
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
248 if (tasks.GetSize() == 0) |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
249 { |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
250 return new Handle(*this, 0, true); |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
251 } |
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
252 |
1920 | 253 boost::mutex::scoped_lock lock(mutex_); |
254 | |
255 uint64_t id = countBags_; | |
256 countBags_ += 1; | |
257 | |
258 Bag bag(tasks.GetSize()); | |
259 bags_[id] = bag; | |
260 | |
261 while (!tasks.IsEmpty()) | |
262 { | |
263 queue_.Enqueue(new Task(id, tasks.Pop())); | |
264 } | |
265 | |
1923
6ac7f31fc543
fix freeze if empty bag of tasks
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1920
diff
changeset
|
266 return new Handle(*this, id, false); |
1920 | 267 } |
268 } |