comparison Resources/Graveyard/OldScheduler/ServerScheduler.cpp @ 2610:3ff4c50647ea jobs

moving the old scheduler to the graveyard
author Sebastien Jodogne <s.jodogne@gmail.com>
date Sat, 19 May 2018 16:40:26 +0200
parents OrthancServer/Scheduler/ServerScheduler.cpp@878b59270859
children 4e43e67f8ecf
comparison
equal deleted inserted replaced
2609:f7a84b551ee4 2610:3ff4c50647ea
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation, either version 3 of the
10 * License, or (at your option) any later version.
11 *
12 * In addition, as a special exception, the copyright holders of this
13 * program give permission to link the code of its release with the
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it
15 * that use the same license as the "OpenSSL" library), and distribute
16 * the linked executables. You must obey the GNU General Public License
17 * in all respects for all of the code used other than "OpenSSL". If you
18 * modify file(s) with this exception, you may extend this exception to
19 * your version of the file(s), but you are not obligated to do so. If
20 * you do not wish to do so, delete this exception statement from your
21 * version. If you delete this exception statement from all source files
22 * in the program, then also delete it here.
23 *
24 * This program is distributed in the hope that it will be useful, but
25 * WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27 * General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program. If not, see <http://www.gnu.org/licenses/>.
31 **/
32
33
34 #include "../PrecompiledHeadersServer.h"
35 #include "ServerScheduler.h"
36
37 #include "../../Core/OrthancException.h"
38 #include "../../Core/Logging.h"
39
40 namespace Orthanc
41 {
42 namespace
43 {
44 // Anonymous namespace to avoid clashes between compilation modules
45 class Sink : public IServerCommand
46 {
47 private:
48 ListOfStrings& target_;
49
50 public:
51 explicit Sink(ListOfStrings& target) : target_(target)
52 {
53 }
54
55 virtual bool Apply(ListOfStrings& outputs,
56 const ListOfStrings& inputs)
57 {
58 for (ListOfStrings::const_iterator
59 it = inputs.begin(); it != inputs.end(); ++it)
60 {
61 target_.push_back(*it);
62 }
63
64 return true;
65 }
66 };
67 }
68
69
70 ServerScheduler::JobInfo& ServerScheduler::GetJobInfo(const std::string& jobId)
71 {
72 Jobs::iterator info = jobs_.find(jobId);
73
74 if (info == jobs_.end())
75 {
76 throw OrthancException(ErrorCode_InternalError);
77 }
78
79 return info->second;
80 }
81
82
83 void ServerScheduler::SignalSuccess(const std::string& jobId)
84 {
85 boost::mutex::scoped_lock lock(mutex_);
86
87 JobInfo& info = GetJobInfo(jobId);
88 info.success_++;
89
90 assert(info.failures_ == 0);
91
92 if (info.success_ >= info.size_)
93 {
94 if (info.watched_)
95 {
96 watchedJobStatus_[jobId] = JobStatus_Success;
97 watchedJobFinished_.notify_all();
98 }
99
100 LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
101 jobs_.erase(jobId);
102
103 availableJob_.Release();
104 }
105 }
106
107
108 void ServerScheduler::SignalFailure(const std::string& jobId)
109 {
110 boost::mutex::scoped_lock lock(mutex_);
111
112 JobInfo& info = GetJobInfo(jobId);
113 info.failures_++;
114
115 if (info.success_ + info.failures_ >= info.size_)
116 {
117 if (info.watched_)
118 {
119 watchedJobStatus_[jobId] = JobStatus_Failure;
120 watchedJobFinished_.notify_all();
121 }
122
123 LOG(ERROR) << "Job has failed (" << info.description_ << ")";
124 jobs_.erase(jobId);
125
126 availableJob_.Release();
127 }
128 }
129
130
131 void ServerScheduler::Worker(ServerScheduler* that)
132 {
133 static const int32_t TIMEOUT = 100;
134
135 LOG(WARNING) << "The server scheduler has started";
136
137 while (!that->finish_)
138 {
139 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
140 if (object.get() != NULL)
141 {
142 ServerCommandInstance& filter = dynamic_cast<ServerCommandInstance&>(*object);
143
144 // Skip the execution of this filter if its parent job has
145 // previously failed.
146 bool jobHasFailed;
147 {
148 boost::mutex::scoped_lock lock(that->mutex_);
149 JobInfo& info = that->GetJobInfo(filter.GetJobId());
150 jobHasFailed = (info.failures_ > 0 || info.cancel_);
151 }
152
153 if (jobHasFailed)
154 {
155 that->SignalFailure(filter.GetJobId());
156 }
157 else
158 {
159 filter.Execute(*that);
160 }
161 }
162 }
163 }
164
165
166 void ServerScheduler::SubmitInternal(ServerJob& job,
167 bool watched)
168 {
169 availableJob_.Acquire();
170
171 boost::mutex::scoped_lock lock(mutex_);
172
173 JobInfo info;
174 info.size_ = job.Submit(queue_, *this);
175 info.cancel_ = false;
176 info.success_ = 0;
177 info.failures_ = 0;
178 info.description_ = job.GetDescription();
179 info.watched_ = watched;
180
181 assert(info.size_ > 0);
182
183 if (watched)
184 {
185 watchedJobStatus_[job.GetId()] = JobStatus_Running;
186 }
187
188 jobs_[job.GetId()] = info;
189
190 LOG(INFO) << "New job submitted (" << job.description_ << ")";
191 }
192
193
194 ServerScheduler::ServerScheduler(unsigned int maxJobs) : availableJob_(maxJobs)
195 {
196 if (maxJobs == 0)
197 {
198 throw OrthancException(ErrorCode_ParameterOutOfRange);
199 }
200
201 finish_ = false;
202 worker_ = boost::thread(Worker, this);
203 }
204
205
206 ServerScheduler::~ServerScheduler()
207 {
208 if (!finish_)
209 {
210 LOG(ERROR) << "INTERNAL ERROR: ServerScheduler::Finalize() should be invoked manually to avoid mess in the destruction order!";
211 Stop();
212 }
213 }
214
215
216 void ServerScheduler::Stop()
217 {
218 if (!finish_)
219 {
220 finish_ = true;
221
222 if (worker_.joinable())
223 {
224 worker_.join();
225 }
226 }
227 }
228
229
230 void ServerScheduler::Submit(ServerJob& job)
231 {
232 if (job.filters_.empty())
233 {
234 return;
235 }
236
237 SubmitInternal(job, false);
238 }
239
240
241 bool ServerScheduler::SubmitAndWait(ListOfStrings& outputs,
242 ServerJob& job)
243 {
244 std::string jobId = job.GetId();
245
246 outputs.clear();
247
248 if (job.filters_.empty())
249 {
250 return true;
251 }
252
253 // Add a sink filter to collect all the results of the filters
254 // that have no next filter.
255 ServerCommandInstance& sink = job.AddCommand(new Sink(outputs));
256
257 for (std::list<ServerCommandInstance*>::iterator
258 it = job.filters_.begin(); it != job.filters_.end(); ++it)
259 {
260 if ((*it) != &sink &&
261 (*it)->IsConnectedToSink())
262 {
263 (*it)->ConnectOutput(sink);
264 }
265 }
266
267 // Submit the job
268 SubmitInternal(job, true);
269
270 // Wait for the job to complete (either success or failure)
271 JobStatus status;
272
273 {
274 boost::mutex::scoped_lock lock(mutex_);
275
276 assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end());
277
278 while (watchedJobStatus_[jobId] == JobStatus_Running)
279 {
280 watchedJobFinished_.wait(lock);
281 }
282
283 status = watchedJobStatus_[jobId];
284 watchedJobStatus_.erase(jobId);
285 }
286
287 return (status == JobStatus_Success);
288 }
289
290
291 bool ServerScheduler::SubmitAndWait(ServerJob& job)
292 {
293 ListOfStrings ignoredSink;
294 return SubmitAndWait(ignoredSink, job);
295 }
296
297
298 bool ServerScheduler::IsRunning(const std::string& jobId)
299 {
300 boost::mutex::scoped_lock lock(mutex_);
301 return jobs_.find(jobId) != jobs_.end();
302 }
303
304
305 void ServerScheduler::Cancel(const std::string& jobId)
306 {
307 boost::mutex::scoped_lock lock(mutex_);
308
309 Jobs::iterator job = jobs_.find(jobId);
310
311 if (job != jobs_.end())
312 {
313 job->second.cancel_ = true;
314 LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")";
315 }
316 }
317
318
319 float ServerScheduler::GetProgress(const std::string& jobId)
320 {
321 boost::mutex::scoped_lock lock(mutex_);
322
323 Jobs::iterator job = jobs_.find(jobId);
324
325 if (job == jobs_.end() ||
326 job->second.size_ == 0 /* should never happen */)
327 {
328 // This job is not running
329 return 1;
330 }
331
332 if (job->second.failures_ != 0)
333 {
334 return 1;
335 }
336
337 if (job->second.size_ == 1)
338 {
339 return static_cast<float>(job->second.success_);
340 }
341
342 return (static_cast<float>(job->second.success_) /
343 static_cast<float>(job->second.size_ - 1));
344 }
345
346
347 void ServerScheduler::GetListOfJobs(ListOfStrings& jobs)
348 {
349 boost::mutex::scoped_lock lock(mutex_);
350
351 jobs.clear();
352
353 for (Jobs::const_iterator
354 it = jobs_.begin(); it != jobs_.end(); ++it)
355 {
356 jobs.push_back(it->first);
357 }
358 }
359 }