comparison Core/JobsEngine/JobsRegistry.cpp @ 2581:8da2cffc2378 jobs

JobsRegistry::Cancel()
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 11 May 2018 17:33:19 +0200
parents 3372c5255333
children 1b6a6d80b6f2
comparison
equal deleted inserted replaced
2580:055d7d4a823f 2581:8da2cffc2378
50 boost::posix_time::ptime creationTime_; 50 boost::posix_time::ptime creationTime_;
51 boost::posix_time::ptime lastStateChangeTime_; 51 boost::posix_time::ptime lastStateChangeTime_;
52 boost::posix_time::time_duration runtime_; 52 boost::posix_time::time_duration runtime_;
53 boost::posix_time::ptime retryTime_; 53 boost::posix_time::ptime retryTime_;
54 bool pauseScheduled_; 54 bool pauseScheduled_;
55 bool cancelScheduled_;
55 JobStatus lastStatus_; 56 JobStatus lastStatus_;
56 57
57 void Touch() 58 void Touch()
58 { 59 {
59 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); 60 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
68 69
69 void SetStateInternal(JobState state) 70 void SetStateInternal(JobState state)
70 { 71 {
71 state_ = state; 72 state_ = state;
72 pauseScheduled_ = false; 73 pauseScheduled_ = false;
74 cancelScheduled_ = false;
73 Touch(); 75 Touch();
74 } 76 }
75 77
76 public: 78 public:
77 JobHandler(IJob* job, 79 JobHandler(IJob* job,
82 priority_(priority), 84 priority_(priority),
83 creationTime_(boost::posix_time::microsec_clock::universal_time()), 85 creationTime_(boost::posix_time::microsec_clock::universal_time()),
84 lastStateChangeTime_(creationTime_), 86 lastStateChangeTime_(creationTime_),
85 runtime_(boost::posix_time::milliseconds(0)), 87 runtime_(boost::posix_time::milliseconds(0)),
86 retryTime_(creationTime_), 88 retryTime_(creationTime_),
87 pauseScheduled_(false) 89 pauseScheduled_(false),
90 cancelScheduled_(false)
88 { 91 {
89 if (job == NULL) 92 if (job == NULL)
90 { 93 {
91 throw OrthancException(ErrorCode_NullPointer); 94 throw OrthancException(ErrorCode_NullPointer);
92 } 95 }
160 // Only valid for running jobs 163 // Only valid for running jobs
161 throw OrthancException(ErrorCode_BadSequenceOfCalls); 164 throw OrthancException(ErrorCode_BadSequenceOfCalls);
162 } 165 }
163 } 166 }
164 167
168 void ScheduleCancel()
169 {
170 if (state_ == JobState_Running)
171 {
172 cancelScheduled_ = true;
173 }
174 else
175 {
176 // Only valid for running jobs
177 throw OrthancException(ErrorCode_BadSequenceOfCalls);
178 }
179 }
180
165 bool IsPauseScheduled() 181 bool IsPauseScheduled()
166 { 182 {
167 return pauseScheduled_; 183 return pauseScheduled_;
184 }
185
186 bool IsCancelScheduled()
187 {
188 return cancelScheduled_;
168 } 189 }
169 190
170 bool IsRetryReady(const boost::posix_time::ptime& now) const 191 bool IsRetryReady(const boost::posix_time::ptime& now) const
171 { 192 {
172 if (state_ != JobState_Retry) 193 if (state_ != JobState_Retry)
201 222
202 void SetLastStatus(const JobStatus& status) 223 void SetLastStatus(const JobStatus& status)
203 { 224 {
204 lastStatus_ = status; 225 lastStatus_ = status;
205 Touch(); 226 Touch();
227 }
228
229 void SetLastErrorCode(ErrorCode code)
230 {
231 lastStatus_.SetErrorCode(code);
206 } 232 }
207 }; 233 };
208 234
209 235
210 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a, 236 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a,
333 } 359 }
334 } 360 }
335 } 361 }
336 362
337 363
364 void JobsRegistry::SetCompletedJob(JobHandler& job,
365 bool success)
366 {
367 job.SetState(success ? JobState_Success : JobState_Failure);
368
369 completedJobs_.push_back(&job);
370 ForgetOldCompletedJobs();
371
372 someJobComplete_.notify_all();
373 }
374
375
338 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, 376 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job,
339 bool success) 377 bool success)
340 { 378 {
341 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") 379 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
342 << ": " << job.GetId(); 380 << ": " << job.GetId();
343 381
344 CheckInvariants(); 382 CheckInvariants();
383
345 assert(job.GetState() == JobState_Running); 384 assert(job.GetState() == JobState_Running);
346 385 SetCompletedJob(job, success);
347 job.SetState(success ? JobState_Success : JobState_Failure);
348
349 completedJobs_.push_back(&job);
350 ForgetOldCompletedJobs();
351
352 someJobComplete_.notify_all();
353 386
354 CheckInvariants(); 387 CheckInvariants();
355 } 388 }
356 389
357 390
499 int priority) 532 int priority)
500 { 533 {
501 std::string id; 534 std::string id;
502 Submit(id, job, priority); 535 Submit(id, job, priority);
503 536
504 printf(">> %s\n", id.c_str()); fflush(stdout);
505
506 JobState state; 537 JobState state;
507 538
508 { 539 {
509 boost::mutex::scoped_lock lock(mutex_); 540 boost::mutex::scoped_lock lock(mutex_);
510 541
559 return true; 590 return true;
560 } 591 }
561 } 592 }
562 593
563 594
595 void JobsRegistry::RemovePendingJob(const std::string& id)
596 {
597 // If the job is pending, we need to reconstruct the priority
598 // queue to remove it
599 PendingJobs copy;
600 std::swap(copy, pendingJobs_);
601
602 assert(pendingJobs_.empty());
603 while (!copy.empty())
604 {
605 if (copy.top()->GetId() != id)
606 {
607 pendingJobs_.push(copy.top());
608 }
609
610 copy.pop();
611 }
612 }
613
614
615 void JobsRegistry::RemoveRetryJob(JobHandler* handler)
616 {
617 RetryJobs::iterator item = retryJobs_.find(handler);
618 assert(item != retryJobs_.end());
619 retryJobs_.erase(item);
620 }
621
622
564 bool JobsRegistry::Pause(const std::string& id) 623 bool JobsRegistry::Pause(const std::string& id)
565 { 624 {
566 LOG(INFO) << "Pausing job: " << id; 625 LOG(INFO) << "Pausing job: " << id;
567 626
568 boost::mutex::scoped_lock lock(mutex_); 627 boost::mutex::scoped_lock lock(mutex_);
578 else 637 else
579 { 638 {
580 switch (found->second->GetState()) 639 switch (found->second->GetState())
581 { 640 {
582 case JobState_Pending: 641 case JobState_Pending:
583 { 642 RemovePendingJob(id);
584 // If the job is pending, we need to reconstruct the
585 // priority queue to remove it
586 PendingJobs copy;
587 std::swap(copy, pendingJobs_);
588
589 assert(pendingJobs_.empty());
590 while (!copy.empty())
591 {
592 if (copy.top()->GetId() != id)
593 {
594 pendingJobs_.push(copy.top());
595 }
596
597 copy.pop();
598 }
599
600 found->second->SetState(JobState_Paused); 643 found->second->SetState(JobState_Paused);
601 644 break;
602 break;
603 }
604 645
605 case JobState_Retry: 646 case JobState_Retry:
606 { 647 RemoveRetryJob(found->second);
607 RetryJobs::iterator item = retryJobs_.find(found->second);
608 assert(item != retryJobs_.end());
609 retryJobs_.erase(item);
610
611 found->second->SetState(JobState_Paused); 648 found->second->SetState(JobState_Paused);
612 649 break;
613 break;
614 }
615 650
616 case JobState_Paused: 651 case JobState_Paused:
617 case JobState_Success: 652 case JobState_Success:
618 case JobState_Failure: 653 case JobState_Failure:
619 // Nothing to be done 654 // Nothing to be done
620 break; 655 break;
621 656
622 case JobState_Running: 657 case JobState_Running:
623 found->second->SchedulePause(); 658 found->second->SchedulePause();
659 break;
660
661 default:
662 throw OrthancException(ErrorCode_InternalError);
663 }
664
665 CheckInvariants();
666 return true;
667 }
668 }
669
670
671 bool JobsRegistry::Cancel(const std::string& id)
672 {
673 LOG(INFO) << "Canceling job: " << id;
674
675 boost::mutex::scoped_lock lock(mutex_);
676 CheckInvariants();
677
678 JobsIndex::iterator found = jobsIndex_.find(id);
679
680 if (found == jobsIndex_.end())
681 {
682 LOG(WARNING) << "Unknown job: " << id;
683 return false;
684 }
685 else
686 {
687 switch (found->second->GetState())
688 {
689 case JobState_Pending:
690 RemovePendingJob(id);
691 SetCompletedJob(*found->second, false);
692 found->second->SetLastErrorCode(ErrorCode_CanceledJob);
693 break;
694
695 case JobState_Retry:
696 RemoveRetryJob(found->second);
697 SetCompletedJob(*found->second, false);
698 found->second->SetLastErrorCode(ErrorCode_CanceledJob);
699 break;
700
701 case JobState_Paused:
702 SetCompletedJob(*found->second, false);
703 found->second->SetLastErrorCode(ErrorCode_CanceledJob);
704 break;
705
706 case JobState_Success:
707 case JobState_Failure:
708 // Nothing to be done
709 break;
710
711 case JobState_Running:
712 found->second->ScheduleCancel();
624 break; 713 break;
625 714
626 default: 715 default:
627 throw OrthancException(ErrorCode_InternalError); 716 throw OrthancException(ErrorCode_InternalError);
628 } 717 }
749 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, 838 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry,
750 unsigned int timeout) : 839 unsigned int timeout) :
751 registry_(registry), 840 registry_(registry),
752 handler_(NULL), 841 handler_(NULL),
753 targetState_(JobState_Failure), 842 targetState_(JobState_Failure),
754 targetRetryTimeout_(0) 843 targetRetryTimeout_(0),
844 canceled_(false)
755 { 845 {
756 { 846 {
757 boost::mutex::scoped_lock lock(registry_.mutex_); 847 boost::mutex::scoped_lock lock(registry_.mutex_);
758 848
759 while (registry_.pendingJobs_.empty()) 849 while (registry_.pendingJobs_.empty())
777 handler_ = registry_.pendingJobs_.top(); 867 handler_ = registry_.pendingJobs_.top();
778 registry_.pendingJobs_.pop(); 868 registry_.pendingJobs_.pop();
779 869
780 assert(handler_->GetState() == JobState_Pending); 870 assert(handler_->GetState() == JobState_Pending);
781 handler_->SetState(JobState_Running); 871 handler_->SetState(JobState_Running);
872 handler_->SetLastErrorCode(ErrorCode_Success);
782 873
783 job_ = &handler_->GetJob(); 874 job_ = &handler_->GetJob();
784 id_ = handler_->GetId(); 875 id_ = handler_->GetId();
785 priority_ = handler_->GetPriority(); 876 priority_ = handler_->GetPriority();
786 } 877 }
795 886
796 switch (targetState_) 887 switch (targetState_)
797 { 888 {
798 case JobState_Failure: 889 case JobState_Failure:
799 registry_.MarkRunningAsCompleted(*handler_, false); 890 registry_.MarkRunningAsCompleted(*handler_, false);
891
892 if (canceled_)
893 {
894 handler_->SetLastErrorCode(ErrorCode_CanceledJob);
895 }
896
800 break; 897 break;
801 898
802 case JobState_Success: 899 case JobState_Success:
803 registry_.MarkRunningAsCompleted(*handler_, true); 900 registry_.MarkRunningAsCompleted(*handler_, true);
804 break; 901 break;
879 return handler_->IsPauseScheduled(); 976 return handler_->IsPauseScheduled();
880 } 977 }
881 } 978 }
882 979
883 980
981 bool JobsRegistry::RunningJob::IsCancelScheduled()
982 {
983 if (!IsValid())
984 {
985 throw OrthancException(ErrorCode_BadSequenceOfCalls);
986 }
987 else
988 {
989 boost::mutex::scoped_lock lock(registry_.mutex_);
990 registry_.CheckInvariants();
991 assert(handler_->GetState() == JobState_Running);
992
993 return handler_->IsCancelScheduled();
994 }
995 }
996
997
884 void JobsRegistry::RunningJob::MarkSuccess() 998 void JobsRegistry::RunningJob::MarkSuccess()
885 { 999 {
886 if (!IsValid()) 1000 if (!IsValid())
887 { 1001 {
888 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1002 throw OrthancException(ErrorCode_BadSequenceOfCalls);
901 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1015 throw OrthancException(ErrorCode_BadSequenceOfCalls);
902 } 1016 }
903 else 1017 else
904 { 1018 {
905 targetState_ = JobState_Failure; 1019 targetState_ = JobState_Failure;
1020 }
1021 }
1022
1023
1024 void JobsRegistry::RunningJob::MarkCanceled()
1025 {
1026 if (!IsValid())
1027 {
1028 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1029 }
1030 else
1031 {
1032 targetState_ = JobState_Failure;
1033 canceled_ = true;
906 } 1034 }
907 } 1035 }
908 1036
909 1037
910 void JobsRegistry::RunningJob::MarkPause() 1038 void JobsRegistry::RunningJob::MarkPause()