Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreadingTests.cpp @ 2557:b4516a6f214b jobs
state machine
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 03 May 2018 13:45:31 +0200 |
parents | 91e944c8389b |
children | 57f81b988713 |
comparison
equal
deleted
inserted
replaced
2556:91e944c8389b | 2557:b4516a6f214b |
---|---|
270 | 270 |
271 #if ORTHANC_SANDBOXED == 1 | 271 #if ORTHANC_SANDBOXED == 1 |
272 # error The job engine cannot be used in sandboxed environments | 272 # error The job engine cannot be used in sandboxed environments |
273 #endif | 273 #endif |
274 | 274 |
275 #include "../Core/Logging.h" | |
276 | |
275 #include <boost/date_time/posix_time/posix_time.hpp> | 277 #include <boost/date_time/posix_time/posix_time.hpp> |
278 #include <queue> | |
276 | 279 |
277 namespace Orthanc | 280 namespace Orthanc |
278 { | 281 { |
279 enum JobState | 282 enum JobState |
280 { | 283 { |
287 }; | 290 }; |
288 | 291 |
289 enum JobStepStatus | 292 enum JobStepStatus |
290 { | 293 { |
291 JobStepStatus_Success, | 294 JobStepStatus_Success, |
292 JobStepStatus_Error, | 295 JobStepStatus_Failure, |
293 JobStepStatus_Continue, | 296 JobStepStatus_Continue, |
294 JobStepStatus_Retry | 297 JobStepStatus_Retry |
295 }; | 298 }; |
296 | 299 |
297 | 300 |
298 class IJobStepResult : public boost::noncopyable | 301 class JobStepResult |
299 { | 302 { |
300 private: | 303 private: |
301 JobStepStatus status_; | 304 JobStepStatus status_; |
302 | 305 |
303 public: | 306 public: |
304 explicit IJobStepResult(JobStepStatus status) : | 307 explicit JobStepResult(JobStepStatus status) : |
305 status_(status) | 308 status_(status) |
306 { | 309 { |
307 } | 310 } |
308 | 311 |
309 virtual ~IJobStepResult() | 312 virtual ~JobStepResult() |
310 { | 313 { |
311 } | 314 } |
312 | 315 |
313 JobStepStatus GetStatus() const | 316 JobStepStatus GetStatus() const |
314 { | 317 { |
315 return status_; | 318 return status_; |
316 } | 319 } |
317 }; | 320 }; |
318 | 321 |
319 | 322 |
320 class RetryResult : public IJobStepResult | 323 class RetryResult : public JobStepResult |
321 { | 324 { |
322 private: | 325 private: |
323 unsigned int timeout_; // Retry after "timeout_" milliseconds | 326 unsigned int timeout_; // Retry after "timeout_" milliseconds |
324 | 327 |
325 public: | 328 public: |
326 RetryResult(unsigned int timeout) : | 329 RetryResult(unsigned int timeout) : |
327 IJobStepResult(JobStepStatus_Retry), | 330 JobStepResult(JobStepStatus_Retry), |
328 timeout_(timeout) | 331 timeout_(timeout) |
329 { | 332 { |
330 } | 333 } |
331 | 334 |
332 unsigned int GetTimeout() const | 335 unsigned int GetTimeout() const |
333 { | 336 { |
341 public: | 344 public: |
342 virtual ~IJob() | 345 virtual ~IJob() |
343 { | 346 { |
344 } | 347 } |
345 | 348 |
346 virtual IJobStepResult* ExecuteStep() = 0; | 349 virtual JobStepResult* ExecuteStep() = 0; |
347 | 350 |
348 virtual void ReleaseResources() = 0; // For pausing jobs | 351 virtual void ReleaseResources() = 0; // For pausing jobs |
349 | 352 |
350 virtual float GetProgress() = 0; | 353 virtual float GetProgress() = 0; |
351 | 354 |
352 virtual void FormatStatus(Json::Value& value) = 0; | 355 virtual void FormatStatus(Json::Value& value) = 0; |
353 }; | 356 }; |
354 | 357 |
355 | 358 |
359 class JobHandler : public boost::noncopyable | |
360 { | |
361 private: | |
362 std::string id_; | |
363 JobState state_; | |
364 std::auto_ptr<IJob> job_; | |
365 int priority_; // "+inf()" means highest priority | |
366 boost::posix_time::ptime creationTime_; | |
367 boost::posix_time::ptime lastUpdateTime_; | |
368 boost::posix_time::ptime retryTime_; | |
369 uint64_t runtime_; // In milliseconds | |
370 bool pauseScheduled_; | |
371 | |
372 void SetStateInternal(JobState state) | |
373 { | |
374 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
375 | |
376 if (state_ == JobState_Running) | |
377 { | |
378 runtime_ += (now - lastUpdateTime_).total_milliseconds(); | |
379 } | |
380 | |
381 state_ = state; | |
382 lastUpdateTime_ = now; | |
383 pauseScheduled_ = false; | |
384 } | |
385 | |
386 public: | |
387 JobHandler(IJob* job, | |
388 int priority) : | |
389 id_(Toolbox::GenerateUuid()), | |
390 state_(JobState_Pending), | |
391 job_(job), | |
392 priority_(priority), | |
393 creationTime_(boost::posix_time::microsec_clock::universal_time()), | |
394 lastUpdateTime_(creationTime_), | |
395 runtime_(0), | |
396 pauseScheduled_(false) | |
397 { | |
398 if (job == NULL) | |
399 { | |
400 throw OrthancException(ErrorCode_NullPointer); | |
401 } | |
402 } | |
403 | |
404 const std::string& GetId() const | |
405 { | |
406 return id_; | |
407 } | |
408 | |
409 IJob& GetJob() const | |
410 { | |
411 assert(job_.get() != NULL); | |
412 return *job_; | |
413 } | |
414 | |
415 void SetPriority(int priority) | |
416 { | |
417 priority_ = priority; | |
418 } | |
419 | |
420 int GetPriority() const | |
421 { | |
422 return priority_; | |
423 } | |
424 | |
425 JobState GetState() const | |
426 { | |
427 return state_; | |
428 } | |
429 | |
430 void SetState(JobState state) | |
431 { | |
432 if (state == JobState_Retry) | |
433 { | |
434 // Use "SetRetryState()" | |
435 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
436 } | |
437 else | |
438 { | |
439 SetStateInternal(state); | |
440 } | |
441 } | |
442 | |
443 void SetRetryState(unsigned int timeout) | |
444 { | |
445 if (state_ == JobState_Running) | |
446 { | |
447 SetStateInternal(JobState_Retry); | |
448 retryTime_ = (boost::posix_time::microsec_clock::universal_time() + | |
449 boost::posix_time::milliseconds(timeout)); | |
450 } | |
451 else | |
452 { | |
453 // Only valid for running jobs | |
454 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
455 } | |
456 } | |
457 | |
458 void SchedulePause() | |
459 { | |
460 if (state_ == JobState_Running) | |
461 { | |
462 pauseScheduled_ = true; | |
463 } | |
464 else | |
465 { | |
466 // Only valid for running jobs | |
467 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
468 } | |
469 } | |
470 | |
471 bool IsPauseScheduled() | |
472 { | |
473 return pauseScheduled_; | |
474 } | |
475 | |
476 bool IsRetryReady(const boost::posix_time::ptime& now) const | |
477 { | |
478 if (state_ != JobState_Retry) | |
479 { | |
480 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
481 } | |
482 else | |
483 { | |
484 return retryTime_ >= now; | |
485 } | |
486 } | |
487 }; | |
488 | |
489 | |
356 class JobsMonitor : public boost::noncopyable | 490 class JobsMonitor : public boost::noncopyable |
357 { | 491 { |
358 private: | 492 private: |
359 class JobHandler : public boost::noncopyable | 493 struct PriorityComparator |
360 { | 494 { |
361 private: | 495 bool operator() (JobHandler*& a, |
362 std::string id_; | 496 JobHandler*& b) const |
363 JobState state_; | 497 { |
364 std::auto_ptr<IJob> job_; | 498 return a->GetPriority() < b->GetPriority(); |
365 int priority_; // "+inf()" means highest priority | 499 } |
366 boost::posix_time::ptime creationTime_; | |
367 boost::posix_time::ptime lastUpdateTime_; | |
368 uint64_t runtime_; // In milliseconds | |
369 | |
370 public: | |
371 JobHandler(IJob* job, | |
372 int priority) : | |
373 id_(Toolbox::GenerateUuid()), | |
374 state_(JobState_Pending), | |
375 job_(job), | |
376 priority_(priority), | |
377 creationTime_(boost::posix_time::microsec_clock::universal_time()), | |
378 lastUpdateTime_(creationTime_), | |
379 runtime_(0) | |
380 { | |
381 if (job == NULL) | |
382 { | |
383 throw OrthancException(ErrorCode_NullPointer); | |
384 } | |
385 } | |
386 | |
387 const std::string& GetId() const | |
388 { | |
389 return id_; | |
390 } | |
391 }; | 500 }; |
392 | 501 |
502 typedef std::map<std::string, JobHandler*> JobsIndex; | |
503 typedef std::list<const JobHandler*> CompletedJobs; | |
504 typedef std::set<JobHandler*> RetryJobs; | |
505 typedef std::priority_queue<JobHandler*, | |
506 std::vector<JobHandler*>, // Could be a "std::deque" | |
507 PriorityComparator> PendingJobs; | |
508 | |
509 boost::mutex mutex_; | |
510 JobsIndex jobsIndex_; | |
511 PendingJobs pendingJobs_; | |
512 CompletedJobs completedJobs_; | |
513 RetryJobs retryJobs_; | |
514 | |
515 boost::condition_variable pendingJobAvailable_; | |
516 size_t maxCompletedJobs_; | |
517 | |
518 | |
519 #ifndef NDEBUG | |
520 bool IsPendingJob(const JobHandler& job) const | |
521 { | |
522 PendingJobs copy = pendingJobs_; | |
523 while (!copy.empty()) | |
524 { | |
525 if (copy.top() == &job) | |
526 { | |
527 return true; | |
528 } | |
529 | |
530 copy.pop(); | |
531 } | |
532 | |
533 return false; | |
534 } | |
535 | |
536 bool IsCompletedJob(const JobHandler& job) const | |
537 { | |
538 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
539 it != completedJobs_.end(); ++it) | |
540 { | |
541 if (*it == &job) | |
542 { | |
543 return true; | |
544 } | |
545 } | |
546 | |
547 return false; | |
548 } | |
549 | |
550 bool IsRetryJob(JobHandler& job) const | |
551 { | |
552 return retryJobs_.find(&job) != retryJobs_.end(); | |
553 } | |
554 #endif | |
555 | |
556 | |
557 void CheckInvariants() | |
558 { | |
559 #ifndef NDEBUG | |
560 { | |
561 PendingJobs copy = pendingJobs_; | |
562 while (!copy.empty()) | |
563 { | |
564 assert(copy.top()->GetState() == JobState_Pending); | |
565 copy.pop(); | |
566 } | |
567 } | |
568 | |
569 assert(completedJobs_.size() <= maxCompletedJobs_); | |
570 | |
571 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
572 it != completedJobs_.end(); ++it) | |
573 { | |
574 assert((*it)->GetState() == JobState_Success || | |
575 (*it)->GetState() == JobState_Failure); | |
576 } | |
577 | |
578 for (RetryJobs::const_iterator it = retryJobs_.begin(); | |
579 it != retryJobs_.end(); ++it) | |
580 { | |
581 assert((*it)->GetState() == JobState_Retry); | |
582 } | |
583 | |
584 for (JobsIndex::iterator it = jobsIndex_.begin(); | |
585 it != jobsIndex_.end(); ++it) | |
586 { | |
587 JobHandler& job = *it->second; | |
588 | |
589 assert(job.GetId() == it->first); | |
590 | |
591 switch (job.GetState()) | |
592 { | |
593 case JobState_Pending: | |
594 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); | |
595 break; | |
596 | |
597 case JobState_Success: | |
598 case JobState_Failure: | |
599 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); | |
600 break; | |
601 | |
602 case JobState_Retry: | |
603 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
604 break; | |
605 | |
606 case JobState_Running: | |
607 case JobState_Paused: | |
608 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
609 break; | |
610 | |
611 default: | |
612 throw OrthancException(ErrorCode_InternalError); | |
613 } | |
614 } | |
615 #endif | |
616 } | |
617 | |
618 | |
619 void ForgetOldCompletedJobs() | |
620 { | |
621 if (maxCompletedJobs_ != 0) | |
622 { | |
623 while (completedJobs_.size() > maxCompletedJobs_) | |
624 { | |
625 assert(completedJobs_.front() != NULL); | |
626 | |
627 std::string id = completedJobs_.front()->GetId(); | |
628 | |
629 assert(jobsIndex_.find(id) != jobsIndex_.end()); | |
630 | |
631 jobsIndex_.erase(id); | |
632 delete(completedJobs_.front()); | |
633 completedJobs_.pop_front(); | |
634 } | |
635 } | |
636 } | |
637 | |
638 | |
639 void MarkRunningAsCompleted(JobHandler& job, | |
640 bool success) | |
641 { | |
642 boost::mutex::scoped_lock lock(mutex_); | |
643 CheckInvariants(); | |
644 assert(job.GetState() == JobState_Running); | |
645 | |
646 job.SetState(success ? JobState_Success : JobState_Failure); | |
647 | |
648 completedJobs_.push_back(&job); | |
649 ForgetOldCompletedJobs(); | |
650 | |
651 CheckInvariants(); | |
652 } | |
653 | |
654 | |
655 void MarkRunningAsRetry(JobHandler& job, | |
656 unsigned int timeout) | |
657 { | |
658 boost::mutex::scoped_lock lock(mutex_); | |
659 CheckInvariants(); | |
660 | |
661 assert(job.GetState() == JobState_Running && | |
662 retryJobs_.find(&job) == retryJobs_.end()); | |
663 | |
664 retryJobs_.insert(&job); | |
665 job.SetRetryState(timeout); | |
666 | |
667 CheckInvariants(); | |
668 } | |
669 | |
670 | |
671 void MarkRunningAsPaused(JobHandler& job) | |
672 { | |
673 boost::mutex::scoped_lock lock(mutex_); | |
674 CheckInvariants(); | |
675 assert(job.GetState() == JobState_Running); | |
676 | |
677 job.SetState(JobState_Paused); | |
678 | |
679 CheckInvariants(); | |
680 } | |
681 | |
682 | |
683 JobHandler* WaitPendingJob(unsigned int timeout) | |
684 { | |
685 boost::mutex::scoped_lock lock(mutex_); | |
686 | |
687 while (pendingJobs_.empty()) | |
688 { | |
689 if (timeout == 0) | |
690 { | |
691 pendingJobAvailable_.wait(lock); | |
692 } | |
693 else | |
694 { | |
695 bool success = pendingJobAvailable_.timed_wait | |
696 (lock, boost::posix_time::milliseconds(timeout)); | |
697 if (!success) | |
698 { | |
699 return NULL; | |
700 } | |
701 } | |
702 } | |
703 | |
704 JobHandler* job = pendingJobs_.top(); | |
705 pendingJobs_.pop(); | |
706 | |
707 job->SetState(JobState_Running); | |
708 return job; | |
709 } | |
710 | |
711 | |
393 public: | 712 public: |
713 JobsMonitor() : | |
714 maxCompletedJobs_(10) | |
715 { | |
716 } | |
717 | |
718 | |
719 ~JobsMonitor() | |
720 { | |
721 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) | |
722 { | |
723 assert(it->second != NULL); | |
724 delete it->second; | |
725 } | |
726 } | |
727 | |
728 | |
729 void SetMaxCompletedJobs(size_t i) | |
730 { | |
731 boost::mutex::scoped_lock lock(mutex_); | |
732 CheckInvariants(); | |
733 | |
734 maxCompletedJobs_ = i; | |
735 ForgetOldCompletedJobs(); | |
736 | |
737 CheckInvariants(); | |
738 } | |
739 | |
740 | |
741 void ListJobs(std::set<std::string>& target) | |
742 { | |
743 boost::mutex::scoped_lock lock(mutex_); | |
744 CheckInvariants(); | |
745 | |
746 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
747 it != jobsIndex_.end(); ++it) | |
748 { | |
749 target.insert(it->first); | |
750 } | |
751 } | |
752 | |
753 | |
394 void Submit(std::string& id, | 754 void Submit(std::string& id, |
395 IJob* job, | 755 IJob* job, // Takes ownership |
396 int priority) | 756 int priority) |
397 { | 757 { |
398 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); | 758 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); |
759 | |
760 boost::mutex::scoped_lock lock(mutex_); | |
761 CheckInvariants(); | |
762 | |
399 id = handler->GetId(); | 763 id = handler->GetId(); |
400 } | 764 pendingJobs_.push(handler.get()); |
765 jobsIndex_.insert(std::make_pair(id, handler.release())); | |
766 | |
767 pendingJobAvailable_.notify_one(); | |
768 | |
769 CheckInvariants(); | |
770 } | |
771 | |
772 | |
773 void Submit(IJob* job, // Takes ownership | |
774 int priority) | |
775 { | |
776 std::string id; | |
777 Submit(id, job, priority); | |
778 } | |
779 | |
401 | 780 |
402 void SetPriority(const std::string& id, | 781 void SetPriority(const std::string& id, |
403 int priority) | 782 int priority) |
404 { | 783 { |
405 // TODO | 784 boost::mutex::scoped_lock lock(mutex_); |
406 } | 785 CheckInvariants(); |
786 | |
787 JobsIndex::iterator found = jobsIndex_.find(id); | |
788 | |
789 if (found == jobsIndex_.end()) | |
790 { | |
791 LOG(WARNING) << "Unknown job: " << id; | |
792 } | |
793 else | |
794 { | |
795 found->second->SetPriority(priority); | |
796 | |
797 if (found->second->GetState() == JobState_Pending) | |
798 { | |
799 // If the job is pending, we need to reconstruct the | |
800 // priority queue, as the heap condition has changed | |
801 | |
802 PendingJobs copy; | |
803 std::swap(copy, pendingJobs_); | |
804 | |
805 assert(pendingJobs_.empty()); | |
806 while (!copy.empty()) | |
807 { | |
808 pendingJobs_.push(copy.top()); | |
809 copy.pop(); | |
810 } | |
811 } | |
812 } | |
813 | |
814 CheckInvariants(); | |
815 } | |
816 | |
407 | 817 |
408 void Pause(const std::string& id) | 818 void Pause(const std::string& id) |
409 { | 819 { |
410 // TODO | 820 boost::mutex::scoped_lock lock(mutex_); |
411 } | 821 CheckInvariants(); |
822 | |
823 JobsIndex::iterator found = jobsIndex_.find(id); | |
824 | |
825 if (found == jobsIndex_.end()) | |
826 { | |
827 LOG(WARNING) << "Unknown job: " << id; | |
828 } | |
829 else | |
830 { | |
831 switch (found->second->GetState()) | |
832 { | |
833 case JobState_Pending: | |
834 { | |
835 // If the job is pending, we need to reconstruct the | |
836 // priority queue to remove it | |
837 PendingJobs copy; | |
838 std::swap(copy, pendingJobs_); | |
839 | |
840 assert(pendingJobs_.empty()); | |
841 while (!copy.empty()) | |
842 { | |
843 if (copy.top()->GetId() != id) | |
844 { | |
845 pendingJobs_.push(copy.top()); | |
846 } | |
847 | |
848 copy.pop(); | |
849 } | |
850 | |
851 found->second->SetState(JobState_Paused); | |
852 | |
853 break; | |
854 } | |
855 | |
856 case JobState_Retry: | |
857 { | |
858 RetryJobs::iterator item = retryJobs_.find(found->second); | |
859 assert(item != retryJobs_.end()); | |
860 retryJobs_.erase(item); | |
861 | |
862 found->second->SetState(JobState_Paused); | |
863 | |
864 break; | |
865 } | |
866 | |
867 case JobState_Paused: | |
868 case JobState_Success: | |
869 case JobState_Failure: | |
870 // Nothing to be done | |
871 break; | |
872 | |
873 case JobState_Running: | |
874 found->second->SchedulePause(); | |
875 break; | |
876 | |
877 default: | |
878 throw OrthancException(ErrorCode_InternalError); | |
879 } | |
880 } | |
881 | |
882 CheckInvariants(); | |
883 } | |
884 | |
412 | 885 |
413 void Resume(const std::string& id) | 886 void Resume(const std::string& id) |
414 { | 887 { |
415 // TODO | 888 boost::mutex::scoped_lock lock(mutex_); |
416 } | 889 CheckInvariants(); |
890 | |
891 JobsIndex::iterator found = jobsIndex_.find(id); | |
892 | |
893 if (found == jobsIndex_.end()) | |
894 { | |
895 LOG(WARNING) << "Unknown job: " << id; | |
896 } | |
897 else if (found->second->GetState() != JobState_Paused) | |
898 { | |
899 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; | |
900 } | |
901 else | |
902 { | |
903 found->second->SetState(JobState_Pending); | |
904 pendingJobs_.push(found->second); | |
905 pendingJobAvailable_.notify_one(); | |
906 } | |
907 | |
908 CheckInvariants(); | |
909 } | |
910 | |
417 | 911 |
418 void Resubmit(const std::string& id) | 912 void Resubmit(const std::string& id) |
419 { | 913 { |
420 // TODO | 914 boost::mutex::scoped_lock lock(mutex_); |
421 } | 915 CheckInvariants(); |
422 | 916 |
423 class JobToRun : public boost::noncopyable | 917 JobsIndex::iterator found = jobsIndex_.find(id); |
918 | |
919 if (found == jobsIndex_.end()) | |
920 { | |
921 LOG(WARNING) << "Unknown job: " << id; | |
922 } | |
923 else if (found->second->GetState() != JobState_Failure) | |
924 { | |
925 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; | |
926 } | |
927 else | |
928 { | |
929 bool ok = false; | |
930 for (CompletedJobs::iterator it = completedJobs_.begin(); | |
931 it != completedJobs_.end(); ++it) | |
932 { | |
933 if (*it == found->second) | |
934 { | |
935 ok = true; | |
936 completedJobs_.erase(it); | |
937 break; | |
938 } | |
939 } | |
940 | |
941 assert(ok); | |
942 | |
943 found->second->SetState(JobState_Pending); | |
944 pendingJobs_.push(found->second); | |
945 pendingJobAvailable_.notify_one(); | |
946 } | |
947 | |
948 CheckInvariants(); | |
949 } | |
950 | |
951 | |
952 void ScheduleRetries() | |
953 { | |
954 boost::mutex::scoped_lock lock(mutex_); | |
955 CheckInvariants(); | |
956 | |
957 RetryJobs copy; | |
958 std::swap(copy, retryJobs_); | |
959 | |
960 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
961 | |
962 assert(retryJobs_.empty()); | |
963 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) | |
964 { | |
965 if ((*it)->IsRetryReady(now)) | |
966 { | |
967 (*it)->SetState(JobState_Pending); | |
968 } | |
969 else | |
970 { | |
971 retryJobs_.insert(*it); | |
972 } | |
973 } | |
974 | |
975 CheckInvariants(); | |
976 } | |
977 | |
978 | |
979 bool GetState(JobState& state, | |
980 const std::string& id) | |
981 { | |
982 boost::mutex::scoped_lock lock(mutex_); | |
983 CheckInvariants(); | |
984 | |
985 JobsIndex::const_iterator it = jobsIndex_.find(id); | |
986 if (it == jobsIndex_.end()) | |
987 { | |
988 return false; | |
989 } | |
990 else | |
991 { | |
992 state = it->second->GetState(); | |
993 return true; | |
994 } | |
995 } | |
996 | |
997 | |
998 class RunningJob : public boost::noncopyable | |
424 { | 999 { |
425 private: | 1000 private: |
426 JobHandler* handler_; | 1001 JobsMonitor& that_; |
1002 JobHandler* handler_; | |
1003 JobState targetState_; | |
1004 unsigned int retryTimeout_; | |
427 | 1005 |
428 public: | 1006 public: |
429 JobToRun(JobsMonitor& that, | 1007 RunningJob(JobsMonitor& that, |
430 unsigned int timeout) : | 1008 unsigned int timeout) : |
431 handler_(NULL) | 1009 that_(that), |
432 { | 1010 handler_(NULL), |
1011 targetState_(JobState_Failure), | |
1012 retryTimeout_(0) | |
1013 { | |
1014 handler_ = that_.WaitPendingJob(timeout); | |
1015 } | |
1016 | |
1017 ~RunningJob() | |
1018 { | |
1019 if (IsValid()) | |
1020 { | |
1021 switch (targetState_) | |
1022 { | |
1023 case JobState_Failure: | |
1024 that_.MarkRunningAsCompleted(*handler_, false); | |
1025 break; | |
1026 | |
1027 case JobState_Success: | |
1028 that_.MarkRunningAsCompleted(*handler_, true); | |
1029 break; | |
1030 | |
1031 case JobState_Paused: | |
1032 that_.MarkRunningAsPaused(*handler_); | |
1033 break; | |
1034 | |
1035 case JobState_Retry: | |
1036 that_.MarkRunningAsRetry(*handler_, retryTimeout_); | |
1037 break; | |
1038 | |
1039 default: | |
1040 assert(0); | |
1041 } | |
1042 } | |
433 } | 1043 } |
434 | 1044 |
435 bool IsValid() const | 1045 bool IsValid() const |
436 { | 1046 { |
437 return handler_ != NULL; | 1047 return handler_ != NULL; |
438 } | 1048 } |
439 | 1049 |
440 | 1050 const std::string& GetId() const |
1051 { | |
1052 if (IsValid()) | |
1053 { | |
1054 return handler_->GetId(); | |
1055 } | |
1056 else | |
1057 { | |
1058 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1059 } | |
1060 } | |
1061 | |
1062 int GetPriority() const | |
1063 { | |
1064 if (IsValid()) | |
1065 { | |
1066 return handler_->GetPriority(); | |
1067 } | |
1068 else | |
1069 { | |
1070 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1071 } | |
1072 } | |
1073 | |
1074 bool IsPauseScheduled() | |
1075 { | |
1076 if (!IsValid()) | |
1077 { | |
1078 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1079 } | |
1080 | |
1081 boost::mutex::scoped_lock lock(that_.mutex_); | |
1082 that_.CheckInvariants(); | |
1083 assert(handler_->GetState() == JobState_Running); | |
1084 | |
1085 return handler_->IsPauseScheduled(); | |
1086 } | |
1087 | |
1088 IJob& GetJob() | |
1089 { | |
1090 if (!IsValid()) | |
1091 { | |
1092 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1093 } | |
1094 | |
1095 boost::mutex::scoped_lock lock(that_.mutex_); | |
1096 that_.CheckInvariants(); | |
1097 assert(handler_->GetState() == JobState_Running); | |
1098 | |
1099 return handler_->GetJob(); | |
1100 } | |
1101 | |
1102 void MarkSuccess() | |
1103 { | |
1104 if (!IsValid()) | |
1105 { | |
1106 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1107 } | |
1108 | |
1109 targetState_ = JobState_Success; | |
1110 } | |
1111 | |
1112 void MarkFailure() | |
1113 { | |
1114 if (!IsValid()) | |
1115 { | |
1116 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1117 } | |
1118 | |
1119 targetState_ = JobState_Failure; | |
1120 } | |
1121 | |
1122 void MarkPause() | |
1123 { | |
1124 if (!IsValid()) | |
1125 { | |
1126 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1127 } | |
1128 | |
1129 targetState_ = JobState_Paused; | |
1130 } | |
1131 | |
1132 void MarkRetry(unsigned int timeout) | |
1133 { | |
1134 if (!IsValid()) | |
1135 { | |
1136 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1137 } | |
1138 | |
1139 targetState_ = JobState_Retry; | |
1140 retryTimeout_ = timeout; | |
1141 } | |
441 }; | 1142 }; |
442 }; | 1143 }; |
443 } | 1144 } |
1145 | |
1146 | |
1147 | |
1148 class DummyJob : public Orthanc::IJob | |
1149 { | |
1150 private: | |
1151 JobStepResult result_; | |
1152 | |
1153 public: | |
1154 DummyJob() : | |
1155 result_(Orthanc::JobStepStatus_Success) | |
1156 { | |
1157 } | |
1158 | |
1159 explicit DummyJob(JobStepResult result) : | |
1160 result_(result) | |
1161 { | |
1162 } | |
1163 | |
1164 virtual JobStepResult* ExecuteStep() | |
1165 { | |
1166 return new JobStepResult(result_); | |
1167 } | |
1168 | |
1169 virtual void ReleaseResources() | |
1170 { | |
1171 } | |
1172 | |
1173 virtual float GetProgress() | |
1174 { | |
1175 return 0; | |
1176 } | |
1177 | |
1178 virtual void FormatStatus(Json::Value& value) | |
1179 { | |
1180 } | |
1181 }; | |
1182 | |
1183 | |
1184 static bool CheckState(Orthanc::JobsMonitor& monitor, | |
1185 const std::string& id, | |
1186 Orthanc::JobState state) | |
1187 { | |
1188 Orthanc::JobState s; | |
1189 if (monitor.GetState(s, id)) | |
1190 { | |
1191 return state == s; | |
1192 } | |
1193 else | |
1194 { | |
1195 return false; | |
1196 } | |
1197 } | |
1198 | |
1199 | |
1200 TEST(JobsMonitor, Priority) | |
1201 { | |
1202 JobsMonitor monitor; | |
1203 | |
1204 std::string i1, i2, i3, i4; | |
1205 monitor.Submit(i1, new DummyJob(), 10); | |
1206 monitor.Submit(i2, new DummyJob(), 30); | |
1207 monitor.Submit(i3, new DummyJob(), 20); | |
1208 monitor.Submit(i4, new DummyJob(), 5); | |
1209 | |
1210 monitor.SetMaxCompletedJobs(2); | |
1211 | |
1212 std::set<std::string> id; | |
1213 monitor.ListJobs(id); | |
1214 | |
1215 ASSERT_EQ(4u, id.size()); | |
1216 ASSERT_TRUE(id.find(i1) != id.end()); | |
1217 ASSERT_TRUE(id.find(i2) != id.end()); | |
1218 ASSERT_TRUE(id.find(i3) != id.end()); | |
1219 ASSERT_TRUE(id.find(i4) != id.end()); | |
1220 | |
1221 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Pending)); | |
1222 | |
1223 { | |
1224 JobsMonitor::RunningJob job(monitor, 0); | |
1225 ASSERT_TRUE(job.IsValid()); | |
1226 ASSERT_EQ(30, job.GetPriority()); | |
1227 ASSERT_EQ(i2, job.GetId()); | |
1228 | |
1229 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Running)); | |
1230 } | |
1231 | |
1232 ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Failure)); | |
1233 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Pending)); | |
1234 | |
1235 { | |
1236 JobsMonitor::RunningJob job(monitor, 0); | |
1237 ASSERT_TRUE(job.IsValid()); | |
1238 ASSERT_EQ(20, job.GetPriority()); | |
1239 ASSERT_EQ(i3, job.GetId()); | |
1240 | |
1241 job.MarkSuccess(); | |
1242 | |
1243 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Running)); | |
1244 } | |
1245 | |
1246 ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Success)); | |
1247 | |
1248 { | |
1249 JobsMonitor::RunningJob job(monitor, 0); | |
1250 ASSERT_TRUE(job.IsValid()); | |
1251 ASSERT_EQ(10, job.GetPriority()); | |
1252 ASSERT_EQ(i1, job.GetId()); | |
1253 } | |
1254 | |
1255 { | |
1256 JobsMonitor::RunningJob job(monitor, 0); | |
1257 ASSERT_TRUE(job.IsValid()); | |
1258 ASSERT_EQ(5, job.GetPriority()); | |
1259 ASSERT_EQ(i4, job.GetId()); | |
1260 } | |
1261 | |
1262 { | |
1263 JobsMonitor::RunningJob job(monitor, 1); | |
1264 ASSERT_FALSE(job.IsValid()); | |
1265 } | |
1266 | |
1267 Orthanc::JobState s; | |
1268 ASSERT_TRUE(monitor.GetState(s, i1)); | |
1269 ASSERT_FALSE(monitor.GetState(s, i2)); // Removed because oldest | |
1270 ASSERT_FALSE(monitor.GetState(s, i3)); // Removed because second oldest | |
1271 ASSERT_TRUE(monitor.GetState(s, i4)); | |
1272 | |
1273 monitor.SetMaxCompletedJobs(1); // (*) | |
1274 ASSERT_FALSE(monitor.GetState(s, i1)); // Just discarded by (*) | |
1275 ASSERT_TRUE(monitor.GetState(s, i4)); | |
1276 } | |
1277 | |
1278 | |
1279 TEST(JobsMonitor, Resubmit) | |
1280 { | |
1281 JobsMonitor monitor; | |
1282 | |
1283 std::string id; | |
1284 monitor.Submit(id, new DummyJob(), 10); | |
1285 | |
1286 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); | |
1287 | |
1288 monitor.Resubmit(id); | |
1289 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); | |
1290 | |
1291 { | |
1292 JobsMonitor::RunningJob job(monitor, 0); | |
1293 ASSERT_TRUE(job.IsValid()); | |
1294 job.MarkFailure(); | |
1295 | |
1296 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); | |
1297 | |
1298 monitor.Resubmit(id); | |
1299 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); | |
1300 } | |
1301 | |
1302 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Failure)); | |
1303 | |
1304 monitor.Resubmit(id); | |
1305 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); | |
1306 | |
1307 { | |
1308 JobsMonitor::RunningJob job(monitor, 0); | |
1309 ASSERT_TRUE(job.IsValid()); | |
1310 ASSERT_EQ(id, job.GetId()); | |
1311 | |
1312 job.MarkSuccess(); | |
1313 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); | |
1314 } | |
1315 | |
1316 ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Success)); | |
1317 } |