Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreadingTests.cpp @ 2558:57f81b988713 jobs
cont
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 03 May 2018 15:24:33 +0200 |
parents | b4516a6f214b |
children | 9b7680dee75d |
comparison
equal
deleted
inserted
replaced
2557:b4516a6f214b | 2558:57f81b988713 |
---|---|
479 { | 479 { |
480 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 480 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
481 } | 481 } |
482 else | 482 else |
483 { | 483 { |
484 return retryTime_ >= now; | 484 return retryTime_ <= now; |
485 } | 485 } |
486 } | 486 } |
487 }; | 487 }; |
488 | 488 |
489 | 489 |
490 class JobsMonitor : public boost::noncopyable | 490 class JobsRegistry : public boost::noncopyable |
491 { | 491 { |
492 private: | 492 private: |
493 struct PriorityComparator | 493 struct PriorityComparator |
494 { | 494 { |
495 bool operator() (JobHandler*& a, | 495 bool operator() (JobHandler*& a, |
504 typedef std::set<JobHandler*> RetryJobs; | 504 typedef std::set<JobHandler*> RetryJobs; |
505 typedef std::priority_queue<JobHandler*, | 505 typedef std::priority_queue<JobHandler*, |
506 std::vector<JobHandler*>, // Could be a "std::deque" | 506 std::vector<JobHandler*>, // Could be a "std::deque" |
507 PriorityComparator> PendingJobs; | 507 PriorityComparator> PendingJobs; |
508 | 508 |
509 boost::mutex mutex_; | 509 boost::mutex mutex_; |
510 JobsIndex jobsIndex_; | 510 JobsIndex jobsIndex_; |
511 PendingJobs pendingJobs_; | 511 PendingJobs pendingJobs_; |
512 CompletedJobs completedJobs_; | 512 CompletedJobs completedJobs_; |
513 RetryJobs retryJobs_; | 513 RetryJobs retryJobs_; |
514 | 514 |
515 boost::condition_variable pendingJobAvailable_; | 515 boost::condition_variable pendingJobAvailable_; |
516 size_t maxCompletedJobs_; | 516 size_t maxCompletedJobs_; |
517 | 517 |
518 | 518 |
519 #ifndef NDEBUG | 519 #ifndef NDEBUG |
520 bool IsPendingJob(const JobHandler& job) const | 520 bool IsPendingJob(const JobHandler& job) const |
521 { | |
522 PendingJobs copy = pendingJobs_; | |
523 while (!copy.empty()) | |
524 { | |
525 if (copy.top() == &job) | |
526 { | |
527 return true; | |
528 } | |
529 | |
530 copy.pop(); | |
531 } | |
532 | |
533 return false; | |
534 } | |
535 | |
536 bool IsCompletedJob(const JobHandler& job) const | |
537 { | |
538 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
539 it != completedJobs_.end(); ++it) | |
540 { | |
541 if (*it == &job) | |
542 { | |
543 return true; | |
544 } | |
545 } | |
546 | |
547 return false; | |
548 } | |
549 | |
550 bool IsRetryJob(JobHandler& job) const | |
551 { | |
552 return retryJobs_.find(&job) != retryJobs_.end(); | |
553 } | |
554 #endif | |
555 | |
556 | |
557 void CheckInvariants() | |
558 { | |
559 #ifndef NDEBUG | |
521 { | 560 { |
522 PendingJobs copy = pendingJobs_; | 561 PendingJobs copy = pendingJobs_; |
523 while (!copy.empty()) | 562 while (!copy.empty()) |
524 { | 563 { |
525 if (copy.top() == &job) | 564 assert(copy.top()->GetState() == JobState_Pending); |
565 copy.pop(); | |
566 } | |
567 } | |
568 | |
569 assert(completedJobs_.size() <= maxCompletedJobs_); | |
570 | |
571 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
572 it != completedJobs_.end(); ++it) | |
573 { | |
574 assert((*it)->GetState() == JobState_Success || | |
575 (*it)->GetState() == JobState_Failure); | |
576 } | |
577 | |
578 for (RetryJobs::const_iterator it = retryJobs_.begin(); | |
579 it != retryJobs_.end(); ++it) | |
580 { | |
581 assert((*it)->GetState() == JobState_Retry); | |
582 } | |
583 | |
584 for (JobsIndex::iterator it = jobsIndex_.begin(); | |
585 it != jobsIndex_.end(); ++it) | |
586 { | |
587 JobHandler& job = *it->second; | |
588 | |
589 assert(job.GetId() == it->first); | |
590 | |
591 switch (job.GetState()) | |
592 { | |
593 case JobState_Pending: | |
594 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); | |
595 break; | |
596 | |
597 case JobState_Success: | |
598 case JobState_Failure: | |
599 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); | |
600 break; | |
601 | |
602 case JobState_Retry: | |
603 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
604 break; | |
605 | |
606 case JobState_Running: | |
607 case JobState_Paused: | |
608 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
609 break; | |
610 | |
611 default: | |
612 throw OrthancException(ErrorCode_InternalError); | |
613 } | |
614 } | |
615 #endif | |
616 } | |
617 | |
618 | |
619 void ForgetOldCompletedJobs() | |
620 { | |
621 if (maxCompletedJobs_ != 0) | |
622 { | |
623 while (completedJobs_.size() > maxCompletedJobs_) | |
624 { | |
625 assert(completedJobs_.front() != NULL); | |
626 | |
627 std::string id = completedJobs_.front()->GetId(); | |
628 assert(jobsIndex_.find(id) != jobsIndex_.end()); | |
629 | |
630 jobsIndex_.erase(id); | |
631 delete(completedJobs_.front()); | |
632 completedJobs_.pop_front(); | |
633 } | |
634 } | |
635 } | |
636 | |
637 | |
638 void MarkRunningAsCompleted(JobHandler& job, | |
639 bool success) | |
640 { | |
641 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") | |
642 << ": " << job.GetId(); | |
643 | |
644 boost::mutex::scoped_lock lock(mutex_); | |
645 CheckInvariants(); | |
646 assert(job.GetState() == JobState_Running); | |
647 | |
648 job.SetState(success ? JobState_Success : JobState_Failure); | |
649 | |
650 completedJobs_.push_back(&job); | |
651 ForgetOldCompletedJobs(); | |
652 | |
653 CheckInvariants(); | |
654 } | |
655 | |
656 | |
657 void MarkRunningAsRetry(JobHandler& job, | |
658 unsigned int timeout) | |
659 { | |
660 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); | |
661 | |
662 boost::mutex::scoped_lock lock(mutex_); | |
663 CheckInvariants(); | |
664 | |
665 assert(job.GetState() == JobState_Running && | |
666 retryJobs_.find(&job) == retryJobs_.end()); | |
667 | |
668 retryJobs_.insert(&job); | |
669 job.SetRetryState(timeout); | |
670 | |
671 CheckInvariants(); | |
672 } | |
673 | |
674 | |
675 void MarkRunningAsPaused(JobHandler& job) | |
676 { | |
677 LOG(INFO) << "Job paused: " << job.GetId(); | |
678 | |
679 boost::mutex::scoped_lock lock(mutex_); | |
680 CheckInvariants(); | |
681 assert(job.GetState() == JobState_Running); | |
682 | |
683 job.SetState(JobState_Paused); | |
684 | |
685 CheckInvariants(); | |
686 } | |
687 | |
688 | |
689 JobHandler* WaitPendingJob(unsigned int timeout) | |
690 { | |
691 boost::mutex::scoped_lock lock(mutex_); | |
692 | |
693 while (pendingJobs_.empty()) | |
694 { | |
695 if (timeout == 0) | |
696 { | |
697 pendingJobAvailable_.wait(lock); | |
698 } | |
699 else | |
700 { | |
701 bool success = pendingJobAvailable_.timed_wait | |
702 (lock, boost::posix_time::milliseconds(timeout)); | |
703 if (!success) | |
526 { | 704 { |
527 return true; | 705 return NULL; |
528 } | 706 } |
529 | 707 } |
530 copy.pop(); | 708 } |
531 } | 709 |
532 | 710 JobHandler* job = pendingJobs_.top(); |
533 return false; | 711 pendingJobs_.pop(); |
534 } | 712 |
535 | 713 job->SetState(JobState_Running); |
536 bool IsCompletedJob(const JobHandler& job) const | 714 return job; |
537 { | 715 } |
538 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | 716 |
539 it != completedJobs_.end(); ++it) | 717 |
540 { | 718 public: |
541 if (*it == &job) | 719 JobsRegistry() : |
542 { | 720 maxCompletedJobs_(10) |
543 return true; | 721 { |
544 } | 722 } |
545 } | 723 |
546 | 724 |
547 return false; | 725 ~JobsRegistry() |
548 } | 726 { |
549 | 727 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) |
550 bool IsRetryJob(JobHandler& job) const | 728 { |
551 { | 729 assert(it->second != NULL); |
552 return retryJobs_.find(&job) != retryJobs_.end(); | 730 delete it->second; |
553 } | 731 } |
554 #endif | 732 } |
555 | 733 |
556 | 734 |
557 void CheckInvariants() | 735 void SetMaxCompletedJobs(size_t i) |
558 { | 736 { |
559 #ifndef NDEBUG | 737 boost::mutex::scoped_lock lock(mutex_); |
560 { | 738 CheckInvariants(); |
561 PendingJobs copy = pendingJobs_; | 739 |
740 maxCompletedJobs_ = i; | |
741 ForgetOldCompletedJobs(); | |
742 | |
743 CheckInvariants(); | |
744 } | |
745 | |
746 | |
747 void ListJobs(std::set<std::string>& target) | |
748 { | |
749 boost::mutex::scoped_lock lock(mutex_); | |
750 CheckInvariants(); | |
751 | |
752 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
753 it != jobsIndex_.end(); ++it) | |
754 { | |
755 target.insert(it->first); | |
756 } | |
757 } | |
758 | |
759 | |
760 void Submit(std::string& id, | |
761 IJob* job, // Takes ownership | |
762 int priority) | |
763 { | |
764 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); | |
765 | |
766 boost::mutex::scoped_lock lock(mutex_); | |
767 CheckInvariants(); | |
768 | |
769 id = handler->GetId(); | |
770 | |
771 pendingJobs_.push(handler.get()); | |
772 pendingJobAvailable_.notify_one(); | |
773 | |
774 jobsIndex_.insert(std::make_pair(id, handler.release())); | |
775 | |
776 LOG(INFO) << "New job submitted: " << id; | |
777 | |
778 CheckInvariants(); | |
779 } | |
780 | |
781 | |
782 void Submit(IJob* job, // Takes ownership | |
783 int priority) | |
784 { | |
785 std::string id; | |
786 Submit(id, job, priority); | |
787 } | |
788 | |
789 | |
790 void SetPriority(const std::string& id, | |
791 int priority) | |
792 { | |
793 LOG(INFO) << "Changing priority to " << priority << " for job: " << id; | |
794 | |
795 boost::mutex::scoped_lock lock(mutex_); | |
796 CheckInvariants(); | |
797 | |
798 JobsIndex::iterator found = jobsIndex_.find(id); | |
799 | |
800 if (found == jobsIndex_.end()) | |
801 { | |
802 LOG(WARNING) << "Unknown job: " << id; | |
803 } | |
804 else | |
805 { | |
806 found->second->SetPriority(priority); | |
807 | |
808 if (found->second->GetState() == JobState_Pending) | |
809 { | |
810 // If the job is pending, we need to reconstruct the | |
811 // priority queue, as the heap condition has changed | |
812 | |
813 PendingJobs copy; | |
814 std::swap(copy, pendingJobs_); | |
815 | |
816 assert(pendingJobs_.empty()); | |
562 while (!copy.empty()) | 817 while (!copy.empty()) |
563 { | 818 { |
564 assert(copy.top()->GetState() == JobState_Pending); | 819 pendingJobs_.push(copy.top()); |
565 copy.pop(); | 820 copy.pop(); |
566 } | 821 } |
567 } | 822 } |
568 | 823 } |
569 assert(completedJobs_.size() <= maxCompletedJobs_); | 824 |
570 | 825 CheckInvariants(); |
571 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | 826 } |
572 it != completedJobs_.end(); ++it) | 827 |
573 { | 828 |
574 assert((*it)->GetState() == JobState_Success || | 829 void Pause(const std::string& id) |
575 (*it)->GetState() == JobState_Failure); | 830 { |
576 } | 831 LOG(INFO) << "Pausing job: " << id; |
577 | 832 |
578 for (RetryJobs::const_iterator it = retryJobs_.begin(); | 833 boost::mutex::scoped_lock lock(mutex_); |
579 it != retryJobs_.end(); ++it) | 834 CheckInvariants(); |
580 { | 835 |
581 assert((*it)->GetState() == JobState_Retry); | 836 JobsIndex::iterator found = jobsIndex_.find(id); |
582 } | 837 |
583 | 838 if (found == jobsIndex_.end()) |
584 for (JobsIndex::iterator it = jobsIndex_.begin(); | 839 { |
585 it != jobsIndex_.end(); ++it) | 840 LOG(WARNING) << "Unknown job: " << id; |
586 { | 841 } |
587 JobHandler& job = *it->second; | 842 else |
588 | 843 { |
589 assert(job.GetId() == it->first); | 844 switch (found->second->GetState()) |
590 | 845 { |
591 switch (job.GetState()) | 846 case JobState_Pending: |
592 { | |
593 case JobState_Pending: | |
594 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); | |
595 break; | |
596 | |
597 case JobState_Success: | |
598 case JobState_Failure: | |
599 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); | |
600 break; | |
601 | |
602 case JobState_Retry: | |
603 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
604 break; | |
605 | |
606 case JobState_Running: | |
607 case JobState_Paused: | |
608 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
609 break; | |
610 | |
611 default: | |
612 throw OrthancException(ErrorCode_InternalError); | |
613 } | |
614 } | |
615 #endif | |
616 } | |
617 | |
618 | |
619 void ForgetOldCompletedJobs() | |
620 { | |
621 if (maxCompletedJobs_ != 0) | |
622 { | |
623 while (completedJobs_.size() > maxCompletedJobs_) | |
624 { | |
625 assert(completedJobs_.front() != NULL); | |
626 | |
627 std::string id = completedJobs_.front()->GetId(); | |
628 | |
629 assert(jobsIndex_.find(id) != jobsIndex_.end()); | |
630 | |
631 jobsIndex_.erase(id); | |
632 delete(completedJobs_.front()); | |
633 completedJobs_.pop_front(); | |
634 } | |
635 } | |
636 } | |
637 | |
638 | |
639 void MarkRunningAsCompleted(JobHandler& job, | |
640 bool success) | |
641 { | |
642 boost::mutex::scoped_lock lock(mutex_); | |
643 CheckInvariants(); | |
644 assert(job.GetState() == JobState_Running); | |
645 | |
646 job.SetState(success ? JobState_Success : JobState_Failure); | |
647 | |
648 completedJobs_.push_back(&job); | |
649 ForgetOldCompletedJobs(); | |
650 | |
651 CheckInvariants(); | |
652 } | |
653 | |
654 | |
655 void MarkRunningAsRetry(JobHandler& job, | |
656 unsigned int timeout) | |
657 { | |
658 boost::mutex::scoped_lock lock(mutex_); | |
659 CheckInvariants(); | |
660 | |
661 assert(job.GetState() == JobState_Running && | |
662 retryJobs_.find(&job) == retryJobs_.end()); | |
663 | |
664 retryJobs_.insert(&job); | |
665 job.SetRetryState(timeout); | |
666 | |
667 CheckInvariants(); | |
668 } | |
669 | |
670 | |
671 void MarkRunningAsPaused(JobHandler& job) | |
672 { | |
673 boost::mutex::scoped_lock lock(mutex_); | |
674 CheckInvariants(); | |
675 assert(job.GetState() == JobState_Running); | |
676 | |
677 job.SetState(JobState_Paused); | |
678 | |
679 CheckInvariants(); | |
680 } | |
681 | |
682 | |
683 JobHandler* WaitPendingJob(unsigned int timeout) | |
684 { | |
685 boost::mutex::scoped_lock lock(mutex_); | |
686 | |
687 while (pendingJobs_.empty()) | |
688 { | |
689 if (timeout == 0) | |
690 { | |
691 pendingJobAvailable_.wait(lock); | |
692 } | |
693 else | |
694 { | |
695 bool success = pendingJobAvailable_.timed_wait | |
696 (lock, boost::posix_time::milliseconds(timeout)); | |
697 if (!success) | |
698 { | |
699 return NULL; | |
700 } | |
701 } | |
702 } | |
703 | |
704 JobHandler* job = pendingJobs_.top(); | |
705 pendingJobs_.pop(); | |
706 | |
707 job->SetState(JobState_Running); | |
708 return job; | |
709 } | |
710 | |
711 | |
712 public: | |
713 JobsMonitor() : | |
714 maxCompletedJobs_(10) | |
715 { | |
716 } | |
717 | |
718 | |
719 ~JobsMonitor() | |
720 { | |
721 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) | |
722 { | |
723 assert(it->second != NULL); | |
724 delete it->second; | |
725 } | |
726 } | |
727 | |
728 | |
729 void SetMaxCompletedJobs(size_t i) | |
730 { | |
731 boost::mutex::scoped_lock lock(mutex_); | |
732 CheckInvariants(); | |
733 | |
734 maxCompletedJobs_ = i; | |
735 ForgetOldCompletedJobs(); | |
736 | |
737 CheckInvariants(); | |
738 } | |
739 | |
740 | |
741 void ListJobs(std::set<std::string>& target) | |
742 { | |
743 boost::mutex::scoped_lock lock(mutex_); | |
744 CheckInvariants(); | |
745 | |
746 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
747 it != jobsIndex_.end(); ++it) | |
748 { | |
749 target.insert(it->first); | |
750 } | |
751 } | |
752 | |
753 | |
754 void Submit(std::string& id, | |
755 IJob* job, // Takes ownership | |
756 int priority) | |
757 { | |
758 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); | |
759 | |
760 boost::mutex::scoped_lock lock(mutex_); | |
761 CheckInvariants(); | |
762 | |
763 id = handler->GetId(); | |
764 pendingJobs_.push(handler.get()); | |
765 jobsIndex_.insert(std::make_pair(id, handler.release())); | |
766 | |
767 pendingJobAvailable_.notify_one(); | |
768 | |
769 CheckInvariants(); | |
770 } | |
771 | |
772 | |
773 void Submit(IJob* job, // Takes ownership | |
774 int priority) | |
775 { | |
776 std::string id; | |
777 Submit(id, job, priority); | |
778 } | |
779 | |
780 | |
781 void SetPriority(const std::string& id, | |
782 int priority) | |
783 { | |
784 boost::mutex::scoped_lock lock(mutex_); | |
785 CheckInvariants(); | |
786 | |
787 JobsIndex::iterator found = jobsIndex_.find(id); | |
788 | |
789 if (found == jobsIndex_.end()) | |
790 { | |
791 LOG(WARNING) << "Unknown job: " << id; | |
792 } | |
793 else | |
794 { | |
795 found->second->SetPriority(priority); | |
796 | |
797 if (found->second->GetState() == JobState_Pending) | |
798 { | 847 { |
799 // If the job is pending, we need to reconstruct the | 848 // If the job is pending, we need to reconstruct the |
800 // priority queue, as the heap condition has changed | 849 // priority queue to remove it |
801 | |
802 PendingJobs copy; | 850 PendingJobs copy; |
803 std::swap(copy, pendingJobs_); | 851 std::swap(copy, pendingJobs_); |
804 | 852 |
805 assert(pendingJobs_.empty()); | 853 assert(pendingJobs_.empty()); |
806 while (!copy.empty()) | 854 while (!copy.empty()) |
807 { | 855 { |
808 pendingJobs_.push(copy.top()); | 856 if (copy.top()->GetId() != id) |
857 { | |
858 pendingJobs_.push(copy.top()); | |
859 } | |
860 | |
809 copy.pop(); | 861 copy.pop(); |
810 } | 862 } |
863 | |
864 found->second->SetState(JobState_Paused); | |
865 | |
866 break; | |
811 } | 867 } |
812 } | 868 |
813 | 869 case JobState_Retry: |
814 CheckInvariants(); | 870 { |
815 } | 871 RetryJobs::iterator item = retryJobs_.find(found->second); |
816 | 872 assert(item != retryJobs_.end()); |
817 | 873 retryJobs_.erase(item); |
818 void Pause(const std::string& id) | 874 |
819 { | 875 found->second->SetState(JobState_Paused); |
820 boost::mutex::scoped_lock lock(mutex_); | 876 |
821 CheckInvariants(); | 877 break; |
822 | 878 } |
823 JobsIndex::iterator found = jobsIndex_.find(id); | 879 |
824 | 880 case JobState_Paused: |
825 if (found == jobsIndex_.end()) | 881 case JobState_Success: |
826 { | 882 case JobState_Failure: |
827 LOG(WARNING) << "Unknown job: " << id; | 883 // Nothing to be done |
884 break; | |
885 | |
886 case JobState_Running: | |
887 found->second->SchedulePause(); | |
888 break; | |
889 | |
890 default: | |
891 throw OrthancException(ErrorCode_InternalError); | |
892 } | |
893 } | |
894 | |
895 CheckInvariants(); | |
896 } | |
897 | |
898 | |
899 void Resume(const std::string& id) | |
900 { | |
901 LOG(INFO) << "Resuming job: " << id; | |
902 | |
903 boost::mutex::scoped_lock lock(mutex_); | |
904 CheckInvariants(); | |
905 | |
906 JobsIndex::iterator found = jobsIndex_.find(id); | |
907 | |
908 if (found == jobsIndex_.end()) | |
909 { | |
910 LOG(WARNING) << "Unknown job: " << id; | |
911 } | |
912 else if (found->second->GetState() != JobState_Paused) | |
913 { | |
914 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; | |
915 } | |
916 else | |
917 { | |
918 found->second->SetState(JobState_Pending); | |
919 pendingJobs_.push(found->second); | |
920 pendingJobAvailable_.notify_one(); | |
921 } | |
922 | |
923 CheckInvariants(); | |
924 } | |
925 | |
926 | |
927 void Resubmit(const std::string& id) | |
928 { | |
929 LOG(INFO) << "Resubmitting failed job: " << id; | |
930 | |
931 boost::mutex::scoped_lock lock(mutex_); | |
932 CheckInvariants(); | |
933 | |
934 JobsIndex::iterator found = jobsIndex_.find(id); | |
935 | |
936 if (found == jobsIndex_.end()) | |
937 { | |
938 LOG(WARNING) << "Unknown job: " << id; | |
939 } | |
940 else if (found->second->GetState() != JobState_Failure) | |
941 { | |
942 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; | |
943 } | |
944 else | |
945 { | |
946 bool ok = false; | |
947 for (CompletedJobs::iterator it = completedJobs_.begin(); | |
948 it != completedJobs_.end(); ++it) | |
949 { | |
950 if (*it == found->second) | |
951 { | |
952 ok = true; | |
953 completedJobs_.erase(it); | |
954 break; | |
955 } | |
956 } | |
957 | |
958 assert(ok); | |
959 | |
960 found->second->SetState(JobState_Pending); | |
961 pendingJobs_.push(found->second); | |
962 pendingJobAvailable_.notify_one(); | |
963 } | |
964 | |
965 CheckInvariants(); | |
966 } | |
967 | |
968 | |
969 void ScheduleRetries() | |
970 { | |
971 boost::mutex::scoped_lock lock(mutex_); | |
972 CheckInvariants(); | |
973 | |
974 RetryJobs copy; | |
975 std::swap(copy, retryJobs_); | |
976 | |
977 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
978 | |
979 assert(retryJobs_.empty()); | |
980 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) | |
981 { | |
982 if ((*it)->IsRetryReady(now)) | |
983 { | |
984 LOG(INFO) << "Retrying job: " << (*it)->GetId(); | |
985 (*it)->SetState(JobState_Pending); | |
986 pendingJobs_.push(*it); | |
987 pendingJobAvailable_.notify_one(); | |
828 } | 988 } |
829 else | 989 else |
830 { | 990 { |
831 switch (found->second->GetState()) | 991 retryJobs_.insert(*it); |
992 } | |
993 } | |
994 | |
995 CheckInvariants(); | |
996 } | |
997 | |
998 | |
999 bool GetState(JobState& state, | |
1000 const std::string& id) | |
1001 { | |
1002 boost::mutex::scoped_lock lock(mutex_); | |
1003 CheckInvariants(); | |
1004 | |
1005 JobsIndex::const_iterator it = jobsIndex_.find(id); | |
1006 if (it == jobsIndex_.end()) | |
1007 { | |
1008 return false; | |
1009 } | |
1010 else | |
1011 { | |
1012 state = it->second->GetState(); | |
1013 return true; | |
1014 } | |
1015 } | |
1016 | |
1017 | |
1018 class RunningJob : public boost::noncopyable | |
1019 { | |
1020 private: | |
1021 JobsRegistry& that_; | |
1022 JobHandler* handler_; | |
1023 JobState targetState_; | |
1024 unsigned int retryTimeout_; | |
1025 | |
1026 public: | |
1027 RunningJob(JobsRegistry& that, | |
1028 unsigned int timeout) : | |
1029 that_(that), | |
1030 handler_(NULL), | |
1031 targetState_(JobState_Failure), | |
1032 retryTimeout_(0) | |
1033 { | |
1034 handler_ = that_.WaitPendingJob(timeout); | |
1035 } | |
1036 | |
1037 ~RunningJob() | |
1038 { | |
1039 if (IsValid()) | |
1040 { | |
1041 switch (targetState_) | |
832 { | 1042 { |
833 case JobState_Pending: | 1043 case JobState_Failure: |
834 { | 1044 that_.MarkRunningAsCompleted(*handler_, false); |
835 // If the job is pending, we need to reconstruct the | |
836 // priority queue to remove it | |
837 PendingJobs copy; | |
838 std::swap(copy, pendingJobs_); | |
839 | |
840 assert(pendingJobs_.empty()); | |
841 while (!copy.empty()) | |
842 { | |
843 if (copy.top()->GetId() != id) | |
844 { | |
845 pendingJobs_.push(copy.top()); | |
846 } | |
847 | |
848 copy.pop(); | |
849 } | |
850 | |
851 found->second->SetState(JobState_Paused); | |
852 | |
853 break; | 1045 break; |
854 } | 1046 |
1047 case JobState_Success: | |
1048 that_.MarkRunningAsCompleted(*handler_, true); | |
1049 break; | |
1050 | |
1051 case JobState_Paused: | |
1052 that_.MarkRunningAsPaused(*handler_); | |
1053 break; | |
855 | 1054 |
856 case JobState_Retry: | 1055 case JobState_Retry: |
857 { | 1056 that_.MarkRunningAsRetry(*handler_, retryTimeout_); |
858 RetryJobs::iterator item = retryJobs_.find(found->second); | |
859 assert(item != retryJobs_.end()); | |
860 retryJobs_.erase(item); | |
861 | |
862 found->second->SetState(JobState_Paused); | |
863 | |
864 break; | 1057 break; |
865 } | 1058 |
866 | |
867 case JobState_Paused: | |
868 case JobState_Success: | |
869 case JobState_Failure: | |
870 // Nothing to be done | |
871 break; | |
872 | |
873 case JobState_Running: | |
874 found->second->SchedulePause(); | |
875 break; | |
876 | |
877 default: | 1059 default: |
878 throw OrthancException(ErrorCode_InternalError); | 1060 assert(0); |
879 } | 1061 } |
880 } | 1062 } |
881 | 1063 } |
882 CheckInvariants(); | 1064 |
883 } | 1065 bool IsValid() const |
884 | 1066 { |
885 | 1067 return handler_ != NULL; |
886 void Resume(const std::string& id) | 1068 } |
887 { | 1069 |
888 boost::mutex::scoped_lock lock(mutex_); | 1070 const std::string& GetId() const |
889 CheckInvariants(); | 1071 { |
890 | 1072 if (IsValid()) |
891 JobsIndex::iterator found = jobsIndex_.find(id); | 1073 { |
892 | 1074 return handler_->GetId(); |
893 if (found == jobsIndex_.end()) | |
894 { | |
895 LOG(WARNING) << "Unknown job: " << id; | |
896 } | |
897 else if (found->second->GetState() != JobState_Paused) | |
898 { | |
899 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; | |
900 } | 1075 } |
901 else | 1076 else |
902 { | 1077 { |
903 found->second->SetState(JobState_Pending); | 1078 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
904 pendingJobs_.push(found->second); | 1079 } |
905 pendingJobAvailable_.notify_one(); | 1080 } |
906 } | 1081 |
907 | 1082 int GetPriority() const |
908 CheckInvariants(); | 1083 { |
909 } | 1084 if (IsValid()) |
910 | 1085 { |
911 | 1086 return handler_->GetPriority(); |
912 void Resubmit(const std::string& id) | |
913 { | |
914 boost::mutex::scoped_lock lock(mutex_); | |
915 CheckInvariants(); | |
916 | |
917 JobsIndex::iterator found = jobsIndex_.find(id); | |
918 | |
919 if (found == jobsIndex_.end()) | |
920 { | |
921 LOG(WARNING) << "Unknown job: " << id; | |
922 } | |
923 else if (found->second->GetState() != JobState_Failure) | |
924 { | |
925 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; | |
926 } | 1087 } |
927 else | 1088 else |
928 { | 1089 { |
929 bool ok = false; | 1090 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
930 for (CompletedJobs::iterator it = completedJobs_.begin(); | 1091 } |
931 it != completedJobs_.end(); ++it) | 1092 } |
932 { | 1093 |
933 if (*it == found->second) | 1094 bool IsPauseScheduled() |
934 { | 1095 { |
935 ok = true; | 1096 if (!IsValid()) |
936 completedJobs_.erase(it); | 1097 { |
937 break; | 1098 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
938 } | 1099 } |
939 } | 1100 |
940 | 1101 boost::mutex::scoped_lock lock(that_.mutex_); |
941 assert(ok); | 1102 that_.CheckInvariants(); |
942 | 1103 assert(handler_->GetState() == JobState_Running); |
943 found->second->SetState(JobState_Pending); | 1104 |
944 pendingJobs_.push(found->second); | 1105 return handler_->IsPauseScheduled(); |
945 pendingJobAvailable_.notify_one(); | 1106 } |
946 } | 1107 |
947 | 1108 IJob& GetJob() |
948 CheckInvariants(); | 1109 { |
949 } | 1110 if (!IsValid()) |
950 | 1111 { |
951 | 1112 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
952 void ScheduleRetries() | 1113 } |
953 { | 1114 |
954 boost::mutex::scoped_lock lock(mutex_); | 1115 boost::mutex::scoped_lock lock(that_.mutex_); |
955 CheckInvariants(); | 1116 that_.CheckInvariants(); |
956 | 1117 assert(handler_->GetState() == JobState_Running); |
957 RetryJobs copy; | 1118 |
958 std::swap(copy, retryJobs_); | 1119 return handler_->GetJob(); |
959 | 1120 } |
960 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | 1121 |
961 | 1122 void MarkSuccess() |
962 assert(retryJobs_.empty()); | 1123 { |
963 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) | 1124 if (!IsValid()) |
964 { | 1125 { |
965 if ((*it)->IsRetryReady(now)) | 1126 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
966 { | 1127 } |
967 (*it)->SetState(JobState_Pending); | 1128 |
968 } | 1129 targetState_ = JobState_Success; |
969 else | 1130 } |
970 { | 1131 |
971 retryJobs_.insert(*it); | 1132 void MarkFailure() |
972 } | 1133 { |
973 } | 1134 if (!IsValid()) |
974 | 1135 { |
975 CheckInvariants(); | 1136 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
976 } | 1137 } |
977 | 1138 |
978 | 1139 targetState_ = JobState_Failure; |
979 bool GetState(JobState& state, | 1140 } |
980 const std::string& id) | 1141 |
981 { | 1142 void SchedulePause() |
982 boost::mutex::scoped_lock lock(mutex_); | 1143 { |
983 CheckInvariants(); | 1144 if (!IsValid()) |
984 | 1145 { |
985 JobsIndex::const_iterator it = jobsIndex_.find(id); | 1146 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
986 if (it == jobsIndex_.end()) | 1147 } |
987 { | 1148 |
988 return false; | 1149 targetState_ = JobState_Paused; |
989 } | 1150 } |
990 else | 1151 |
991 { | 1152 void MarkRetry(unsigned int timeout) |
992 state = it->second->GetState(); | 1153 { |
993 return true; | 1154 if (!IsValid()) |
994 } | 1155 { |
995 } | 1156 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
996 | 1157 } |
997 | 1158 |
998 class RunningJob : public boost::noncopyable | 1159 targetState_ = JobState_Retry; |
999 { | 1160 retryTimeout_ = timeout; |
1000 private: | 1161 } |
1001 JobsMonitor& that_; | |
1002 JobHandler* handler_; | |
1003 JobState targetState_; | |
1004 unsigned int retryTimeout_; | |
1005 | |
1006 public: | |
1007 RunningJob(JobsMonitor& that, | |
1008 unsigned int timeout) : | |
1009 that_(that), | |
1010 handler_(NULL), | |
1011 targetState_(JobState_Failure), | |
1012 retryTimeout_(0) | |
1013 { | |
1014 handler_ = that_.WaitPendingJob(timeout); | |
1015 } | |
1016 | |
1017 ~RunningJob() | |
1018 { | |
1019 if (IsValid()) | |
1020 { | |
1021 switch (targetState_) | |
1022 { | |
1023 case JobState_Failure: | |
1024 that_.MarkRunningAsCompleted(*handler_, false); | |
1025 break; | |
1026 | |
1027 case JobState_Success: | |
1028 that_.MarkRunningAsCompleted(*handler_, true); | |
1029 break; | |
1030 | |
1031 case JobState_Paused: | |
1032 that_.MarkRunningAsPaused(*handler_); | |
1033 break; | |
1034 | |
1035 case JobState_Retry: | |
1036 that_.MarkRunningAsRetry(*handler_, retryTimeout_); | |
1037 break; | |
1038 | |
1039 default: | |
1040 assert(0); | |
1041 } | |
1042 } | |
1043 } | |
1044 | |
1045 bool IsValid() const | |
1046 { | |
1047 return handler_ != NULL; | |
1048 } | |
1049 | |
1050 const std::string& GetId() const | |
1051 { | |
1052 if (IsValid()) | |
1053 { | |
1054 return handler_->GetId(); | |
1055 } | |
1056 else | |
1057 { | |
1058 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1059 } | |
1060 } | |
1061 | |
1062 int GetPriority() const | |
1063 { | |
1064 if (IsValid()) | |
1065 { | |
1066 return handler_->GetPriority(); | |
1067 } | |
1068 else | |
1069 { | |
1070 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1071 } | |
1072 } | |
1073 | |
1074 bool IsPauseScheduled() | |
1075 { | |
1076 if (!IsValid()) | |
1077 { | |
1078 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1079 } | |
1080 | |
1081 boost::mutex::scoped_lock lock(that_.mutex_); | |
1082 that_.CheckInvariants(); | |
1083 assert(handler_->GetState() == JobState_Running); | |
1084 | |
1085 return handler_->IsPauseScheduled(); | |
1086 } | |
1087 | |
1088 IJob& GetJob() | |
1089 { | |
1090 if (!IsValid()) | |
1091 { | |
1092 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1093 } | |
1094 | |
1095 boost::mutex::scoped_lock lock(that_.mutex_); | |
1096 that_.CheckInvariants(); | |
1097 assert(handler_->GetState() == JobState_Running); | |
1098 | |
1099 return handler_->GetJob(); | |
1100 } | |
1101 | |
1102 void MarkSuccess() | |
1103 { | |
1104 if (!IsValid()) | |
1105 { | |
1106 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1107 } | |
1108 | |
1109 targetState_ = JobState_Success; | |
1110 } | |
1111 | |
1112 void MarkFailure() | |
1113 { | |
1114 if (!IsValid()) | |
1115 { | |
1116 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1117 } | |
1118 | |
1119 targetState_ = JobState_Failure; | |
1120 } | |
1121 | |
1122 void MarkPause() | |
1123 { | |
1124 if (!IsValid()) | |
1125 { | |
1126 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1127 } | |
1128 | |
1129 targetState_ = JobState_Paused; | |
1130 } | |
1131 | |
1132 void MarkRetry(unsigned int timeout) | |
1133 { | |
1134 if (!IsValid()) | |
1135 { | |
1136 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1137 } | |
1138 | |
1139 targetState_ = JobState_Retry; | |
1140 retryTimeout_ = timeout; | |
1141 } | |
1142 }; | |
1143 }; | 1162 }; |
1163 }; | |
1144 } | 1164 } |
1145 | 1165 |
1146 | 1166 |
1147 | 1167 |
1148 class DummyJob : public Orthanc::IJob | 1168 class DummyJob : public Orthanc::IJob |
1179 { | 1199 { |
1180 } | 1200 } |
1181 }; | 1201 }; |
1182 | 1202 |
1183 | 1203 |
1184 static bool CheckState(Orthanc::JobsMonitor& monitor, | 1204 static bool CheckState(Orthanc::JobsRegistry& registry, |
1185 const std::string& id, | 1205 const std::string& id, |
1186 Orthanc::JobState state) | 1206 Orthanc::JobState state) |
1187 { | 1207 { |
1188 Orthanc::JobState s; | 1208 Orthanc::JobState s; |
1189 if (monitor.GetState(s, id)) | 1209 if (registry.GetState(s, id)) |
1190 { | 1210 { |
1191 return state == s; | 1211 return state == s; |
1192 } | 1212 } |
1193 else | 1213 else |
1194 { | 1214 { |
1195 return false; | 1215 return false; |
1196 } | 1216 } |
1197 } | 1217 } |
1198 | 1218 |
1199 | 1219 |
1200 TEST(JobsMonitor, Priority) | 1220 TEST(JobsRegistry, Priority) |
1201 { | 1221 { |
1202 JobsMonitor monitor; | 1222 JobsRegistry registry; |
1203 | 1223 |
1204 std::string i1, i2, i3, i4; | 1224 std::string i1, i2, i3, i4; |
1205 monitor.Submit(i1, new DummyJob(), 10); | 1225 registry.Submit(i1, new DummyJob(), 10); |
1206 monitor.Submit(i2, new DummyJob(), 30); | 1226 registry.Submit(i2, new DummyJob(), 30); |
1207 monitor.Submit(i3, new DummyJob(), 20); | 1227 registry.Submit(i3, new DummyJob(), 20); |
1208 monitor.Submit(i4, new DummyJob(), 5); | 1228 registry.Submit(i4, new DummyJob(), 5); |
1209 | 1229 |
1210 monitor.SetMaxCompletedJobs(2); | 1230 registry.SetMaxCompletedJobs(2); |
1211 | 1231 |
1212 std::set<std::string> id; | 1232 std::set<std::string> id; |
1213 monitor.ListJobs(id); | 1233 registry.ListJobs(id); |
1214 | 1234 |
1215 ASSERT_EQ(4u, id.size()); | 1235 ASSERT_EQ(4u, id.size()); |
1216 ASSERT_TRUE(id.find(i1) != id.end()); | 1236 ASSERT_TRUE(id.find(i1) != id.end()); |
1217 ASSERT_TRUE(id.find(i2) != id.end()); | 1237 ASSERT_TRUE(id.find(i2) != id.end()); |
1218 ASSERT_TRUE(id.find(i3) != id.end()); | 1238 ASSERT_TRUE(id.find(i3) != id.end()); |
1219 ASSERT_TRUE(id.find(i4) != id.end()); | 1239 ASSERT_TRUE(id.find(i4) != id.end()); |
1220 | 1240 |
1221 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Pending)); | 1241 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending)); |
1222 | 1242 |
1223 { | 1243 { |
1224 JobsMonitor::RunningJob job(monitor, 0); | 1244 JobsRegistry::RunningJob job(registry, 0); |
1225 ASSERT_TRUE(job.IsValid()); | 1245 ASSERT_TRUE(job.IsValid()); |
1226 ASSERT_EQ(30, job.GetPriority()); | 1246 ASSERT_EQ(30, job.GetPriority()); |
1227 ASSERT_EQ(i2, job.GetId()); | 1247 ASSERT_EQ(i2, job.GetId()); |
1228 | 1248 |
1229 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Running)); | 1249 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running)); |
1230 } | 1250 } |
1231 | 1251 |
1232 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Failure)); | 1252 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Failure)); |
1233 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Pending)); | 1253 ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Pending)); |
1234 | 1254 |
1235 { | 1255 { |
1236 JobsMonitor::RunningJob job(monitor, 0); | 1256 JobsRegistry::RunningJob job(registry, 0); |
1237 ASSERT_TRUE(job.IsValid()); | 1257 ASSERT_TRUE(job.IsValid()); |
1238 ASSERT_EQ(20, job.GetPriority()); | 1258 ASSERT_EQ(20, job.GetPriority()); |
1239 ASSERT_EQ(i3, job.GetId()); | 1259 ASSERT_EQ(i3, job.GetId()); |
1240 | 1260 |
1241 job.MarkSuccess(); | 1261 job.MarkSuccess(); |
1242 | 1262 |
1243 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Running)); | 1263 ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Running)); |
1244 } | 1264 } |
1245 | 1265 |
1246 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Success)); | 1266 ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Success)); |
1247 | 1267 |
1248 { | 1268 { |
1249 JobsMonitor::RunningJob job(monitor, 0); | 1269 JobsRegistry::RunningJob job(registry, 0); |
1250 ASSERT_TRUE(job.IsValid()); | 1270 ASSERT_TRUE(job.IsValid()); |
1251 ASSERT_EQ(10, job.GetPriority()); | 1271 ASSERT_EQ(10, job.GetPriority()); |
1252 ASSERT_EQ(i1, job.GetId()); | 1272 ASSERT_EQ(i1, job.GetId()); |
1253 } | 1273 } |
1254 | 1274 |
1255 { | 1275 { |
1256 JobsMonitor::RunningJob job(monitor, 0); | 1276 JobsRegistry::RunningJob job(registry, 0); |
1257 ASSERT_TRUE(job.IsValid()); | 1277 ASSERT_TRUE(job.IsValid()); |
1258 ASSERT_EQ(5, job.GetPriority()); | 1278 ASSERT_EQ(5, job.GetPriority()); |
1259 ASSERT_EQ(i4, job.GetId()); | 1279 ASSERT_EQ(i4, job.GetId()); |
1260 } | 1280 } |
1261 | 1281 |
1262 { | 1282 { |
1263 JobsMonitor::RunningJob job(monitor, 1); | 1283 JobsRegistry::RunningJob job(registry, 1); |
1264 ASSERT_FALSE(job.IsValid()); | 1284 ASSERT_FALSE(job.IsValid()); |
1265 } | 1285 } |
1266 | 1286 |
1267 Orthanc::JobState s; | 1287 Orthanc::JobState s; |
1268 ASSERT_TRUE(monitor.GetState(s, i1)); | 1288 ASSERT_TRUE(registry.GetState(s, i1)); |
1269 ASSERT_FALSE(monitor.GetState(s, i2)); // Removed because oldest | 1289 ASSERT_FALSE(registry.GetState(s, i2)); // Removed because oldest |
1270 ASSERT_FALSE(monitor.GetState(s, i3)); // Removed because second oldest | 1290 ASSERT_FALSE(registry.GetState(s, i3)); // Removed because second oldest |
1271 ASSERT_TRUE(monitor.GetState(s, i4)); | 1291 ASSERT_TRUE(registry.GetState(s, i4)); |
1272 | 1292 |
1273 monitor.SetMaxCompletedJobs(1); // (*) | 1293 registry.SetMaxCompletedJobs(1); // (*) |
1274 ASSERT_FALSE(monitor.GetState(s, i1)); // Just discarded by (*) | 1294 ASSERT_FALSE(registry.GetState(s, i1)); // Just discarded by (*) |
1275 ASSERT_TRUE(monitor.GetState(s, i4)); | 1295 ASSERT_TRUE(registry.GetState(s, i4)); |
1276 } | 1296 } |
1277 | 1297 |
1278 | 1298 |
1279 TEST(JobsMonitor, Resubmit) | 1299 TEST(JobsRegistry, Simultaneous) |
1280 { | 1300 { |
1281 JobsMonitor monitor; | 1301 JobsRegistry registry; |
1302 | |
1303 std::string i1, i2; | |
1304 registry.Submit(i1, new DummyJob(), 20); | |
1305 registry.Submit(i2, new DummyJob(), 10); | |
1306 | |
1307 ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Pending)); | |
1308 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending)); | |
1309 | |
1310 { | |
1311 JobsRegistry::RunningJob job1(registry, 0); | |
1312 JobsRegistry::RunningJob job2(registry, 0); | |
1313 | |
1314 ASSERT_TRUE(job1.IsValid()); | |
1315 ASSERT_TRUE(job2.IsValid()); | |
1316 | |
1317 job1.MarkFailure(); | |
1318 job2.MarkSuccess(); | |
1319 | |
1320 ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Running)); | |
1321 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running)); | |
1322 } | |
1323 | |
1324 ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Failure)); | |
1325 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Success)); | |
1326 } | |
1327 | |
1328 | |
1329 TEST(JobsRegistry, Resubmit) | |
1330 { | |
1331 JobsRegistry registry; | |
1282 | 1332 |
1283 std::string id; | 1333 std::string id; |
1284 monitor.Submit(id, new DummyJob(), 10); | 1334 registry.Submit(id, new DummyJob(), 10); |
1285 | 1335 |
1286 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); | 1336 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); |
1287 | 1337 |
1288 monitor.Resubmit(id); | 1338 registry.Resubmit(id); |
1289 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); | 1339 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); |
1290 | 1340 |
1291 { | 1341 { |
1292 JobsMonitor::RunningJob job(monitor, 0); | 1342 JobsRegistry::RunningJob job(registry, 0); |
1293 ASSERT_TRUE(job.IsValid()); | 1343 ASSERT_TRUE(job.IsValid()); |
1294 job.MarkFailure(); | 1344 job.MarkFailure(); |
1295 | 1345 |
1296 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); | 1346 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); |
1297 | 1347 |
1298 monitor.Resubmit(id); | 1348 registry.Resubmit(id); |
1299 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); | 1349 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); |
1300 } | 1350 } |
1301 | 1351 |
1302 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Failure)); | 1352 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure)); |
1303 | 1353 |
1304 monitor.Resubmit(id); | 1354 registry.Resubmit(id); |
1305 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); | 1355 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); |
1306 | 1356 |
1307 { | 1357 { |
1308 JobsMonitor::RunningJob job(monitor, 0); | 1358 JobsRegistry::RunningJob job(registry, 0); |
1309 ASSERT_TRUE(job.IsValid()); | 1359 ASSERT_TRUE(job.IsValid()); |
1310 ASSERT_EQ(id, job.GetId()); | 1360 ASSERT_EQ(id, job.GetId()); |
1311 | 1361 |
1312 job.MarkSuccess(); | 1362 job.MarkSuccess(); |
1313 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); | 1363 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); |
1314 } | 1364 } |
1315 | 1365 |
1316 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Success)); | 1366 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); |
1317 } | 1367 |
1368 registry.Resubmit(id); | |
1369 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); | |
1370 } | |
1371 | |
1372 | |
1373 TEST(JobsRegistry, Retry) | |
1374 { | |
1375 JobsRegistry registry; | |
1376 | |
1377 std::string id; | |
1378 registry.Submit(id, new DummyJob(), 10); | |
1379 | |
1380 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1381 | |
1382 { | |
1383 JobsRegistry::RunningJob job(registry, 0); | |
1384 ASSERT_TRUE(job.IsValid()); | |
1385 job.MarkRetry(0); | |
1386 | |
1387 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1388 } | |
1389 | |
1390 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); | |
1391 | |
1392 registry.Resubmit(id); | |
1393 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); | |
1394 | |
1395 registry.ScheduleRetries(); | |
1396 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1397 | |
1398 { | |
1399 JobsRegistry::RunningJob job(registry, 0); | |
1400 ASSERT_TRUE(job.IsValid()); | |
1401 job.MarkSuccess(); | |
1402 | |
1403 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1404 } | |
1405 | |
1406 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); | |
1407 } | |
1408 | |
1409 | |
1410 TEST(JobsRegistry, PausePending) | |
1411 { | |
1412 JobsRegistry registry; | |
1413 | |
1414 std::string id; | |
1415 registry.Submit(id, new DummyJob(), 10); | |
1416 | |
1417 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1418 | |
1419 registry.Pause(id); | |
1420 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1421 | |
1422 registry.Pause(id); | |
1423 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1424 | |
1425 registry.Resubmit(id); | |
1426 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1427 | |
1428 registry.Resume(id); | |
1429 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1430 } | |
1431 | |
1432 | |
1433 TEST(JobsRegistry, PauseRunning) | |
1434 { | |
1435 JobsRegistry registry; | |
1436 | |
1437 std::string id; | |
1438 registry.Submit(id, new DummyJob(), 10); | |
1439 | |
1440 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1441 | |
1442 { | |
1443 JobsRegistry::RunningJob job(registry, 0); | |
1444 ASSERT_TRUE(job.IsValid()); | |
1445 | |
1446 registry.Resubmit(id); | |
1447 job.SchedulePause(); | |
1448 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1449 } | |
1450 | |
1451 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1452 | |
1453 registry.Resubmit(id); | |
1454 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1455 | |
1456 registry.Resume(id); | |
1457 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1458 | |
1459 { | |
1460 JobsRegistry::RunningJob job(registry, 0); | |
1461 ASSERT_TRUE(job.IsValid()); | |
1462 | |
1463 job.MarkSuccess(); | |
1464 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1465 } | |
1466 | |
1467 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); | |
1468 } | |
1469 | |
1470 | |
1471 TEST(JobsRegistry, PauseRetry) | |
1472 { | |
1473 JobsRegistry registry; | |
1474 | |
1475 std::string id; | |
1476 registry.Submit(id, new DummyJob(), 10); | |
1477 | |
1478 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1479 | |
1480 { | |
1481 JobsRegistry::RunningJob job(registry, 0); | |
1482 ASSERT_TRUE(job.IsValid()); | |
1483 | |
1484 job.MarkRetry(0); | |
1485 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1486 } | |
1487 | |
1488 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); | |
1489 | |
1490 registry.Pause(id); | |
1491 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1492 | |
1493 registry.Resume(id); | |
1494 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1495 | |
1496 { | |
1497 JobsRegistry::RunningJob job(registry, 0); | |
1498 ASSERT_TRUE(job.IsValid()); | |
1499 | |
1500 job.MarkSuccess(); | |
1501 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1502 } | |
1503 | |
1504 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); | |
1505 } |