comparison UnitTestsSources/MultiThreadingTests.cpp @ 2557:b4516a6f214b jobs

state machine
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 03 May 2018 13:45:31 +0200
parents 91e944c8389b
children 57f81b988713
comparison
equal deleted inserted replaced
2556:91e944c8389b 2557:b4516a6f214b
270 270
271 #if ORTHANC_SANDBOXED == 1 271 #if ORTHANC_SANDBOXED == 1
272 # error The job engine cannot be used in sandboxed environments 272 # error The job engine cannot be used in sandboxed environments
273 #endif 273 #endif
274 274
275 #include "../Core/Logging.h"
276
275 #include <boost/date_time/posix_time/posix_time.hpp> 277 #include <boost/date_time/posix_time/posix_time.hpp>
278 #include <queue>
276 279
277 namespace Orthanc 280 namespace Orthanc
278 { 281 {
279 enum JobState 282 enum JobState
280 { 283 {
287 }; 290 };
288 291
289 enum JobStepStatus 292 enum JobStepStatus
290 { 293 {
291 JobStepStatus_Success, 294 JobStepStatus_Success,
292 JobStepStatus_Error, 295 JobStepStatus_Failure,
293 JobStepStatus_Continue, 296 JobStepStatus_Continue,
294 JobStepStatus_Retry 297 JobStepStatus_Retry
295 }; 298 };
296 299
297 300
298 class IJobStepResult : public boost::noncopyable 301 class JobStepResult
299 { 302 {
300 private: 303 private:
301 JobStepStatus status_; 304 JobStepStatus status_;
302 305
303 public: 306 public:
304 explicit IJobStepResult(JobStepStatus status) : 307 explicit JobStepResult(JobStepStatus status) :
305 status_(status) 308 status_(status)
306 { 309 {
307 } 310 }
308 311
309 virtual ~IJobStepResult() 312 virtual ~JobStepResult()
310 { 313 {
311 } 314 }
312 315
313 JobStepStatus GetStatus() const 316 JobStepStatus GetStatus() const
314 { 317 {
315 return status_; 318 return status_;
316 } 319 }
317 }; 320 };
318 321
319 322
320 class RetryResult : public IJobStepResult 323 class RetryResult : public JobStepResult
321 { 324 {
322 private: 325 private:
323 unsigned int timeout_; // Retry after "timeout_" milliseconds 326 unsigned int timeout_; // Retry after "timeout_" milliseconds
324 327
325 public: 328 public:
326 RetryResult(unsigned int timeout) : 329 RetryResult(unsigned int timeout) :
327 IJobStepResult(JobStepStatus_Retry), 330 JobStepResult(JobStepStatus_Retry),
328 timeout_(timeout) 331 timeout_(timeout)
329 { 332 {
330 } 333 }
331 334
332 unsigned int GetTimeout() const 335 unsigned int GetTimeout() const
333 { 336 {
341 public: 344 public:
342 virtual ~IJob() 345 virtual ~IJob()
343 { 346 {
344 } 347 }
345 348
346 virtual IJobStepResult* ExecuteStep() = 0; 349 virtual JobStepResult* ExecuteStep() = 0;
347 350
348 virtual void ReleaseResources() = 0; // For pausing jobs 351 virtual void ReleaseResources() = 0; // For pausing jobs
349 352
350 virtual float GetProgress() = 0; 353 virtual float GetProgress() = 0;
351 354
352 virtual void FormatStatus(Json::Value& value) = 0; 355 virtual void FormatStatus(Json::Value& value) = 0;
353 }; 356 };
354 357
355 358
359 class JobHandler : public boost::noncopyable
360 {
361 private:
362 std::string id_;
363 JobState state_;
364 std::auto_ptr<IJob> job_;
365 int priority_; // "+inf()" means highest priority
366 boost::posix_time::ptime creationTime_;
367 boost::posix_time::ptime lastUpdateTime_;
368 boost::posix_time::ptime retryTime_;
369 uint64_t runtime_; // In milliseconds
370 bool pauseScheduled_;
371
372 void SetStateInternal(JobState state)
373 {
374 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
375
376 if (state_ == JobState_Running)
377 {
378 runtime_ += (now - lastUpdateTime_).total_milliseconds();
379 }
380
381 state_ = state;
382 lastUpdateTime_ = now;
383 pauseScheduled_ = false;
384 }
385
386 public:
387 JobHandler(IJob* job,
388 int priority) :
389 id_(Toolbox::GenerateUuid()),
390 state_(JobState_Pending),
391 job_(job),
392 priority_(priority),
393 creationTime_(boost::posix_time::microsec_clock::universal_time()),
394 lastUpdateTime_(creationTime_),
395 runtime_(0),
396 pauseScheduled_(false)
397 {
398 if (job == NULL)
399 {
400 throw OrthancException(ErrorCode_NullPointer);
401 }
402 }
403
404 const std::string& GetId() const
405 {
406 return id_;
407 }
408
409 IJob& GetJob() const
410 {
411 assert(job_.get() != NULL);
412 return *job_;
413 }
414
415 void SetPriority(int priority)
416 {
417 priority_ = priority;
418 }
419
420 int GetPriority() const
421 {
422 return priority_;
423 }
424
425 JobState GetState() const
426 {
427 return state_;
428 }
429
430 void SetState(JobState state)
431 {
432 if (state == JobState_Retry)
433 {
434 // Use "SetRetryState()"
435 throw OrthancException(ErrorCode_BadSequenceOfCalls);
436 }
437 else
438 {
439 SetStateInternal(state);
440 }
441 }
442
443 void SetRetryState(unsigned int timeout)
444 {
445 if (state_ == JobState_Running)
446 {
447 SetStateInternal(JobState_Retry);
448 retryTime_ = (boost::posix_time::microsec_clock::universal_time() +
449 boost::posix_time::milliseconds(timeout));
450 }
451 else
452 {
453 // Only valid for running jobs
454 throw OrthancException(ErrorCode_BadSequenceOfCalls);
455 }
456 }
457
458 void SchedulePause()
459 {
460 if (state_ == JobState_Running)
461 {
462 pauseScheduled_ = true;
463 }
464 else
465 {
466 // Only valid for running jobs
467 throw OrthancException(ErrorCode_BadSequenceOfCalls);
468 }
469 }
470
471 bool IsPauseScheduled()
472 {
473 return pauseScheduled_;
474 }
475
476 bool IsRetryReady(const boost::posix_time::ptime& now) const
477 {
478 if (state_ != JobState_Retry)
479 {
480 throw OrthancException(ErrorCode_BadSequenceOfCalls);
481 }
482 else
483 {
484 return retryTime_ >= now;
485 }
486 }
487 };
488
489
356 class JobsMonitor : public boost::noncopyable 490 class JobsMonitor : public boost::noncopyable
357 { 491 {
358 private: 492 private:
359 class JobHandler : public boost::noncopyable 493 struct PriorityComparator
360 { 494 {
361 private: 495 bool operator() (JobHandler*& a,
362 std::string id_; 496 JobHandler*& b) const
363 JobState state_; 497 {
364 std::auto_ptr<IJob> job_; 498 return a->GetPriority() < b->GetPriority();
365 int priority_; // "+inf()" means highest priority 499 }
366 boost::posix_time::ptime creationTime_;
367 boost::posix_time::ptime lastUpdateTime_;
368 uint64_t runtime_; // In milliseconds
369
370 public:
371 JobHandler(IJob* job,
372 int priority) :
373 id_(Toolbox::GenerateUuid()),
374 state_(JobState_Pending),
375 job_(job),
376 priority_(priority),
377 creationTime_(boost::posix_time::microsec_clock::universal_time()),
378 lastUpdateTime_(creationTime_),
379 runtime_(0)
380 {
381 if (job == NULL)
382 {
383 throw OrthancException(ErrorCode_NullPointer);
384 }
385 }
386
387 const std::string& GetId() const
388 {
389 return id_;
390 }
391 }; 500 };
392 501
502 typedef std::map<std::string, JobHandler*> JobsIndex;
503 typedef std::list<const JobHandler*> CompletedJobs;
504 typedef std::set<JobHandler*> RetryJobs;
505 typedef std::priority_queue<JobHandler*,
506 std::vector<JobHandler*>, // Could be a "std::deque"
507 PriorityComparator> PendingJobs;
508
509 boost::mutex mutex_;
510 JobsIndex jobsIndex_;
511 PendingJobs pendingJobs_;
512 CompletedJobs completedJobs_;
513 RetryJobs retryJobs_;
514
515 boost::condition_variable pendingJobAvailable_;
516 size_t maxCompletedJobs_;
517
518
519 #ifndef NDEBUG
520 bool IsPendingJob(const JobHandler& job) const
521 {
522 PendingJobs copy = pendingJobs_;
523 while (!copy.empty())
524 {
525 if (copy.top() == &job)
526 {
527 return true;
528 }
529
530 copy.pop();
531 }
532
533 return false;
534 }
535
536 bool IsCompletedJob(const JobHandler& job) const
537 {
538 for (CompletedJobs::const_iterator it = completedJobs_.begin();
539 it != completedJobs_.end(); ++it)
540 {
541 if (*it == &job)
542 {
543 return true;
544 }
545 }
546
547 return false;
548 }
549
550 bool IsRetryJob(JobHandler& job) const
551 {
552 return retryJobs_.find(&job) != retryJobs_.end();
553 }
554 #endif
555
556
557 void CheckInvariants()
558 {
559 #ifndef NDEBUG
560 {
561 PendingJobs copy = pendingJobs_;
562 while (!copy.empty())
563 {
564 assert(copy.top()->GetState() == JobState_Pending);
565 copy.pop();
566 }
567 }
568
569 assert(completedJobs_.size() <= maxCompletedJobs_);
570
571 for (CompletedJobs::const_iterator it = completedJobs_.begin();
572 it != completedJobs_.end(); ++it)
573 {
574 assert((*it)->GetState() == JobState_Success ||
575 (*it)->GetState() == JobState_Failure);
576 }
577
578 for (RetryJobs::const_iterator it = retryJobs_.begin();
579 it != retryJobs_.end(); ++it)
580 {
581 assert((*it)->GetState() == JobState_Retry);
582 }
583
584 for (JobsIndex::iterator it = jobsIndex_.begin();
585 it != jobsIndex_.end(); ++it)
586 {
587 JobHandler& job = *it->second;
588
589 assert(job.GetId() == it->first);
590
591 switch (job.GetState())
592 {
593 case JobState_Pending:
594 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
595 break;
596
597 case JobState_Success:
598 case JobState_Failure:
599 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
600 break;
601
602 case JobState_Retry:
603 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
604 break;
605
606 case JobState_Running:
607 case JobState_Paused:
608 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
609 break;
610
611 default:
612 throw OrthancException(ErrorCode_InternalError);
613 }
614 }
615 #endif
616 }
617
618
619 void ForgetOldCompletedJobs()
620 {
621 if (maxCompletedJobs_ != 0)
622 {
623 while (completedJobs_.size() > maxCompletedJobs_)
624 {
625 assert(completedJobs_.front() != NULL);
626
627 std::string id = completedJobs_.front()->GetId();
628
629 assert(jobsIndex_.find(id) != jobsIndex_.end());
630
631 jobsIndex_.erase(id);
632 delete(completedJobs_.front());
633 completedJobs_.pop_front();
634 }
635 }
636 }
637
638
639 void MarkRunningAsCompleted(JobHandler& job,
640 bool success)
641 {
642 boost::mutex::scoped_lock lock(mutex_);
643 CheckInvariants();
644 assert(job.GetState() == JobState_Running);
645
646 job.SetState(success ? JobState_Success : JobState_Failure);
647
648 completedJobs_.push_back(&job);
649 ForgetOldCompletedJobs();
650
651 CheckInvariants();
652 }
653
654
655 void MarkRunningAsRetry(JobHandler& job,
656 unsigned int timeout)
657 {
658 boost::mutex::scoped_lock lock(mutex_);
659 CheckInvariants();
660
661 assert(job.GetState() == JobState_Running &&
662 retryJobs_.find(&job) == retryJobs_.end());
663
664 retryJobs_.insert(&job);
665 job.SetRetryState(timeout);
666
667 CheckInvariants();
668 }
669
670
671 void MarkRunningAsPaused(JobHandler& job)
672 {
673 boost::mutex::scoped_lock lock(mutex_);
674 CheckInvariants();
675 assert(job.GetState() == JobState_Running);
676
677 job.SetState(JobState_Paused);
678
679 CheckInvariants();
680 }
681
682
683 JobHandler* WaitPendingJob(unsigned int timeout)
684 {
685 boost::mutex::scoped_lock lock(mutex_);
686
687 while (pendingJobs_.empty())
688 {
689 if (timeout == 0)
690 {
691 pendingJobAvailable_.wait(lock);
692 }
693 else
694 {
695 bool success = pendingJobAvailable_.timed_wait
696 (lock, boost::posix_time::milliseconds(timeout));
697 if (!success)
698 {
699 return NULL;
700 }
701 }
702 }
703
704 JobHandler* job = pendingJobs_.top();
705 pendingJobs_.pop();
706
707 job->SetState(JobState_Running);
708 return job;
709 }
710
711
393 public: 712 public:
713 JobsMonitor() :
714 maxCompletedJobs_(10)
715 {
716 }
717
718
719 ~JobsMonitor()
720 {
721 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
722 {
723 assert(it->second != NULL);
724 delete it->second;
725 }
726 }
727
728
729 void SetMaxCompletedJobs(size_t i)
730 {
731 boost::mutex::scoped_lock lock(mutex_);
732 CheckInvariants();
733
734 maxCompletedJobs_ = i;
735 ForgetOldCompletedJobs();
736
737 CheckInvariants();
738 }
739
740
741 void ListJobs(std::set<std::string>& target)
742 {
743 boost::mutex::scoped_lock lock(mutex_);
744 CheckInvariants();
745
746 for (JobsIndex::const_iterator it = jobsIndex_.begin();
747 it != jobsIndex_.end(); ++it)
748 {
749 target.insert(it->first);
750 }
751 }
752
753
394 void Submit(std::string& id, 754 void Submit(std::string& id,
395 IJob* job, 755 IJob* job, // Takes ownership
396 int priority) 756 int priority)
397 { 757 {
398 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); 758 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority));
759
760 boost::mutex::scoped_lock lock(mutex_);
761 CheckInvariants();
762
399 id = handler->GetId(); 763 id = handler->GetId();
400 } 764 pendingJobs_.push(handler.get());
765 jobsIndex_.insert(std::make_pair(id, handler.release()));
766
767 pendingJobAvailable_.notify_one();
768
769 CheckInvariants();
770 }
771
772
773 void Submit(IJob* job, // Takes ownership
774 int priority)
775 {
776 std::string id;
777 Submit(id, job, priority);
778 }
779
401 780
402 void SetPriority(const std::string& id, 781 void SetPriority(const std::string& id,
403 int priority) 782 int priority)
404 { 783 {
405 // TODO 784 boost::mutex::scoped_lock lock(mutex_);
406 } 785 CheckInvariants();
786
787 JobsIndex::iterator found = jobsIndex_.find(id);
788
789 if (found == jobsIndex_.end())
790 {
791 LOG(WARNING) << "Unknown job: " << id;
792 }
793 else
794 {
795 found->second->SetPriority(priority);
796
797 if (found->second->GetState() == JobState_Pending)
798 {
799 // If the job is pending, we need to reconstruct the
800 // priority queue, as the heap condition has changed
801
802 PendingJobs copy;
803 std::swap(copy, pendingJobs_);
804
805 assert(pendingJobs_.empty());
806 while (!copy.empty())
807 {
808 pendingJobs_.push(copy.top());
809 copy.pop();
810 }
811 }
812 }
813
814 CheckInvariants();
815 }
816
407 817
408 void Pause(const std::string& id) 818 void Pause(const std::string& id)
409 { 819 {
410 // TODO 820 boost::mutex::scoped_lock lock(mutex_);
411 } 821 CheckInvariants();
822
823 JobsIndex::iterator found = jobsIndex_.find(id);
824
825 if (found == jobsIndex_.end())
826 {
827 LOG(WARNING) << "Unknown job: " << id;
828 }
829 else
830 {
831 switch (found->second->GetState())
832 {
833 case JobState_Pending:
834 {
835 // If the job is pending, we need to reconstruct the
836 // priority queue to remove it
837 PendingJobs copy;
838 std::swap(copy, pendingJobs_);
839
840 assert(pendingJobs_.empty());
841 while (!copy.empty())
842 {
843 if (copy.top()->GetId() != id)
844 {
845 pendingJobs_.push(copy.top());
846 }
847
848 copy.pop();
849 }
850
851 found->second->SetState(JobState_Paused);
852
853 break;
854 }
855
856 case JobState_Retry:
857 {
858 RetryJobs::iterator item = retryJobs_.find(found->second);
859 assert(item != retryJobs_.end());
860 retryJobs_.erase(item);
861
862 found->second->SetState(JobState_Paused);
863
864 break;
865 }
866
867 case JobState_Paused:
868 case JobState_Success:
869 case JobState_Failure:
870 // Nothing to be done
871 break;
872
873 case JobState_Running:
874 found->second->SchedulePause();
875 break;
876
877 default:
878 throw OrthancException(ErrorCode_InternalError);
879 }
880 }
881
882 CheckInvariants();
883 }
884
412 885
413 void Resume(const std::string& id) 886 void Resume(const std::string& id)
414 { 887 {
415 // TODO 888 boost::mutex::scoped_lock lock(mutex_);
416 } 889 CheckInvariants();
890
891 JobsIndex::iterator found = jobsIndex_.find(id);
892
893 if (found == jobsIndex_.end())
894 {
895 LOG(WARNING) << "Unknown job: " << id;
896 }
897 else if (found->second->GetState() != JobState_Paused)
898 {
899 LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
900 }
901 else
902 {
903 found->second->SetState(JobState_Pending);
904 pendingJobs_.push(found->second);
905 pendingJobAvailable_.notify_one();
906 }
907
908 CheckInvariants();
909 }
910
417 911
418 void Resubmit(const std::string& id) 912 void Resubmit(const std::string& id)
419 { 913 {
420 // TODO 914 boost::mutex::scoped_lock lock(mutex_);
421 } 915 CheckInvariants();
422 916
423 class JobToRun : public boost::noncopyable 917 JobsIndex::iterator found = jobsIndex_.find(id);
918
919 if (found == jobsIndex_.end())
920 {
921 LOG(WARNING) << "Unknown job: " << id;
922 }
923 else if (found->second->GetState() != JobState_Failure)
924 {
925 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
926 }
927 else
928 {
929 bool ok = false;
930 for (CompletedJobs::iterator it = completedJobs_.begin();
931 it != completedJobs_.end(); ++it)
932 {
933 if (*it == found->second)
934 {
935 ok = true;
936 completedJobs_.erase(it);
937 break;
938 }
939 }
940
941 assert(ok);
942
943 found->second->SetState(JobState_Pending);
944 pendingJobs_.push(found->second);
945 pendingJobAvailable_.notify_one();
946 }
947
948 CheckInvariants();
949 }
950
951
952 void ScheduleRetries()
953 {
954 boost::mutex::scoped_lock lock(mutex_);
955 CheckInvariants();
956
957 RetryJobs copy;
958 std::swap(copy, retryJobs_);
959
960 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
961
962 assert(retryJobs_.empty());
963 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
964 {
965 if ((*it)->IsRetryReady(now))
966 {
967 (*it)->SetState(JobState_Pending);
968 }
969 else
970 {
971 retryJobs_.insert(*it);
972 }
973 }
974
975 CheckInvariants();
976 }
977
978
979 bool GetState(JobState& state,
980 const std::string& id)
981 {
982 boost::mutex::scoped_lock lock(mutex_);
983 CheckInvariants();
984
985 JobsIndex::const_iterator it = jobsIndex_.find(id);
986 if (it == jobsIndex_.end())
987 {
988 return false;
989 }
990 else
991 {
992 state = it->second->GetState();
993 return true;
994 }
995 }
996
997
998 class RunningJob : public boost::noncopyable
424 { 999 {
425 private: 1000 private:
426 JobHandler* handler_; 1001 JobsMonitor& that_;
1002 JobHandler* handler_;
1003 JobState targetState_;
1004 unsigned int retryTimeout_;
427 1005
428 public: 1006 public:
429 JobToRun(JobsMonitor& that, 1007 RunningJob(JobsMonitor& that,
430 unsigned int timeout) : 1008 unsigned int timeout) :
431 handler_(NULL) 1009 that_(that),
432 { 1010 handler_(NULL),
1011 targetState_(JobState_Failure),
1012 retryTimeout_(0)
1013 {
1014 handler_ = that_.WaitPendingJob(timeout);
1015 }
1016
1017 ~RunningJob()
1018 {
1019 if (IsValid())
1020 {
1021 switch (targetState_)
1022 {
1023 case JobState_Failure:
1024 that_.MarkRunningAsCompleted(*handler_, false);
1025 break;
1026
1027 case JobState_Success:
1028 that_.MarkRunningAsCompleted(*handler_, true);
1029 break;
1030
1031 case JobState_Paused:
1032 that_.MarkRunningAsPaused(*handler_);
1033 break;
1034
1035 case JobState_Retry:
1036 that_.MarkRunningAsRetry(*handler_, retryTimeout_);
1037 break;
1038
1039 default:
1040 assert(0);
1041 }
1042 }
433 } 1043 }
434 1044
435 bool IsValid() const 1045 bool IsValid() const
436 { 1046 {
437 return handler_ != NULL; 1047 return handler_ != NULL;
438 } 1048 }
439 1049
440 1050 const std::string& GetId() const
1051 {
1052 if (IsValid())
1053 {
1054 return handler_->GetId();
1055 }
1056 else
1057 {
1058 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1059 }
1060 }
1061
1062 int GetPriority() const
1063 {
1064 if (IsValid())
1065 {
1066 return handler_->GetPriority();
1067 }
1068 else
1069 {
1070 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1071 }
1072 }
1073
1074 bool IsPauseScheduled()
1075 {
1076 if (!IsValid())
1077 {
1078 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1079 }
1080
1081 boost::mutex::scoped_lock lock(that_.mutex_);
1082 that_.CheckInvariants();
1083 assert(handler_->GetState() == JobState_Running);
1084
1085 return handler_->IsPauseScheduled();
1086 }
1087
1088 IJob& GetJob()
1089 {
1090 if (!IsValid())
1091 {
1092 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1093 }
1094
1095 boost::mutex::scoped_lock lock(that_.mutex_);
1096 that_.CheckInvariants();
1097 assert(handler_->GetState() == JobState_Running);
1098
1099 return handler_->GetJob();
1100 }
1101
1102 void MarkSuccess()
1103 {
1104 if (!IsValid())
1105 {
1106 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1107 }
1108
1109 targetState_ = JobState_Success;
1110 }
1111
1112 void MarkFailure()
1113 {
1114 if (!IsValid())
1115 {
1116 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1117 }
1118
1119 targetState_ = JobState_Failure;
1120 }
1121
1122 void MarkPause()
1123 {
1124 if (!IsValid())
1125 {
1126 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1127 }
1128
1129 targetState_ = JobState_Paused;
1130 }
1131
1132 void MarkRetry(unsigned int timeout)
1133 {
1134 if (!IsValid())
1135 {
1136 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1137 }
1138
1139 targetState_ = JobState_Retry;
1140 retryTimeout_ = timeout;
1141 }
441 }; 1142 };
442 }; 1143 };
443 } 1144 }
1145
1146
1147
1148 class DummyJob : public Orthanc::IJob
1149 {
1150 private:
1151 JobStepResult result_;
1152
1153 public:
1154 DummyJob() :
1155 result_(Orthanc::JobStepStatus_Success)
1156 {
1157 }
1158
1159 explicit DummyJob(JobStepResult result) :
1160 result_(result)
1161 {
1162 }
1163
1164 virtual JobStepResult* ExecuteStep()
1165 {
1166 return new JobStepResult(result_);
1167 }
1168
1169 virtual void ReleaseResources()
1170 {
1171 }
1172
1173 virtual float GetProgress()
1174 {
1175 return 0;
1176 }
1177
1178 virtual void FormatStatus(Json::Value& value)
1179 {
1180 }
1181 };
1182
1183
1184 static bool CheckState(Orthanc::JobsMonitor& monitor,
1185 const std::string& id,
1186 Orthanc::JobState state)
1187 {
1188 Orthanc::JobState s;
1189 if (monitor.GetState(s, id))
1190 {
1191 return state == s;
1192 }
1193 else
1194 {
1195 return false;
1196 }
1197 }
1198
1199
1200 TEST(JobsMonitor, Priority)
1201 {
1202 JobsMonitor monitor;
1203
1204 std::string i1, i2, i3, i4;
1205 monitor.Submit(i1, new DummyJob(), 10);
1206 monitor.Submit(i2, new DummyJob(), 30);
1207 monitor.Submit(i3, new DummyJob(), 20);
1208 monitor.Submit(i4, new DummyJob(), 5);
1209
1210 monitor.SetMaxCompletedJobs(2);
1211
1212 std::set<std::string> id;
1213 monitor.ListJobs(id);
1214
1215 ASSERT_EQ(4u, id.size());
1216 ASSERT_TRUE(id.find(i1) != id.end());
1217 ASSERT_TRUE(id.find(i2) != id.end());
1218 ASSERT_TRUE(id.find(i3) != id.end());
1219 ASSERT_TRUE(id.find(i4) != id.end());
1220
1221 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Pending));
1222
1223 {
1224 JobsMonitor::RunningJob job(monitor, 0);
1225 ASSERT_TRUE(job.IsValid());
1226 ASSERT_EQ(30, job.GetPriority());
1227 ASSERT_EQ(i2, job.GetId());
1228
1229 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Running));
1230 }
1231
1232 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Failure));
1233 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Pending));
1234
1235 {
1236 JobsMonitor::RunningJob job(monitor, 0);
1237 ASSERT_TRUE(job.IsValid());
1238 ASSERT_EQ(20, job.GetPriority());
1239 ASSERT_EQ(i3, job.GetId());
1240
1241 job.MarkSuccess();
1242
1243 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Running));
1244 }
1245
1246 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Success));
1247
1248 {
1249 JobsMonitor::RunningJob job(monitor, 0);
1250 ASSERT_TRUE(job.IsValid());
1251 ASSERT_EQ(10, job.GetPriority());
1252 ASSERT_EQ(i1, job.GetId());
1253 }
1254
1255 {
1256 JobsMonitor::RunningJob job(monitor, 0);
1257 ASSERT_TRUE(job.IsValid());
1258 ASSERT_EQ(5, job.GetPriority());
1259 ASSERT_EQ(i4, job.GetId());
1260 }
1261
1262 {
1263 JobsMonitor::RunningJob job(monitor, 1);
1264 ASSERT_FALSE(job.IsValid());
1265 }
1266
1267 Orthanc::JobState s;
1268 ASSERT_TRUE(monitor.GetState(s, i1));
1269 ASSERT_FALSE(monitor.GetState(s, i2)); // Removed because oldest
1270 ASSERT_FALSE(monitor.GetState(s, i3)); // Removed because second oldest
1271 ASSERT_TRUE(monitor.GetState(s, i4));
1272
1273 monitor.SetMaxCompletedJobs(1); // (*)
1274 ASSERT_FALSE(monitor.GetState(s, i1)); // Just discarded by (*)
1275 ASSERT_TRUE(monitor.GetState(s, i4));
1276 }
1277
1278
1279 TEST(JobsMonitor, Resubmit)
1280 {
1281 JobsMonitor monitor;
1282
1283 std::string id;
1284 monitor.Submit(id, new DummyJob(), 10);
1285
1286 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending));
1287
1288 monitor.Resubmit(id);
1289 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending));
1290
1291 {
1292 JobsMonitor::RunningJob job(monitor, 0);
1293 ASSERT_TRUE(job.IsValid());
1294 job.MarkFailure();
1295
1296 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running));
1297
1298 monitor.Resubmit(id);
1299 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running));
1300 }
1301
1302 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Failure));
1303
1304 monitor.Resubmit(id);
1305 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending));
1306
1307 {
1308 JobsMonitor::RunningJob job(monitor, 0);
1309 ASSERT_TRUE(job.IsValid());
1310 ASSERT_EQ(id, job.GetId());
1311
1312 job.MarkSuccess();
1313 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running));
1314 }
1315
1316 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Success));
1317 }