Mercurial > hg > orthanc
annotate Core/JobsEngine/JobsRegistry.cpp @ 2570:2e879c796ec7 jobs
JobsRegistry::SubmitAndWait(), StoreScuJob
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 07 May 2018 21:42:04 +0200 |
parents | 2af17cd5eb1f |
children | 3372c5255333 |
rev | line source |
---|---|
2569 | 1 /** |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium | |
6 * | |
7 * This program is free software: you can redistribute it and/or | |
8 * modify it under the terms of the GNU General Public License as | |
9 * published by the Free Software Foundation, either version 3 of the | |
10 * License, or (at your option) any later version. | |
11 * | |
12 * In addition, as a special exception, the copyright holders of this | |
13 * program give permission to link the code of its release with the | |
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it | |
15 * that use the same license as the "OpenSSL" library), and distribute | |
16 * the linked executables. You must obey the GNU General Public License | |
17 * in all respects for all of the code used other than "OpenSSL". If you | |
18 * modify file(s) with this exception, you may extend this exception to | |
19 * your version of the file(s), but you are not obligated to do so. If | |
20 * you do not wish to do so, delete this exception statement from your | |
21 * version. If you delete this exception statement from all source files | |
22 * in the program, then also delete it here. | |
23 * | |
24 * This program is distributed in the hope that it will be useful, but | |
25 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
27 * General Public License for more details. | |
28 * | |
29 * You should have received a copy of the GNU General Public License | |
30 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
31 **/ | |
32 | |
33 | |
34 #include "../PrecompiledHeaders.h" | |
35 #include "JobsRegistry.h" | |
36 | |
37 #include "../Logging.h" | |
38 #include "../OrthancException.h" | |
39 #include "../Toolbox.h" | |
40 | |
41 namespace Orthanc | |
42 { | |
43 class JobsRegistry::JobHandler : public boost::noncopyable | |
44 { | |
45 private: | |
46 std::string id_; | |
47 JobState state_; | |
48 std::auto_ptr<IJob> job_; | |
49 int priority_; // "+inf()" means highest priority | |
50 boost::posix_time::ptime creationTime_; | |
51 boost::posix_time::ptime lastStateChangeTime_; | |
52 boost::posix_time::time_duration runtime_; | |
53 boost::posix_time::ptime retryTime_; | |
54 bool pauseScheduled_; | |
55 JobStatus lastStatus_; | |
56 | |
57 void Touch() | |
58 { | |
59 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
60 | |
61 if (state_ == JobState_Running) | |
62 { | |
63 runtime_ += (now - lastStateChangeTime_); | |
64 } | |
65 | |
66 lastStateChangeTime_ = now; | |
67 } | |
68 | |
69 void SetStateInternal(JobState state) | |
70 { | |
71 state_ = state; | |
72 pauseScheduled_ = false; | |
73 Touch(); | |
74 } | |
75 | |
76 public: | |
77 JobHandler(IJob* job, | |
78 int priority) : | |
79 id_(Toolbox::GenerateUuid()), | |
80 state_(JobState_Pending), | |
81 job_(job), | |
82 priority_(priority), | |
83 creationTime_(boost::posix_time::microsec_clock::universal_time()), | |
84 lastStateChangeTime_(creationTime_), | |
85 runtime_(boost::posix_time::milliseconds(0)), | |
86 retryTime_(creationTime_), | |
87 pauseScheduled_(false) | |
88 { | |
89 if (job == NULL) | |
90 { | |
91 throw OrthancException(ErrorCode_NullPointer); | |
92 } | |
93 | |
94 lastStatus_ = JobStatus(ErrorCode_Success, *job); | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
95 job->Start(); |
2569 | 96 } |
97 | |
98 const std::string& GetId() const | |
99 { | |
100 return id_; | |
101 } | |
102 | |
103 IJob& GetJob() const | |
104 { | |
105 assert(job_.get() != NULL); | |
106 return *job_; | |
107 } | |
108 | |
109 void SetPriority(int priority) | |
110 { | |
111 priority_ = priority; | |
112 } | |
113 | |
114 int GetPriority() const | |
115 { | |
116 return priority_; | |
117 } | |
118 | |
119 JobState GetState() const | |
120 { | |
121 return state_; | |
122 } | |
123 | |
124 void SetState(JobState state) | |
125 { | |
126 if (state == JobState_Retry) | |
127 { | |
128 // Use "SetRetryState()" | |
129 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
130 } | |
131 else | |
132 { | |
133 SetStateInternal(state); | |
134 } | |
135 } | |
136 | |
137 void SetRetryState(unsigned int timeout) | |
138 { | |
139 if (state_ == JobState_Running) | |
140 { | |
141 SetStateInternal(JobState_Retry); | |
142 retryTime_ = (boost::posix_time::microsec_clock::universal_time() + | |
143 boost::posix_time::milliseconds(timeout)); | |
144 } | |
145 else | |
146 { | |
147 // Only valid for running jobs | |
148 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
149 } | |
150 } | |
151 | |
152 void SchedulePause() | |
153 { | |
154 if (state_ == JobState_Running) | |
155 { | |
156 pauseScheduled_ = true; | |
157 } | |
158 else | |
159 { | |
160 // Only valid for running jobs | |
161 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
162 } | |
163 } | |
164 | |
165 bool IsPauseScheduled() | |
166 { | |
167 return pauseScheduled_; | |
168 } | |
169 | |
170 bool IsRetryReady(const boost::posix_time::ptime& now) const | |
171 { | |
172 if (state_ != JobState_Retry) | |
173 { | |
174 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
175 } | |
176 else | |
177 { | |
178 return retryTime_ <= now; | |
179 } | |
180 } | |
181 | |
182 const boost::posix_time::ptime& GetCreationTime() const | |
183 { | |
184 return creationTime_; | |
185 } | |
186 | |
187 const boost::posix_time::ptime& GetLastStateChangeTime() const | |
188 { | |
189 return lastStateChangeTime_; | |
190 } | |
191 | |
192 const boost::posix_time::time_duration& GetRuntime() const | |
193 { | |
194 return runtime_; | |
195 } | |
196 | |
197 const JobStatus& GetLastStatus() const | |
198 { | |
199 return lastStatus_; | |
200 } | |
201 | |
202 void SetLastStatus(const JobStatus& status) | |
203 { | |
204 lastStatus_ = status; | |
205 Touch(); | |
206 } | |
207 }; | |
208 | |
209 | |
210 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a, | |
211 JobHandler*& b) const | |
212 { | |
213 return a->GetPriority() < b->GetPriority(); | |
214 } | |
215 | |
216 | |
217 #if defined(NDEBUG) | |
218 void JobsRegistry::CheckInvariants() const | |
219 { | |
220 } | |
221 | |
222 #else | |
223 bool JobsRegistry::IsPendingJob(const JobHandler& job) const | |
224 { | |
225 PendingJobs copy = pendingJobs_; | |
226 while (!copy.empty()) | |
227 { | |
228 if (copy.top() == &job) | |
229 { | |
230 return true; | |
231 } | |
232 | |
233 copy.pop(); | |
234 } | |
235 | |
236 return false; | |
237 } | |
238 | |
239 bool JobsRegistry::IsCompletedJob(JobHandler& job) const | |
240 { | |
241 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
242 it != completedJobs_.end(); ++it) | |
243 { | |
244 if (*it == &job) | |
245 { | |
246 return true; | |
247 } | |
248 } | |
249 | |
250 return false; | |
251 } | |
252 | |
253 bool JobsRegistry::IsRetryJob(JobHandler& job) const | |
254 { | |
255 return retryJobs_.find(&job) != retryJobs_.end(); | |
256 } | |
257 | |
258 void JobsRegistry::CheckInvariants() const | |
259 { | |
260 { | |
261 PendingJobs copy = pendingJobs_; | |
262 while (!copy.empty()) | |
263 { | |
264 assert(copy.top()->GetState() == JobState_Pending); | |
265 copy.pop(); | |
266 } | |
267 } | |
268 | |
269 assert(completedJobs_.size() <= maxCompletedJobs_); | |
270 | |
271 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
272 it != completedJobs_.end(); ++it) | |
273 { | |
274 assert((*it)->GetState() == JobState_Success || | |
275 (*it)->GetState() == JobState_Failure); | |
276 } | |
277 | |
278 for (RetryJobs::const_iterator it = retryJobs_.begin(); | |
279 it != retryJobs_.end(); ++it) | |
280 { | |
281 assert((*it)->GetState() == JobState_Retry); | |
282 } | |
283 | |
284 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
285 it != jobsIndex_.end(); ++it) | |
286 { | |
287 JobHandler& job = *it->second; | |
288 | |
289 assert(job.GetId() == it->first); | |
290 | |
291 switch (job.GetState()) | |
292 { | |
293 case JobState_Pending: | |
294 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); | |
295 break; | |
296 | |
297 case JobState_Success: | |
298 case JobState_Failure: | |
299 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); | |
300 break; | |
301 | |
302 case JobState_Retry: | |
303 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
304 break; | |
305 | |
306 case JobState_Running: | |
307 case JobState_Paused: | |
308 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
309 break; | |
310 | |
311 default: | |
312 throw OrthancException(ErrorCode_InternalError); | |
313 } | |
314 } | |
315 } | |
316 #endif | |
317 | |
318 | |
319 void JobsRegistry::ForgetOldCompletedJobs() | |
320 { | |
321 if (maxCompletedJobs_ != 0) | |
322 { | |
323 while (completedJobs_.size() > maxCompletedJobs_) | |
324 { | |
325 assert(completedJobs_.front() != NULL); | |
326 | |
327 std::string id = completedJobs_.front()->GetId(); | |
328 assert(jobsIndex_.find(id) != jobsIndex_.end()); | |
329 | |
330 jobsIndex_.erase(id); | |
331 delete(completedJobs_.front()); | |
332 completedJobs_.pop_front(); | |
333 } | |
334 } | |
335 } | |
336 | |
337 | |
338 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, | |
339 bool success) | |
340 { | |
341 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") | |
342 << ": " << job.GetId(); | |
343 | |
344 CheckInvariants(); | |
345 assert(job.GetState() == JobState_Running); | |
346 | |
347 job.SetState(success ? JobState_Success : JobState_Failure); | |
348 | |
349 completedJobs_.push_back(&job); | |
350 ForgetOldCompletedJobs(); | |
351 | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
352 someJobComplete_.notify_all(); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
353 |
2569 | 354 CheckInvariants(); |
355 } | |
356 | |
357 | |
358 void JobsRegistry::MarkRunningAsRetry(JobHandler& job, | |
359 unsigned int timeout) | |
360 { | |
361 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); | |
362 | |
363 CheckInvariants(); | |
364 | |
365 assert(job.GetState() == JobState_Running && | |
366 retryJobs_.find(&job) == retryJobs_.end()); | |
367 | |
368 retryJobs_.insert(&job); | |
369 job.SetRetryState(timeout); | |
370 | |
371 CheckInvariants(); | |
372 } | |
373 | |
374 | |
375 void JobsRegistry::MarkRunningAsPaused(JobHandler& job) | |
376 { | |
377 LOG(INFO) << "Job paused: " << job.GetId(); | |
378 | |
379 CheckInvariants(); | |
380 assert(job.GetState() == JobState_Running); | |
381 | |
382 job.SetState(JobState_Paused); | |
383 | |
384 CheckInvariants(); | |
385 } | |
386 | |
387 | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
388 bool JobsRegistry::GetStateInternal(JobState& state, |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
389 const std::string& id) |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
390 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
391 CheckInvariants(); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
392 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
393 JobsIndex::const_iterator it = jobsIndex_.find(id); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
394 if (it == jobsIndex_.end()) |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
395 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
396 return false; |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
397 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
398 else |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
399 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
400 state = it->second->GetState(); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
401 return true; |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
402 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
403 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
404 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
405 |
2569 | 406 JobsRegistry::~JobsRegistry() |
407 { | |
408 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) | |
409 { | |
410 assert(it->second != NULL); | |
411 delete it->second; | |
412 } | |
413 } | |
414 | |
415 | |
416 void JobsRegistry::SetMaxCompletedJobs(size_t i) | |
417 { | |
418 boost::mutex::scoped_lock lock(mutex_); | |
419 CheckInvariants(); | |
420 | |
421 maxCompletedJobs_ = i; | |
422 ForgetOldCompletedJobs(); | |
423 | |
424 CheckInvariants(); | |
425 } | |
426 | |
427 | |
428 void JobsRegistry::ListJobs(std::set<std::string>& target) | |
429 { | |
430 boost::mutex::scoped_lock lock(mutex_); | |
431 CheckInvariants(); | |
432 | |
433 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
434 it != jobsIndex_.end(); ++it) | |
435 { | |
436 target.insert(it->first); | |
437 } | |
438 } | |
439 | |
440 | |
441 bool JobsRegistry::GetJobInfo(JobInfo& target, | |
442 const std::string& id) | |
443 { | |
444 boost::mutex::scoped_lock lock(mutex_); | |
445 CheckInvariants(); | |
446 | |
447 JobsIndex::const_iterator found = jobsIndex_.find(id); | |
448 | |
449 if (found == jobsIndex_.end()) | |
450 { | |
451 return false; | |
452 } | |
453 else | |
454 { | |
455 const JobHandler& handler = *found->second; | |
456 target = JobInfo(handler.GetId(), | |
457 handler.GetPriority(), | |
458 handler.GetState(), | |
459 handler.GetLastStatus(), | |
460 handler.GetCreationTime(), | |
461 handler.GetLastStateChangeTime(), | |
462 handler.GetRuntime()); | |
463 return true; | |
464 } | |
465 } | |
466 | |
467 | |
468 void JobsRegistry::Submit(std::string& id, | |
469 IJob* job, // Takes ownership | |
470 int priority) | |
471 { | |
472 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); | |
473 | |
474 boost::mutex::scoped_lock lock(mutex_); | |
475 CheckInvariants(); | |
476 | |
477 id = handler->GetId(); | |
478 | |
479 pendingJobs_.push(handler.get()); | |
480 pendingJobAvailable_.notify_one(); | |
481 | |
482 jobsIndex_.insert(std::make_pair(id, handler.release())); | |
483 | |
484 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; | |
485 | |
486 CheckInvariants(); | |
487 } | |
488 | |
489 | |
490 void JobsRegistry::Submit(IJob* job, // Takes ownership | |
491 int priority) | |
492 { | |
493 std::string id; | |
494 Submit(id, job, priority); | |
495 } | |
496 | |
497 | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
498 bool JobsRegistry::SubmitAndWait(IJob* job, // Takes ownership |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
499 int priority) |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
500 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
501 std::string id; |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
502 Submit(id, job, priority); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
503 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
504 printf(">> %s\n", id.c_str()); fflush(stdout); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
505 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
506 JobState state; |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
507 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
508 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
509 boost::mutex::scoped_lock lock(mutex_); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
510 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
511 while (GetStateInternal(state, id) && |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
512 state != JobState_Success && |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
513 state != JobState_Failure) |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
514 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
515 someJobComplete_.wait(lock); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
516 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
517 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
518 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
519 return (state == JobState_Success); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
520 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
521 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
522 |
2569 | 523 void JobsRegistry::SetPriority(const std::string& id, |
524 int priority) | |
525 { | |
526 LOG(INFO) << "Changing priority to " << priority << " for job: " << id; | |
527 | |
528 boost::mutex::scoped_lock lock(mutex_); | |
529 CheckInvariants(); | |
530 | |
531 JobsIndex::iterator found = jobsIndex_.find(id); | |
532 | |
533 if (found == jobsIndex_.end()) | |
534 { | |
535 LOG(WARNING) << "Unknown job: " << id; | |
536 } | |
537 else | |
538 { | |
539 found->second->SetPriority(priority); | |
540 | |
541 if (found->second->GetState() == JobState_Pending) | |
542 { | |
543 // If the job is pending, we need to reconstruct the | |
544 // priority queue, as the heap condition has changed | |
545 | |
546 PendingJobs copy; | |
547 std::swap(copy, pendingJobs_); | |
548 | |
549 assert(pendingJobs_.empty()); | |
550 while (!copy.empty()) | |
551 { | |
552 pendingJobs_.push(copy.top()); | |
553 copy.pop(); | |
554 } | |
555 } | |
556 } | |
557 | |
558 CheckInvariants(); | |
559 } | |
560 | |
561 | |
562 void JobsRegistry::Pause(const std::string& id) | |
563 { | |
564 LOG(INFO) << "Pausing job: " << id; | |
565 | |
566 boost::mutex::scoped_lock lock(mutex_); | |
567 CheckInvariants(); | |
568 | |
569 JobsIndex::iterator found = jobsIndex_.find(id); | |
570 | |
571 if (found == jobsIndex_.end()) | |
572 { | |
573 LOG(WARNING) << "Unknown job: " << id; | |
574 } | |
575 else | |
576 { | |
577 switch (found->second->GetState()) | |
578 { | |
579 case JobState_Pending: | |
580 { | |
581 // If the job is pending, we need to reconstruct the | |
582 // priority queue to remove it | |
583 PendingJobs copy; | |
584 std::swap(copy, pendingJobs_); | |
585 | |
586 assert(pendingJobs_.empty()); | |
587 while (!copy.empty()) | |
588 { | |
589 if (copy.top()->GetId() != id) | |
590 { | |
591 pendingJobs_.push(copy.top()); | |
592 } | |
593 | |
594 copy.pop(); | |
595 } | |
596 | |
597 found->second->SetState(JobState_Paused); | |
598 | |
599 break; | |
600 } | |
601 | |
602 case JobState_Retry: | |
603 { | |
604 RetryJobs::iterator item = retryJobs_.find(found->second); | |
605 assert(item != retryJobs_.end()); | |
606 retryJobs_.erase(item); | |
607 | |
608 found->second->SetState(JobState_Paused); | |
609 | |
610 break; | |
611 } | |
612 | |
613 case JobState_Paused: | |
614 case JobState_Success: | |
615 case JobState_Failure: | |
616 // Nothing to be done | |
617 break; | |
618 | |
619 case JobState_Running: | |
620 found->second->SchedulePause(); | |
621 break; | |
622 | |
623 default: | |
624 throw OrthancException(ErrorCode_InternalError); | |
625 } | |
626 } | |
627 | |
628 CheckInvariants(); | |
629 } | |
630 | |
631 | |
632 void JobsRegistry::Resume(const std::string& id) | |
633 { | |
634 LOG(INFO) << "Resuming job: " << id; | |
635 | |
636 boost::mutex::scoped_lock lock(mutex_); | |
637 CheckInvariants(); | |
638 | |
639 JobsIndex::iterator found = jobsIndex_.find(id); | |
640 | |
641 if (found == jobsIndex_.end()) | |
642 { | |
643 LOG(WARNING) << "Unknown job: " << id; | |
644 } | |
645 else if (found->second->GetState() != JobState_Paused) | |
646 { | |
647 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; | |
648 } | |
649 else | |
650 { | |
651 found->second->SetState(JobState_Pending); | |
652 pendingJobs_.push(found->second); | |
653 pendingJobAvailable_.notify_one(); | |
654 } | |
655 | |
656 CheckInvariants(); | |
657 } | |
658 | |
659 | |
660 void JobsRegistry::Resubmit(const std::string& id) | |
661 { | |
662 LOG(INFO) << "Resubmitting failed job: " << id; | |
663 | |
664 boost::mutex::scoped_lock lock(mutex_); | |
665 CheckInvariants(); | |
666 | |
667 JobsIndex::iterator found = jobsIndex_.find(id); | |
668 | |
669 if (found == jobsIndex_.end()) | |
670 { | |
671 LOG(WARNING) << "Unknown job: " << id; | |
672 } | |
673 else if (found->second->GetState() != JobState_Failure) | |
674 { | |
675 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; | |
676 } | |
677 else | |
678 { | |
679 bool ok = false; | |
680 for (CompletedJobs::iterator it = completedJobs_.begin(); | |
681 it != completedJobs_.end(); ++it) | |
682 { | |
683 if (*it == found->second) | |
684 { | |
685 ok = true; | |
686 completedJobs_.erase(it); | |
687 break; | |
688 } | |
689 } | |
690 | |
691 assert(ok); | |
692 | |
693 found->second->SetState(JobState_Pending); | |
694 pendingJobs_.push(found->second); | |
695 pendingJobAvailable_.notify_one(); | |
696 } | |
697 | |
698 CheckInvariants(); | |
699 } | |
700 | |
701 | |
702 void JobsRegistry::ScheduleRetries() | |
703 { | |
704 boost::mutex::scoped_lock lock(mutex_); | |
705 CheckInvariants(); | |
706 | |
707 RetryJobs copy; | |
708 std::swap(copy, retryJobs_); | |
709 | |
710 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
711 | |
712 assert(retryJobs_.empty()); | |
713 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) | |
714 { | |
715 if ((*it)->IsRetryReady(now)) | |
716 { | |
717 LOG(INFO) << "Retrying job: " << (*it)->GetId(); | |
718 (*it)->SetState(JobState_Pending); | |
719 pendingJobs_.push(*it); | |
720 pendingJobAvailable_.notify_one(); | |
721 } | |
722 else | |
723 { | |
724 retryJobs_.insert(*it); | |
725 } | |
726 } | |
727 | |
728 CheckInvariants(); | |
729 } | |
730 | |
731 | |
732 bool JobsRegistry::GetState(JobState& state, | |
733 const std::string& id) | |
734 { | |
735 boost::mutex::scoped_lock lock(mutex_); | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
736 return GetStateInternal(state, id); |
2569 | 737 } |
738 | |
739 | |
740 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, | |
741 unsigned int timeout) : | |
742 registry_(registry), | |
743 handler_(NULL), | |
744 targetState_(JobState_Failure), | |
745 targetRetryTimeout_(0) | |
746 { | |
747 { | |
748 boost::mutex::scoped_lock lock(registry_.mutex_); | |
749 | |
750 while (registry_.pendingJobs_.empty()) | |
751 { | |
752 if (timeout == 0) | |
753 { | |
754 registry_.pendingJobAvailable_.wait(lock); | |
755 } | |
756 else | |
757 { | |
758 bool success = registry_.pendingJobAvailable_.timed_wait | |
759 (lock, boost::posix_time::milliseconds(timeout)); | |
760 if (!success) | |
761 { | |
762 // No pending job | |
763 return; | |
764 } | |
765 } | |
766 } | |
767 | |
768 handler_ = registry_.pendingJobs_.top(); | |
769 registry_.pendingJobs_.pop(); | |
770 | |
771 assert(handler_->GetState() == JobState_Pending); | |
772 handler_->SetState(JobState_Running); | |
773 | |
774 job_ = &handler_->GetJob(); | |
775 id_ = handler_->GetId(); | |
776 priority_ = handler_->GetPriority(); | |
777 } | |
778 } | |
779 | |
780 | |
781 JobsRegistry::RunningJob::~RunningJob() | |
782 { | |
783 if (IsValid()) | |
784 { | |
785 boost::mutex::scoped_lock lock(registry_.mutex_); | |
786 | |
787 switch (targetState_) | |
788 { | |
789 case JobState_Failure: | |
790 registry_.MarkRunningAsCompleted(*handler_, false); | |
791 break; | |
792 | |
793 case JobState_Success: | |
794 registry_.MarkRunningAsCompleted(*handler_, true); | |
795 break; | |
796 | |
797 case JobState_Paused: | |
798 registry_.MarkRunningAsPaused(*handler_); | |
799 break; | |
800 | |
801 case JobState_Retry: | |
802 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); | |
803 break; | |
804 | |
805 default: | |
806 assert(0); | |
807 } | |
808 } | |
809 } | |
810 | |
811 | |
812 bool JobsRegistry::RunningJob::IsValid() const | |
813 { | |
814 return (handler_ != NULL && | |
815 job_ != NULL); | |
816 } | |
817 | |
818 | |
819 const std::string& JobsRegistry::RunningJob::GetId() const | |
820 { | |
821 if (!IsValid()) | |
822 { | |
823 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
824 } | |
825 else | |
826 { | |
827 return id_; | |
828 } | |
829 } | |
830 | |
831 | |
832 int JobsRegistry::RunningJob::GetPriority() const | |
833 { | |
834 if (!IsValid()) | |
835 { | |
836 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
837 } | |
838 else | |
839 { | |
840 return priority_; | |
841 } | |
842 } | |
843 | |
844 | |
845 IJob& JobsRegistry::RunningJob::GetJob() | |
846 { | |
847 if (!IsValid()) | |
848 { | |
849 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
850 } | |
851 else | |
852 { | |
853 return *job_; | |
854 } | |
855 } | |
856 | |
857 | |
858 bool JobsRegistry::RunningJob::IsPauseScheduled() | |
859 { | |
860 if (!IsValid()) | |
861 { | |
862 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
863 } | |
864 else | |
865 { | |
866 boost::mutex::scoped_lock lock(registry_.mutex_); | |
867 registry_.CheckInvariants(); | |
868 assert(handler_->GetState() == JobState_Running); | |
869 | |
870 return handler_->IsPauseScheduled(); | |
871 } | |
872 } | |
873 | |
874 | |
875 void JobsRegistry::RunningJob::MarkSuccess() | |
876 { | |
877 if (!IsValid()) | |
878 { | |
879 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
880 } | |
881 else | |
882 { | |
883 targetState_ = JobState_Success; | |
884 } | |
885 } | |
886 | |
887 | |
888 void JobsRegistry::RunningJob::MarkFailure() | |
889 { | |
890 if (!IsValid()) | |
891 { | |
892 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
893 } | |
894 else | |
895 { | |
896 targetState_ = JobState_Failure; | |
897 } | |
898 } | |
899 | |
900 | |
901 void JobsRegistry::RunningJob::MarkPause() | |
902 { | |
903 if (!IsValid()) | |
904 { | |
905 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
906 } | |
907 else | |
908 { | |
909 targetState_ = JobState_Paused; | |
910 } | |
911 } | |
912 | |
913 | |
914 void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout) | |
915 { | |
916 if (!IsValid()) | |
917 { | |
918 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
919 } | |
920 else | |
921 { | |
922 targetState_ = JobState_Retry; | |
923 targetRetryTimeout_ = timeout; | |
924 } | |
925 } | |
926 | |
927 | |
928 void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code) | |
929 { | |
930 if (!IsValid()) | |
931 { | |
932 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
933 } | |
934 else | |
935 { | |
936 JobStatus status(code, *job_); | |
937 | |
938 boost::mutex::scoped_lock lock(registry_.mutex_); | |
939 registry_.CheckInvariants(); | |
940 assert(handler_->GetState() == JobState_Running); | |
941 | |
942 handler_->SetLastStatus(status); | |
943 } | |
944 } | |
945 } |