Mercurial > hg > orthanc
annotate Core/JobsEngine/JobsRegistry.cpp @ 2573:3372c5255333 jobs
StoreScuJob, Orthanc Explorer for jobs
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 09 May 2018 17:56:14 +0200 |
parents | 2e879c796ec7 |
children | 8da2cffc2378 |
rev | line source |
---|---|
2569 | 1 /** |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium | |
6 * | |
7 * This program is free software: you can redistribute it and/or | |
8 * modify it under the terms of the GNU General Public License as | |
9 * published by the Free Software Foundation, either version 3 of the | |
10 * License, or (at your option) any later version. | |
11 * | |
12 * In addition, as a special exception, the copyright holders of this | |
13 * program give permission to link the code of its release with the | |
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it | |
15 * that use the same license as the "OpenSSL" library), and distribute | |
16 * the linked executables. You must obey the GNU General Public License | |
17 * in all respects for all of the code used other than "OpenSSL". If you | |
18 * modify file(s) with this exception, you may extend this exception to | |
19 * your version of the file(s), but you are not obligated to do so. If | |
20 * you do not wish to do so, delete this exception statement from your | |
21 * version. If you delete this exception statement from all source files | |
22 * in the program, then also delete it here. | |
23 * | |
24 * This program is distributed in the hope that it will be useful, but | |
25 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
27 * General Public License for more details. | |
28 * | |
29 * You should have received a copy of the GNU General Public License | |
30 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
31 **/ | |
32 | |
33 | |
34 #include "../PrecompiledHeaders.h" | |
35 #include "JobsRegistry.h" | |
36 | |
37 #include "../Logging.h" | |
38 #include "../OrthancException.h" | |
39 #include "../Toolbox.h" | |
40 | |
41 namespace Orthanc | |
42 { | |
43 class JobsRegistry::JobHandler : public boost::noncopyable | |
44 { | |
45 private: | |
46 std::string id_; | |
47 JobState state_; | |
48 std::auto_ptr<IJob> job_; | |
49 int priority_; // "+inf()" means highest priority | |
50 boost::posix_time::ptime creationTime_; | |
51 boost::posix_time::ptime lastStateChangeTime_; | |
52 boost::posix_time::time_duration runtime_; | |
53 boost::posix_time::ptime retryTime_; | |
54 bool pauseScheduled_; | |
55 JobStatus lastStatus_; | |
56 | |
57 void Touch() | |
58 { | |
59 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
60 | |
61 if (state_ == JobState_Running) | |
62 { | |
63 runtime_ += (now - lastStateChangeTime_); | |
64 } | |
65 | |
66 lastStateChangeTime_ = now; | |
67 } | |
68 | |
69 void SetStateInternal(JobState state) | |
70 { | |
71 state_ = state; | |
72 pauseScheduled_ = false; | |
73 Touch(); | |
74 } | |
75 | |
76 public: | |
77 JobHandler(IJob* job, | |
78 int priority) : | |
79 id_(Toolbox::GenerateUuid()), | |
80 state_(JobState_Pending), | |
81 job_(job), | |
82 priority_(priority), | |
83 creationTime_(boost::posix_time::microsec_clock::universal_time()), | |
84 lastStateChangeTime_(creationTime_), | |
85 runtime_(boost::posix_time::milliseconds(0)), | |
86 retryTime_(creationTime_), | |
87 pauseScheduled_(false) | |
88 { | |
89 if (job == NULL) | |
90 { | |
91 throw OrthancException(ErrorCode_NullPointer); | |
92 } | |
93 | |
94 lastStatus_ = JobStatus(ErrorCode_Success, *job); | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
95 job->Start(); |
2569 | 96 } |
97 | |
98 const std::string& GetId() const | |
99 { | |
100 return id_; | |
101 } | |
102 | |
103 IJob& GetJob() const | |
104 { | |
105 assert(job_.get() != NULL); | |
106 return *job_; | |
107 } | |
108 | |
109 void SetPriority(int priority) | |
110 { | |
111 priority_ = priority; | |
112 } | |
113 | |
114 int GetPriority() const | |
115 { | |
116 return priority_; | |
117 } | |
118 | |
119 JobState GetState() const | |
120 { | |
121 return state_; | |
122 } | |
123 | |
124 void SetState(JobState state) | |
125 { | |
126 if (state == JobState_Retry) | |
127 { | |
128 // Use "SetRetryState()" | |
129 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
130 } | |
131 else | |
132 { | |
133 SetStateInternal(state); | |
134 } | |
135 } | |
136 | |
137 void SetRetryState(unsigned int timeout) | |
138 { | |
139 if (state_ == JobState_Running) | |
140 { | |
141 SetStateInternal(JobState_Retry); | |
142 retryTime_ = (boost::posix_time::microsec_clock::universal_time() + | |
143 boost::posix_time::milliseconds(timeout)); | |
144 } | |
145 else | |
146 { | |
147 // Only valid for running jobs | |
148 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
149 } | |
150 } | |
151 | |
152 void SchedulePause() | |
153 { | |
154 if (state_ == JobState_Running) | |
155 { | |
156 pauseScheduled_ = true; | |
157 } | |
158 else | |
159 { | |
160 // Only valid for running jobs | |
161 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
162 } | |
163 } | |
164 | |
165 bool IsPauseScheduled() | |
166 { | |
167 return pauseScheduled_; | |
168 } | |
169 | |
170 bool IsRetryReady(const boost::posix_time::ptime& now) const | |
171 { | |
172 if (state_ != JobState_Retry) | |
173 { | |
174 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
175 } | |
176 else | |
177 { | |
178 return retryTime_ <= now; | |
179 } | |
180 } | |
181 | |
182 const boost::posix_time::ptime& GetCreationTime() const | |
183 { | |
184 return creationTime_; | |
185 } | |
186 | |
187 const boost::posix_time::ptime& GetLastStateChangeTime() const | |
188 { | |
189 return lastStateChangeTime_; | |
190 } | |
191 | |
192 const boost::posix_time::time_duration& GetRuntime() const | |
193 { | |
194 return runtime_; | |
195 } | |
196 | |
197 const JobStatus& GetLastStatus() const | |
198 { | |
199 return lastStatus_; | |
200 } | |
201 | |
202 void SetLastStatus(const JobStatus& status) | |
203 { | |
204 lastStatus_ = status; | |
205 Touch(); | |
206 } | |
207 }; | |
208 | |
209 | |
210 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a, | |
211 JobHandler*& b) const | |
212 { | |
213 return a->GetPriority() < b->GetPriority(); | |
214 } | |
215 | |
216 | |
217 #if defined(NDEBUG) | |
218 void JobsRegistry::CheckInvariants() const | |
219 { | |
220 } | |
221 | |
222 #else | |
223 bool JobsRegistry::IsPendingJob(const JobHandler& job) const | |
224 { | |
225 PendingJobs copy = pendingJobs_; | |
226 while (!copy.empty()) | |
227 { | |
228 if (copy.top() == &job) | |
229 { | |
230 return true; | |
231 } | |
232 | |
233 copy.pop(); | |
234 } | |
235 | |
236 return false; | |
237 } | |
238 | |
239 bool JobsRegistry::IsCompletedJob(JobHandler& job) const | |
240 { | |
241 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
242 it != completedJobs_.end(); ++it) | |
243 { | |
244 if (*it == &job) | |
245 { | |
246 return true; | |
247 } | |
248 } | |
249 | |
250 return false; | |
251 } | |
252 | |
253 bool JobsRegistry::IsRetryJob(JobHandler& job) const | |
254 { | |
255 return retryJobs_.find(&job) != retryJobs_.end(); | |
256 } | |
257 | |
258 void JobsRegistry::CheckInvariants() const | |
259 { | |
260 { | |
261 PendingJobs copy = pendingJobs_; | |
262 while (!copy.empty()) | |
263 { | |
264 assert(copy.top()->GetState() == JobState_Pending); | |
265 copy.pop(); | |
266 } | |
267 } | |
268 | |
269 assert(completedJobs_.size() <= maxCompletedJobs_); | |
270 | |
271 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
272 it != completedJobs_.end(); ++it) | |
273 { | |
274 assert((*it)->GetState() == JobState_Success || | |
275 (*it)->GetState() == JobState_Failure); | |
276 } | |
277 | |
278 for (RetryJobs::const_iterator it = retryJobs_.begin(); | |
279 it != retryJobs_.end(); ++it) | |
280 { | |
281 assert((*it)->GetState() == JobState_Retry); | |
282 } | |
283 | |
284 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
285 it != jobsIndex_.end(); ++it) | |
286 { | |
287 JobHandler& job = *it->second; | |
288 | |
289 assert(job.GetId() == it->first); | |
290 | |
291 switch (job.GetState()) | |
292 { | |
293 case JobState_Pending: | |
294 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); | |
295 break; | |
296 | |
297 case JobState_Success: | |
298 case JobState_Failure: | |
299 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); | |
300 break; | |
301 | |
302 case JobState_Retry: | |
303 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
304 break; | |
305 | |
306 case JobState_Running: | |
307 case JobState_Paused: | |
308 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
309 break; | |
310 | |
311 default: | |
312 throw OrthancException(ErrorCode_InternalError); | |
313 } | |
314 } | |
315 } | |
316 #endif | |
317 | |
318 | |
319 void JobsRegistry::ForgetOldCompletedJobs() | |
320 { | |
321 if (maxCompletedJobs_ != 0) | |
322 { | |
323 while (completedJobs_.size() > maxCompletedJobs_) | |
324 { | |
325 assert(completedJobs_.front() != NULL); | |
326 | |
327 std::string id = completedJobs_.front()->GetId(); | |
328 assert(jobsIndex_.find(id) != jobsIndex_.end()); | |
329 | |
330 jobsIndex_.erase(id); | |
331 delete(completedJobs_.front()); | |
332 completedJobs_.pop_front(); | |
333 } | |
334 } | |
335 } | |
336 | |
337 | |
338 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, | |
339 bool success) | |
340 { | |
341 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") | |
342 << ": " << job.GetId(); | |
343 | |
344 CheckInvariants(); | |
345 assert(job.GetState() == JobState_Running); | |
346 | |
347 job.SetState(success ? JobState_Success : JobState_Failure); | |
348 | |
349 completedJobs_.push_back(&job); | |
350 ForgetOldCompletedJobs(); | |
351 | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
352 someJobComplete_.notify_all(); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
353 |
2569 | 354 CheckInvariants(); |
355 } | |
356 | |
357 | |
358 void JobsRegistry::MarkRunningAsRetry(JobHandler& job, | |
359 unsigned int timeout) | |
360 { | |
361 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); | |
362 | |
363 CheckInvariants(); | |
364 | |
365 assert(job.GetState() == JobState_Running && | |
366 retryJobs_.find(&job) == retryJobs_.end()); | |
367 | |
368 retryJobs_.insert(&job); | |
369 job.SetRetryState(timeout); | |
370 | |
371 CheckInvariants(); | |
372 } | |
373 | |
374 | |
375 void JobsRegistry::MarkRunningAsPaused(JobHandler& job) | |
376 { | |
377 LOG(INFO) << "Job paused: " << job.GetId(); | |
378 | |
379 CheckInvariants(); | |
380 assert(job.GetState() == JobState_Running); | |
381 | |
382 job.SetState(JobState_Paused); | |
383 | |
384 CheckInvariants(); | |
385 } | |
386 | |
387 | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
388 bool JobsRegistry::GetStateInternal(JobState& state, |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
389 const std::string& id) |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
390 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
391 CheckInvariants(); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
392 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
393 JobsIndex::const_iterator it = jobsIndex_.find(id); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
394 if (it == jobsIndex_.end()) |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
395 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
396 return false; |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
397 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
398 else |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
399 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
400 state = it->second->GetState(); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
401 return true; |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
402 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
403 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
404 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
405 |
2569 | 406 JobsRegistry::~JobsRegistry() |
407 { | |
408 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) | |
409 { | |
410 assert(it->second != NULL); | |
411 delete it->second; | |
412 } | |
413 } | |
414 | |
415 | |
416 void JobsRegistry::SetMaxCompletedJobs(size_t i) | |
417 { | |
418 boost::mutex::scoped_lock lock(mutex_); | |
419 CheckInvariants(); | |
420 | |
421 maxCompletedJobs_ = i; | |
422 ForgetOldCompletedJobs(); | |
423 | |
424 CheckInvariants(); | |
425 } | |
426 | |
427 | |
428 void JobsRegistry::ListJobs(std::set<std::string>& target) | |
429 { | |
430 boost::mutex::scoped_lock lock(mutex_); | |
431 CheckInvariants(); | |
432 | |
433 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
434 it != jobsIndex_.end(); ++it) | |
435 { | |
436 target.insert(it->first); | |
437 } | |
438 } | |
439 | |
440 | |
441 bool JobsRegistry::GetJobInfo(JobInfo& target, | |
442 const std::string& id) | |
443 { | |
444 boost::mutex::scoped_lock lock(mutex_); | |
445 CheckInvariants(); | |
446 | |
447 JobsIndex::const_iterator found = jobsIndex_.find(id); | |
448 | |
449 if (found == jobsIndex_.end()) | |
450 { | |
451 return false; | |
452 } | |
453 else | |
454 { | |
455 const JobHandler& handler = *found->second; | |
456 target = JobInfo(handler.GetId(), | |
457 handler.GetPriority(), | |
458 handler.GetState(), | |
459 handler.GetLastStatus(), | |
460 handler.GetCreationTime(), | |
461 handler.GetLastStateChangeTime(), | |
462 handler.GetRuntime()); | |
463 return true; | |
464 } | |
465 } | |
466 | |
467 | |
468 void JobsRegistry::Submit(std::string& id, | |
469 IJob* job, // Takes ownership | |
470 int priority) | |
471 { | |
472 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); | |
473 | |
474 boost::mutex::scoped_lock lock(mutex_); | |
475 CheckInvariants(); | |
476 | |
477 id = handler->GetId(); | |
478 | |
479 pendingJobs_.push(handler.get()); | |
480 pendingJobAvailable_.notify_one(); | |
481 | |
482 jobsIndex_.insert(std::make_pair(id, handler.release())); | |
483 | |
484 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; | |
485 | |
486 CheckInvariants(); | |
487 } | |
488 | |
489 | |
490 void JobsRegistry::Submit(IJob* job, // Takes ownership | |
491 int priority) | |
492 { | |
493 std::string id; | |
494 Submit(id, job, priority); | |
495 } | |
496 | |
497 | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
498 bool JobsRegistry::SubmitAndWait(IJob* job, // Takes ownership |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
499 int priority) |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
500 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
501 std::string id; |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
502 Submit(id, job, priority); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
503 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
504 printf(">> %s\n", id.c_str()); fflush(stdout); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
505 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
506 JobState state; |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
507 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
508 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
509 boost::mutex::scoped_lock lock(mutex_); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
510 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
511 while (GetStateInternal(state, id) && |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
512 state != JobState_Success && |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
513 state != JobState_Failure) |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
514 { |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
515 someJobComplete_.wait(lock); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
516 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
517 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
518 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
519 return (state == JobState_Success); |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
520 } |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
521 |
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
522 |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
523 bool JobsRegistry::SetPriority(const std::string& id, |
2569 | 524 int priority) |
525 { | |
526 LOG(INFO) << "Changing priority to " << priority << " for job: " << id; | |
527 | |
528 boost::mutex::scoped_lock lock(mutex_); | |
529 CheckInvariants(); | |
530 | |
531 JobsIndex::iterator found = jobsIndex_.find(id); | |
532 | |
533 if (found == jobsIndex_.end()) | |
534 { | |
535 LOG(WARNING) << "Unknown job: " << id; | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
536 return false; |
2569 | 537 } |
538 else | |
539 { | |
540 found->second->SetPriority(priority); | |
541 | |
542 if (found->second->GetState() == JobState_Pending) | |
543 { | |
544 // If the job is pending, we need to reconstruct the | |
545 // priority queue, as the heap condition has changed | |
546 | |
547 PendingJobs copy; | |
548 std::swap(copy, pendingJobs_); | |
549 | |
550 assert(pendingJobs_.empty()); | |
551 while (!copy.empty()) | |
552 { | |
553 pendingJobs_.push(copy.top()); | |
554 copy.pop(); | |
555 } | |
556 } | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
557 |
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
558 CheckInvariants(); |
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
559 return true; |
2569 | 560 } |
561 } | |
562 | |
563 | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
564 bool JobsRegistry::Pause(const std::string& id) |
2569 | 565 { |
566 LOG(INFO) << "Pausing job: " << id; | |
567 | |
568 boost::mutex::scoped_lock lock(mutex_); | |
569 CheckInvariants(); | |
570 | |
571 JobsIndex::iterator found = jobsIndex_.find(id); | |
572 | |
573 if (found == jobsIndex_.end()) | |
574 { | |
575 LOG(WARNING) << "Unknown job: " << id; | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
576 return false; |
2569 | 577 } |
578 else | |
579 { | |
580 switch (found->second->GetState()) | |
581 { | |
582 case JobState_Pending: | |
583 { | |
584 // If the job is pending, we need to reconstruct the | |
585 // priority queue to remove it | |
586 PendingJobs copy; | |
587 std::swap(copy, pendingJobs_); | |
588 | |
589 assert(pendingJobs_.empty()); | |
590 while (!copy.empty()) | |
591 { | |
592 if (copy.top()->GetId() != id) | |
593 { | |
594 pendingJobs_.push(copy.top()); | |
595 } | |
596 | |
597 copy.pop(); | |
598 } | |
599 | |
600 found->second->SetState(JobState_Paused); | |
601 | |
602 break; | |
603 } | |
604 | |
605 case JobState_Retry: | |
606 { | |
607 RetryJobs::iterator item = retryJobs_.find(found->second); | |
608 assert(item != retryJobs_.end()); | |
609 retryJobs_.erase(item); | |
610 | |
611 found->second->SetState(JobState_Paused); | |
612 | |
613 break; | |
614 } | |
615 | |
616 case JobState_Paused: | |
617 case JobState_Success: | |
618 case JobState_Failure: | |
619 // Nothing to be done | |
620 break; | |
621 | |
622 case JobState_Running: | |
623 found->second->SchedulePause(); | |
624 break; | |
625 | |
626 default: | |
627 throw OrthancException(ErrorCode_InternalError); | |
628 } | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
629 |
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
630 CheckInvariants(); |
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
631 return true; |
2569 | 632 } |
633 } | |
634 | |
635 | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
636 bool JobsRegistry::Resume(const std::string& id) |
2569 | 637 { |
638 LOG(INFO) << "Resuming job: " << id; | |
639 | |
640 boost::mutex::scoped_lock lock(mutex_); | |
641 CheckInvariants(); | |
642 | |
643 JobsIndex::iterator found = jobsIndex_.find(id); | |
644 | |
645 if (found == jobsIndex_.end()) | |
646 { | |
647 LOG(WARNING) << "Unknown job: " << id; | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
648 return false; |
2569 | 649 } |
650 else if (found->second->GetState() != JobState_Paused) | |
651 { | |
652 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
653 return false; |
2569 | 654 } |
655 else | |
656 { | |
657 found->second->SetState(JobState_Pending); | |
658 pendingJobs_.push(found->second); | |
659 pendingJobAvailable_.notify_one(); | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
660 CheckInvariants(); |
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
661 return true; |
2569 | 662 } |
663 } | |
664 | |
665 | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
666 bool JobsRegistry::Resubmit(const std::string& id) |
2569 | 667 { |
668 LOG(INFO) << "Resubmitting failed job: " << id; | |
669 | |
670 boost::mutex::scoped_lock lock(mutex_); | |
671 CheckInvariants(); | |
672 | |
673 JobsIndex::iterator found = jobsIndex_.find(id); | |
674 | |
675 if (found == jobsIndex_.end()) | |
676 { | |
677 LOG(WARNING) << "Unknown job: " << id; | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
678 return false; |
2569 | 679 } |
680 else if (found->second->GetState() != JobState_Failure) | |
681 { | |
682 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
683 return false; |
2569 | 684 } |
685 else | |
686 { | |
687 bool ok = false; | |
688 for (CompletedJobs::iterator it = completedJobs_.begin(); | |
689 it != completedJobs_.end(); ++it) | |
690 { | |
691 if (*it == found->second) | |
692 { | |
693 ok = true; | |
694 completedJobs_.erase(it); | |
695 break; | |
696 } | |
697 } | |
698 | |
699 assert(ok); | |
700 | |
701 found->second->SetState(JobState_Pending); | |
702 pendingJobs_.push(found->second); | |
703 pendingJobAvailable_.notify_one(); | |
2573
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
704 |
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
705 CheckInvariants(); |
3372c5255333
StoreScuJob, Orthanc Explorer for jobs
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2570
diff
changeset
|
706 return true; |
2569 | 707 } |
708 } | |
709 | |
710 | |
711 void JobsRegistry::ScheduleRetries() | |
712 { | |
713 boost::mutex::scoped_lock lock(mutex_); | |
714 CheckInvariants(); | |
715 | |
716 RetryJobs copy; | |
717 std::swap(copy, retryJobs_); | |
718 | |
719 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
720 | |
721 assert(retryJobs_.empty()); | |
722 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) | |
723 { | |
724 if ((*it)->IsRetryReady(now)) | |
725 { | |
726 LOG(INFO) << "Retrying job: " << (*it)->GetId(); | |
727 (*it)->SetState(JobState_Pending); | |
728 pendingJobs_.push(*it); | |
729 pendingJobAvailable_.notify_one(); | |
730 } | |
731 else | |
732 { | |
733 retryJobs_.insert(*it); | |
734 } | |
735 } | |
736 | |
737 CheckInvariants(); | |
738 } | |
739 | |
740 | |
741 bool JobsRegistry::GetState(JobState& state, | |
742 const std::string& id) | |
743 { | |
744 boost::mutex::scoped_lock lock(mutex_); | |
2570
2e879c796ec7
JobsRegistry::SubmitAndWait(), StoreScuJob
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2569
diff
changeset
|
745 return GetStateInternal(state, id); |
2569 | 746 } |
747 | |
748 | |
749 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, | |
750 unsigned int timeout) : | |
751 registry_(registry), | |
752 handler_(NULL), | |
753 targetState_(JobState_Failure), | |
754 targetRetryTimeout_(0) | |
755 { | |
756 { | |
757 boost::mutex::scoped_lock lock(registry_.mutex_); | |
758 | |
759 while (registry_.pendingJobs_.empty()) | |
760 { | |
761 if (timeout == 0) | |
762 { | |
763 registry_.pendingJobAvailable_.wait(lock); | |
764 } | |
765 else | |
766 { | |
767 bool success = registry_.pendingJobAvailable_.timed_wait | |
768 (lock, boost::posix_time::milliseconds(timeout)); | |
769 if (!success) | |
770 { | |
771 // No pending job | |
772 return; | |
773 } | |
774 } | |
775 } | |
776 | |
777 handler_ = registry_.pendingJobs_.top(); | |
778 registry_.pendingJobs_.pop(); | |
779 | |
780 assert(handler_->GetState() == JobState_Pending); | |
781 handler_->SetState(JobState_Running); | |
782 | |
783 job_ = &handler_->GetJob(); | |
784 id_ = handler_->GetId(); | |
785 priority_ = handler_->GetPriority(); | |
786 } | |
787 } | |
788 | |
789 | |
790 JobsRegistry::RunningJob::~RunningJob() | |
791 { | |
792 if (IsValid()) | |
793 { | |
794 boost::mutex::scoped_lock lock(registry_.mutex_); | |
795 | |
796 switch (targetState_) | |
797 { | |
798 case JobState_Failure: | |
799 registry_.MarkRunningAsCompleted(*handler_, false); | |
800 break; | |
801 | |
802 case JobState_Success: | |
803 registry_.MarkRunningAsCompleted(*handler_, true); | |
804 break; | |
805 | |
806 case JobState_Paused: | |
807 registry_.MarkRunningAsPaused(*handler_); | |
808 break; | |
809 | |
810 case JobState_Retry: | |
811 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); | |
812 break; | |
813 | |
814 default: | |
815 assert(0); | |
816 } | |
817 } | |
818 } | |
819 | |
820 | |
821 bool JobsRegistry::RunningJob::IsValid() const | |
822 { | |
823 return (handler_ != NULL && | |
824 job_ != NULL); | |
825 } | |
826 | |
827 | |
828 const std::string& JobsRegistry::RunningJob::GetId() const | |
829 { | |
830 if (!IsValid()) | |
831 { | |
832 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
833 } | |
834 else | |
835 { | |
836 return id_; | |
837 } | |
838 } | |
839 | |
840 | |
841 int JobsRegistry::RunningJob::GetPriority() const | |
842 { | |
843 if (!IsValid()) | |
844 { | |
845 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
846 } | |
847 else | |
848 { | |
849 return priority_; | |
850 } | |
851 } | |
852 | |
853 | |
854 IJob& JobsRegistry::RunningJob::GetJob() | |
855 { | |
856 if (!IsValid()) | |
857 { | |
858 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
859 } | |
860 else | |
861 { | |
862 return *job_; | |
863 } | |
864 } | |
865 | |
866 | |
867 bool JobsRegistry::RunningJob::IsPauseScheduled() | |
868 { | |
869 if (!IsValid()) | |
870 { | |
871 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
872 } | |
873 else | |
874 { | |
875 boost::mutex::scoped_lock lock(registry_.mutex_); | |
876 registry_.CheckInvariants(); | |
877 assert(handler_->GetState() == JobState_Running); | |
878 | |
879 return handler_->IsPauseScheduled(); | |
880 } | |
881 } | |
882 | |
883 | |
884 void JobsRegistry::RunningJob::MarkSuccess() | |
885 { | |
886 if (!IsValid()) | |
887 { | |
888 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
889 } | |
890 else | |
891 { | |
892 targetState_ = JobState_Success; | |
893 } | |
894 } | |
895 | |
896 | |
897 void JobsRegistry::RunningJob::MarkFailure() | |
898 { | |
899 if (!IsValid()) | |
900 { | |
901 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
902 } | |
903 else | |
904 { | |
905 targetState_ = JobState_Failure; | |
906 } | |
907 } | |
908 | |
909 | |
910 void JobsRegistry::RunningJob::MarkPause() | |
911 { | |
912 if (!IsValid()) | |
913 { | |
914 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
915 } | |
916 else | |
917 { | |
918 targetState_ = JobState_Paused; | |
919 } | |
920 } | |
921 | |
922 | |
923 void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout) | |
924 { | |
925 if (!IsValid()) | |
926 { | |
927 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
928 } | |
929 else | |
930 { | |
931 targetState_ = JobState_Retry; | |
932 targetRetryTimeout_ = timeout; | |
933 } | |
934 } | |
935 | |
936 | |
937 void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code) | |
938 { | |
939 if (!IsValid()) | |
940 { | |
941 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
942 } | |
943 else | |
944 { | |
945 JobStatus status(code, *job_); | |
946 | |
947 boost::mutex::scoped_lock lock(registry_.mutex_); | |
948 registry_.CheckInvariants(); | |
949 assert(handler_->GetState() == JobState_Running); | |
950 | |
951 handler_->SetLastStatus(status); | |
952 } | |
953 } | |
954 } |