comparison OrthancServer/Scheduler/ServerScheduler.cpp @ 781:f0ac3a53ccf2 lua-scripting

scheduler
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 30 Apr 2014 18:30:05 +0200
parents
children c9cdd53a6b31
comparison
equal deleted inserted replaced
779:76eb563f08f0 781:f0ac3a53ccf2
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
4 * Belgium
5 *
6 * This program is free software: you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation, either version 3 of the
9 * License, or (at your option) any later version.
10 *
11 * In addition, as a special exception, the copyright holders of this
12 * program give permission to link the code of its release with the
13 * OpenSSL project's "OpenSSL" library (or with modified versions of it
14 * that use the same license as the "OpenSSL" library), and distribute
15 * the linked executables. You must obey the GNU General Public License
16 * in all respects for all of the code used other than "OpenSSL". If you
17 * modify file(s) with this exception, you may extend this exception to
18 * your version of the file(s), but you are not obligated to do so. If
19 * you do not wish to do so, delete this exception statement from your
20 * version. If you delete this exception statement from all source files
21 * in the program, then also delete it here.
22 *
23 * This program is distributed in the hope that it will be useful, but
24 * WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 * General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License
29 * along with this program. If not, see <http://www.gnu.org/licenses/>.
30 **/
31
32
33 #include "ServerScheduler.h"
34
35 #include "../../Core/OrthancException.h"
36
37 #include <glog/logging.h>
38
39 namespace Orthanc
40 {
41 namespace
42 {
43 // Anonymous namespace to avoid clashes between compilation modules
44 class Sink : public IServerFilter
45 {
46 private:
47 ListOfStrings& target_;
48
49 public:
50 Sink(ListOfStrings& target) : target_(target)
51 {
52 }
53
54 virtual bool SendOutputsToSink() const
55 {
56 return false;
57 }
58
59 virtual bool Apply(ListOfStrings& outputs,
60 const ListOfStrings& inputs)
61 {
62 for (ListOfStrings::const_iterator
63 it = inputs.begin(); it != inputs.end(); it++)
64 {
65 target_.push_back(*it);
66 }
67
68 return true;
69 }
70 };
71 }
72
73
74 ServerScheduler::JobInfo& ServerScheduler::GetJobInfo(const std::string& jobId)
75 {
76 Jobs::iterator info = jobs_.find(jobId);
77
78 if (info == jobs_.end())
79 {
80 throw OrthancException(ErrorCode_InternalError);
81 }
82
83 return info->second;
84 }
85
86
87 void ServerScheduler::SignalSuccess(const std::string& jobId)
88 {
89 boost::mutex::scoped_lock lock(mutex_);
90
91 JobInfo& info = GetJobInfo(jobId);
92 info.success_++;
93
94 assert(info.failures_ == 0);
95
96 if (info.success_ >= info.size_)
97 {
98 if (info.watched_)
99 {
100 watchedJobStatus_[jobId] = JobStatus_Success;
101 jobFinished_.notify_all();
102 }
103
104 LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
105 jobs_.erase(jobId);
106 }
107 }
108
109
110 void ServerScheduler::SignalFailure(const std::string& jobId)
111 {
112 boost::mutex::scoped_lock lock(mutex_);
113
114 JobInfo& info = GetJobInfo(jobId);
115 info.failures_++;
116
117 if (info.success_ + info.failures_ >= info.size_)
118 {
119 if (info.watched_)
120 {
121 watchedJobStatus_[jobId] = JobStatus_Failure;
122 jobFinished_.notify_all();
123 }
124
125 LOG(ERROR) << "Job has failed (" << info.description_ << ")";
126 jobs_.erase(jobId);
127 }
128 }
129
130
131 void ServerScheduler::Worker(ServerScheduler* that)
132 {
133 static const int32_t TIMEOUT = 100;
134
135 while (!that->finish_)
136 {
137 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
138 if (object.get() != NULL)
139 {
140 ServerFilterInstance& filter = dynamic_cast<ServerFilterInstance&>(*object);
141
142 // Skip the execution of this filter if its parent job has
143 // previously failed.
144 bool jobHasFailed;
145 {
146 boost::mutex::scoped_lock lock(that->mutex_);
147 JobInfo& info = that->GetJobInfo(filter.GetJobId());
148 jobHasFailed = (info.failures_ > 0 || info.cancel_);
149 }
150
151 if (jobHasFailed)
152 {
153 that->SignalFailure(filter.GetJobId());
154 }
155 else
156 {
157 filter.Execute(*that);
158 }
159 }
160 }
161 }
162
163
164 void ServerScheduler::SubmitInternal(ServerJob& job,
165 bool watched)
166 {
167 boost::mutex::scoped_lock lock(mutex_);
168
169 JobInfo info;
170 info.size_ = job.Submit(queue_, *this);
171 info.cancel_ = false;
172 info.success_ = 0;
173 info.failures_ = 0;
174 info.description_ = job.GetDescription();
175 info.watched_ = watched;
176
177 assert(info.size_ > 0);
178
179 if (watched)
180 {
181 watchedJobStatus_[job.GetId()] = JobStatus_Running;
182 }
183
184 jobs_[job.GetId()] = info;
185
186 LOG(INFO) << "New job submitted (" << job.description_ << ")";
187 }
188
189
190 ServerScheduler::ServerScheduler()
191 {
192 finish_ = false;
193 worker_ = boost::thread(Worker, this);
194 }
195
196
197 ServerScheduler::~ServerScheduler()
198 {
199 finish_ = true;
200 worker_.join();
201 }
202
203
204 void ServerScheduler::Submit(ServerJob& job)
205 {
206 if (job.filters_.empty())
207 {
208 return;
209 }
210
211 SubmitInternal(job, false);
212 }
213
214
215 bool ServerScheduler::SubmitAndWait(ListOfStrings& outputs,
216 ServerJob& job)
217 {
218 std::string jobId = job.GetId();
219
220 outputs.clear();
221
222 if (job.filters_.empty())
223 {
224 return true;
225 }
226
227 // Add a sink filter to collect all the results of the filters
228 // that have no next filter.
229 ServerFilterInstance& sink = job.AddFilter(new Sink(outputs));
230
231 for (std::list<ServerFilterInstance*>::iterator
232 it = job.filters_.begin(); it != job.filters_.end(); it++)
233 {
234 if ((*it) != &sink &&
235 (*it)->GetNextFilters().size() == 0 &&
236 (*it)->GetFilter().SendOutputsToSink())
237 {
238 (*it)->ConnectNext(sink);
239 }
240 }
241
242 // Submit the job
243 SubmitInternal(job, true);
244
245 // Wait for the job to complete (either success or failure)
246 JobStatus status;
247
248 {
249 boost::mutex::scoped_lock lock(mutex_);
250
251 assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end());
252
253 while (watchedJobStatus_[jobId] == JobStatus_Running)
254 {
255 jobFinished_.wait(lock);
256 }
257
258 status = watchedJobStatus_[jobId];
259 watchedJobStatus_.erase(jobId);
260 }
261
262 return (status == JobStatus_Success);
263 }
264
265
266 bool ServerScheduler::IsRunning(const std::string& jobId)
267 {
268 boost::mutex::scoped_lock lock(mutex_);
269 return jobs_.find(jobId) != jobs_.end();
270 }
271
272
273 void ServerScheduler::Cancel(const std::string& jobId)
274 {
275 boost::mutex::scoped_lock lock(mutex_);
276
277 Jobs::iterator job = jobs_.find(jobId);
278
279 if (job != jobs_.end())
280 {
281 job->second.cancel_ = true;
282 LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")";
283 }
284 }
285
286
287 float ServerScheduler::GetProgress(const std::string& jobId)
288 {
289 boost::mutex::scoped_lock lock(mutex_);
290
291 Jobs::iterator job = jobs_.find(jobId);
292
293 if (job == jobs_.end() ||
294 job->second.size_ == 0 /* should never happen */)
295 {
296 // This job is not running
297 return 1;
298 }
299
300 if (job->second.failures_ != 0)
301 {
302 return 1;
303 }
304
305 if (job->second.size_ == 1)
306 {
307 return job->second.success_;
308 }
309
310 return (static_cast<float>(job->second.success_) /
311 static_cast<float>(job->second.size_ - 1));
312 }
313
314
315 void ServerScheduler::GetListOfJobs(ListOfStrings& jobs)
316 {
317 boost::mutex::scoped_lock lock(mutex_);
318
319 jobs.clear();
320
321 for (Jobs::const_iterator
322 it = jobs_.begin(); it != jobs_.end(); it++)
323 {
324 jobs.push_back(it->first);
325 }
326 }
327 }