Mercurial > hg > orthanc
comparison OrthancFramework/Sources/JobsEngine/JobsRegistry.cpp @ 4044:d25f4c0fa160 framework
splitting code into OrthancFramework and OrthancServer
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 10 Jun 2020 20:30:34 +0200 |
parents | Core/JobsEngine/JobsRegistry.cpp@1d2b31fc782f |
children | bf7b9edf6b81 |
comparison
equal
deleted
inserted
replaced
4043:6c6239aec462 | 4044:d25f4c0fa160 |
---|---|
1 /** | |
2 * Orthanc - A Lightweight, RESTful DICOM Store | |
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
4 * Department, University Hospital of Liege, Belgium | |
5 * Copyright (C) 2017-2020 Osimis S.A., Belgium | |
6 * | |
7 * This program is free software: you can redistribute it and/or | |
8 * modify it under the terms of the GNU General Public License as | |
9 * published by the Free Software Foundation, either version 3 of the | |
10 * License, or (at your option) any later version. | |
11 * | |
12 * In addition, as a special exception, the copyright holders of this | |
13 * program give permission to link the code of its release with the | |
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it | |
15 * that use the same license as the "OpenSSL" library), and distribute | |
16 * the linked executables. You must obey the GNU General Public License | |
17 * in all respects for all of the code used other than "OpenSSL". If you | |
18 * modify file(s) with this exception, you may extend this exception to | |
19 * your version of the file(s), but you are not obligated to do so. If | |
20 * you do not wish to do so, delete this exception statement from your | |
21 * version. If you delete this exception statement from all source files | |
22 * in the program, then also delete it here. | |
23 * | |
24 * This program is distributed in the hope that it will be useful, but | |
25 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
27 * General Public License for more details. | |
28 * | |
29 * You should have received a copy of the GNU General Public License | |
30 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
31 **/ | |
32 | |
33 | |
34 #include "../PrecompiledHeaders.h" | |
35 #include "JobsRegistry.h" | |
36 | |
37 #include "../Logging.h" | |
38 #include "../OrthancException.h" | |
39 #include "../Toolbox.h" | |
40 #include "../SerializationToolbox.h" | |
41 | |
42 namespace Orthanc | |
43 { | |
44 static const char* STATE = "State"; | |
45 static const char* TYPE = "Type"; | |
46 static const char* PRIORITY = "Priority"; | |
47 static const char* JOB = "Job"; | |
48 static const char* JOBS = "Jobs"; | |
49 static const char* JOBS_REGISTRY = "JobsRegistry"; | |
50 static const char* CREATION_TIME = "CreationTime"; | |
51 static const char* LAST_CHANGE_TIME = "LastChangeTime"; | |
52 static const char* RUNTIME = "Runtime"; | |
53 | |
54 | |
55 class JobsRegistry::JobHandler : public boost::noncopyable | |
56 { | |
57 private: | |
58 std::string id_; | |
59 JobState state_; | |
60 std::string jobType_; | |
61 std::unique_ptr<IJob> job_; | |
62 int priority_; // "+inf()" means highest priority | |
63 boost::posix_time::ptime creationTime_; | |
64 boost::posix_time::ptime lastStateChangeTime_; | |
65 boost::posix_time::time_duration runtime_; | |
66 boost::posix_time::ptime retryTime_; | |
67 bool pauseScheduled_; | |
68 bool cancelScheduled_; | |
69 JobStatus lastStatus_; | |
70 | |
71 void Touch() | |
72 { | |
73 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
74 | |
75 if (state_ == JobState_Running) | |
76 { | |
77 runtime_ += (now - lastStateChangeTime_); | |
78 } | |
79 | |
80 lastStateChangeTime_ = now; | |
81 } | |
82 | |
83 void SetStateInternal(JobState state) | |
84 { | |
85 state_ = state; | |
86 pauseScheduled_ = false; | |
87 cancelScheduled_ = false; | |
88 Touch(); | |
89 } | |
90 | |
91 public: | |
92 JobHandler(IJob* job, | |
93 int priority) : | |
94 id_(Toolbox::GenerateUuid()), | |
95 state_(JobState_Pending), | |
96 job_(job), | |
97 priority_(priority), | |
98 creationTime_(boost::posix_time::microsec_clock::universal_time()), | |
99 lastStateChangeTime_(creationTime_), | |
100 runtime_(boost::posix_time::milliseconds(0)), | |
101 retryTime_(creationTime_), | |
102 pauseScheduled_(false), | |
103 cancelScheduled_(false) | |
104 { | |
105 if (job == NULL) | |
106 { | |
107 throw OrthancException(ErrorCode_NullPointer); | |
108 } | |
109 | |
110 job->GetJobType(jobType_); | |
111 job->Start(); | |
112 | |
113 lastStatus_ = JobStatus(ErrorCode_Success, "", *job_); | |
114 } | |
115 | |
116 const std::string& GetId() const | |
117 { | |
118 return id_; | |
119 } | |
120 | |
121 IJob& GetJob() const | |
122 { | |
123 assert(job_.get() != NULL); | |
124 return *job_; | |
125 } | |
126 | |
127 void SetPriority(int priority) | |
128 { | |
129 priority_ = priority; | |
130 } | |
131 | |
132 int GetPriority() const | |
133 { | |
134 return priority_; | |
135 } | |
136 | |
137 JobState GetState() const | |
138 { | |
139 return state_; | |
140 } | |
141 | |
142 void SetState(JobState state) | |
143 { | |
144 if (state == JobState_Retry) | |
145 { | |
146 // Use "SetRetryState()" | |
147 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
148 } | |
149 else | |
150 { | |
151 SetStateInternal(state); | |
152 } | |
153 } | |
154 | |
155 void SetRetryState(unsigned int timeout) | |
156 { | |
157 if (state_ == JobState_Running) | |
158 { | |
159 SetStateInternal(JobState_Retry); | |
160 retryTime_ = (boost::posix_time::microsec_clock::universal_time() + | |
161 boost::posix_time::milliseconds(timeout)); | |
162 } | |
163 else | |
164 { | |
165 // Only valid for running jobs | |
166 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
167 } | |
168 } | |
169 | |
170 void SchedulePause() | |
171 { | |
172 if (state_ == JobState_Running) | |
173 { | |
174 pauseScheduled_ = true; | |
175 } | |
176 else | |
177 { | |
178 // Only valid for running jobs | |
179 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
180 } | |
181 } | |
182 | |
183 void ScheduleCancel() | |
184 { | |
185 if (state_ == JobState_Running) | |
186 { | |
187 cancelScheduled_ = true; | |
188 } | |
189 else | |
190 { | |
191 // Only valid for running jobs | |
192 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
193 } | |
194 } | |
195 | |
196 bool IsPauseScheduled() | |
197 { | |
198 return pauseScheduled_; | |
199 } | |
200 | |
201 bool IsCancelScheduled() | |
202 { | |
203 return cancelScheduled_; | |
204 } | |
205 | |
206 bool IsRetryReady(const boost::posix_time::ptime& now) const | |
207 { | |
208 if (state_ != JobState_Retry) | |
209 { | |
210 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
211 } | |
212 else | |
213 { | |
214 return retryTime_ <= now; | |
215 } | |
216 } | |
217 | |
218 const boost::posix_time::ptime& GetCreationTime() const | |
219 { | |
220 return creationTime_; | |
221 } | |
222 | |
223 const boost::posix_time::ptime& GetLastStateChangeTime() const | |
224 { | |
225 return lastStateChangeTime_; | |
226 } | |
227 | |
228 void SetLastStateChangeTime(const boost::posix_time::ptime& time) | |
229 { | |
230 lastStateChangeTime_ = time; | |
231 } | |
232 | |
233 const boost::posix_time::time_duration& GetRuntime() const | |
234 { | |
235 return runtime_; | |
236 } | |
237 | |
238 const JobStatus& GetLastStatus() const | |
239 { | |
240 return lastStatus_; | |
241 } | |
242 | |
243 void SetLastStatus(const JobStatus& status) | |
244 { | |
245 lastStatus_ = status; | |
246 Touch(); | |
247 } | |
248 | |
249 void SetLastErrorCode(ErrorCode code) | |
250 { | |
251 lastStatus_.SetErrorCode(code); | |
252 } | |
253 | |
254 bool Serialize(Json::Value& target) const | |
255 { | |
256 target = Json::objectValue; | |
257 | |
258 bool ok; | |
259 | |
260 if (state_ == JobState_Running) | |
261 { | |
262 // WARNING: Cannot directly access the "job_" member, as long | |
263 // as a "RunningJob" instance is running. We do not use a | |
264 // mutex at the "JobHandler" level, as serialization would be | |
265 // blocked while a step in the job is running. Instead, we | |
266 // save a snapshot of the serialized job. (*) | |
267 | |
268 if (lastStatus_.HasSerialized()) | |
269 { | |
270 target[JOB] = lastStatus_.GetSerialized(); | |
271 ok = true; | |
272 } | |
273 else | |
274 { | |
275 ok = false; | |
276 } | |
277 } | |
278 else | |
279 { | |
280 ok = job_->Serialize(target[JOB]); | |
281 } | |
282 | |
283 if (ok) | |
284 { | |
285 target[STATE] = EnumerationToString(state_); | |
286 target[PRIORITY] = priority_; | |
287 target[CREATION_TIME] = boost::posix_time::to_iso_string(creationTime_); | |
288 target[LAST_CHANGE_TIME] = boost::posix_time::to_iso_string(lastStateChangeTime_); | |
289 target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds()); | |
290 return true; | |
291 } | |
292 else | |
293 { | |
294 VLOG(1) << "Job backup is not supported for job of type: " << jobType_; | |
295 return false; | |
296 } | |
297 } | |
298 | |
299 JobHandler(IJobUnserializer& unserializer, | |
300 const Json::Value& serialized, | |
301 const std::string& id) : | |
302 id_(id), | |
303 pauseScheduled_(false), | |
304 cancelScheduled_(false) | |
305 { | |
306 state_ = StringToJobState(SerializationToolbox::ReadString(serialized, STATE)); | |
307 priority_ = SerializationToolbox::ReadInteger(serialized, PRIORITY); | |
308 creationTime_ = boost::posix_time::from_iso_string | |
309 (SerializationToolbox::ReadString(serialized, CREATION_TIME)); | |
310 lastStateChangeTime_ = boost::posix_time::from_iso_string | |
311 (SerializationToolbox::ReadString(serialized, LAST_CHANGE_TIME)); | |
312 runtime_ = boost::posix_time::milliseconds | |
313 (SerializationToolbox::ReadInteger(serialized, RUNTIME)); | |
314 | |
315 retryTime_ = creationTime_; | |
316 | |
317 job_.reset(unserializer.UnserializeJob(serialized[JOB])); | |
318 job_->GetJobType(jobType_); | |
319 job_->Start(); | |
320 | |
321 lastStatus_ = JobStatus(ErrorCode_Success, "", *job_); | |
322 } | |
323 }; | |
324 | |
325 | |
326 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a, | |
327 JobHandler*& b) const | |
328 { | |
329 return a->GetPriority() < b->GetPriority(); | |
330 } | |
331 | |
332 | |
333 #if defined(NDEBUG) | |
334 void JobsRegistry::CheckInvariants() const | |
335 { | |
336 } | |
337 | |
338 #else | |
339 bool JobsRegistry::IsPendingJob(const JobHandler& job) const | |
340 { | |
341 PendingJobs copy = pendingJobs_; | |
342 while (!copy.empty()) | |
343 { | |
344 if (copy.top() == &job) | |
345 { | |
346 return true; | |
347 } | |
348 | |
349 copy.pop(); | |
350 } | |
351 | |
352 return false; | |
353 } | |
354 | |
355 bool JobsRegistry::IsCompletedJob(JobHandler& job) const | |
356 { | |
357 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
358 it != completedJobs_.end(); ++it) | |
359 { | |
360 if (*it == &job) | |
361 { | |
362 return true; | |
363 } | |
364 } | |
365 | |
366 return false; | |
367 } | |
368 | |
369 bool JobsRegistry::IsRetryJob(JobHandler& job) const | |
370 { | |
371 return retryJobs_.find(&job) != retryJobs_.end(); | |
372 } | |
373 | |
374 void JobsRegistry::CheckInvariants() const | |
375 { | |
376 { | |
377 PendingJobs copy = pendingJobs_; | |
378 while (!copy.empty()) | |
379 { | |
380 assert(copy.top()->GetState() == JobState_Pending); | |
381 copy.pop(); | |
382 } | |
383 } | |
384 | |
385 assert(completedJobs_.size() <= maxCompletedJobs_); | |
386 | |
387 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
388 it != completedJobs_.end(); ++it) | |
389 { | |
390 assert((*it)->GetState() == JobState_Success || | |
391 (*it)->GetState() == JobState_Failure); | |
392 } | |
393 | |
394 for (RetryJobs::const_iterator it = retryJobs_.begin(); | |
395 it != retryJobs_.end(); ++it) | |
396 { | |
397 assert((*it)->GetState() == JobState_Retry); | |
398 } | |
399 | |
400 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
401 it != jobsIndex_.end(); ++it) | |
402 { | |
403 JobHandler& job = *it->second; | |
404 | |
405 assert(job.GetId() == it->first); | |
406 | |
407 switch (job.GetState()) | |
408 { | |
409 case JobState_Pending: | |
410 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); | |
411 break; | |
412 | |
413 case JobState_Success: | |
414 case JobState_Failure: | |
415 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); | |
416 break; | |
417 | |
418 case JobState_Retry: | |
419 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
420 break; | |
421 | |
422 case JobState_Running: | |
423 case JobState_Paused: | |
424 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
425 break; | |
426 | |
427 default: | |
428 throw OrthancException(ErrorCode_InternalError); | |
429 } | |
430 } | |
431 } | |
432 #endif | |
433 | |
434 | |
435 void JobsRegistry::ForgetOldCompletedJobs() | |
436 { | |
437 while (completedJobs_.size() > maxCompletedJobs_) | |
438 { | |
439 assert(completedJobs_.front() != NULL); | |
440 | |
441 std::string id = completedJobs_.front()->GetId(); | |
442 assert(jobsIndex_.find(id) != jobsIndex_.end()); | |
443 | |
444 jobsIndex_.erase(id); | |
445 delete(completedJobs_.front()); | |
446 completedJobs_.pop_front(); | |
447 } | |
448 | |
449 CheckInvariants(); | |
450 } | |
451 | |
452 | |
453 void JobsRegistry::SetCompletedJob(JobHandler& job, | |
454 bool success) | |
455 { | |
456 job.SetState(success ? JobState_Success : JobState_Failure); | |
457 | |
458 completedJobs_.push_back(&job); | |
459 someJobComplete_.notify_all(); | |
460 } | |
461 | |
462 | |
463 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, | |
464 CompletedReason reason) | |
465 { | |
466 const char* tmp; | |
467 | |
468 switch (reason) | |
469 { | |
470 case CompletedReason_Success: | |
471 tmp = "success"; | |
472 break; | |
473 | |
474 case CompletedReason_Failure: | |
475 tmp = "failure"; | |
476 break; | |
477 | |
478 case CompletedReason_Canceled: | |
479 tmp = "cancel"; | |
480 break; | |
481 | |
482 default: | |
483 throw OrthancException(ErrorCode_InternalError); | |
484 } | |
485 | |
486 LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId(); | |
487 | |
488 CheckInvariants(); | |
489 | |
490 assert(job.GetState() == JobState_Running); | |
491 SetCompletedJob(job, reason == CompletedReason_Success); | |
492 | |
493 if (reason == CompletedReason_Canceled) | |
494 { | |
495 job.SetLastErrorCode(ErrorCode_CanceledJob); | |
496 } | |
497 | |
498 if (observer_ != NULL) | |
499 { | |
500 if (reason == CompletedReason_Success) | |
501 { | |
502 observer_->SignalJobSuccess(job.GetId()); | |
503 } | |
504 else | |
505 { | |
506 observer_->SignalJobFailure(job.GetId()); | |
507 } | |
508 } | |
509 | |
510 // WARNING: The following call might make "job" invalid if the job | |
511 // history size is empty | |
512 ForgetOldCompletedJobs(); | |
513 } | |
514 | |
515 | |
516 void JobsRegistry::MarkRunningAsRetry(JobHandler& job, | |
517 unsigned int timeout) | |
518 { | |
519 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); | |
520 | |
521 CheckInvariants(); | |
522 | |
523 assert(job.GetState() == JobState_Running && | |
524 retryJobs_.find(&job) == retryJobs_.end()); | |
525 | |
526 retryJobs_.insert(&job); | |
527 job.SetRetryState(timeout); | |
528 | |
529 CheckInvariants(); | |
530 } | |
531 | |
532 | |
533 void JobsRegistry::MarkRunningAsPaused(JobHandler& job) | |
534 { | |
535 LOG(INFO) << "Job paused: " << job.GetId(); | |
536 | |
537 CheckInvariants(); | |
538 assert(job.GetState() == JobState_Running); | |
539 | |
540 job.SetState(JobState_Paused); | |
541 | |
542 CheckInvariants(); | |
543 } | |
544 | |
545 | |
546 bool JobsRegistry::GetStateInternal(JobState& state, | |
547 const std::string& id) | |
548 { | |
549 CheckInvariants(); | |
550 | |
551 JobsIndex::const_iterator it = jobsIndex_.find(id); | |
552 if (it == jobsIndex_.end()) | |
553 { | |
554 return false; | |
555 } | |
556 else | |
557 { | |
558 state = it->second->GetState(); | |
559 return true; | |
560 } | |
561 } | |
562 | |
563 | |
564 JobsRegistry::~JobsRegistry() | |
565 { | |
566 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) | |
567 { | |
568 assert(it->second != NULL); | |
569 delete it->second; | |
570 } | |
571 } | |
572 | |
573 | |
574 void JobsRegistry::SetMaxCompletedJobs(size_t n) | |
575 { | |
576 boost::mutex::scoped_lock lock(mutex_); | |
577 CheckInvariants(); | |
578 | |
579 LOG(INFO) << "The size of the history of the jobs engine is set to: " << n << " job(s)"; | |
580 | |
581 maxCompletedJobs_ = n; | |
582 ForgetOldCompletedJobs(); | |
583 } | |
584 | |
585 | |
586 size_t JobsRegistry::GetMaxCompletedJobs() | |
587 { | |
588 boost::mutex::scoped_lock lock(mutex_); | |
589 CheckInvariants(); | |
590 return maxCompletedJobs_; | |
591 } | |
592 | |
593 | |
594 void JobsRegistry::ListJobs(std::set<std::string>& target) | |
595 { | |
596 boost::mutex::scoped_lock lock(mutex_); | |
597 CheckInvariants(); | |
598 | |
599 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
600 it != jobsIndex_.end(); ++it) | |
601 { | |
602 target.insert(it->first); | |
603 } | |
604 } | |
605 | |
606 | |
607 bool JobsRegistry::GetJobInfo(JobInfo& target, | |
608 const std::string& id) | |
609 { | |
610 boost::mutex::scoped_lock lock(mutex_); | |
611 CheckInvariants(); | |
612 | |
613 JobsIndex::const_iterator found = jobsIndex_.find(id); | |
614 | |
615 if (found == jobsIndex_.end()) | |
616 { | |
617 return false; | |
618 } | |
619 else | |
620 { | |
621 const JobHandler& handler = *found->second; | |
622 target = JobInfo(handler.GetId(), | |
623 handler.GetPriority(), | |
624 handler.GetState(), | |
625 handler.GetLastStatus(), | |
626 handler.GetCreationTime(), | |
627 handler.GetLastStateChangeTime(), | |
628 handler.GetRuntime()); | |
629 return true; | |
630 } | |
631 } | |
632 | |
633 | |
634 bool JobsRegistry::GetJobOutput(std::string& output, | |
635 MimeType& mime, | |
636 const std::string& job, | |
637 const std::string& key) | |
638 { | |
639 boost::mutex::scoped_lock lock(mutex_); | |
640 CheckInvariants(); | |
641 | |
642 JobsIndex::const_iterator found = jobsIndex_.find(job); | |
643 | |
644 if (found == jobsIndex_.end()) | |
645 { | |
646 return false; | |
647 } | |
648 else | |
649 { | |
650 const JobHandler& handler = *found->second; | |
651 | |
652 if (handler.GetState() == JobState_Success) | |
653 { | |
654 return handler.GetJob().GetOutput(output, mime, key); | |
655 } | |
656 else | |
657 { | |
658 return false; | |
659 } | |
660 } | |
661 } | |
662 | |
663 | |
664 void JobsRegistry::SubmitInternal(std::string& id, | |
665 JobHandler* handler) | |
666 { | |
667 if (handler == NULL) | |
668 { | |
669 throw OrthancException(ErrorCode_NullPointer); | |
670 } | |
671 | |
672 std::unique_ptr<JobHandler> protection(handler); | |
673 | |
674 { | |
675 boost::mutex::scoped_lock lock(mutex_); | |
676 CheckInvariants(); | |
677 | |
678 id = handler->GetId(); | |
679 int priority = handler->GetPriority(); | |
680 | |
681 jobsIndex_.insert(std::make_pair(id, protection.release())); | |
682 | |
683 switch (handler->GetState()) | |
684 { | |
685 case JobState_Pending: | |
686 case JobState_Retry: | |
687 case JobState_Running: | |
688 handler->SetState(JobState_Pending); | |
689 pendingJobs_.push(handler); | |
690 pendingJobAvailable_.notify_one(); | |
691 break; | |
692 | |
693 case JobState_Success: | |
694 SetCompletedJob(*handler, true); | |
695 break; | |
696 | |
697 case JobState_Failure: | |
698 SetCompletedJob(*handler, false); | |
699 break; | |
700 | |
701 case JobState_Paused: | |
702 break; | |
703 | |
704 default: | |
705 { | |
706 std::string details = ("A job should not be loaded from state: " + | |
707 std::string(EnumerationToString(handler->GetState()))); | |
708 throw OrthancException(ErrorCode_InternalError, details); | |
709 } | |
710 } | |
711 | |
712 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; | |
713 | |
714 if (observer_ != NULL) | |
715 { | |
716 observer_->SignalJobSubmitted(id); | |
717 } | |
718 | |
719 // WARNING: The following call might make "handler" invalid if | |
720 // the job history size is empty | |
721 ForgetOldCompletedJobs(); | |
722 } | |
723 } | |
724 | |
725 | |
726 void JobsRegistry::Submit(std::string& id, | |
727 IJob* job, // Takes ownership | |
728 int priority) | |
729 { | |
730 SubmitInternal(id, new JobHandler(job, priority)); | |
731 } | |
732 | |
733 | |
734 void JobsRegistry::Submit(IJob* job, // Takes ownership | |
735 int priority) | |
736 { | |
737 std::string id; | |
738 SubmitInternal(id, new JobHandler(job, priority)); | |
739 } | |
740 | |
741 | |
742 void JobsRegistry::SubmitAndWait(Json::Value& successContent, | |
743 IJob* job, // Takes ownership | |
744 int priority) | |
745 { | |
746 std::string id; | |
747 Submit(id, job, priority); | |
748 | |
749 JobState state = JobState_Pending; // Dummy initialization | |
750 | |
751 { | |
752 boost::mutex::scoped_lock lock(mutex_); | |
753 | |
754 for (;;) | |
755 { | |
756 if (!GetStateInternal(state, id)) | |
757 { | |
758 // Job has finished and has been lost (typically happens if | |
759 // "JobsHistorySize" is 0) | |
760 throw OrthancException(ErrorCode_InexistentItem, | |
761 "Cannot retrieve the status of the job, " | |
762 "make sure that \"JobsHistorySize\" is not 0"); | |
763 } | |
764 else if (state == JobState_Failure) | |
765 { | |
766 // Failure | |
767 JobsIndex::const_iterator it = jobsIndex_.find(id); | |
768 if (it != jobsIndex_.end()) // Should always be true, already tested in GetStateInternal() | |
769 { | |
770 ErrorCode code = it->second->GetLastStatus().GetErrorCode(); | |
771 const std::string& details = it->second->GetLastStatus().GetDetails(); | |
772 | |
773 if (details.empty()) | |
774 { | |
775 throw OrthancException(code); | |
776 } | |
777 else | |
778 { | |
779 throw OrthancException(code, details); | |
780 } | |
781 } | |
782 else | |
783 { | |
784 throw OrthancException(ErrorCode_InternalError); | |
785 } | |
786 } | |
787 else if (state == JobState_Success) | |
788 { | |
789 // Success, try and retrieve the status of the job | |
790 JobsIndex::const_iterator it = jobsIndex_.find(id); | |
791 if (it == jobsIndex_.end()) | |
792 { | |
793 // Should not happen | |
794 state = JobState_Failure; | |
795 } | |
796 else | |
797 { | |
798 const JobStatus& status = it->second->GetLastStatus(); | |
799 successContent = status.GetPublicContent(); | |
800 } | |
801 | |
802 return; | |
803 } | |
804 else | |
805 { | |
806 // This job has not finished yet, wait for new completion | |
807 someJobComplete_.wait(lock); | |
808 } | |
809 } | |
810 } | |
811 } | |
812 | |
813 | |
814 bool JobsRegistry::SetPriority(const std::string& id, | |
815 int priority) | |
816 { | |
817 LOG(INFO) << "Changing priority to " << priority << " for job: " << id; | |
818 | |
819 boost::mutex::scoped_lock lock(mutex_); | |
820 CheckInvariants(); | |
821 | |
822 JobsIndex::iterator found = jobsIndex_.find(id); | |
823 | |
824 if (found == jobsIndex_.end()) | |
825 { | |
826 LOG(WARNING) << "Unknown job: " << id; | |
827 return false; | |
828 } | |
829 else | |
830 { | |
831 found->second->SetPriority(priority); | |
832 | |
833 if (found->second->GetState() == JobState_Pending) | |
834 { | |
835 // If the job is pending, we need to reconstruct the | |
836 // priority queue, as the heap condition has changed | |
837 | |
838 PendingJobs copy; | |
839 std::swap(copy, pendingJobs_); | |
840 | |
841 assert(pendingJobs_.empty()); | |
842 while (!copy.empty()) | |
843 { | |
844 pendingJobs_.push(copy.top()); | |
845 copy.pop(); | |
846 } | |
847 } | |
848 | |
849 CheckInvariants(); | |
850 return true; | |
851 } | |
852 } | |
853 | |
854 | |
855 void JobsRegistry::RemovePendingJob(const std::string& id) | |
856 { | |
857 // If the job is pending, we need to reconstruct the priority | |
858 // queue to remove it | |
859 PendingJobs copy; | |
860 std::swap(copy, pendingJobs_); | |
861 | |
862 assert(pendingJobs_.empty()); | |
863 while (!copy.empty()) | |
864 { | |
865 if (copy.top()->GetId() != id) | |
866 { | |
867 pendingJobs_.push(copy.top()); | |
868 } | |
869 | |
870 copy.pop(); | |
871 } | |
872 } | |
873 | |
874 | |
875 void JobsRegistry::RemoveRetryJob(JobHandler* handler) | |
876 { | |
877 RetryJobs::iterator item = retryJobs_.find(handler); | |
878 assert(item != retryJobs_.end()); | |
879 retryJobs_.erase(item); | |
880 } | |
881 | |
882 | |
883 bool JobsRegistry::Pause(const std::string& id) | |
884 { | |
885 LOG(INFO) << "Pausing job: " << id; | |
886 | |
887 boost::mutex::scoped_lock lock(mutex_); | |
888 CheckInvariants(); | |
889 | |
890 JobsIndex::iterator found = jobsIndex_.find(id); | |
891 | |
892 if (found == jobsIndex_.end()) | |
893 { | |
894 LOG(WARNING) << "Unknown job: " << id; | |
895 return false; | |
896 } | |
897 else | |
898 { | |
899 switch (found->second->GetState()) | |
900 { | |
901 case JobState_Pending: | |
902 RemovePendingJob(id); | |
903 found->second->SetState(JobState_Paused); | |
904 break; | |
905 | |
906 case JobState_Retry: | |
907 RemoveRetryJob(found->second); | |
908 found->second->SetState(JobState_Paused); | |
909 break; | |
910 | |
911 case JobState_Paused: | |
912 case JobState_Success: | |
913 case JobState_Failure: | |
914 // Nothing to be done | |
915 break; | |
916 | |
917 case JobState_Running: | |
918 found->second->SchedulePause(); | |
919 break; | |
920 | |
921 default: | |
922 throw OrthancException(ErrorCode_InternalError); | |
923 } | |
924 | |
925 CheckInvariants(); | |
926 return true; | |
927 } | |
928 } | |
929 | |
930 | |
931 bool JobsRegistry::Cancel(const std::string& id) | |
932 { | |
933 LOG(INFO) << "Canceling job: " << id; | |
934 | |
935 boost::mutex::scoped_lock lock(mutex_); | |
936 CheckInvariants(); | |
937 | |
938 JobsIndex::iterator found = jobsIndex_.find(id); | |
939 | |
940 if (found == jobsIndex_.end()) | |
941 { | |
942 LOG(WARNING) << "Unknown job: " << id; | |
943 return false; | |
944 } | |
945 else | |
946 { | |
947 switch (found->second->GetState()) | |
948 { | |
949 case JobState_Pending: | |
950 RemovePendingJob(id); | |
951 SetCompletedJob(*found->second, false); | |
952 found->second->SetLastErrorCode(ErrorCode_CanceledJob); | |
953 break; | |
954 | |
955 case JobState_Retry: | |
956 RemoveRetryJob(found->second); | |
957 SetCompletedJob(*found->second, false); | |
958 found->second->SetLastErrorCode(ErrorCode_CanceledJob); | |
959 break; | |
960 | |
961 case JobState_Paused: | |
962 SetCompletedJob(*found->second, false); | |
963 found->second->SetLastErrorCode(ErrorCode_CanceledJob); | |
964 break; | |
965 | |
966 case JobState_Success: | |
967 case JobState_Failure: | |
968 // Nothing to be done | |
969 break; | |
970 | |
971 case JobState_Running: | |
972 found->second->ScheduleCancel(); | |
973 break; | |
974 | |
975 default: | |
976 throw OrthancException(ErrorCode_InternalError); | |
977 } | |
978 | |
979 // WARNING: The following call might make "handler" invalid if | |
980 // the job history size is empty | |
981 ForgetOldCompletedJobs(); | |
982 | |
983 return true; | |
984 } | |
985 } | |
986 | |
987 | |
988 bool JobsRegistry::Resume(const std::string& id) | |
989 { | |
990 LOG(INFO) << "Resuming job: " << id; | |
991 | |
992 boost::mutex::scoped_lock lock(mutex_); | |
993 CheckInvariants(); | |
994 | |
995 JobsIndex::iterator found = jobsIndex_.find(id); | |
996 | |
997 if (found == jobsIndex_.end()) | |
998 { | |
999 LOG(WARNING) << "Unknown job: " << id; | |
1000 return false; | |
1001 } | |
1002 else if (found->second->GetState() != JobState_Paused) | |
1003 { | |
1004 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; | |
1005 return false; | |
1006 } | |
1007 else | |
1008 { | |
1009 found->second->SetState(JobState_Pending); | |
1010 pendingJobs_.push(found->second); | |
1011 pendingJobAvailable_.notify_one(); | |
1012 CheckInvariants(); | |
1013 return true; | |
1014 } | |
1015 } | |
1016 | |
1017 | |
1018 bool JobsRegistry::Resubmit(const std::string& id) | |
1019 { | |
1020 LOG(INFO) << "Resubmitting failed job: " << id; | |
1021 | |
1022 boost::mutex::scoped_lock lock(mutex_); | |
1023 CheckInvariants(); | |
1024 | |
1025 JobsIndex::iterator found = jobsIndex_.find(id); | |
1026 | |
1027 if (found == jobsIndex_.end()) | |
1028 { | |
1029 LOG(WARNING) << "Unknown job: " << id; | |
1030 return false; | |
1031 } | |
1032 else if (found->second->GetState() != JobState_Failure) | |
1033 { | |
1034 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; | |
1035 return false; | |
1036 } | |
1037 else | |
1038 { | |
1039 found->second->GetJob().Reset(); | |
1040 | |
1041 bool ok = false; | |
1042 for (CompletedJobs::iterator it = completedJobs_.begin(); | |
1043 it != completedJobs_.end(); ++it) | |
1044 { | |
1045 if (*it == found->second) | |
1046 { | |
1047 ok = true; | |
1048 completedJobs_.erase(it); | |
1049 break; | |
1050 } | |
1051 } | |
1052 | |
1053 assert(ok); | |
1054 | |
1055 found->second->SetState(JobState_Pending); | |
1056 pendingJobs_.push(found->second); | |
1057 pendingJobAvailable_.notify_one(); | |
1058 | |
1059 CheckInvariants(); | |
1060 return true; | |
1061 } | |
1062 } | |
1063 | |
1064 | |
1065 void JobsRegistry::ScheduleRetries() | |
1066 { | |
1067 boost::mutex::scoped_lock lock(mutex_); | |
1068 CheckInvariants(); | |
1069 | |
1070 RetryJobs copy; | |
1071 std::swap(copy, retryJobs_); | |
1072 | |
1073 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
1074 | |
1075 assert(retryJobs_.empty()); | |
1076 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) | |
1077 { | |
1078 if ((*it)->IsRetryReady(now)) | |
1079 { | |
1080 LOG(INFO) << "Retrying job: " << (*it)->GetId(); | |
1081 (*it)->SetState(JobState_Pending); | |
1082 pendingJobs_.push(*it); | |
1083 pendingJobAvailable_.notify_one(); | |
1084 } | |
1085 else | |
1086 { | |
1087 retryJobs_.insert(*it); | |
1088 } | |
1089 } | |
1090 | |
1091 CheckInvariants(); | |
1092 } | |
1093 | |
1094 | |
1095 bool JobsRegistry::GetState(JobState& state, | |
1096 const std::string& id) | |
1097 { | |
1098 boost::mutex::scoped_lock lock(mutex_); | |
1099 return GetStateInternal(state, id); | |
1100 } | |
1101 | |
1102 | |
1103 void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer) | |
1104 { | |
1105 boost::mutex::scoped_lock lock(mutex_); | |
1106 observer_ = &observer; | |
1107 } | |
1108 | |
1109 | |
1110 void JobsRegistry::ResetObserver() | |
1111 { | |
1112 boost::mutex::scoped_lock lock(mutex_); | |
1113 observer_ = NULL; | |
1114 } | |
1115 | |
1116 | |
1117 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, | |
1118 unsigned int timeout) : | |
1119 registry_(registry), | |
1120 handler_(NULL), | |
1121 targetState_(JobState_Failure), | |
1122 targetRetryTimeout_(0), | |
1123 canceled_(false) | |
1124 { | |
1125 { | |
1126 boost::mutex::scoped_lock lock(registry_.mutex_); | |
1127 | |
1128 while (registry_.pendingJobs_.empty()) | |
1129 { | |
1130 if (timeout == 0) | |
1131 { | |
1132 registry_.pendingJobAvailable_.wait(lock); | |
1133 } | |
1134 else | |
1135 { | |
1136 bool success = registry_.pendingJobAvailable_.timed_wait | |
1137 (lock, boost::posix_time::milliseconds(timeout)); | |
1138 if (!success) | |
1139 { | |
1140 // No pending job | |
1141 return; | |
1142 } | |
1143 } | |
1144 } | |
1145 | |
1146 handler_ = registry_.pendingJobs_.top(); | |
1147 registry_.pendingJobs_.pop(); | |
1148 | |
1149 assert(handler_->GetState() == JobState_Pending); | |
1150 handler_->SetState(JobState_Running); | |
1151 handler_->SetLastErrorCode(ErrorCode_Success); | |
1152 | |
1153 job_ = &handler_->GetJob(); | |
1154 id_ = handler_->GetId(); | |
1155 priority_ = handler_->GetPriority(); | |
1156 } | |
1157 } | |
1158 | |
1159 | |
1160 JobsRegistry::RunningJob::~RunningJob() | |
1161 { | |
1162 if (IsValid()) | |
1163 { | |
1164 boost::mutex::scoped_lock lock(registry_.mutex_); | |
1165 | |
1166 switch (targetState_) | |
1167 { | |
1168 case JobState_Failure: | |
1169 registry_.MarkRunningAsCompleted | |
1170 (*handler_, canceled_ ? CompletedReason_Canceled : CompletedReason_Failure); | |
1171 break; | |
1172 | |
1173 case JobState_Success: | |
1174 registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success); | |
1175 break; | |
1176 | |
1177 case JobState_Paused: | |
1178 registry_.MarkRunningAsPaused(*handler_); | |
1179 break; | |
1180 | |
1181 case JobState_Retry: | |
1182 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); | |
1183 break; | |
1184 | |
1185 default: | |
1186 assert(0); | |
1187 } | |
1188 } | |
1189 } | |
1190 | |
1191 | |
1192 bool JobsRegistry::RunningJob::IsValid() const | |
1193 { | |
1194 return (handler_ != NULL && | |
1195 job_ != NULL); | |
1196 } | |
1197 | |
1198 | |
1199 const std::string& JobsRegistry::RunningJob::GetId() const | |
1200 { | |
1201 if (!IsValid()) | |
1202 { | |
1203 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1204 } | |
1205 else | |
1206 { | |
1207 return id_; | |
1208 } | |
1209 } | |
1210 | |
1211 | |
1212 int JobsRegistry::RunningJob::GetPriority() const | |
1213 { | |
1214 if (!IsValid()) | |
1215 { | |
1216 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1217 } | |
1218 else | |
1219 { | |
1220 return priority_; | |
1221 } | |
1222 } | |
1223 | |
1224 | |
1225 IJob& JobsRegistry::RunningJob::GetJob() | |
1226 { | |
1227 if (!IsValid()) | |
1228 { | |
1229 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1230 } | |
1231 else | |
1232 { | |
1233 return *job_; | |
1234 } | |
1235 } | |
1236 | |
1237 | |
1238 bool JobsRegistry::RunningJob::IsPauseScheduled() | |
1239 { | |
1240 if (!IsValid()) | |
1241 { | |
1242 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1243 } | |
1244 else | |
1245 { | |
1246 boost::mutex::scoped_lock lock(registry_.mutex_); | |
1247 registry_.CheckInvariants(); | |
1248 assert(handler_->GetState() == JobState_Running); | |
1249 | |
1250 return handler_->IsPauseScheduled(); | |
1251 } | |
1252 } | |
1253 | |
1254 | |
1255 bool JobsRegistry::RunningJob::IsCancelScheduled() | |
1256 { | |
1257 if (!IsValid()) | |
1258 { | |
1259 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1260 } | |
1261 else | |
1262 { | |
1263 boost::mutex::scoped_lock lock(registry_.mutex_); | |
1264 registry_.CheckInvariants(); | |
1265 assert(handler_->GetState() == JobState_Running); | |
1266 | |
1267 return handler_->IsCancelScheduled(); | |
1268 } | |
1269 } | |
1270 | |
1271 | |
1272 void JobsRegistry::RunningJob::MarkSuccess() | |
1273 { | |
1274 if (!IsValid()) | |
1275 { | |
1276 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1277 } | |
1278 else | |
1279 { | |
1280 targetState_ = JobState_Success; | |
1281 } | |
1282 } | |
1283 | |
1284 | |
1285 void JobsRegistry::RunningJob::MarkFailure() | |
1286 { | |
1287 if (!IsValid()) | |
1288 { | |
1289 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1290 } | |
1291 else | |
1292 { | |
1293 targetState_ = JobState_Failure; | |
1294 } | |
1295 } | |
1296 | |
1297 | |
1298 void JobsRegistry::RunningJob::MarkCanceled() | |
1299 { | |
1300 if (!IsValid()) | |
1301 { | |
1302 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1303 } | |
1304 else | |
1305 { | |
1306 targetState_ = JobState_Failure; | |
1307 canceled_ = true; | |
1308 } | |
1309 } | |
1310 | |
1311 | |
1312 void JobsRegistry::RunningJob::MarkPause() | |
1313 { | |
1314 if (!IsValid()) | |
1315 { | |
1316 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1317 } | |
1318 else | |
1319 { | |
1320 targetState_ = JobState_Paused; | |
1321 } | |
1322 } | |
1323 | |
1324 | |
1325 void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout) | |
1326 { | |
1327 if (!IsValid()) | |
1328 { | |
1329 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1330 } | |
1331 else | |
1332 { | |
1333 targetState_ = JobState_Retry; | |
1334 targetRetryTimeout_ = timeout; | |
1335 } | |
1336 } | |
1337 | |
1338 | |
1339 void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code, | |
1340 const std::string& details) | |
1341 { | |
1342 if (!IsValid()) | |
1343 { | |
1344 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1345 } | |
1346 else | |
1347 { | |
1348 JobStatus status(code, details, *job_); | |
1349 | |
1350 boost::mutex::scoped_lock lock(registry_.mutex_); | |
1351 registry_.CheckInvariants(); | |
1352 assert(handler_->GetState() == JobState_Running); | |
1353 | |
1354 handler_->SetLastStatus(status); | |
1355 } | |
1356 } | |
1357 | |
1358 | |
1359 | |
1360 void JobsRegistry::Serialize(Json::Value& target) | |
1361 { | |
1362 boost::mutex::scoped_lock lock(mutex_); | |
1363 CheckInvariants(); | |
1364 | |
1365 target = Json::objectValue; | |
1366 target[TYPE] = JOBS_REGISTRY; | |
1367 target[JOBS] = Json::objectValue; | |
1368 | |
1369 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
1370 it != jobsIndex_.end(); ++it) | |
1371 { | |
1372 Json::Value v; | |
1373 if (it->second->Serialize(v)) | |
1374 { | |
1375 target[JOBS][it->first] = v; | |
1376 } | |
1377 } | |
1378 } | |
1379 | |
1380 | |
1381 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, | |
1382 const Json::Value& s, | |
1383 size_t maxCompletedJobs) : | |
1384 maxCompletedJobs_(maxCompletedJobs), | |
1385 observer_(NULL) | |
1386 { | |
1387 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || | |
1388 !s.isMember(JOBS) || | |
1389 s[JOBS].type() != Json::objectValue) | |
1390 { | |
1391 throw OrthancException(ErrorCode_BadFileFormat); | |
1392 } | |
1393 | |
1394 Json::Value::Members members = s[JOBS].getMemberNames(); | |
1395 | |
1396 for (Json::Value::Members::const_iterator it = members.begin(); | |
1397 it != members.end(); ++it) | |
1398 { | |
1399 std::unique_ptr<JobHandler> job; | |
1400 | |
1401 try | |
1402 { | |
1403 job.reset(new JobHandler(unserializer, s[JOBS][*it], *it)); | |
1404 } | |
1405 catch (OrthancException& e) | |
1406 { | |
1407 LOG(WARNING) << "Cannot unserialize one job from previous execution, " | |
1408 << "skipping it: " << e.What(); | |
1409 continue; | |
1410 } | |
1411 | |
1412 const boost::posix_time::ptime lastChangeTime = job->GetLastStateChangeTime(); | |
1413 | |
1414 std::string id; | |
1415 SubmitInternal(id, job.release()); | |
1416 | |
1417 // Check whether the job has not been removed (which could be | |
1418 // the case if the "maxCompletedJobs_" value gets smaller) | |
1419 JobsIndex::iterator found = jobsIndex_.find(id); | |
1420 if (found != jobsIndex_.end()) | |
1421 { | |
1422 // The job still lies in the history: Update the time of its | |
1423 // last change to the time that was serialized | |
1424 assert(found->second != NULL); | |
1425 found->second->SetLastStateChangeTime(lastChangeTime); | |
1426 } | |
1427 } | |
1428 } | |
1429 | |
1430 | |
1431 void JobsRegistry::GetStatistics(unsigned int& pending, | |
1432 unsigned int& running, | |
1433 unsigned int& success, | |
1434 unsigned int& failed) | |
1435 { | |
1436 boost::mutex::scoped_lock lock(mutex_); | |
1437 CheckInvariants(); | |
1438 | |
1439 pending = 0; | |
1440 running = 0; | |
1441 success = 0; | |
1442 failed = 0; | |
1443 | |
1444 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
1445 it != jobsIndex_.end(); ++it) | |
1446 { | |
1447 JobHandler& job = *it->second; | |
1448 | |
1449 switch (job.GetState()) | |
1450 { | |
1451 case JobState_Retry: | |
1452 case JobState_Pending: | |
1453 pending ++; | |
1454 break; | |
1455 | |
1456 case JobState_Paused: | |
1457 case JobState_Running: | |
1458 running ++; | |
1459 break; | |
1460 | |
1461 case JobState_Success: | |
1462 success ++; | |
1463 break; | |
1464 | |
1465 case JobState_Failure: | |
1466 failed ++; | |
1467 break; | |
1468 | |
1469 default: | |
1470 throw OrthancException(ErrorCode_InternalError); | |
1471 } | |
1472 } | |
1473 } | |
1474 } |