Mercurial > hg > orthanc
comparison Core/JobsEngine/JobsRegistry.cpp @ 2581:8da2cffc2378 jobs
JobsRegistry::Cancel()
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 11 May 2018 17:33:19 +0200 |
parents | 3372c5255333 |
children | 1b6a6d80b6f2 |
comparison
equal
deleted
inserted
replaced
2580:055d7d4a823f | 2581:8da2cffc2378 |
---|---|
50 boost::posix_time::ptime creationTime_; | 50 boost::posix_time::ptime creationTime_; |
51 boost::posix_time::ptime lastStateChangeTime_; | 51 boost::posix_time::ptime lastStateChangeTime_; |
52 boost::posix_time::time_duration runtime_; | 52 boost::posix_time::time_duration runtime_; |
53 boost::posix_time::ptime retryTime_; | 53 boost::posix_time::ptime retryTime_; |
54 bool pauseScheduled_; | 54 bool pauseScheduled_; |
55 bool cancelScheduled_; | |
55 JobStatus lastStatus_; | 56 JobStatus lastStatus_; |
56 | 57 |
57 void Touch() | 58 void Touch() |
58 { | 59 { |
59 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | 60 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); |
68 | 69 |
69 void SetStateInternal(JobState state) | 70 void SetStateInternal(JobState state) |
70 { | 71 { |
71 state_ = state; | 72 state_ = state; |
72 pauseScheduled_ = false; | 73 pauseScheduled_ = false; |
74 cancelScheduled_ = false; | |
73 Touch(); | 75 Touch(); |
74 } | 76 } |
75 | 77 |
76 public: | 78 public: |
77 JobHandler(IJob* job, | 79 JobHandler(IJob* job, |
82 priority_(priority), | 84 priority_(priority), |
83 creationTime_(boost::posix_time::microsec_clock::universal_time()), | 85 creationTime_(boost::posix_time::microsec_clock::universal_time()), |
84 lastStateChangeTime_(creationTime_), | 86 lastStateChangeTime_(creationTime_), |
85 runtime_(boost::posix_time::milliseconds(0)), | 87 runtime_(boost::posix_time::milliseconds(0)), |
86 retryTime_(creationTime_), | 88 retryTime_(creationTime_), |
87 pauseScheduled_(false) | 89 pauseScheduled_(false), |
90 cancelScheduled_(false) | |
88 { | 91 { |
89 if (job == NULL) | 92 if (job == NULL) |
90 { | 93 { |
91 throw OrthancException(ErrorCode_NullPointer); | 94 throw OrthancException(ErrorCode_NullPointer); |
92 } | 95 } |
160 // Only valid for running jobs | 163 // Only valid for running jobs |
161 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 164 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
162 } | 165 } |
163 } | 166 } |
164 | 167 |
168 void ScheduleCancel() | |
169 { | |
170 if (state_ == JobState_Running) | |
171 { | |
172 cancelScheduled_ = true; | |
173 } | |
174 else | |
175 { | |
176 // Only valid for running jobs | |
177 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
178 } | |
179 } | |
180 | |
165 bool IsPauseScheduled() | 181 bool IsPauseScheduled() |
166 { | 182 { |
167 return pauseScheduled_; | 183 return pauseScheduled_; |
184 } | |
185 | |
186 bool IsCancelScheduled() | |
187 { | |
188 return cancelScheduled_; | |
168 } | 189 } |
169 | 190 |
170 bool IsRetryReady(const boost::posix_time::ptime& now) const | 191 bool IsRetryReady(const boost::posix_time::ptime& now) const |
171 { | 192 { |
172 if (state_ != JobState_Retry) | 193 if (state_ != JobState_Retry) |
201 | 222 |
202 void SetLastStatus(const JobStatus& status) | 223 void SetLastStatus(const JobStatus& status) |
203 { | 224 { |
204 lastStatus_ = status; | 225 lastStatus_ = status; |
205 Touch(); | 226 Touch(); |
227 } | |
228 | |
229 void SetLastErrorCode(ErrorCode code) | |
230 { | |
231 lastStatus_.SetErrorCode(code); | |
206 } | 232 } |
207 }; | 233 }; |
208 | 234 |
209 | 235 |
210 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a, | 236 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a, |
333 } | 359 } |
334 } | 360 } |
335 } | 361 } |
336 | 362 |
337 | 363 |
364 void JobsRegistry::SetCompletedJob(JobHandler& job, | |
365 bool success) | |
366 { | |
367 job.SetState(success ? JobState_Success : JobState_Failure); | |
368 | |
369 completedJobs_.push_back(&job); | |
370 ForgetOldCompletedJobs(); | |
371 | |
372 someJobComplete_.notify_all(); | |
373 } | |
374 | |
375 | |
338 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, | 376 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, |
339 bool success) | 377 bool success) |
340 { | 378 { |
341 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") | 379 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") |
342 << ": " << job.GetId(); | 380 << ": " << job.GetId(); |
343 | 381 |
344 CheckInvariants(); | 382 CheckInvariants(); |
383 | |
345 assert(job.GetState() == JobState_Running); | 384 assert(job.GetState() == JobState_Running); |
346 | 385 SetCompletedJob(job, success); |
347 job.SetState(success ? JobState_Success : JobState_Failure); | |
348 | |
349 completedJobs_.push_back(&job); | |
350 ForgetOldCompletedJobs(); | |
351 | |
352 someJobComplete_.notify_all(); | |
353 | 386 |
354 CheckInvariants(); | 387 CheckInvariants(); |
355 } | 388 } |
356 | 389 |
357 | 390 |
499 int priority) | 532 int priority) |
500 { | 533 { |
501 std::string id; | 534 std::string id; |
502 Submit(id, job, priority); | 535 Submit(id, job, priority); |
503 | 536 |
504 printf(">> %s\n", id.c_str()); fflush(stdout); | |
505 | |
506 JobState state; | 537 JobState state; |
507 | 538 |
508 { | 539 { |
509 boost::mutex::scoped_lock lock(mutex_); | 540 boost::mutex::scoped_lock lock(mutex_); |
510 | 541 |
559 return true; | 590 return true; |
560 } | 591 } |
561 } | 592 } |
562 | 593 |
563 | 594 |
595 void JobsRegistry::RemovePendingJob(const std::string& id) | |
596 { | |
597 // If the job is pending, we need to reconstruct the priority | |
598 // queue to remove it | |
599 PendingJobs copy; | |
600 std::swap(copy, pendingJobs_); | |
601 | |
602 assert(pendingJobs_.empty()); | |
603 while (!copy.empty()) | |
604 { | |
605 if (copy.top()->GetId() != id) | |
606 { | |
607 pendingJobs_.push(copy.top()); | |
608 } | |
609 | |
610 copy.pop(); | |
611 } | |
612 } | |
613 | |
614 | |
615 void JobsRegistry::RemoveRetryJob(JobHandler* handler) | |
616 { | |
617 RetryJobs::iterator item = retryJobs_.find(handler); | |
618 assert(item != retryJobs_.end()); | |
619 retryJobs_.erase(item); | |
620 } | |
621 | |
622 | |
564 bool JobsRegistry::Pause(const std::string& id) | 623 bool JobsRegistry::Pause(const std::string& id) |
565 { | 624 { |
566 LOG(INFO) << "Pausing job: " << id; | 625 LOG(INFO) << "Pausing job: " << id; |
567 | 626 |
568 boost::mutex::scoped_lock lock(mutex_); | 627 boost::mutex::scoped_lock lock(mutex_); |
578 else | 637 else |
579 { | 638 { |
580 switch (found->second->GetState()) | 639 switch (found->second->GetState()) |
581 { | 640 { |
582 case JobState_Pending: | 641 case JobState_Pending: |
583 { | 642 RemovePendingJob(id); |
584 // If the job is pending, we need to reconstruct the | |
585 // priority queue to remove it | |
586 PendingJobs copy; | |
587 std::swap(copy, pendingJobs_); | |
588 | |
589 assert(pendingJobs_.empty()); | |
590 while (!copy.empty()) | |
591 { | |
592 if (copy.top()->GetId() != id) | |
593 { | |
594 pendingJobs_.push(copy.top()); | |
595 } | |
596 | |
597 copy.pop(); | |
598 } | |
599 | |
600 found->second->SetState(JobState_Paused); | 643 found->second->SetState(JobState_Paused); |
601 | 644 break; |
602 break; | |
603 } | |
604 | 645 |
605 case JobState_Retry: | 646 case JobState_Retry: |
606 { | 647 RemoveRetryJob(found->second); |
607 RetryJobs::iterator item = retryJobs_.find(found->second); | |
608 assert(item != retryJobs_.end()); | |
609 retryJobs_.erase(item); | |
610 | |
611 found->second->SetState(JobState_Paused); | 648 found->second->SetState(JobState_Paused); |
612 | 649 break; |
613 break; | |
614 } | |
615 | 650 |
616 case JobState_Paused: | 651 case JobState_Paused: |
617 case JobState_Success: | 652 case JobState_Success: |
618 case JobState_Failure: | 653 case JobState_Failure: |
619 // Nothing to be done | 654 // Nothing to be done |
620 break; | 655 break; |
621 | 656 |
622 case JobState_Running: | 657 case JobState_Running: |
623 found->second->SchedulePause(); | 658 found->second->SchedulePause(); |
659 break; | |
660 | |
661 default: | |
662 throw OrthancException(ErrorCode_InternalError); | |
663 } | |
664 | |
665 CheckInvariants(); | |
666 return true; | |
667 } | |
668 } | |
669 | |
670 | |
671 bool JobsRegistry::Cancel(const std::string& id) | |
672 { | |
673 LOG(INFO) << "Canceling job: " << id; | |
674 | |
675 boost::mutex::scoped_lock lock(mutex_); | |
676 CheckInvariants(); | |
677 | |
678 JobsIndex::iterator found = jobsIndex_.find(id); | |
679 | |
680 if (found == jobsIndex_.end()) | |
681 { | |
682 LOG(WARNING) << "Unknown job: " << id; | |
683 return false; | |
684 } | |
685 else | |
686 { | |
687 switch (found->second->GetState()) | |
688 { | |
689 case JobState_Pending: | |
690 RemovePendingJob(id); | |
691 SetCompletedJob(*found->second, false); | |
692 found->second->SetLastErrorCode(ErrorCode_CanceledJob); | |
693 break; | |
694 | |
695 case JobState_Retry: | |
696 RemoveRetryJob(found->second); | |
697 SetCompletedJob(*found->second, false); | |
698 found->second->SetLastErrorCode(ErrorCode_CanceledJob); | |
699 break; | |
700 | |
701 case JobState_Paused: | |
702 SetCompletedJob(*found->second, false); | |
703 found->second->SetLastErrorCode(ErrorCode_CanceledJob); | |
704 break; | |
705 | |
706 case JobState_Success: | |
707 case JobState_Failure: | |
708 // Nothing to be done | |
709 break; | |
710 | |
711 case JobState_Running: | |
712 found->second->ScheduleCancel(); | |
624 break; | 713 break; |
625 | 714 |
626 default: | 715 default: |
627 throw OrthancException(ErrorCode_InternalError); | 716 throw OrthancException(ErrorCode_InternalError); |
628 } | 717 } |
749 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, | 838 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, |
750 unsigned int timeout) : | 839 unsigned int timeout) : |
751 registry_(registry), | 840 registry_(registry), |
752 handler_(NULL), | 841 handler_(NULL), |
753 targetState_(JobState_Failure), | 842 targetState_(JobState_Failure), |
754 targetRetryTimeout_(0) | 843 targetRetryTimeout_(0), |
844 canceled_(false) | |
755 { | 845 { |
756 { | 846 { |
757 boost::mutex::scoped_lock lock(registry_.mutex_); | 847 boost::mutex::scoped_lock lock(registry_.mutex_); |
758 | 848 |
759 while (registry_.pendingJobs_.empty()) | 849 while (registry_.pendingJobs_.empty()) |
777 handler_ = registry_.pendingJobs_.top(); | 867 handler_ = registry_.pendingJobs_.top(); |
778 registry_.pendingJobs_.pop(); | 868 registry_.pendingJobs_.pop(); |
779 | 869 |
780 assert(handler_->GetState() == JobState_Pending); | 870 assert(handler_->GetState() == JobState_Pending); |
781 handler_->SetState(JobState_Running); | 871 handler_->SetState(JobState_Running); |
872 handler_->SetLastErrorCode(ErrorCode_Success); | |
782 | 873 |
783 job_ = &handler_->GetJob(); | 874 job_ = &handler_->GetJob(); |
784 id_ = handler_->GetId(); | 875 id_ = handler_->GetId(); |
785 priority_ = handler_->GetPriority(); | 876 priority_ = handler_->GetPriority(); |
786 } | 877 } |
795 | 886 |
796 switch (targetState_) | 887 switch (targetState_) |
797 { | 888 { |
798 case JobState_Failure: | 889 case JobState_Failure: |
799 registry_.MarkRunningAsCompleted(*handler_, false); | 890 registry_.MarkRunningAsCompleted(*handler_, false); |
891 | |
892 if (canceled_) | |
893 { | |
894 handler_->SetLastErrorCode(ErrorCode_CanceledJob); | |
895 } | |
896 | |
800 break; | 897 break; |
801 | 898 |
802 case JobState_Success: | 899 case JobState_Success: |
803 registry_.MarkRunningAsCompleted(*handler_, true); | 900 registry_.MarkRunningAsCompleted(*handler_, true); |
804 break; | 901 break; |
879 return handler_->IsPauseScheduled(); | 976 return handler_->IsPauseScheduled(); |
880 } | 977 } |
881 } | 978 } |
882 | 979 |
883 | 980 |
981 bool JobsRegistry::RunningJob::IsCancelScheduled() | |
982 { | |
983 if (!IsValid()) | |
984 { | |
985 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
986 } | |
987 else | |
988 { | |
989 boost::mutex::scoped_lock lock(registry_.mutex_); | |
990 registry_.CheckInvariants(); | |
991 assert(handler_->GetState() == JobState_Running); | |
992 | |
993 return handler_->IsCancelScheduled(); | |
994 } | |
995 } | |
996 | |
997 | |
884 void JobsRegistry::RunningJob::MarkSuccess() | 998 void JobsRegistry::RunningJob::MarkSuccess() |
885 { | 999 { |
886 if (!IsValid()) | 1000 if (!IsValid()) |
887 { | 1001 { |
888 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 1002 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
901 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 1015 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
902 } | 1016 } |
903 else | 1017 else |
904 { | 1018 { |
905 targetState_ = JobState_Failure; | 1019 targetState_ = JobState_Failure; |
1020 } | |
1021 } | |
1022 | |
1023 | |
1024 void JobsRegistry::RunningJob::MarkCanceled() | |
1025 { | |
1026 if (!IsValid()) | |
1027 { | |
1028 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1029 } | |
1030 else | |
1031 { | |
1032 targetState_ = JobState_Failure; | |
1033 canceled_ = true; | |
906 } | 1034 } |
907 } | 1035 } |
908 | 1036 |
909 | 1037 |
910 void JobsRegistry::RunningJob::MarkPause() | 1038 void JobsRegistry::RunningJob::MarkPause() |