comparison OrthancServer/Scheduler/ServerScheduler.cpp @ 1020:1fc112c4b832

integration lua-scripting->mainline
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 10 Jul 2014 11:38:46 +0200
parents 26642cecd36d
children 6e7e5ed91c2d
comparison
equal deleted inserted replaced
1016:f4bbf13572cd 1020:1fc112c4b832
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
4 * Belgium
5 *
6 * This program is free software: you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation, either version 3 of the
9 * License, or (at your option) any later version.
10 *
11 * In addition, as a special exception, the copyright holders of this
12 * program give permission to link the code of its release with the
13 * OpenSSL project's "OpenSSL" library (or with modified versions of it
14 * that use the same license as the "OpenSSL" library), and distribute
15 * the linked executables. You must obey the GNU General Public License
16 * in all respects for all of the code used other than "OpenSSL". If you
17 * modify file(s) with this exception, you may extend this exception to
18 * your version of the file(s), but you are not obligated to do so. If
19 * you do not wish to do so, delete this exception statement from your
20 * version. If you delete this exception statement from all source files
21 * in the program, then also delete it here.
22 *
23 * This program is distributed in the hope that it will be useful, but
24 * WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 * General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License
29 * along with this program. If not, see <http://www.gnu.org/licenses/>.
30 **/
31
32
33 #include "ServerScheduler.h"
34
35 #include "../../Core/OrthancException.h"
36
37 #include <glog/logging.h>
38
39 namespace Orthanc
40 {
41 namespace
42 {
43 // Anonymous namespace to avoid clashes between compilation modules
44 class Sink : public IServerCommand
45 {
46 private:
47 ListOfStrings& target_;
48
49 public:
50 Sink(ListOfStrings& target) : target_(target)
51 {
52 }
53
54 virtual bool Apply(ListOfStrings& outputs,
55 const ListOfStrings& inputs)
56 {
57 for (ListOfStrings::const_iterator
58 it = inputs.begin(); it != inputs.end(); it++)
59 {
60 target_.push_back(*it);
61 }
62
63 return true;
64 }
65 };
66 }
67
68
69 ServerScheduler::JobInfo& ServerScheduler::GetJobInfo(const std::string& jobId)
70 {
71 Jobs::iterator info = jobs_.find(jobId);
72
73 if (info == jobs_.end())
74 {
75 throw OrthancException(ErrorCode_InternalError);
76 }
77
78 return info->second;
79 }
80
81
82 void ServerScheduler::SignalSuccess(const std::string& jobId)
83 {
84 boost::mutex::scoped_lock lock(mutex_);
85
86 JobInfo& info = GetJobInfo(jobId);
87 info.success_++;
88
89 assert(info.failures_ == 0);
90
91 if (info.success_ >= info.size_)
92 {
93 if (info.watched_)
94 {
95 watchedJobStatus_[jobId] = JobStatus_Success;
96 watchedJobFinished_.notify_all();
97 }
98
99 LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
100 jobs_.erase(jobId);
101
102 availableJob_.Release();
103 }
104 }
105
106
107 void ServerScheduler::SignalFailure(const std::string& jobId)
108 {
109 boost::mutex::scoped_lock lock(mutex_);
110
111 JobInfo& info = GetJobInfo(jobId);
112 info.failures_++;
113
114 if (info.success_ + info.failures_ >= info.size_)
115 {
116 if (info.watched_)
117 {
118 watchedJobStatus_[jobId] = JobStatus_Failure;
119 watchedJobFinished_.notify_all();
120 }
121
122 LOG(ERROR) << "Job has failed (" << info.description_ << ")";
123 jobs_.erase(jobId);
124
125 availableJob_.Release();
126 }
127 }
128
129
130 void ServerScheduler::Worker(ServerScheduler* that)
131 {
132 static const int32_t TIMEOUT = 100;
133
134 LOG(WARNING) << "The server scheduler has started";
135
136 while (!that->finish_)
137 {
138 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
139 if (object.get() != NULL)
140 {
141 ServerCommandInstance& filter = dynamic_cast<ServerCommandInstance&>(*object);
142
143 // Skip the execution of this filter if its parent job has
144 // previously failed.
145 bool jobHasFailed;
146 {
147 boost::mutex::scoped_lock lock(that->mutex_);
148 JobInfo& info = that->GetJobInfo(filter.GetJobId());
149 jobHasFailed = (info.failures_ > 0 || info.cancel_);
150 }
151
152 if (jobHasFailed)
153 {
154 that->SignalFailure(filter.GetJobId());
155 }
156 else
157 {
158 filter.Execute(*that);
159 }
160 }
161 }
162 }
163
164
165 void ServerScheduler::SubmitInternal(ServerJob& job,
166 bool watched)
167 {
168 availableJob_.Acquire();
169
170 boost::mutex::scoped_lock lock(mutex_);
171
172 JobInfo info;
173 info.size_ = job.Submit(queue_, *this);
174 info.cancel_ = false;
175 info.success_ = 0;
176 info.failures_ = 0;
177 info.description_ = job.GetDescription();
178 info.watched_ = watched;
179
180 assert(info.size_ > 0);
181
182 if (watched)
183 {
184 watchedJobStatus_[job.GetId()] = JobStatus_Running;
185 }
186
187 jobs_[job.GetId()] = info;
188
189 LOG(INFO) << "New job submitted (" << job.description_ << ")";
190 }
191
192
193 ServerScheduler::ServerScheduler(unsigned int maxJobs) : availableJob_(maxJobs)
194 {
195 finish_ = false;
196 worker_ = boost::thread(Worker, this);
197 }
198
199
200 ServerScheduler::~ServerScheduler()
201 {
202 finish_ = true;
203 worker_.join();
204 }
205
206
207 void ServerScheduler::Submit(ServerJob& job)
208 {
209 if (job.filters_.empty())
210 {
211 return;
212 }
213
214 SubmitInternal(job, false);
215 }
216
217
218 bool ServerScheduler::SubmitAndWait(ListOfStrings& outputs,
219 ServerJob& job)
220 {
221 std::string jobId = job.GetId();
222
223 outputs.clear();
224
225 if (job.filters_.empty())
226 {
227 return true;
228 }
229
230 // Add a sink filter to collect all the results of the filters
231 // that have no next filter.
232 ServerCommandInstance& sink = job.AddCommand(new Sink(outputs));
233
234 for (std::list<ServerCommandInstance*>::iterator
235 it = job.filters_.begin(); it != job.filters_.end(); it++)
236 {
237 if ((*it) != &sink &&
238 (*it)->IsConnectedToSink())
239 {
240 (*it)->ConnectOutput(sink);
241 }
242 }
243
244 // Submit the job
245 SubmitInternal(job, true);
246
247 // Wait for the job to complete (either success or failure)
248 JobStatus status;
249
250 {
251 boost::mutex::scoped_lock lock(mutex_);
252
253 assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end());
254
255 while (watchedJobStatus_[jobId] == JobStatus_Running)
256 {
257 watchedJobFinished_.wait(lock);
258 }
259
260 status = watchedJobStatus_[jobId];
261 watchedJobStatus_.erase(jobId);
262 }
263
264 return (status == JobStatus_Success);
265 }
266
267
268 bool ServerScheduler::SubmitAndWait(ServerJob& job)
269 {
270 ListOfStrings ignoredSink;
271 return SubmitAndWait(ignoredSink, job);
272 }
273
274
275 bool ServerScheduler::IsRunning(const std::string& jobId)
276 {
277 boost::mutex::scoped_lock lock(mutex_);
278 return jobs_.find(jobId) != jobs_.end();
279 }
280
281
282 void ServerScheduler::Cancel(const std::string& jobId)
283 {
284 boost::mutex::scoped_lock lock(mutex_);
285
286 Jobs::iterator job = jobs_.find(jobId);
287
288 if (job != jobs_.end())
289 {
290 job->second.cancel_ = true;
291 LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")";
292 }
293 }
294
295
296 float ServerScheduler::GetProgress(const std::string& jobId)
297 {
298 boost::mutex::scoped_lock lock(mutex_);
299
300 Jobs::iterator job = jobs_.find(jobId);
301
302 if (job == jobs_.end() ||
303 job->second.size_ == 0 /* should never happen */)
304 {
305 // This job is not running
306 return 1;
307 }
308
309 if (job->second.failures_ != 0)
310 {
311 return 1;
312 }
313
314 if (job->second.size_ == 1)
315 {
316 return job->second.success_;
317 }
318
319 return (static_cast<float>(job->second.success_) /
320 static_cast<float>(job->second.size_ - 1));
321 }
322
323
324 void ServerScheduler::GetListOfJobs(ListOfStrings& jobs)
325 {
326 boost::mutex::scoped_lock lock(mutex_);
327
328 jobs.clear();
329
330 for (Jobs::const_iterator
331 it = jobs_.begin(); it != jobs_.end(); it++)
332 {
333 jobs.push_back(it->first);
334 }
335 }
336 }