comparison UnitTestsSources/MultiThreadingTests.cpp @ 2558:57f81b988713 jobs

cont
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 03 May 2018 15:24:33 +0200
parents b4516a6f214b
children 9b7680dee75d
comparison
equal deleted inserted replaced
2557:b4516a6f214b 2558:57f81b988713
479 { 479 {
480 throw OrthancException(ErrorCode_BadSequenceOfCalls); 480 throw OrthancException(ErrorCode_BadSequenceOfCalls);
481 } 481 }
482 else 482 else
483 { 483 {
484 return retryTime_ >= now; 484 return retryTime_ <= now;
485 } 485 }
486 } 486 }
487 }; 487 };
488 488
489 489
490 class JobsMonitor : public boost::noncopyable 490 class JobsRegistry : public boost::noncopyable
491 { 491 {
492 private: 492 private:
493 struct PriorityComparator 493 struct PriorityComparator
494 { 494 {
495 bool operator() (JobHandler*& a, 495 bool operator() (JobHandler*& a,
504 typedef std::set<JobHandler*> RetryJobs; 504 typedef std::set<JobHandler*> RetryJobs;
505 typedef std::priority_queue<JobHandler*, 505 typedef std::priority_queue<JobHandler*,
506 std::vector<JobHandler*>, // Could be a "std::deque" 506 std::vector<JobHandler*>, // Could be a "std::deque"
507 PriorityComparator> PendingJobs; 507 PriorityComparator> PendingJobs;
508 508
509 boost::mutex mutex_; 509 boost::mutex mutex_;
510 JobsIndex jobsIndex_; 510 JobsIndex jobsIndex_;
511 PendingJobs pendingJobs_; 511 PendingJobs pendingJobs_;
512 CompletedJobs completedJobs_; 512 CompletedJobs completedJobs_;
513 RetryJobs retryJobs_; 513 RetryJobs retryJobs_;
514 514
515 boost::condition_variable pendingJobAvailable_; 515 boost::condition_variable pendingJobAvailable_;
516 size_t maxCompletedJobs_; 516 size_t maxCompletedJobs_;
517 517
518 518
519 #ifndef NDEBUG 519 #ifndef NDEBUG
520 bool IsPendingJob(const JobHandler& job) const 520 bool IsPendingJob(const JobHandler& job) const
521 {
522 PendingJobs copy = pendingJobs_;
523 while (!copy.empty())
524 {
525 if (copy.top() == &job)
526 {
527 return true;
528 }
529
530 copy.pop();
531 }
532
533 return false;
534 }
535
536 bool IsCompletedJob(const JobHandler& job) const
537 {
538 for (CompletedJobs::const_iterator it = completedJobs_.begin();
539 it != completedJobs_.end(); ++it)
540 {
541 if (*it == &job)
542 {
543 return true;
544 }
545 }
546
547 return false;
548 }
549
550 bool IsRetryJob(JobHandler& job) const
551 {
552 return retryJobs_.find(&job) != retryJobs_.end();
553 }
554 #endif
555
556
557 void CheckInvariants()
558 {
559 #ifndef NDEBUG
521 { 560 {
522 PendingJobs copy = pendingJobs_; 561 PendingJobs copy = pendingJobs_;
523 while (!copy.empty()) 562 while (!copy.empty())
524 { 563 {
525 if (copy.top() == &job) 564 assert(copy.top()->GetState() == JobState_Pending);
565 copy.pop();
566 }
567 }
568
569 assert(completedJobs_.size() <= maxCompletedJobs_);
570
571 for (CompletedJobs::const_iterator it = completedJobs_.begin();
572 it != completedJobs_.end(); ++it)
573 {
574 assert((*it)->GetState() == JobState_Success ||
575 (*it)->GetState() == JobState_Failure);
576 }
577
578 for (RetryJobs::const_iterator it = retryJobs_.begin();
579 it != retryJobs_.end(); ++it)
580 {
581 assert((*it)->GetState() == JobState_Retry);
582 }
583
584 for (JobsIndex::iterator it = jobsIndex_.begin();
585 it != jobsIndex_.end(); ++it)
586 {
587 JobHandler& job = *it->second;
588
589 assert(job.GetId() == it->first);
590
591 switch (job.GetState())
592 {
593 case JobState_Pending:
594 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
595 break;
596
597 case JobState_Success:
598 case JobState_Failure:
599 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
600 break;
601
602 case JobState_Retry:
603 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
604 break;
605
606 case JobState_Running:
607 case JobState_Paused:
608 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
609 break;
610
611 default:
612 throw OrthancException(ErrorCode_InternalError);
613 }
614 }
615 #endif
616 }
617
618
619 void ForgetOldCompletedJobs()
620 {
621 if (maxCompletedJobs_ != 0)
622 {
623 while (completedJobs_.size() > maxCompletedJobs_)
624 {
625 assert(completedJobs_.front() != NULL);
626
627 std::string id = completedJobs_.front()->GetId();
628 assert(jobsIndex_.find(id) != jobsIndex_.end());
629
630 jobsIndex_.erase(id);
631 delete(completedJobs_.front());
632 completedJobs_.pop_front();
633 }
634 }
635 }
636
637
638 void MarkRunningAsCompleted(JobHandler& job,
639 bool success)
640 {
641 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
642 << ": " << job.GetId();
643
644 boost::mutex::scoped_lock lock(mutex_);
645 CheckInvariants();
646 assert(job.GetState() == JobState_Running);
647
648 job.SetState(success ? JobState_Success : JobState_Failure);
649
650 completedJobs_.push_back(&job);
651 ForgetOldCompletedJobs();
652
653 CheckInvariants();
654 }
655
656
657 void MarkRunningAsRetry(JobHandler& job,
658 unsigned int timeout)
659 {
660 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();
661
662 boost::mutex::scoped_lock lock(mutex_);
663 CheckInvariants();
664
665 assert(job.GetState() == JobState_Running &&
666 retryJobs_.find(&job) == retryJobs_.end());
667
668 retryJobs_.insert(&job);
669 job.SetRetryState(timeout);
670
671 CheckInvariants();
672 }
673
674
675 void MarkRunningAsPaused(JobHandler& job)
676 {
677 LOG(INFO) << "Job paused: " << job.GetId();
678
679 boost::mutex::scoped_lock lock(mutex_);
680 CheckInvariants();
681 assert(job.GetState() == JobState_Running);
682
683 job.SetState(JobState_Paused);
684
685 CheckInvariants();
686 }
687
688
689 JobHandler* WaitPendingJob(unsigned int timeout)
690 {
691 boost::mutex::scoped_lock lock(mutex_);
692
693 while (pendingJobs_.empty())
694 {
695 if (timeout == 0)
696 {
697 pendingJobAvailable_.wait(lock);
698 }
699 else
700 {
701 bool success = pendingJobAvailable_.timed_wait
702 (lock, boost::posix_time::milliseconds(timeout));
703 if (!success)
526 { 704 {
527 return true; 705 return NULL;
528 } 706 }
529 707 }
530 copy.pop(); 708 }
531 } 709
532 710 JobHandler* job = pendingJobs_.top();
533 return false; 711 pendingJobs_.pop();
534 } 712
535 713 job->SetState(JobState_Running);
536 bool IsCompletedJob(const JobHandler& job) const 714 return job;
537 { 715 }
538 for (CompletedJobs::const_iterator it = completedJobs_.begin(); 716
539 it != completedJobs_.end(); ++it) 717
540 { 718 public:
541 if (*it == &job) 719 JobsRegistry() :
542 { 720 maxCompletedJobs_(10)
543 return true; 721 {
544 } 722 }
545 } 723
546 724
547 return false; 725 ~JobsRegistry()
548 } 726 {
549 727 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
550 bool IsRetryJob(JobHandler& job) const 728 {
551 { 729 assert(it->second != NULL);
552 return retryJobs_.find(&job) != retryJobs_.end(); 730 delete it->second;
553 } 731 }
554 #endif 732 }
555 733
556 734
557 void CheckInvariants() 735 void SetMaxCompletedJobs(size_t i)
558 { 736 {
559 #ifndef NDEBUG 737 boost::mutex::scoped_lock lock(mutex_);
560 { 738 CheckInvariants();
561 PendingJobs copy = pendingJobs_; 739
740 maxCompletedJobs_ = i;
741 ForgetOldCompletedJobs();
742
743 CheckInvariants();
744 }
745
746
747 void ListJobs(std::set<std::string>& target)
748 {
749 boost::mutex::scoped_lock lock(mutex_);
750 CheckInvariants();
751
752 for (JobsIndex::const_iterator it = jobsIndex_.begin();
753 it != jobsIndex_.end(); ++it)
754 {
755 target.insert(it->first);
756 }
757 }
758
759
760 void Submit(std::string& id,
761 IJob* job, // Takes ownership
762 int priority)
763 {
764 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority));
765
766 boost::mutex::scoped_lock lock(mutex_);
767 CheckInvariants();
768
769 id = handler->GetId();
770
771 pendingJobs_.push(handler.get());
772 pendingJobAvailable_.notify_one();
773
774 jobsIndex_.insert(std::make_pair(id, handler.release()));
775
776 LOG(INFO) << "New job submitted: " << id;
777
778 CheckInvariants();
779 }
780
781
782 void Submit(IJob* job, // Takes ownership
783 int priority)
784 {
785 std::string id;
786 Submit(id, job, priority);
787 }
788
789
790 void SetPriority(const std::string& id,
791 int priority)
792 {
793 LOG(INFO) << "Changing priority to " << priority << " for job: " << id;
794
795 boost::mutex::scoped_lock lock(mutex_);
796 CheckInvariants();
797
798 JobsIndex::iterator found = jobsIndex_.find(id);
799
800 if (found == jobsIndex_.end())
801 {
802 LOG(WARNING) << "Unknown job: " << id;
803 }
804 else
805 {
806 found->second->SetPriority(priority);
807
808 if (found->second->GetState() == JobState_Pending)
809 {
810 // If the job is pending, we need to reconstruct the
811 // priority queue, as the heap condition has changed
812
813 PendingJobs copy;
814 std::swap(copy, pendingJobs_);
815
816 assert(pendingJobs_.empty());
562 while (!copy.empty()) 817 while (!copy.empty())
563 { 818 {
564 assert(copy.top()->GetState() == JobState_Pending); 819 pendingJobs_.push(copy.top());
565 copy.pop(); 820 copy.pop();
566 } 821 }
567 } 822 }
568 823 }
569 assert(completedJobs_.size() <= maxCompletedJobs_); 824
570 825 CheckInvariants();
571 for (CompletedJobs::const_iterator it = completedJobs_.begin(); 826 }
572 it != completedJobs_.end(); ++it) 827
573 { 828
574 assert((*it)->GetState() == JobState_Success || 829 void Pause(const std::string& id)
575 (*it)->GetState() == JobState_Failure); 830 {
576 } 831 LOG(INFO) << "Pausing job: " << id;
577 832
578 for (RetryJobs::const_iterator it = retryJobs_.begin(); 833 boost::mutex::scoped_lock lock(mutex_);
579 it != retryJobs_.end(); ++it) 834 CheckInvariants();
580 { 835
581 assert((*it)->GetState() == JobState_Retry); 836 JobsIndex::iterator found = jobsIndex_.find(id);
582 } 837
583 838 if (found == jobsIndex_.end())
584 for (JobsIndex::iterator it = jobsIndex_.begin(); 839 {
585 it != jobsIndex_.end(); ++it) 840 LOG(WARNING) << "Unknown job: " << id;
586 { 841 }
587 JobHandler& job = *it->second; 842 else
588 843 {
589 assert(job.GetId() == it->first); 844 switch (found->second->GetState())
590 845 {
591 switch (job.GetState()) 846 case JobState_Pending:
592 {
593 case JobState_Pending:
594 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
595 break;
596
597 case JobState_Success:
598 case JobState_Failure:
599 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
600 break;
601
602 case JobState_Retry:
603 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
604 break;
605
606 case JobState_Running:
607 case JobState_Paused:
608 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
609 break;
610
611 default:
612 throw OrthancException(ErrorCode_InternalError);
613 }
614 }
615 #endif
616 }
617
618
619 void ForgetOldCompletedJobs()
620 {
621 if (maxCompletedJobs_ != 0)
622 {
623 while (completedJobs_.size() > maxCompletedJobs_)
624 {
625 assert(completedJobs_.front() != NULL);
626
627 std::string id = completedJobs_.front()->GetId();
628
629 assert(jobsIndex_.find(id) != jobsIndex_.end());
630
631 jobsIndex_.erase(id);
632 delete(completedJobs_.front());
633 completedJobs_.pop_front();
634 }
635 }
636 }
637
638
639 void MarkRunningAsCompleted(JobHandler& job,
640 bool success)
641 {
642 boost::mutex::scoped_lock lock(mutex_);
643 CheckInvariants();
644 assert(job.GetState() == JobState_Running);
645
646 job.SetState(success ? JobState_Success : JobState_Failure);
647
648 completedJobs_.push_back(&job);
649 ForgetOldCompletedJobs();
650
651 CheckInvariants();
652 }
653
654
655 void MarkRunningAsRetry(JobHandler& job,
656 unsigned int timeout)
657 {
658 boost::mutex::scoped_lock lock(mutex_);
659 CheckInvariants();
660
661 assert(job.GetState() == JobState_Running &&
662 retryJobs_.find(&job) == retryJobs_.end());
663
664 retryJobs_.insert(&job);
665 job.SetRetryState(timeout);
666
667 CheckInvariants();
668 }
669
670
671 void MarkRunningAsPaused(JobHandler& job)
672 {
673 boost::mutex::scoped_lock lock(mutex_);
674 CheckInvariants();
675 assert(job.GetState() == JobState_Running);
676
677 job.SetState(JobState_Paused);
678
679 CheckInvariants();
680 }
681
682
683 JobHandler* WaitPendingJob(unsigned int timeout)
684 {
685 boost::mutex::scoped_lock lock(mutex_);
686
687 while (pendingJobs_.empty())
688 {
689 if (timeout == 0)
690 {
691 pendingJobAvailable_.wait(lock);
692 }
693 else
694 {
695 bool success = pendingJobAvailable_.timed_wait
696 (lock, boost::posix_time::milliseconds(timeout));
697 if (!success)
698 {
699 return NULL;
700 }
701 }
702 }
703
704 JobHandler* job = pendingJobs_.top();
705 pendingJobs_.pop();
706
707 job->SetState(JobState_Running);
708 return job;
709 }
710
711
712 public:
713 JobsMonitor() :
714 maxCompletedJobs_(10)
715 {
716 }
717
718
719 ~JobsMonitor()
720 {
721 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
722 {
723 assert(it->second != NULL);
724 delete it->second;
725 }
726 }
727
728
729 void SetMaxCompletedJobs(size_t i)
730 {
731 boost::mutex::scoped_lock lock(mutex_);
732 CheckInvariants();
733
734 maxCompletedJobs_ = i;
735 ForgetOldCompletedJobs();
736
737 CheckInvariants();
738 }
739
740
741 void ListJobs(std::set<std::string>& target)
742 {
743 boost::mutex::scoped_lock lock(mutex_);
744 CheckInvariants();
745
746 for (JobsIndex::const_iterator it = jobsIndex_.begin();
747 it != jobsIndex_.end(); ++it)
748 {
749 target.insert(it->first);
750 }
751 }
752
753
754 void Submit(std::string& id,
755 IJob* job, // Takes ownership
756 int priority)
757 {
758 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority));
759
760 boost::mutex::scoped_lock lock(mutex_);
761 CheckInvariants();
762
763 id = handler->GetId();
764 pendingJobs_.push(handler.get());
765 jobsIndex_.insert(std::make_pair(id, handler.release()));
766
767 pendingJobAvailable_.notify_one();
768
769 CheckInvariants();
770 }
771
772
773 void Submit(IJob* job, // Takes ownership
774 int priority)
775 {
776 std::string id;
777 Submit(id, job, priority);
778 }
779
780
781 void SetPriority(const std::string& id,
782 int priority)
783 {
784 boost::mutex::scoped_lock lock(mutex_);
785 CheckInvariants();
786
787 JobsIndex::iterator found = jobsIndex_.find(id);
788
789 if (found == jobsIndex_.end())
790 {
791 LOG(WARNING) << "Unknown job: " << id;
792 }
793 else
794 {
795 found->second->SetPriority(priority);
796
797 if (found->second->GetState() == JobState_Pending)
798 { 847 {
799 // If the job is pending, we need to reconstruct the 848 // If the job is pending, we need to reconstruct the
800 // priority queue, as the heap condition has changed 849 // priority queue to remove it
801
802 PendingJobs copy; 850 PendingJobs copy;
803 std::swap(copy, pendingJobs_); 851 std::swap(copy, pendingJobs_);
804 852
805 assert(pendingJobs_.empty()); 853 assert(pendingJobs_.empty());
806 while (!copy.empty()) 854 while (!copy.empty())
807 { 855 {
808 pendingJobs_.push(copy.top()); 856 if (copy.top()->GetId() != id)
857 {
858 pendingJobs_.push(copy.top());
859 }
860
809 copy.pop(); 861 copy.pop();
810 } 862 }
863
864 found->second->SetState(JobState_Paused);
865
866 break;
811 } 867 }
812 } 868
813 869 case JobState_Retry:
814 CheckInvariants(); 870 {
815 } 871 RetryJobs::iterator item = retryJobs_.find(found->second);
816 872 assert(item != retryJobs_.end());
817 873 retryJobs_.erase(item);
818 void Pause(const std::string& id) 874
819 { 875 found->second->SetState(JobState_Paused);
820 boost::mutex::scoped_lock lock(mutex_); 876
821 CheckInvariants(); 877 break;
822 878 }
823 JobsIndex::iterator found = jobsIndex_.find(id); 879
824 880 case JobState_Paused:
825 if (found == jobsIndex_.end()) 881 case JobState_Success:
826 { 882 case JobState_Failure:
827 LOG(WARNING) << "Unknown job: " << id; 883 // Nothing to be done
884 break;
885
886 case JobState_Running:
887 found->second->SchedulePause();
888 break;
889
890 default:
891 throw OrthancException(ErrorCode_InternalError);
892 }
893 }
894
895 CheckInvariants();
896 }
897
898
899 void Resume(const std::string& id)
900 {
901 LOG(INFO) << "Resuming job: " << id;
902
903 boost::mutex::scoped_lock lock(mutex_);
904 CheckInvariants();
905
906 JobsIndex::iterator found = jobsIndex_.find(id);
907
908 if (found == jobsIndex_.end())
909 {
910 LOG(WARNING) << "Unknown job: " << id;
911 }
912 else if (found->second->GetState() != JobState_Paused)
913 {
914 LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
915 }
916 else
917 {
918 found->second->SetState(JobState_Pending);
919 pendingJobs_.push(found->second);
920 pendingJobAvailable_.notify_one();
921 }
922
923 CheckInvariants();
924 }
925
926
927 void Resubmit(const std::string& id)
928 {
929 LOG(INFO) << "Resubmitting failed job: " << id;
930
931 boost::mutex::scoped_lock lock(mutex_);
932 CheckInvariants();
933
934 JobsIndex::iterator found = jobsIndex_.find(id);
935
936 if (found == jobsIndex_.end())
937 {
938 LOG(WARNING) << "Unknown job: " << id;
939 }
940 else if (found->second->GetState() != JobState_Failure)
941 {
942 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
943 }
944 else
945 {
946 bool ok = false;
947 for (CompletedJobs::iterator it = completedJobs_.begin();
948 it != completedJobs_.end(); ++it)
949 {
950 if (*it == found->second)
951 {
952 ok = true;
953 completedJobs_.erase(it);
954 break;
955 }
956 }
957
958 assert(ok);
959
960 found->second->SetState(JobState_Pending);
961 pendingJobs_.push(found->second);
962 pendingJobAvailable_.notify_one();
963 }
964
965 CheckInvariants();
966 }
967
968
969 void ScheduleRetries()
970 {
971 boost::mutex::scoped_lock lock(mutex_);
972 CheckInvariants();
973
974 RetryJobs copy;
975 std::swap(copy, retryJobs_);
976
977 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
978
979 assert(retryJobs_.empty());
980 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
981 {
982 if ((*it)->IsRetryReady(now))
983 {
984 LOG(INFO) << "Retrying job: " << (*it)->GetId();
985 (*it)->SetState(JobState_Pending);
986 pendingJobs_.push(*it);
987 pendingJobAvailable_.notify_one();
828 } 988 }
829 else 989 else
830 { 990 {
831 switch (found->second->GetState()) 991 retryJobs_.insert(*it);
992 }
993 }
994
995 CheckInvariants();
996 }
997
998
999 bool GetState(JobState& state,
1000 const std::string& id)
1001 {
1002 boost::mutex::scoped_lock lock(mutex_);
1003 CheckInvariants();
1004
1005 JobsIndex::const_iterator it = jobsIndex_.find(id);
1006 if (it == jobsIndex_.end())
1007 {
1008 return false;
1009 }
1010 else
1011 {
1012 state = it->second->GetState();
1013 return true;
1014 }
1015 }
1016
1017
1018 class RunningJob : public boost::noncopyable
1019 {
1020 private:
1021 JobsRegistry& that_;
1022 JobHandler* handler_;
1023 JobState targetState_;
1024 unsigned int retryTimeout_;
1025
1026 public:
1027 RunningJob(JobsRegistry& that,
1028 unsigned int timeout) :
1029 that_(that),
1030 handler_(NULL),
1031 targetState_(JobState_Failure),
1032 retryTimeout_(0)
1033 {
1034 handler_ = that_.WaitPendingJob(timeout);
1035 }
1036
1037 ~RunningJob()
1038 {
1039 if (IsValid())
1040 {
1041 switch (targetState_)
832 { 1042 {
833 case JobState_Pending: 1043 case JobState_Failure:
834 { 1044 that_.MarkRunningAsCompleted(*handler_, false);
835 // If the job is pending, we need to reconstruct the
836 // priority queue to remove it
837 PendingJobs copy;
838 std::swap(copy, pendingJobs_);
839
840 assert(pendingJobs_.empty());
841 while (!copy.empty())
842 {
843 if (copy.top()->GetId() != id)
844 {
845 pendingJobs_.push(copy.top());
846 }
847
848 copy.pop();
849 }
850
851 found->second->SetState(JobState_Paused);
852
853 break; 1045 break;
854 } 1046
1047 case JobState_Success:
1048 that_.MarkRunningAsCompleted(*handler_, true);
1049 break;
1050
1051 case JobState_Paused:
1052 that_.MarkRunningAsPaused(*handler_);
1053 break;
855 1054
856 case JobState_Retry: 1055 case JobState_Retry:
857 { 1056 that_.MarkRunningAsRetry(*handler_, retryTimeout_);
858 RetryJobs::iterator item = retryJobs_.find(found->second);
859 assert(item != retryJobs_.end());
860 retryJobs_.erase(item);
861
862 found->second->SetState(JobState_Paused);
863
864 break; 1057 break;
865 } 1058
866
867 case JobState_Paused:
868 case JobState_Success:
869 case JobState_Failure:
870 // Nothing to be done
871 break;
872
873 case JobState_Running:
874 found->second->SchedulePause();
875 break;
876
877 default: 1059 default:
878 throw OrthancException(ErrorCode_InternalError); 1060 assert(0);
879 } 1061 }
880 } 1062 }
881 1063 }
882 CheckInvariants(); 1064
883 } 1065 bool IsValid() const
884 1066 {
885 1067 return handler_ != NULL;
886 void Resume(const std::string& id) 1068 }
887 { 1069
888 boost::mutex::scoped_lock lock(mutex_); 1070 const std::string& GetId() const
889 CheckInvariants(); 1071 {
890 1072 if (IsValid())
891 JobsIndex::iterator found = jobsIndex_.find(id); 1073 {
892 1074 return handler_->GetId();
893 if (found == jobsIndex_.end())
894 {
895 LOG(WARNING) << "Unknown job: " << id;
896 }
897 else if (found->second->GetState() != JobState_Paused)
898 {
899 LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
900 } 1075 }
901 else 1076 else
902 { 1077 {
903 found->second->SetState(JobState_Pending); 1078 throw OrthancException(ErrorCode_BadSequenceOfCalls);
904 pendingJobs_.push(found->second); 1079 }
905 pendingJobAvailable_.notify_one(); 1080 }
906 } 1081
907 1082 int GetPriority() const
908 CheckInvariants(); 1083 {
909 } 1084 if (IsValid())
910 1085 {
911 1086 return handler_->GetPriority();
912 void Resubmit(const std::string& id)
913 {
914 boost::mutex::scoped_lock lock(mutex_);
915 CheckInvariants();
916
917 JobsIndex::iterator found = jobsIndex_.find(id);
918
919 if (found == jobsIndex_.end())
920 {
921 LOG(WARNING) << "Unknown job: " << id;
922 }
923 else if (found->second->GetState() != JobState_Failure)
924 {
925 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
926 } 1087 }
927 else 1088 else
928 { 1089 {
929 bool ok = false; 1090 throw OrthancException(ErrorCode_BadSequenceOfCalls);
930 for (CompletedJobs::iterator it = completedJobs_.begin(); 1091 }
931 it != completedJobs_.end(); ++it) 1092 }
932 { 1093
933 if (*it == found->second) 1094 bool IsPauseScheduled()
934 { 1095 {
935 ok = true; 1096 if (!IsValid())
936 completedJobs_.erase(it); 1097 {
937 break; 1098 throw OrthancException(ErrorCode_BadSequenceOfCalls);
938 } 1099 }
939 } 1100
940 1101 boost::mutex::scoped_lock lock(that_.mutex_);
941 assert(ok); 1102 that_.CheckInvariants();
942 1103 assert(handler_->GetState() == JobState_Running);
943 found->second->SetState(JobState_Pending); 1104
944 pendingJobs_.push(found->second); 1105 return handler_->IsPauseScheduled();
945 pendingJobAvailable_.notify_one(); 1106 }
946 } 1107
947 1108 IJob& GetJob()
948 CheckInvariants(); 1109 {
949 } 1110 if (!IsValid())
950 1111 {
951 1112 throw OrthancException(ErrorCode_BadSequenceOfCalls);
952 void ScheduleRetries() 1113 }
953 { 1114
954 boost::mutex::scoped_lock lock(mutex_); 1115 boost::mutex::scoped_lock lock(that_.mutex_);
955 CheckInvariants(); 1116 that_.CheckInvariants();
956 1117 assert(handler_->GetState() == JobState_Running);
957 RetryJobs copy; 1118
958 std::swap(copy, retryJobs_); 1119 return handler_->GetJob();
959 1120 }
960 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); 1121
961 1122 void MarkSuccess()
962 assert(retryJobs_.empty()); 1123 {
963 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) 1124 if (!IsValid())
964 { 1125 {
965 if ((*it)->IsRetryReady(now)) 1126 throw OrthancException(ErrorCode_BadSequenceOfCalls);
966 { 1127 }
967 (*it)->SetState(JobState_Pending); 1128
968 } 1129 targetState_ = JobState_Success;
969 else 1130 }
970 { 1131
971 retryJobs_.insert(*it); 1132 void MarkFailure()
972 } 1133 {
973 } 1134 if (!IsValid())
974 1135 {
975 CheckInvariants(); 1136 throw OrthancException(ErrorCode_BadSequenceOfCalls);
976 } 1137 }
977 1138
978 1139 targetState_ = JobState_Failure;
979 bool GetState(JobState& state, 1140 }
980 const std::string& id) 1141
981 { 1142 void SchedulePause()
982 boost::mutex::scoped_lock lock(mutex_); 1143 {
983 CheckInvariants(); 1144 if (!IsValid())
984 1145 {
985 JobsIndex::const_iterator it = jobsIndex_.find(id); 1146 throw OrthancException(ErrorCode_BadSequenceOfCalls);
986 if (it == jobsIndex_.end()) 1147 }
987 { 1148
988 return false; 1149 targetState_ = JobState_Paused;
989 } 1150 }
990 else 1151
991 { 1152 void MarkRetry(unsigned int timeout)
992 state = it->second->GetState(); 1153 {
993 return true; 1154 if (!IsValid())
994 } 1155 {
995 } 1156 throw OrthancException(ErrorCode_BadSequenceOfCalls);
996 1157 }
997 1158
998 class RunningJob : public boost::noncopyable 1159 targetState_ = JobState_Retry;
999 { 1160 retryTimeout_ = timeout;
1000 private: 1161 }
1001 JobsMonitor& that_;
1002 JobHandler* handler_;
1003 JobState targetState_;
1004 unsigned int retryTimeout_;
1005
1006 public:
1007 RunningJob(JobsMonitor& that,
1008 unsigned int timeout) :
1009 that_(that),
1010 handler_(NULL),
1011 targetState_(JobState_Failure),
1012 retryTimeout_(0)
1013 {
1014 handler_ = that_.WaitPendingJob(timeout);
1015 }
1016
1017 ~RunningJob()
1018 {
1019 if (IsValid())
1020 {
1021 switch (targetState_)
1022 {
1023 case JobState_Failure:
1024 that_.MarkRunningAsCompleted(*handler_, false);
1025 break;
1026
1027 case JobState_Success:
1028 that_.MarkRunningAsCompleted(*handler_, true);
1029 break;
1030
1031 case JobState_Paused:
1032 that_.MarkRunningAsPaused(*handler_);
1033 break;
1034
1035 case JobState_Retry:
1036 that_.MarkRunningAsRetry(*handler_, retryTimeout_);
1037 break;
1038
1039 default:
1040 assert(0);
1041 }
1042 }
1043 }
1044
1045 bool IsValid() const
1046 {
1047 return handler_ != NULL;
1048 }
1049
1050 const std::string& GetId() const
1051 {
1052 if (IsValid())
1053 {
1054 return handler_->GetId();
1055 }
1056 else
1057 {
1058 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1059 }
1060 }
1061
1062 int GetPriority() const
1063 {
1064 if (IsValid())
1065 {
1066 return handler_->GetPriority();
1067 }
1068 else
1069 {
1070 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1071 }
1072 }
1073
1074 bool IsPauseScheduled()
1075 {
1076 if (!IsValid())
1077 {
1078 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1079 }
1080
1081 boost::mutex::scoped_lock lock(that_.mutex_);
1082 that_.CheckInvariants();
1083 assert(handler_->GetState() == JobState_Running);
1084
1085 return handler_->IsPauseScheduled();
1086 }
1087
1088 IJob& GetJob()
1089 {
1090 if (!IsValid())
1091 {
1092 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1093 }
1094
1095 boost::mutex::scoped_lock lock(that_.mutex_);
1096 that_.CheckInvariants();
1097 assert(handler_->GetState() == JobState_Running);
1098
1099 return handler_->GetJob();
1100 }
1101
1102 void MarkSuccess()
1103 {
1104 if (!IsValid())
1105 {
1106 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1107 }
1108
1109 targetState_ = JobState_Success;
1110 }
1111
1112 void MarkFailure()
1113 {
1114 if (!IsValid())
1115 {
1116 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1117 }
1118
1119 targetState_ = JobState_Failure;
1120 }
1121
1122 void MarkPause()
1123 {
1124 if (!IsValid())
1125 {
1126 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1127 }
1128
1129 targetState_ = JobState_Paused;
1130 }
1131
1132 void MarkRetry(unsigned int timeout)
1133 {
1134 if (!IsValid())
1135 {
1136 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1137 }
1138
1139 targetState_ = JobState_Retry;
1140 retryTimeout_ = timeout;
1141 }
1142 };
1143 }; 1162 };
1163 };
1144 } 1164 }
1145 1165
1146 1166
1147 1167
1148 class DummyJob : public Orthanc::IJob 1168 class DummyJob : public Orthanc::IJob
1179 { 1199 {
1180 } 1200 }
1181 }; 1201 };
1182 1202
1183 1203
1184 static bool CheckState(Orthanc::JobsMonitor& monitor, 1204 static bool CheckState(Orthanc::JobsRegistry& registry,
1185 const std::string& id, 1205 const std::string& id,
1186 Orthanc::JobState state) 1206 Orthanc::JobState state)
1187 { 1207 {
1188 Orthanc::JobState s; 1208 Orthanc::JobState s;
1189 if (monitor.GetState(s, id)) 1209 if (registry.GetState(s, id))
1190 { 1210 {
1191 return state == s; 1211 return state == s;
1192 } 1212 }
1193 else 1213 else
1194 { 1214 {
1195 return false; 1215 return false;
1196 } 1216 }
1197 } 1217 }
1198 1218
1199 1219
1200 TEST(JobsMonitor, Priority) 1220 TEST(JobsRegistry, Priority)
1201 { 1221 {
1202 JobsMonitor monitor; 1222 JobsRegistry registry;
1203 1223
1204 std::string i1, i2, i3, i4; 1224 std::string i1, i2, i3, i4;
1205 monitor.Submit(i1, new DummyJob(), 10); 1225 registry.Submit(i1, new DummyJob(), 10);
1206 monitor.Submit(i2, new DummyJob(), 30); 1226 registry.Submit(i2, new DummyJob(), 30);
1207 monitor.Submit(i3, new DummyJob(), 20); 1227 registry.Submit(i3, new DummyJob(), 20);
1208 monitor.Submit(i4, new DummyJob(), 5); 1228 registry.Submit(i4, new DummyJob(), 5);
1209 1229
1210 monitor.SetMaxCompletedJobs(2); 1230 registry.SetMaxCompletedJobs(2);
1211 1231
1212 std::set<std::string> id; 1232 std::set<std::string> id;
1213 monitor.ListJobs(id); 1233 registry.ListJobs(id);
1214 1234
1215 ASSERT_EQ(4u, id.size()); 1235 ASSERT_EQ(4u, id.size());
1216 ASSERT_TRUE(id.find(i1) != id.end()); 1236 ASSERT_TRUE(id.find(i1) != id.end());
1217 ASSERT_TRUE(id.find(i2) != id.end()); 1237 ASSERT_TRUE(id.find(i2) != id.end());
1218 ASSERT_TRUE(id.find(i3) != id.end()); 1238 ASSERT_TRUE(id.find(i3) != id.end());
1219 ASSERT_TRUE(id.find(i4) != id.end()); 1239 ASSERT_TRUE(id.find(i4) != id.end());
1220 1240
1221 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Pending)); 1241 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending));
1222 1242
1223 { 1243 {
1224 JobsMonitor::RunningJob job(monitor, 0); 1244 JobsRegistry::RunningJob job(registry, 0);
1225 ASSERT_TRUE(job.IsValid()); 1245 ASSERT_TRUE(job.IsValid());
1226 ASSERT_EQ(30, job.GetPriority()); 1246 ASSERT_EQ(30, job.GetPriority());
1227 ASSERT_EQ(i2, job.GetId()); 1247 ASSERT_EQ(i2, job.GetId());
1228 1248
1229 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Running)); 1249 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running));
1230 } 1250 }
1231 1251
1232 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Failure)); 1252 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Failure));
1233 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Pending)); 1253 ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Pending));
1234 1254
1235 { 1255 {
1236 JobsMonitor::RunningJob job(monitor, 0); 1256 JobsRegistry::RunningJob job(registry, 0);
1237 ASSERT_TRUE(job.IsValid()); 1257 ASSERT_TRUE(job.IsValid());
1238 ASSERT_EQ(20, job.GetPriority()); 1258 ASSERT_EQ(20, job.GetPriority());
1239 ASSERT_EQ(i3, job.GetId()); 1259 ASSERT_EQ(i3, job.GetId());
1240 1260
1241 job.MarkSuccess(); 1261 job.MarkSuccess();
1242 1262
1243 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Running)); 1263 ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Running));
1244 } 1264 }
1245 1265
1246 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Success)); 1266 ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Success));
1247 1267
1248 { 1268 {
1249 JobsMonitor::RunningJob job(monitor, 0); 1269 JobsRegistry::RunningJob job(registry, 0);
1250 ASSERT_TRUE(job.IsValid()); 1270 ASSERT_TRUE(job.IsValid());
1251 ASSERT_EQ(10, job.GetPriority()); 1271 ASSERT_EQ(10, job.GetPriority());
1252 ASSERT_EQ(i1, job.GetId()); 1272 ASSERT_EQ(i1, job.GetId());
1253 } 1273 }
1254 1274
1255 { 1275 {
1256 JobsMonitor::RunningJob job(monitor, 0); 1276 JobsRegistry::RunningJob job(registry, 0);
1257 ASSERT_TRUE(job.IsValid()); 1277 ASSERT_TRUE(job.IsValid());
1258 ASSERT_EQ(5, job.GetPriority()); 1278 ASSERT_EQ(5, job.GetPriority());
1259 ASSERT_EQ(i4, job.GetId()); 1279 ASSERT_EQ(i4, job.GetId());
1260 } 1280 }
1261 1281
1262 { 1282 {
1263 JobsMonitor::RunningJob job(monitor, 1); 1283 JobsRegistry::RunningJob job(registry, 1);
1264 ASSERT_FALSE(job.IsValid()); 1284 ASSERT_FALSE(job.IsValid());
1265 } 1285 }
1266 1286
1267 Orthanc::JobState s; 1287 Orthanc::JobState s;
1268 ASSERT_TRUE(monitor.GetState(s, i1)); 1288 ASSERT_TRUE(registry.GetState(s, i1));
1269 ASSERT_FALSE(monitor.GetState(s, i2)); // Removed because oldest 1289 ASSERT_FALSE(registry.GetState(s, i2)); // Removed because oldest
1270 ASSERT_FALSE(monitor.GetState(s, i3)); // Removed because second oldest 1290 ASSERT_FALSE(registry.GetState(s, i3)); // Removed because second oldest
1271 ASSERT_TRUE(monitor.GetState(s, i4)); 1291 ASSERT_TRUE(registry.GetState(s, i4));
1272 1292
1273 monitor.SetMaxCompletedJobs(1); // (*) 1293 registry.SetMaxCompletedJobs(1); // (*)
1274 ASSERT_FALSE(monitor.GetState(s, i1)); // Just discarded by (*) 1294 ASSERT_FALSE(registry.GetState(s, i1)); // Just discarded by (*)
1275 ASSERT_TRUE(monitor.GetState(s, i4)); 1295 ASSERT_TRUE(registry.GetState(s, i4));
1276 } 1296 }
1277 1297
1278 1298
1279 TEST(JobsMonitor, Resubmit) 1299 TEST(JobsRegistry, Simultaneous)
1280 { 1300 {
1281 JobsMonitor monitor; 1301 JobsRegistry registry;
1302
1303 std::string i1, i2;
1304 registry.Submit(i1, new DummyJob(), 20);
1305 registry.Submit(i2, new DummyJob(), 10);
1306
1307 ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Pending));
1308 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending));
1309
1310 {
1311 JobsRegistry::RunningJob job1(registry, 0);
1312 JobsRegistry::RunningJob job2(registry, 0);
1313
1314 ASSERT_TRUE(job1.IsValid());
1315 ASSERT_TRUE(job2.IsValid());
1316
1317 job1.MarkFailure();
1318 job2.MarkSuccess();
1319
1320 ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Running));
1321 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running));
1322 }
1323
1324 ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Failure));
1325 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Success));
1326 }
1327
1328
1329 TEST(JobsRegistry, Resubmit)
1330 {
1331 JobsRegistry registry;
1282 1332
1283 std::string id; 1333 std::string id;
1284 monitor.Submit(id, new DummyJob(), 10); 1334 registry.Submit(id, new DummyJob(), 10);
1285 1335
1286 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); 1336 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1287 1337
1288 monitor.Resubmit(id); 1338 registry.Resubmit(id);
1289 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); 1339 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1290 1340
1291 { 1341 {
1292 JobsMonitor::RunningJob job(monitor, 0); 1342 JobsRegistry::RunningJob job(registry, 0);
1293 ASSERT_TRUE(job.IsValid()); 1343 ASSERT_TRUE(job.IsValid());
1294 job.MarkFailure(); 1344 job.MarkFailure();
1295 1345
1296 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); 1346 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1297 1347
1298 monitor.Resubmit(id); 1348 registry.Resubmit(id);
1299 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); 1349 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1300 } 1350 }
1301 1351
1302 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Failure)); 1352 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure));
1303 1353
1304 monitor.Resubmit(id); 1354 registry.Resubmit(id);
1305 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); 1355 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1306 1356
1307 { 1357 {
1308 JobsMonitor::RunningJob job(monitor, 0); 1358 JobsRegistry::RunningJob job(registry, 0);
1309 ASSERT_TRUE(job.IsValid()); 1359 ASSERT_TRUE(job.IsValid());
1310 ASSERT_EQ(id, job.GetId()); 1360 ASSERT_EQ(id, job.GetId());
1311 1361
1312 job.MarkSuccess(); 1362 job.MarkSuccess();
1313 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); 1363 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1314 } 1364 }
1315 1365
1316 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Success)); 1366 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
1317 } 1367
1368 registry.Resubmit(id);
1369 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
1370 }
1371
1372
1373 TEST(JobsRegistry, Retry)
1374 {
1375 JobsRegistry registry;
1376
1377 std::string id;
1378 registry.Submit(id, new DummyJob(), 10);
1379
1380 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1381
1382 {
1383 JobsRegistry::RunningJob job(registry, 0);
1384 ASSERT_TRUE(job.IsValid());
1385 job.MarkRetry(0);
1386
1387 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1388 }
1389
1390 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry));
1391
1392 registry.Resubmit(id);
1393 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry));
1394
1395 registry.ScheduleRetries();
1396 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1397
1398 {
1399 JobsRegistry::RunningJob job(registry, 0);
1400 ASSERT_TRUE(job.IsValid());
1401 job.MarkSuccess();
1402
1403 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1404 }
1405
1406 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
1407 }
1408
1409
1410 TEST(JobsRegistry, PausePending)
1411 {
1412 JobsRegistry registry;
1413
1414 std::string id;
1415 registry.Submit(id, new DummyJob(), 10);
1416
1417 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1418
1419 registry.Pause(id);
1420 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));
1421
1422 registry.Pause(id);
1423 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));
1424
1425 registry.Resubmit(id);
1426 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));
1427
1428 registry.Resume(id);
1429 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1430 }
1431
1432
1433 TEST(JobsRegistry, PauseRunning)
1434 {
1435 JobsRegistry registry;
1436
1437 std::string id;
1438 registry.Submit(id, new DummyJob(), 10);
1439
1440 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1441
1442 {
1443 JobsRegistry::RunningJob job(registry, 0);
1444 ASSERT_TRUE(job.IsValid());
1445
1446 registry.Resubmit(id);
1447 job.SchedulePause();
1448 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1449 }
1450
1451 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));
1452
1453 registry.Resubmit(id);
1454 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));
1455
1456 registry.Resume(id);
1457 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1458
1459 {
1460 JobsRegistry::RunningJob job(registry, 0);
1461 ASSERT_TRUE(job.IsValid());
1462
1463 job.MarkSuccess();
1464 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1465 }
1466
1467 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
1468 }
1469
1470
1471 TEST(JobsRegistry, PauseRetry)
1472 {
1473 JobsRegistry registry;
1474
1475 std::string id;
1476 registry.Submit(id, new DummyJob(), 10);
1477
1478 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1479
1480 {
1481 JobsRegistry::RunningJob job(registry, 0);
1482 ASSERT_TRUE(job.IsValid());
1483
1484 job.MarkRetry(0);
1485 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1486 }
1487
1488 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry));
1489
1490 registry.Pause(id);
1491 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));
1492
1493 registry.Resume(id);
1494 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
1495
1496 {
1497 JobsRegistry::RunningJob job(registry, 0);
1498 ASSERT_TRUE(job.IsValid());
1499
1500 job.MarkSuccess();
1501 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1502 }
1503
1504 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
1505 }