comparison Core/JobsEngine/JobsRegistry.cpp @ 2569:2af17cd5eb1f jobs

reorganization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 07 May 2018 15:37:20 +0200
parents
children 2e879c796ec7
comparison
equal deleted inserted replaced
2568:a46094602346 2569:2af17cd5eb1f
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation, either version 3 of the
10 * License, or (at your option) any later version.
11 *
12 * In addition, as a special exception, the copyright holders of this
13 * program give permission to link the code of its release with the
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it
15 * that use the same license as the "OpenSSL" library), and distribute
16 * the linked executables. You must obey the GNU General Public License
17 * in all respects for all of the code used other than "OpenSSL". If you
18 * modify file(s) with this exception, you may extend this exception to
19 * your version of the file(s), but you are not obligated to do so. If
20 * you do not wish to do so, delete this exception statement from your
21 * version. If you delete this exception statement from all source files
22 * in the program, then also delete it here.
23 *
24 * This program is distributed in the hope that it will be useful, but
25 * WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27 * General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program. If not, see <http://www.gnu.org/licenses/>.
31 **/
32
33
34 #include "../PrecompiledHeaders.h"
35 #include "JobsRegistry.h"
36
37 #include "../Logging.h"
38 #include "../OrthancException.h"
39 #include "../Toolbox.h"
40
41 namespace Orthanc
42 {
43 class JobsRegistry::JobHandler : public boost::noncopyable
44 {
45 private:
46 std::string id_;
47 JobState state_;
48 std::auto_ptr<IJob> job_;
49 int priority_; // "+inf()" means highest priority
50 boost::posix_time::ptime creationTime_;
51 boost::posix_time::ptime lastStateChangeTime_;
52 boost::posix_time::time_duration runtime_;
53 boost::posix_time::ptime retryTime_;
54 bool pauseScheduled_;
55 JobStatus lastStatus_;
56
57 void Touch()
58 {
59 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
60
61 if (state_ == JobState_Running)
62 {
63 runtime_ += (now - lastStateChangeTime_);
64 }
65
66 lastStateChangeTime_ = now;
67 }
68
69 void SetStateInternal(JobState state)
70 {
71 state_ = state;
72 pauseScheduled_ = false;
73 Touch();
74 }
75
76 public:
77 JobHandler(IJob* job,
78 int priority) :
79 id_(Toolbox::GenerateUuid()),
80 state_(JobState_Pending),
81 job_(job),
82 priority_(priority),
83 creationTime_(boost::posix_time::microsec_clock::universal_time()),
84 lastStateChangeTime_(creationTime_),
85 runtime_(boost::posix_time::milliseconds(0)),
86 retryTime_(creationTime_),
87 pauseScheduled_(false)
88 {
89 if (job == NULL)
90 {
91 throw OrthancException(ErrorCode_NullPointer);
92 }
93
94 lastStatus_ = JobStatus(ErrorCode_Success, *job);
95 }
96
97 const std::string& GetId() const
98 {
99 return id_;
100 }
101
102 IJob& GetJob() const
103 {
104 assert(job_.get() != NULL);
105 return *job_;
106 }
107
108 void SetPriority(int priority)
109 {
110 priority_ = priority;
111 }
112
113 int GetPriority() const
114 {
115 return priority_;
116 }
117
118 JobState GetState() const
119 {
120 return state_;
121 }
122
123 void SetState(JobState state)
124 {
125 if (state == JobState_Retry)
126 {
127 // Use "SetRetryState()"
128 throw OrthancException(ErrorCode_BadSequenceOfCalls);
129 }
130 else
131 {
132 SetStateInternal(state);
133 }
134 }
135
136 void SetRetryState(unsigned int timeout)
137 {
138 if (state_ == JobState_Running)
139 {
140 SetStateInternal(JobState_Retry);
141 retryTime_ = (boost::posix_time::microsec_clock::universal_time() +
142 boost::posix_time::milliseconds(timeout));
143 }
144 else
145 {
146 // Only valid for running jobs
147 throw OrthancException(ErrorCode_BadSequenceOfCalls);
148 }
149 }
150
151 void SchedulePause()
152 {
153 if (state_ == JobState_Running)
154 {
155 pauseScheduled_ = true;
156 }
157 else
158 {
159 // Only valid for running jobs
160 throw OrthancException(ErrorCode_BadSequenceOfCalls);
161 }
162 }
163
164 bool IsPauseScheduled()
165 {
166 return pauseScheduled_;
167 }
168
169 bool IsRetryReady(const boost::posix_time::ptime& now) const
170 {
171 if (state_ != JobState_Retry)
172 {
173 throw OrthancException(ErrorCode_BadSequenceOfCalls);
174 }
175 else
176 {
177 return retryTime_ <= now;
178 }
179 }
180
181 const boost::posix_time::ptime& GetCreationTime() const
182 {
183 return creationTime_;
184 }
185
186 const boost::posix_time::ptime& GetLastStateChangeTime() const
187 {
188 return lastStateChangeTime_;
189 }
190
191 const boost::posix_time::time_duration& GetRuntime() const
192 {
193 return runtime_;
194 }
195
196 const JobStatus& GetLastStatus() const
197 {
198 return lastStatus_;
199 }
200
201 void SetLastStatus(const JobStatus& status)
202 {
203 lastStatus_ = status;
204 Touch();
205 }
206 };
207
208
209 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a,
210 JobHandler*& b) const
211 {
212 return a->GetPriority() < b->GetPriority();
213 }
214
215
216 #if defined(NDEBUG)
217 void JobsRegistry::CheckInvariants() const
218 {
219 }
220
221 #else
222 bool JobsRegistry::IsPendingJob(const JobHandler& job) const
223 {
224 PendingJobs copy = pendingJobs_;
225 while (!copy.empty())
226 {
227 if (copy.top() == &job)
228 {
229 return true;
230 }
231
232 copy.pop();
233 }
234
235 return false;
236 }
237
238 bool JobsRegistry::IsCompletedJob(JobHandler& job) const
239 {
240 for (CompletedJobs::const_iterator it = completedJobs_.begin();
241 it != completedJobs_.end(); ++it)
242 {
243 if (*it == &job)
244 {
245 return true;
246 }
247 }
248
249 return false;
250 }
251
252 bool JobsRegistry::IsRetryJob(JobHandler& job) const
253 {
254 return retryJobs_.find(&job) != retryJobs_.end();
255 }
256
257 void JobsRegistry::CheckInvariants() const
258 {
259 {
260 PendingJobs copy = pendingJobs_;
261 while (!copy.empty())
262 {
263 assert(copy.top()->GetState() == JobState_Pending);
264 copy.pop();
265 }
266 }
267
268 assert(completedJobs_.size() <= maxCompletedJobs_);
269
270 for (CompletedJobs::const_iterator it = completedJobs_.begin();
271 it != completedJobs_.end(); ++it)
272 {
273 assert((*it)->GetState() == JobState_Success ||
274 (*it)->GetState() == JobState_Failure);
275 }
276
277 for (RetryJobs::const_iterator it = retryJobs_.begin();
278 it != retryJobs_.end(); ++it)
279 {
280 assert((*it)->GetState() == JobState_Retry);
281 }
282
283 for (JobsIndex::const_iterator it = jobsIndex_.begin();
284 it != jobsIndex_.end(); ++it)
285 {
286 JobHandler& job = *it->second;
287
288 assert(job.GetId() == it->first);
289
290 switch (job.GetState())
291 {
292 case JobState_Pending:
293 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
294 break;
295
296 case JobState_Success:
297 case JobState_Failure:
298 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
299 break;
300
301 case JobState_Retry:
302 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
303 break;
304
305 case JobState_Running:
306 case JobState_Paused:
307 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
308 break;
309
310 default:
311 throw OrthancException(ErrorCode_InternalError);
312 }
313 }
314 }
315 #endif
316
317
318 void JobsRegistry::ForgetOldCompletedJobs()
319 {
320 if (maxCompletedJobs_ != 0)
321 {
322 while (completedJobs_.size() > maxCompletedJobs_)
323 {
324 assert(completedJobs_.front() != NULL);
325
326 std::string id = completedJobs_.front()->GetId();
327 assert(jobsIndex_.find(id) != jobsIndex_.end());
328
329 jobsIndex_.erase(id);
330 delete(completedJobs_.front());
331 completedJobs_.pop_front();
332 }
333 }
334 }
335
336
337 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job,
338 bool success)
339 {
340 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
341 << ": " << job.GetId();
342
343 CheckInvariants();
344 assert(job.GetState() == JobState_Running);
345
346 job.SetState(success ? JobState_Success : JobState_Failure);
347
348 completedJobs_.push_back(&job);
349 ForgetOldCompletedJobs();
350
351 CheckInvariants();
352 }
353
354
355 void JobsRegistry::MarkRunningAsRetry(JobHandler& job,
356 unsigned int timeout)
357 {
358 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();
359
360 CheckInvariants();
361
362 assert(job.GetState() == JobState_Running &&
363 retryJobs_.find(&job) == retryJobs_.end());
364
365 retryJobs_.insert(&job);
366 job.SetRetryState(timeout);
367
368 CheckInvariants();
369 }
370
371
372 void JobsRegistry::MarkRunningAsPaused(JobHandler& job)
373 {
374 LOG(INFO) << "Job paused: " << job.GetId();
375
376 CheckInvariants();
377 assert(job.GetState() == JobState_Running);
378
379 job.SetState(JobState_Paused);
380
381 CheckInvariants();
382 }
383
384
385 JobsRegistry::~JobsRegistry()
386 {
387 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
388 {
389 assert(it->second != NULL);
390 delete it->second;
391 }
392 }
393
394
395 void JobsRegistry::SetMaxCompletedJobs(size_t i)
396 {
397 boost::mutex::scoped_lock lock(mutex_);
398 CheckInvariants();
399
400 maxCompletedJobs_ = i;
401 ForgetOldCompletedJobs();
402
403 CheckInvariants();
404 }
405
406
407 void JobsRegistry::ListJobs(std::set<std::string>& target)
408 {
409 boost::mutex::scoped_lock lock(mutex_);
410 CheckInvariants();
411
412 for (JobsIndex::const_iterator it = jobsIndex_.begin();
413 it != jobsIndex_.end(); ++it)
414 {
415 target.insert(it->first);
416 }
417 }
418
419
420 bool JobsRegistry::GetJobInfo(JobInfo& target,
421 const std::string& id)
422 {
423 boost::mutex::scoped_lock lock(mutex_);
424 CheckInvariants();
425
426 JobsIndex::const_iterator found = jobsIndex_.find(id);
427
428 if (found == jobsIndex_.end())
429 {
430 return false;
431 }
432 else
433 {
434 const JobHandler& handler = *found->second;
435 target = JobInfo(handler.GetId(),
436 handler.GetPriority(),
437 handler.GetState(),
438 handler.GetLastStatus(),
439 handler.GetCreationTime(),
440 handler.GetLastStateChangeTime(),
441 handler.GetRuntime());
442 return true;
443 }
444 }
445
446
447 void JobsRegistry::Submit(std::string& id,
448 IJob* job, // Takes ownership
449 int priority)
450 {
451 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority));
452
453 boost::mutex::scoped_lock lock(mutex_);
454 CheckInvariants();
455
456 id = handler->GetId();
457
458 pendingJobs_.push(handler.get());
459 pendingJobAvailable_.notify_one();
460
461 jobsIndex_.insert(std::make_pair(id, handler.release()));
462
463 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
464
465 CheckInvariants();
466 }
467
468
469 void JobsRegistry::Submit(IJob* job, // Takes ownership
470 int priority)
471 {
472 std::string id;
473 Submit(id, job, priority);
474 }
475
476
477 void JobsRegistry::SetPriority(const std::string& id,
478 int priority)
479 {
480 LOG(INFO) << "Changing priority to " << priority << " for job: " << id;
481
482 boost::mutex::scoped_lock lock(mutex_);
483 CheckInvariants();
484
485 JobsIndex::iterator found = jobsIndex_.find(id);
486
487 if (found == jobsIndex_.end())
488 {
489 LOG(WARNING) << "Unknown job: " << id;
490 }
491 else
492 {
493 found->second->SetPriority(priority);
494
495 if (found->second->GetState() == JobState_Pending)
496 {
497 // If the job is pending, we need to reconstruct the
498 // priority queue, as the heap condition has changed
499
500 PendingJobs copy;
501 std::swap(copy, pendingJobs_);
502
503 assert(pendingJobs_.empty());
504 while (!copy.empty())
505 {
506 pendingJobs_.push(copy.top());
507 copy.pop();
508 }
509 }
510 }
511
512 CheckInvariants();
513 }
514
515
516 void JobsRegistry::Pause(const std::string& id)
517 {
518 LOG(INFO) << "Pausing job: " << id;
519
520 boost::mutex::scoped_lock lock(mutex_);
521 CheckInvariants();
522
523 JobsIndex::iterator found = jobsIndex_.find(id);
524
525 if (found == jobsIndex_.end())
526 {
527 LOG(WARNING) << "Unknown job: " << id;
528 }
529 else
530 {
531 switch (found->second->GetState())
532 {
533 case JobState_Pending:
534 {
535 // If the job is pending, we need to reconstruct the
536 // priority queue to remove it
537 PendingJobs copy;
538 std::swap(copy, pendingJobs_);
539
540 assert(pendingJobs_.empty());
541 while (!copy.empty())
542 {
543 if (copy.top()->GetId() != id)
544 {
545 pendingJobs_.push(copy.top());
546 }
547
548 copy.pop();
549 }
550
551 found->second->SetState(JobState_Paused);
552
553 break;
554 }
555
556 case JobState_Retry:
557 {
558 RetryJobs::iterator item = retryJobs_.find(found->second);
559 assert(item != retryJobs_.end());
560 retryJobs_.erase(item);
561
562 found->second->SetState(JobState_Paused);
563
564 break;
565 }
566
567 case JobState_Paused:
568 case JobState_Success:
569 case JobState_Failure:
570 // Nothing to be done
571 break;
572
573 case JobState_Running:
574 found->second->SchedulePause();
575 break;
576
577 default:
578 throw OrthancException(ErrorCode_InternalError);
579 }
580 }
581
582 CheckInvariants();
583 }
584
585
586 void JobsRegistry::Resume(const std::string& id)
587 {
588 LOG(INFO) << "Resuming job: " << id;
589
590 boost::mutex::scoped_lock lock(mutex_);
591 CheckInvariants();
592
593 JobsIndex::iterator found = jobsIndex_.find(id);
594
595 if (found == jobsIndex_.end())
596 {
597 LOG(WARNING) << "Unknown job: " << id;
598 }
599 else if (found->second->GetState() != JobState_Paused)
600 {
601 LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
602 }
603 else
604 {
605 found->second->SetState(JobState_Pending);
606 pendingJobs_.push(found->second);
607 pendingJobAvailable_.notify_one();
608 }
609
610 CheckInvariants();
611 }
612
613
614 void JobsRegistry::Resubmit(const std::string& id)
615 {
616 LOG(INFO) << "Resubmitting failed job: " << id;
617
618 boost::mutex::scoped_lock lock(mutex_);
619 CheckInvariants();
620
621 JobsIndex::iterator found = jobsIndex_.find(id);
622
623 if (found == jobsIndex_.end())
624 {
625 LOG(WARNING) << "Unknown job: " << id;
626 }
627 else if (found->second->GetState() != JobState_Failure)
628 {
629 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
630 }
631 else
632 {
633 bool ok = false;
634 for (CompletedJobs::iterator it = completedJobs_.begin();
635 it != completedJobs_.end(); ++it)
636 {
637 if (*it == found->second)
638 {
639 ok = true;
640 completedJobs_.erase(it);
641 break;
642 }
643 }
644
645 assert(ok);
646
647 found->second->SetState(JobState_Pending);
648 pendingJobs_.push(found->second);
649 pendingJobAvailable_.notify_one();
650 }
651
652 CheckInvariants();
653 }
654
655
656 void JobsRegistry::ScheduleRetries()
657 {
658 boost::mutex::scoped_lock lock(mutex_);
659 CheckInvariants();
660
661 RetryJobs copy;
662 std::swap(copy, retryJobs_);
663
664 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
665
666 assert(retryJobs_.empty());
667 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
668 {
669 if ((*it)->IsRetryReady(now))
670 {
671 LOG(INFO) << "Retrying job: " << (*it)->GetId();
672 (*it)->SetState(JobState_Pending);
673 pendingJobs_.push(*it);
674 pendingJobAvailable_.notify_one();
675 }
676 else
677 {
678 retryJobs_.insert(*it);
679 }
680 }
681
682 CheckInvariants();
683 }
684
685
686 bool JobsRegistry::GetState(JobState& state,
687 const std::string& id)
688 {
689 boost::mutex::scoped_lock lock(mutex_);
690 CheckInvariants();
691
692 JobsIndex::const_iterator it = jobsIndex_.find(id);
693 if (it == jobsIndex_.end())
694 {
695 return false;
696 }
697 else
698 {
699 state = it->second->GetState();
700 return true;
701 }
702 }
703
704
705 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry,
706 unsigned int timeout) :
707 registry_(registry),
708 handler_(NULL),
709 targetState_(JobState_Failure),
710 targetRetryTimeout_(0)
711 {
712 {
713 boost::mutex::scoped_lock lock(registry_.mutex_);
714
715 while (registry_.pendingJobs_.empty())
716 {
717 if (timeout == 0)
718 {
719 registry_.pendingJobAvailable_.wait(lock);
720 }
721 else
722 {
723 bool success = registry_.pendingJobAvailable_.timed_wait
724 (lock, boost::posix_time::milliseconds(timeout));
725 if (!success)
726 {
727 // No pending job
728 return;
729 }
730 }
731 }
732
733 handler_ = registry_.pendingJobs_.top();
734 registry_.pendingJobs_.pop();
735
736 assert(handler_->GetState() == JobState_Pending);
737 handler_->SetState(JobState_Running);
738
739 job_ = &handler_->GetJob();
740 id_ = handler_->GetId();
741 priority_ = handler_->GetPriority();
742 }
743 }
744
745
746 JobsRegistry::RunningJob::~RunningJob()
747 {
748 if (IsValid())
749 {
750 boost::mutex::scoped_lock lock(registry_.mutex_);
751
752 switch (targetState_)
753 {
754 case JobState_Failure:
755 registry_.MarkRunningAsCompleted(*handler_, false);
756 break;
757
758 case JobState_Success:
759 registry_.MarkRunningAsCompleted(*handler_, true);
760 break;
761
762 case JobState_Paused:
763 registry_.MarkRunningAsPaused(*handler_);
764 break;
765
766 case JobState_Retry:
767 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
768 break;
769
770 default:
771 assert(0);
772 }
773 }
774 }
775
776
777 bool JobsRegistry::RunningJob::IsValid() const
778 {
779 return (handler_ != NULL &&
780 job_ != NULL);
781 }
782
783
784 const std::string& JobsRegistry::RunningJob::GetId() const
785 {
786 if (!IsValid())
787 {
788 throw OrthancException(ErrorCode_BadSequenceOfCalls);
789 }
790 else
791 {
792 return id_;
793 }
794 }
795
796
797 int JobsRegistry::RunningJob::GetPriority() const
798 {
799 if (!IsValid())
800 {
801 throw OrthancException(ErrorCode_BadSequenceOfCalls);
802 }
803 else
804 {
805 return priority_;
806 }
807 }
808
809
810 IJob& JobsRegistry::RunningJob::GetJob()
811 {
812 if (!IsValid())
813 {
814 throw OrthancException(ErrorCode_BadSequenceOfCalls);
815 }
816 else
817 {
818 return *job_;
819 }
820 }
821
822
823 bool JobsRegistry::RunningJob::IsPauseScheduled()
824 {
825 if (!IsValid())
826 {
827 throw OrthancException(ErrorCode_BadSequenceOfCalls);
828 }
829 else
830 {
831 boost::mutex::scoped_lock lock(registry_.mutex_);
832 registry_.CheckInvariants();
833 assert(handler_->GetState() == JobState_Running);
834
835 return handler_->IsPauseScheduled();
836 }
837 }
838
839
840 void JobsRegistry::RunningJob::MarkSuccess()
841 {
842 if (!IsValid())
843 {
844 throw OrthancException(ErrorCode_BadSequenceOfCalls);
845 }
846 else
847 {
848 targetState_ = JobState_Success;
849 }
850 }
851
852
853 void JobsRegistry::RunningJob::MarkFailure()
854 {
855 if (!IsValid())
856 {
857 throw OrthancException(ErrorCode_BadSequenceOfCalls);
858 }
859 else
860 {
861 targetState_ = JobState_Failure;
862 }
863 }
864
865
866 void JobsRegistry::RunningJob::MarkPause()
867 {
868 if (!IsValid())
869 {
870 throw OrthancException(ErrorCode_BadSequenceOfCalls);
871 }
872 else
873 {
874 targetState_ = JobState_Paused;
875 }
876 }
877
878
879 void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout)
880 {
881 if (!IsValid())
882 {
883 throw OrthancException(ErrorCode_BadSequenceOfCalls);
884 }
885 else
886 {
887 targetState_ = JobState_Retry;
888 targetRetryTimeout_ = timeout;
889 }
890 }
891
892
893 void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code)
894 {
895 if (!IsValid())
896 {
897 throw OrthancException(ErrorCode_BadSequenceOfCalls);
898 }
899 else
900 {
901 JobStatus status(code, *job_);
902
903 boost::mutex::scoped_lock lock(registry_.mutex_);
904 registry_.CheckInvariants();
905 assert(handler_->GetState() == JobState_Running);
906
907 handler_->SetLastStatus(status);
908 }
909 }
910 }