comparison UnitTestsSources/MultiThreadingTests.cpp @ 2563:98dfc1948d00 jobs

RunningJob::ExecuteStep()
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 04 May 2018 17:47:02 +0200
parents 1e66fe3ddf9f
children f8681f251caa
comparison
equal deleted inserted replaced
2562:1e66fe3ddf9f 2563:98dfc1948d00
304 private: 304 private:
305 JobStepCode status_; 305 JobStepCode status_;
306 306
307 public: 307 public:
308 explicit JobStepResult(JobStepCode status) : 308 explicit JobStepResult(JobStepCode status) :
309 status_(status) 309 status_(status)
310 { 310 {
311 } 311 }
312 312
313 virtual ~JobStepResult() 313 virtual ~JobStepResult()
314 { 314 {
326 private: 326 private:
327 unsigned int timeout_; // Retry after "timeout_" milliseconds 327 unsigned int timeout_; // Retry after "timeout_" milliseconds
328 328
329 public: 329 public:
330 RetryResult(unsigned int timeout) : 330 RetryResult(unsigned int timeout) :
331 JobStepResult(JobStepCode_Retry), 331 JobStepResult(JobStepCode_Retry),
332 timeout_(timeout) 332 timeout_(timeout)
333 { 333 {
334 } 334 }
335 335
336 unsigned int GetTimeout() const 336 unsigned int GetTimeout() const
337 { 337 {
351 351
352 virtual void ReleaseResources() = 0; // For pausing jobs 352 virtual void ReleaseResources() = 0; // For pausing jobs
353 353
354 virtual float GetProgress() = 0; 354 virtual float GetProgress() = 0;
355 355
356 virtual void FormatStatus(Json::Value& value) = 0; 356 virtual void GetDescription(Json::Value& value) = 0;
357 }; 357 };
358 358
359 359
360 struct JobStatus 360 class JobStatus
361 { 361 {
362 private:
362 ErrorCode errorCode_; 363 ErrorCode errorCode_;
363 float progress_; 364 float progress_;
364 Json::Value description_; 365 Json::Value description_;
365 366
367 public:
366 JobStatus() : 368 JobStatus() :
367 errorCode_(ErrorCode_Success), 369 errorCode_(ErrorCode_Success),
368 progress_(0), 370 progress_(0),
369 description_(Json::objectValue) 371 description_(Json::objectValue)
370 { 372 {
371 } 373 }
372 374
373 JobStatus(ErrorCode code, 375 JobStatus(ErrorCode code,
374 float progress) : 376 IJob& job) :
375 errorCode_(code), 377 errorCode_(code),
376 progress_(progress), 378 progress_(job.GetProgress())
377 description_(Json::objectValue) 379 {
378 { 380 if (progress_ < 0 ||
379 if (progress < 0 || 381 progress_ > 1)
380 progress > 1)
381 { 382 {
382 throw OrthancException(ErrorCode_ParameterOutOfRange); 383 throw OrthancException(ErrorCode_ParameterOutOfRange);
383 } 384 }
385
386 job.GetDescription(description_);
387 }
388
389 ErrorCode GetErrorCode() const
390 {
391 return errorCode_;
392 }
393
394 float GetProgress() const
395 {
396 return progress_;
397 }
398
399 const Json::Value& GetDescription() const
400 {
401 return description_;
384 } 402 }
385 }; 403 };
386 404
387 405
388 class JobInfo 406 class JobInfo
412 creationTime_(creationTime), 430 creationTime_(creationTime),
413 runtime_(runtime), 431 runtime_(runtime),
414 status_(status) 432 status_(status)
415 { 433 {
416 float ms = static_cast<float>(runtime_.total_milliseconds()); 434 float ms = static_cast<float>(runtime_.total_milliseconds());
417 float remaining = boost::math::llround(1.0f - status_.progress_) * ms; 435 float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms;
418 eta_ = infoTime_ + boost::posix_time::milliseconds(remaining); 436 eta_ = infoTime_ + boost::posix_time::milliseconds(remaining);
419 } 437 }
420 438
421 const std::string& GetIdentifier() const 439 const std::string& GetIdentifier() const
422 { 440 {
599 { 617 {
600 return retryTime_ <= now; 618 return retryTime_ <= now;
601 } 619 }
602 } 620 }
603 621
604 JobStatus& GetLastStatus() 622 const JobStatus& GetLastStatus() const
605 { 623 {
606 return lastStatus_; 624 return lastStatus_;
607 } 625 }
608 626
609 const JobStatus& GetLastStatus() const 627 void SetLastStatus(const JobStatus& status)
610 { 628 {
611 return lastStatus_; 629 lastStatus_ = status;
612 } 630 }
613 }; 631 };
614 632
615 633
616 class JobsRegistry : public boost::noncopyable 634 class JobsRegistry : public boost::noncopyable
624 return a->GetPriority() < b->GetPriority(); 642 return a->GetPriority() < b->GetPriority();
625 } 643 }
626 }; 644 };
627 645
628 typedef std::map<std::string, JobHandler*> JobsIndex; 646 typedef std::map<std::string, JobHandler*> JobsIndex;
629 typedef std::list<const JobHandler*> CompletedJobs; 647 typedef std::list<JobHandler*> CompletedJobs;
630 typedef std::set<JobHandler*> RetryJobs; 648 typedef std::set<JobHandler*> RetryJobs;
631 typedef std::priority_queue<JobHandler*, 649 typedef std::priority_queue<JobHandler*,
632 std::vector<JobHandler*>, // Could be a "std::deque" 650 std::vector<JobHandler*>, // Could be a "std::deque"
633 PriorityComparator> PendingJobs; 651 PriorityComparator> PendingJobs;
634 652
635 boost::mutex mutex_; 653 boost::mutex mutex_;
636 JobsIndex jobsIndex_; 654 JobsIndex jobsIndex_;
637 PendingJobs pendingJobs_; 655 PendingJobs pendingJobs_;
638 CompletedJobs completedJobs_; 656 CompletedJobs completedJobs_;
639 RetryJobs retryJobs_; 657 RetryJobs retryJobs_;
640 658
641 boost::condition_variable pendingJobAvailable_; 659 boost::condition_variable pendingJobAvailable_;
642 size_t maxCompletedJobs_; 660 size_t maxCompletedJobs_;
643 661
644 662
645 #ifndef NDEBUG 663 #ifndef NDEBUG
646 bool IsPendingJob(const JobHandler& job) const 664 bool IsPendingJob(const JobHandler& job) const
647 {
648 PendingJobs copy = pendingJobs_;
649 while (!copy.empty())
650 {
651 if (copy.top() == &job)
652 {
653 return true;
654 }
655
656 copy.pop();
657 }
658
659 return false;
660 }
661
662 bool IsCompletedJob(const JobHandler& job) const
663 {
664 for (CompletedJobs::const_iterator it = completedJobs_.begin();
665 it != completedJobs_.end(); ++it)
666 {
667 if (*it == &job)
668 {
669 return true;
670 }
671 }
672
673 return false;
674 }
675
676 bool IsRetryJob(JobHandler& job) const
677 {
678 return retryJobs_.find(&job) != retryJobs_.end();
679 }
680 #endif
681
682
683 void CheckInvariants()
684 {
685 #ifndef NDEBUG
686 { 665 {
687 PendingJobs copy = pendingJobs_; 666 PendingJobs copy = pendingJobs_;
688 while (!copy.empty()) 667 while (!copy.empty())
689 { 668 {
690 assert(copy.top()->GetState() == JobState_Pending); 669 if (copy.top() == &job)
670 {
671 return true;
672 }
673
691 copy.pop(); 674 copy.pop();
692 } 675 }
693 } 676
694 677 return false;
695 assert(completedJobs_.size() <= maxCompletedJobs_); 678 }
696 679
697 for (CompletedJobs::const_iterator it = completedJobs_.begin(); 680 bool IsCompletedJob(JobHandler& job) const
698 it != completedJobs_.end(); ++it) 681 {
699 { 682 for (CompletedJobs::const_iterator it = completedJobs_.begin();
700 assert((*it)->GetState() == JobState_Success || 683 it != completedJobs_.end(); ++it)
701 (*it)->GetState() == JobState_Failure); 684 {
702 } 685 if (*it == &job)
703 686 {
704 for (RetryJobs::const_iterator it = retryJobs_.begin(); 687 return true;
705 it != retryJobs_.end(); ++it) 688 }
706 { 689 }
707 assert((*it)->GetState() == JobState_Retry); 690
708 } 691 return false;
709 692 }
710 for (JobsIndex::iterator it = jobsIndex_.begin(); 693
711 it != jobsIndex_.end(); ++it) 694 bool IsRetryJob(JobHandler& job) const
712 { 695 {
713 JobHandler& job = *it->second; 696 return retryJobs_.find(&job) != retryJobs_.end();
714 697 }
715 assert(job.GetId() == it->first); 698 #endif
716 699
717 switch (job.GetState()) 700
718 { 701 void CheckInvariants() const
719 case JobState_Pending: 702 {
720 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); 703 #ifndef NDEBUG
721 break; 704 {
705 PendingJobs copy = pendingJobs_;
706 while (!copy.empty())
707 {
708 assert(copy.top()->GetState() == JobState_Pending);
709 copy.pop();
710 }
711 }
712
713 assert(completedJobs_.size() <= maxCompletedJobs_);
714
715 for (CompletedJobs::const_iterator it = completedJobs_.begin();
716 it != completedJobs_.end(); ++it)
717 {
718 assert((*it)->GetState() == JobState_Success ||
719 (*it)->GetState() == JobState_Failure);
720 }
721
722 for (RetryJobs::const_iterator it = retryJobs_.begin();
723 it != retryJobs_.end(); ++it)
724 {
725 assert((*it)->GetState() == JobState_Retry);
726 }
727
728 for (JobsIndex::const_iterator it = jobsIndex_.begin();
729 it != jobsIndex_.end(); ++it)
730 {
731 JobHandler& job = *it->second;
732
733 assert(job.GetId() == it->first);
734
735 switch (job.GetState())
736 {
737 case JobState_Pending:
738 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
739 break;
722 740
723 case JobState_Success: 741 case JobState_Success:
724 case JobState_Failure: 742 case JobState_Failure:
725 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); 743 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
726 break; 744 break;
727 745
728 case JobState_Retry: 746 case JobState_Retry:
729 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); 747 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
730 break; 748 break;
731 749
732 case JobState_Running: 750 case JobState_Running:
733 case JobState_Paused: 751 case JobState_Paused:
734 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); 752 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
735 break; 753 break;
736 754
737 default: 755 default:
738 throw OrthancException(ErrorCode_InternalError); 756 throw OrthancException(ErrorCode_InternalError);
739 } 757 }
740 } 758 }
741 #endif 759 #endif
742 } 760 }
743 761
744 762
745 void ForgetOldCompletedJobs() 763 void ForgetOldCompletedJobs()
746 { 764 {
747 if (maxCompletedJobs_ != 0) 765 if (maxCompletedJobs_ != 0)
748 { 766 {
749 while (completedJobs_.size() > maxCompletedJobs_) 767 while (completedJobs_.size() > maxCompletedJobs_)
750 { 768 {
751 assert(completedJobs_.front() != NULL); 769 assert(completedJobs_.front() != NULL);
752 770
753 std::string id = completedJobs_.front()->GetId(); 771 std::string id = completedJobs_.front()->GetId();
754 assert(jobsIndex_.find(id) != jobsIndex_.end()); 772 assert(jobsIndex_.find(id) != jobsIndex_.end());
755 773
756 jobsIndex_.erase(id); 774 jobsIndex_.erase(id);
757 delete(completedJobs_.front()); 775 delete(completedJobs_.front());
758 completedJobs_.pop_front(); 776 completedJobs_.pop_front();
759 } 777 }
760 } 778 }
761 } 779 }
762 780
763 781
764 void MarkRunningAsCompleted(JobHandler& job, 782 void MarkRunningAsCompleted(JobHandler& job,
765 bool success) 783 bool success)
766 { 784 {
767 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") 785 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
768 << ": " << job.GetId(); 786 << ": " << job.GetId();
769 787
770 CheckInvariants(); 788 CheckInvariants();
771 assert(job.GetState() == JobState_Running); 789 assert(job.GetState() == JobState_Running);
772 790
773 job.SetState(success ? JobState_Success : JobState_Failure); 791 job.SetState(success ? JobState_Success : JobState_Failure);
774 792
775 completedJobs_.push_back(&job); 793 completedJobs_.push_back(&job);
776 ForgetOldCompletedJobs(); 794 ForgetOldCompletedJobs();
777 795
778 CheckInvariants(); 796 CheckInvariants();
779 } 797 }
780 798
781 799
782 void MarkRunningAsRetry(JobHandler& job, 800 void MarkRunningAsRetry(JobHandler& job,
783 unsigned int timeout) 801 unsigned int timeout)
784 { 802 {
785 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); 803 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();
786 804
787 CheckInvariants(); 805 CheckInvariants();
788 806
789 assert(job.GetState() == JobState_Running && 807 assert(job.GetState() == JobState_Running &&
790 retryJobs_.find(&job) == retryJobs_.end()); 808 retryJobs_.find(&job) == retryJobs_.end());
791 809
792 retryJobs_.insert(&job); 810 retryJobs_.insert(&job);
793 job.SetRetryState(timeout); 811 job.SetRetryState(timeout);
794 812
795 CheckInvariants(); 813 CheckInvariants();
796 } 814 }
797 815
798 816
799 void MarkRunningAsPaused(JobHandler& job) 817 void MarkRunningAsPaused(JobHandler& job)
800 { 818 {
801 LOG(INFO) << "Job paused: " << job.GetId(); 819 LOG(INFO) << "Job paused: " << job.GetId();
802 820
803 CheckInvariants(); 821 CheckInvariants();
804 assert(job.GetState() == JobState_Running); 822 assert(job.GetState() == JobState_Running);
805 823
806 job.SetState(JobState_Paused); 824 job.SetState(JobState_Paused);
807 825
808 CheckInvariants(); 826 CheckInvariants();
809 } 827 }
810 828
811 829
812 public: 830 public:
813 JobsRegistry() : 831 JobsRegistry() :
814 maxCompletedJobs_(10) 832 maxCompletedJobs_(10)
815 { 833 {
816 } 834 }
817 835
818 836
819 ~JobsRegistry() 837 ~JobsRegistry()
820 { 838 {
821 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) 839 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
822 { 840 {
823 assert(it->second != NULL); 841 assert(it->second != NULL);
824 delete it->second; 842 delete it->second;
825 } 843 }
826 } 844 }
827 845
828 846
829 void SetMaxCompletedJobs(size_t i) 847 void SetMaxCompletedJobs(size_t i)
830 { 848 {
831 boost::mutex::scoped_lock lock(mutex_); 849 boost::mutex::scoped_lock lock(mutex_);
832 CheckInvariants(); 850 CheckInvariants();
833 851
834 maxCompletedJobs_ = i; 852 maxCompletedJobs_ = i;
835 ForgetOldCompletedJobs(); 853 ForgetOldCompletedJobs();
836 854
837 CheckInvariants(); 855 CheckInvariants();
838 } 856 }
839 857
840 858
841 void ListJobs(std::set<std::string>& target) 859 void ListJobs(std::set<std::string>& target)
842 { 860 {
843 boost::mutex::scoped_lock lock(mutex_); 861 boost::mutex::scoped_lock lock(mutex_);
844 CheckInvariants(); 862 CheckInvariants();
845 863
846 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 864 for (JobsIndex::const_iterator it = jobsIndex_.begin();
847 it != jobsIndex_.end(); ++it) 865 it != jobsIndex_.end(); ++it)
848 { 866 {
849 target.insert(it->first); 867 target.insert(it->first);
850 } 868 }
851 } 869 }
852 870
853 871
854 void Submit(std::string& id, 872 void Submit(std::string& id,
855 IJob* job, // Takes ownership 873 IJob* job, // Takes ownership
856 int priority) 874 int priority)
857 { 875 {
858 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); 876 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority));
859 877
860 boost::mutex::scoped_lock lock(mutex_); 878 boost::mutex::scoped_lock lock(mutex_);
861 CheckInvariants(); 879 CheckInvariants();
862 880
863 id = handler->GetId(); 881 id = handler->GetId();
864 882
865 pendingJobs_.push(handler.get()); 883 pendingJobs_.push(handler.get());
866 pendingJobAvailable_.notify_one(); 884 pendingJobAvailable_.notify_one();
867 885
868 jobsIndex_.insert(std::make_pair(id, handler.release())); 886 jobsIndex_.insert(std::make_pair(id, handler.release()));
869 887
870 LOG(INFO) << "New job submitted: " << id; 888 LOG(INFO) << "New job submitted: " << id;
871 889
872 CheckInvariants(); 890 CheckInvariants();
873 } 891 }
874 892
875 893
876 void Submit(IJob* job, // Takes ownership 894 void Submit(IJob* job, // Takes ownership
877 int priority) 895 int priority)
878 { 896 {
879 std::string id; 897 std::string id;
880 Submit(id, job, priority); 898 Submit(id, job, priority);
881 } 899 }
882 900
883 901
884 void SetPriority(const std::string& id, 902 void SetPriority(const std::string& id,
885 int priority) 903 int priority)
886 { 904 {
887 LOG(INFO) << "Changing priority to " << priority << " for job: " << id; 905 LOG(INFO) << "Changing priority to " << priority << " for job: " << id;
888 906
889 boost::mutex::scoped_lock lock(mutex_); 907 boost::mutex::scoped_lock lock(mutex_);
890 CheckInvariants(); 908 CheckInvariants();
891 909
892 JobsIndex::iterator found = jobsIndex_.find(id); 910 JobsIndex::iterator found = jobsIndex_.find(id);
893 911
894 if (found == jobsIndex_.end()) 912 if (found == jobsIndex_.end())
895 { 913 {
896 LOG(WARNING) << "Unknown job: " << id; 914 LOG(WARNING) << "Unknown job: " << id;
897 } 915 }
898 else 916 else
899 { 917 {
900 found->second->SetPriority(priority); 918 found->second->SetPriority(priority);
901 919
902 if (found->second->GetState() == JobState_Pending) 920 if (found->second->GetState() == JobState_Pending)
903 {
904 // If the job is pending, we need to reconstruct the
905 // priority queue, as the heap condition has changed
906
907 PendingJobs copy;
908 std::swap(copy, pendingJobs_);
909
910 assert(pendingJobs_.empty());
911 while (!copy.empty())
912 {
913 pendingJobs_.push(copy.top());
914 copy.pop();
915 }
916 }
917 }
918
919 CheckInvariants();
920 }
921
922
923 void Pause(const std::string& id)
924 {
925 LOG(INFO) << "Pausing job: " << id;
926
927 boost::mutex::scoped_lock lock(mutex_);
928 CheckInvariants();
929
930 JobsIndex::iterator found = jobsIndex_.find(id);
931
932 if (found == jobsIndex_.end())
933 {
934 LOG(WARNING) << "Unknown job: " << id;
935 }
936 else
937 {
938 switch (found->second->GetState())
939 {
940 case JobState_Pending:
941 { 921 {
942 // If the job is pending, we need to reconstruct the 922 // If the job is pending, we need to reconstruct the
943 // priority queue to remove it 923 // priority queue, as the heap condition has changed
924
944 PendingJobs copy; 925 PendingJobs copy;
945 std::swap(copy, pendingJobs_); 926 std::swap(copy, pendingJobs_);
946 927
947 assert(pendingJobs_.empty()); 928 assert(pendingJobs_.empty());
948 while (!copy.empty()) 929 while (!copy.empty())
949 { 930 {
950 if (copy.top()->GetId() != id) 931 pendingJobs_.push(copy.top());
951 {
952 pendingJobs_.push(copy.top());
953 }
954
955 copy.pop(); 932 copy.pop();
956 } 933 }
957 934 }
958 found->second->SetState(JobState_Paused); 935 }
959 936
960 break; 937 CheckInvariants();
961 } 938 }
962 939
963 case JobState_Retry: 940
964 { 941 void Pause(const std::string& id)
965 RetryJobs::iterator item = retryJobs_.find(found->second); 942 {
966 assert(item != retryJobs_.end()); 943 LOG(INFO) << "Pausing job: " << id;
967 retryJobs_.erase(item); 944
968 945 boost::mutex::scoped_lock lock(mutex_);
969 found->second->SetState(JobState_Paused); 946 CheckInvariants();
970 947
971 break; 948 JobsIndex::iterator found = jobsIndex_.find(id);
972 } 949
973 950 if (found == jobsIndex_.end())
974 case JobState_Paused: 951 {
975 case JobState_Success: 952 LOG(WARNING) << "Unknown job: " << id;
976 case JobState_Failure: 953 }
977 // Nothing to be done 954 else
978 break; 955 {
979 956 switch (found->second->GetState())
980 case JobState_Running: 957 {
981 found->second->SchedulePause(); 958 case JobState_Pending:
982 break; 959 {
983 960 // If the job is pending, we need to reconstruct the
984 default: 961 // priority queue to remove it
985 throw OrthancException(ErrorCode_InternalError); 962 PendingJobs copy;
986 } 963 std::swap(copy, pendingJobs_);
987 } 964
988 965 assert(pendingJobs_.empty());
989 CheckInvariants(); 966 while (!copy.empty())
990 } 967 {
991 968 if (copy.top()->GetId() != id)
992 969 {
993 void Resume(const std::string& id) 970 pendingJobs_.push(copy.top());
994 { 971 }
995 LOG(INFO) << "Resuming job: " << id; 972
996 973 copy.pop();
997 boost::mutex::scoped_lock lock(mutex_); 974 }
998 CheckInvariants(); 975
999 976 found->second->SetState(JobState_Paused);
1000 JobsIndex::iterator found = jobsIndex_.find(id); 977
1001 978 break;
1002 if (found == jobsIndex_.end()) 979 }
1003 { 980
1004 LOG(WARNING) << "Unknown job: " << id; 981 case JobState_Retry:
1005 } 982 {
1006 else if (found->second->GetState() != JobState_Paused) 983 RetryJobs::iterator item = retryJobs_.find(found->second);
1007 { 984 assert(item != retryJobs_.end());
1008 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; 985 retryJobs_.erase(item);
1009 } 986
1010 else 987 found->second->SetState(JobState_Paused);
1011 { 988
1012 found->second->SetState(JobState_Pending); 989 break;
1013 pendingJobs_.push(found->second); 990 }
1014 pendingJobAvailable_.notify_one(); 991
1015 } 992 case JobState_Paused:
1016 993 case JobState_Success:
1017 CheckInvariants(); 994 case JobState_Failure:
1018 } 995 // Nothing to be done
1019 996 break;
1020 997
1021 void Resubmit(const std::string& id) 998 case JobState_Running:
1022 { 999 found->second->SchedulePause();
1023 LOG(INFO) << "Resubmitting failed job: " << id; 1000 break;
1024 1001
1025 boost::mutex::scoped_lock lock(mutex_); 1002 default:
1026 CheckInvariants(); 1003 throw OrthancException(ErrorCode_InternalError);
1027 1004 }
1028 JobsIndex::iterator found = jobsIndex_.find(id); 1005 }
1029 1006
1030 if (found == jobsIndex_.end()) 1007 CheckInvariants();
1031 { 1008 }
1032 LOG(WARNING) << "Unknown job: " << id; 1009
1033 } 1010
1034 else if (found->second->GetState() != JobState_Failure) 1011 void Resume(const std::string& id)
1035 { 1012 {
1036 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; 1013 LOG(INFO) << "Resuming job: " << id;
1037 } 1014
1038 else 1015 boost::mutex::scoped_lock lock(mutex_);
1039 { 1016 CheckInvariants();
1040 bool ok = false; 1017
1041 for (CompletedJobs::iterator it = completedJobs_.begin(); 1018 JobsIndex::iterator found = jobsIndex_.find(id);
1042 it != completedJobs_.end(); ++it) 1019
1043 { 1020 if (found == jobsIndex_.end())
1044 if (*it == found->second) 1021 {
1045 { 1022 LOG(WARNING) << "Unknown job: " << id;
1046 ok = true; 1023 }
1047 completedJobs_.erase(it); 1024 else if (found->second->GetState() != JobState_Paused)
1048 break; 1025 {
1049 } 1026 LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
1050 } 1027 }
1051 1028 else
1052 assert(ok); 1029 {
1053 1030 found->second->SetState(JobState_Pending);
1054 found->second->SetState(JobState_Pending); 1031 pendingJobs_.push(found->second);
1055 pendingJobs_.push(found->second);
1056 pendingJobAvailable_.notify_one();
1057 }
1058
1059 CheckInvariants();
1060 }
1061
1062
1063 void ScheduleRetries()
1064 {
1065 boost::mutex::scoped_lock lock(mutex_);
1066 CheckInvariants();
1067
1068 RetryJobs copy;
1069 std::swap(copy, retryJobs_);
1070
1071 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
1072
1073 assert(retryJobs_.empty());
1074 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
1075 {
1076 if ((*it)->IsRetryReady(now))
1077 {
1078 LOG(INFO) << "Retrying job: " << (*it)->GetId();
1079 (*it)->SetState(JobState_Pending);
1080 pendingJobs_.push(*it);
1081 pendingJobAvailable_.notify_one(); 1032 pendingJobAvailable_.notify_one();
1082 } 1033 }
1034
1035 CheckInvariants();
1036 }
1037
1038
1039 void Resubmit(const std::string& id)
1040 {
1041 LOG(INFO) << "Resubmitting failed job: " << id;
1042
1043 boost::mutex::scoped_lock lock(mutex_);
1044 CheckInvariants();
1045
1046 JobsIndex::iterator found = jobsIndex_.find(id);
1047
1048 if (found == jobsIndex_.end())
1049 {
1050 LOG(WARNING) << "Unknown job: " << id;
1051 }
1052 else if (found->second->GetState() != JobState_Failure)
1053 {
1054 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
1055 }
1083 else 1056 else
1084 { 1057 {
1085 retryJobs_.insert(*it); 1058 bool ok = false;
1086 } 1059 for (CompletedJobs::iterator it = completedJobs_.begin();
1087 } 1060 it != completedJobs_.end(); ++it)
1088 1061 {
1089 CheckInvariants(); 1062 if (*it == found->second)
1090 } 1063 {
1091 1064 ok = true;
1092 1065 completedJobs_.erase(it);
1093 bool GetState(JobState& state, 1066 break;
1094 const std::string& id) 1067 }
1095 { 1068 }
1096 boost::mutex::scoped_lock lock(mutex_); 1069
1097 CheckInvariants(); 1070 assert(ok);
1098 1071
1099 JobsIndex::const_iterator it = jobsIndex_.find(id); 1072 found->second->SetState(JobState_Pending);
1100 if (it == jobsIndex_.end()) 1073 pendingJobs_.push(found->second);
1101 { 1074 pendingJobAvailable_.notify_one();
1102 return false; 1075 }
1103 } 1076
1104 else 1077 CheckInvariants();
1105 { 1078 }
1106 state = it->second->GetState(); 1079
1107 return true; 1080
1108 } 1081 void ScheduleRetries()
1109 } 1082 {
1110 1083 boost::mutex::scoped_lock lock(mutex_);
1111 1084 CheckInvariants();
1112 class RunningJob : public boost::noncopyable 1085
1113 { 1086 RetryJobs copy;
1114 private: 1087 std::swap(copy, retryJobs_);
1115 JobsRegistry& registry_; 1088
1116 JobHandler* handler_; // Can only be accessed if the registry 1089 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
1117 // mutex is locked! 1090
1118 IJob* job_; // Will by design be in mutual exclusion, 1091 assert(retryJobs_.empty());
1119 // because only one RunningJob can be 1092 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
1120 // executed at a time on a JobHandler 1093 {
1121 1094 if ((*it)->IsRetryReady(now))
1122 std::string id_; 1095 {
1123 int priority_; 1096 LOG(INFO) << "Retrying job: " << (*it)->GetId();
1124 JobState targetState_; 1097 (*it)->SetState(JobState_Pending);
1125 unsigned int targetRetryTimeout_; 1098 pendingJobs_.push(*it);
1099 pendingJobAvailable_.notify_one();
1100 }
1101 else
1102 {
1103 retryJobs_.insert(*it);
1104 }
1105 }
1106
1107 CheckInvariants();
1108 }
1109
1110
1111 bool GetState(JobState& state,
1112 const std::string& id)
1113 {
1114 boost::mutex::scoped_lock lock(mutex_);
1115 CheckInvariants();
1116
1117 JobsIndex::const_iterator it = jobsIndex_.find(id);
1118 if (it == jobsIndex_.end())
1119 {
1120 return false;
1121 }
1122 else
1123 {
1124 state = it->second->GetState();
1125 return true;
1126 }
1127 }
1128
1129
1130 class RunningJob : public boost::noncopyable
1131 {
1132 private:
1133 JobsRegistry& registry_;
1134 JobHandler* handler_; // Can only be accessed if the registry
1135 // mutex is locked!
1136 IJob* job_; // Will by design be in mutual exclusion,
1137 // because only one RunningJob can be
1138 // executed at a time on a JobHandler
1139
1140 std::string id_;
1141 int priority_;
1142 JobState targetState_;
1143 unsigned int targetRetryTimeout_;
1126 1144
1127 public: 1145 public:
1128 RunningJob(JobsRegistry& registry, 1146 RunningJob(JobsRegistry& registry,
1129 unsigned int timeout) : 1147 unsigned int timeout) :
1130 registry_(registry), 1148 registry_(registry),
1131 handler_(NULL), 1149 handler_(NULL),
1132 targetState_(JobState_Failure), 1150 targetState_(JobState_Failure),
1133 targetRetryTimeout_(0) 1151 targetRetryTimeout_(0)
1134 { 1152 {
1135 { 1153 {
1136 boost::mutex::scoped_lock lock(registry_.mutex_); 1154 boost::mutex::scoped_lock lock(registry_.mutex_);
1137 1155
1138 while (registry_.pendingJobs_.empty()) 1156 while (registry_.pendingJobs_.empty())
1139 {
1140 if (timeout == 0)
1141 { 1157 {
1142 registry_.pendingJobAvailable_.wait(lock); 1158 if (timeout == 0)
1159 {
1160 registry_.pendingJobAvailable_.wait(lock);
1161 }
1162 else
1163 {
1164 bool success = registry_.pendingJobAvailable_.timed_wait
1165 (lock, boost::posix_time::milliseconds(timeout));
1166 if (!success)
1167 {
1168 // No pending job
1169 return;
1170 }
1171 }
1172 }
1173
1174 handler_ = registry_.pendingJobs_.top();
1175 registry_.pendingJobs_.pop();
1176
1177 assert(handler_->GetState() == JobState_Pending);
1178 handler_->SetState(JobState_Running);
1179
1180 job_ = &handler_->GetJob();
1181 id_ = handler_->GetId();
1182 priority_ = handler_->GetPriority();
1183 }
1184 }
1185
1186 ~RunningJob()
1187 {
1188 if (IsValid())
1189 {
1190 boost::mutex::scoped_lock lock(registry_.mutex_);
1191
1192 switch (targetState_)
1193 {
1194 case JobState_Failure:
1195 registry_.MarkRunningAsCompleted(*handler_, false);
1196 break;
1197
1198 case JobState_Success:
1199 registry_.MarkRunningAsCompleted(*handler_, true);
1200 break;
1201
1202 case JobState_Paused:
1203 registry_.MarkRunningAsPaused(*handler_);
1204 break;
1205
1206 case JobState_Retry:
1207 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
1208 break;
1209
1210 default:
1211 assert(0);
1212 }
1213 }
1214 }
1215
1216 bool IsValid() const
1217 {
1218 return (handler_ != NULL &&
1219 job_ != NULL);
1220 }
1221
1222 const std::string& GetId() const
1223 {
1224 if (!IsValid())
1225 {
1226 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1227 }
1228 else
1229 {
1230 return id_;
1231 }
1232 }
1233
1234 int GetPriority() const
1235 {
1236 if (!IsValid())
1237 {
1238 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1239 }
1240 else
1241 {
1242 return priority_;
1243 }
1244 }
1245
1246 bool IsPauseScheduled()
1247 {
1248 if (!IsValid())
1249 {
1250 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1251 }
1252 else
1253 {
1254 boost::mutex::scoped_lock lock(registry_.mutex_);
1255 registry_.CheckInvariants();
1256 assert(handler_->GetState() == JobState_Running);
1257
1258 return handler_->IsPauseScheduled();
1259 }
1260 }
1261
1262 void MarkSuccess()
1263 {
1264 if (!IsValid())
1265 {
1266 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1267 }
1268 else
1269 {
1270 targetState_ = JobState_Success;
1271 }
1272 }
1273
1274 void MarkFailure()
1275 {
1276 if (!IsValid())
1277 {
1278 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1279 }
1280 else
1281 {
1282 targetState_ = JobState_Failure;
1283 }
1284 }
1285
1286 void MarkPause()
1287 {
1288 if (!IsValid())
1289 {
1290 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1291 }
1292 else
1293 {
1294 targetState_ = JobState_Paused;
1295 }
1296 }
1297
1298 void MarkRetry(unsigned int timeout)
1299 {
1300 if (!IsValid())
1301 {
1302 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1303 }
1304 else
1305 {
1306 targetState_ = JobState_Retry;
1307 targetRetryTimeout_ = timeout;
1308 }
1309 }
1310
1311 void UpdateStatus(const JobStatus& status)
1312 {
1313 if (!IsValid())
1314 {
1315 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1316 }
1317 else
1318 {
1319 boost::mutex::scoped_lock lock(registry_.mutex_);
1320 registry_.CheckInvariants();
1321 assert(handler_->GetState() == JobState_Running);
1322
1323 handler_->SetLastStatus(status);
1324 }
1325 }
1326
1327 bool ExecuteStep()
1328 {
1329 if (!IsValid())
1330 {
1331 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1332 }
1333
1334 if (IsPauseScheduled())
1335 {
1336 targetState_ = JobState_Paused;
1337 return false;
1338 }
1339
1340 std::auto_ptr<JobStepResult> result;
1341 ErrorCode code;
1342
1343 {
1344 bool ok = false;
1345
1346 try
1347 {
1348 result.reset(job_->ExecuteStep());
1349 ok = true;
1350
1351 if (result->GetCode() == JobStepCode_Failure)
1352 {
1353 code = ErrorCode_InternalError;
1354 }
1355 }
1356 catch (OrthancException& e)
1357 {
1358 code = e.GetErrorCode();
1359 }
1360 catch (boost::bad_lexical_cast&)
1361 {
1362 code = ErrorCode_BadFileFormat;
1363 }
1364 catch (...)
1365 {
1366 code = ErrorCode_InternalError;
1367 }
1368
1369 if (ok)
1370 {
1371 code = ErrorCode_Success;
1143 } 1372 }
1144 else 1373 else
1145 { 1374 {
1146 bool success = registry_.pendingJobAvailable_.timed_wait 1375 result.reset(new JobStepResult(JobStepCode_Failure));
1147 (lock, boost::posix_time::milliseconds(timeout));
1148 if (!success)
1149 {
1150 // No pending job
1151 return;
1152 }
1153 } 1376 }
1154 } 1377 }
1155 1378
1156 handler_ = registry_.pendingJobs_.top(); 1379 {
1157 registry_.pendingJobs_.pop(); 1380 JobStatus status(code, *job_);
1158 1381 UpdateStatus(status);
1159 assert(handler_->GetState() == JobState_Pending); 1382 }
1160 handler_->SetState(JobState_Running); 1383
1161 1384 switch (result->GetCode())
1162 job_ = &handler_->GetJob(); 1385 {
1163 id_ = handler_->GetId(); 1386 case JobStepCode_Success:
1164 priority_ = handler_->GetPriority(); 1387 targetState_ = JobState_Success;
1165 } 1388 return false;
1166 } 1389
1167 1390 case JobStepCode_Failure:
1168 ~RunningJob() 1391 targetState_ = JobState_Failure;
1169 { 1392 return false;
1170 if (IsValid()) 1393
1171 { 1394 case JobStepCode_Retry:
1172 boost::mutex::scoped_lock lock(registry_.mutex_); 1395 targetState_ = JobState_Retry;
1173 1396 targetRetryTimeout_ = dynamic_cast<RetryResult&>(*result).GetTimeout();
1174 switch (targetState_) 1397 return false;
1175 { 1398
1176 case JobState_Failure: 1399 case JobStepCode_Continue:
1177 registry_.MarkRunningAsCompleted(*handler_, false); 1400 return true;
1178 break;
1179
1180 case JobState_Success:
1181 registry_.MarkRunningAsCompleted(*handler_, true);
1182 break;
1183
1184 case JobState_Paused:
1185 registry_.MarkRunningAsPaused(*handler_);
1186 break;
1187
1188 case JobState_Retry:
1189 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
1190 break;
1191 1401
1192 default: 1402 default:
1193 assert(0); 1403 throw OrthancException(ErrorCode_InternalError);
1194 } 1404 }
1195 } 1405 }
1196 } 1406 };
1197
1198 bool IsValid() const
1199 {
1200 return (handler_ != NULL &&
1201 job_ != NULL);
1202 }
1203
1204 const std::string& GetId() const
1205 {
1206 if (!IsValid())
1207 {
1208 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1209 }
1210 else
1211 {
1212 return id_;
1213 }
1214 }
1215
1216 int GetPriority() const
1217 {
1218 if (!IsValid())
1219 {
1220 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1221 }
1222 else
1223 {
1224 return priority_;
1225 }
1226 }
1227
1228 bool IsPauseScheduled()
1229 {
1230 if (!IsValid())
1231 {
1232 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1233 }
1234 else
1235 {
1236 boost::mutex::scoped_lock lock(registry_.mutex_);
1237 registry_.CheckInvariants();
1238 assert(handler_->GetState() == JobState_Running);
1239
1240 return handler_->IsPauseScheduled();
1241 }
1242 }
1243
1244 void MarkSuccess()
1245 {
1246 if (!IsValid())
1247 {
1248 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1249 }
1250 else
1251 {
1252 targetState_ = JobState_Success;
1253 }
1254 }
1255
1256 void MarkFailure()
1257 {
1258 if (!IsValid())
1259 {
1260 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1261 }
1262 else
1263 {
1264 targetState_ = JobState_Failure;
1265 }
1266 }
1267
1268 void MarkPause()
1269 {
1270 if (!IsValid())
1271 {
1272 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1273 }
1274 else
1275 {
1276 targetState_ = JobState_Paused;
1277 }
1278 }
1279
1280 void MarkRetry(unsigned int timeout)
1281 {
1282 if (!IsValid())
1283 {
1284 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1285 }
1286 else
1287 {
1288 targetState_ = JobState_Retry;
1289 targetRetryTimeout_ = timeout;
1290 }
1291 }
1292
1293 /*void ExecuteStep()
1294 {
1295 if (!IsValid())
1296 {
1297 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1298 }
1299
1300 if (IsPauseScheduled())
1301 {
1302 targetState_ = JobState_Paused;
1303 return;
1304 }
1305
1306 std::auto_ptr<JobStepResult> result;
1307 ErrorCode code;
1308
1309 {
1310 bool ok = false;
1311
1312 try
1313 {
1314 result.reset(job_->ExecuteStep());
1315 ok = true;
1316
1317 if (result->GetCode() == JobStepCode_Failure)
1318 {
1319 code = ErrorCode_InternalError;
1320 }
1321 }
1322 catch (OrthancException& e)
1323 {
1324 code = e.GetErrorCode();
1325 }
1326 catch (boost::bad_lexical_cast&)
1327 {
1328 code = ErrorCode_BadFileFormat;
1329 }
1330 catch (...)
1331 {
1332 code = ErrorCode_InternalError;
1333 }
1334
1335 if (ok)
1336 {
1337 code = ErrorCode_Success;
1338 }
1339 else
1340 {
1341 result.reset(new JobStepResult(JobStepCode_Failure));
1342 }
1343 }
1344
1345 switch (result->GetCode())
1346 {
1347 case JobStepCode_Success:
1348 targetState_ = JobState_Success;
1349 break;
1350
1351 case JobStepCode_Failure:
1352 targetState_ = JobState_Failure;
1353 break;
1354
1355 case JobStepCode_Continue:
1356 targetState_ = JobState_Running;
1357 break;
1358
1359 case JobStepCode_Retry:
1360 targetState_ = JobState_Retry;
1361 targetRetryTimeout_ = dynamic_cast<RetryResult&>(*result).GetTimeout();
1362 break;
1363
1364 default:
1365 throw OrthancException(ErrorCode_InternalError);
1366 }
1367 }*/
1368 }; 1407 };
1369 };
1370 } 1408 }
1371 1409
1372 1410
1373 1411
1374 class DummyJob : public Orthanc::IJob 1412 class DummyJob : public Orthanc::IJob
1381 result_(Orthanc::JobStepCode_Success) 1419 result_(Orthanc::JobStepCode_Success)
1382 { 1420 {
1383 } 1421 }
1384 1422
1385 explicit DummyJob(JobStepResult result) : 1423 explicit DummyJob(JobStepResult result) :
1386 result_(result) 1424 result_(result)
1387 { 1425 {
1388 } 1426 }
1389 1427
1390 virtual JobStepResult* ExecuteStep() 1428 virtual JobStepResult* ExecuteStep()
1391 { 1429 {
1399 virtual float GetProgress() 1437 virtual float GetProgress()
1400 { 1438 {
1401 return 0; 1439 return 0;
1402 } 1440 }
1403 1441
1404 virtual void FormatStatus(Json::Value& value) 1442 virtual void GetDescription(Json::Value& value)
1405 { 1443 {
1406 } 1444 }
1407 }; 1445 };
1408 1446
1409 1447