comparison UnitTestsSources/MultiThreadingTests.cpp @ 2568:a46094602346 jobs

improvements
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 07 May 2018 15:02:34 +0200
parents 3caca43371f5
children 2af17cd5eb1f
comparison
equal deleted inserted replaced
2567:3caca43371f5 2568:a46094602346
298 { 298 {
299 JobStepCode_Success, 299 JobStepCode_Success,
300 JobStepCode_Failure, 300 JobStepCode_Failure,
301 JobStepCode_Continue, 301 JobStepCode_Continue,
302 JobStepCode_Retry 302 JobStepCode_Retry
303 }; 303 };
304 304
305
306 class JobStepResult 305 class JobStepResult
307 { 306 {
308 private: 307 private:
309 JobStepCode status_; 308 JobStepCode code_;
310 309
311 public: 310 public:
312 explicit JobStepResult(JobStepCode status) : 311 explicit JobStepResult(JobStepCode code) :
313 status_(status) 312 code_(code)
314 { 313 {
315 } 314 }
316 315
317 virtual ~JobStepResult() 316 virtual ~JobStepResult()
318 { 317 {
319 } 318 }
320 319
321 JobStepCode GetCode() const 320 JobStepCode GetCode() const
322 { 321 {
323 return status_; 322 return code_;
324 } 323 }
325 }; 324 };
326 325
327 326
328 class RetryResult : public JobStepResult 327 class JobStepRetry : public JobStepResult
329 { 328 {
330 private: 329 private:
331 unsigned int timeout_; // Retry after "timeout_" milliseconds 330 unsigned int timeout_; // Retry after "timeout_" milliseconds
332 331
333 public: 332 public:
334 RetryResult(unsigned int timeout) : 333 JobStepRetry(unsigned int timeout) :
335 JobStepResult(JobStepCode_Retry), 334 JobStepResult(JobStepCode_Retry),
336 timeout_(timeout) 335 timeout_(timeout)
337 { 336 {
338 } 337 }
339 338
568 } 567 }
569 } 568 }
570 }; 569 };
571 570
572 571
573 class JobHandler : public boost::noncopyable 572
574 { 573
574 class JobsRegistry : public boost::noncopyable
575 {
575 private: 576 private:
576 std::string id_; 577 class JobHandler : public boost::noncopyable
577 JobState state_; 578 {
578 std::auto_ptr<IJob> job_; 579 private:
579 int priority_; // "+inf()" means highest priority 580 std::string id_;
580 boost::posix_time::ptime creationTime_; 581 JobState state_;
581 boost::posix_time::ptime lastStateChangeTime_; 582 std::auto_ptr<IJob> job_;
582 boost::posix_time::time_duration runtime_; 583 int priority_; // "+inf()" means highest priority
583 boost::posix_time::ptime retryTime_; 584 boost::posix_time::ptime creationTime_;
584 bool pauseScheduled_; 585 boost::posix_time::ptime lastStateChangeTime_;
585 JobStatus lastStatus_; 586 boost::posix_time::time_duration runtime_;
586 587 boost::posix_time::ptime retryTime_;
587 void Touch() 588 bool pauseScheduled_;
588 { 589 JobStatus lastStatus_;
589 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); 590
590 591 void Touch()
591 if (state_ == JobState_Running) 592 {
592 { 593 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
593 runtime_ += (now - lastStateChangeTime_); 594
594 } 595 if (state_ == JobState_Running)
595 596 {
596 lastStateChangeTime_ = now; 597 runtime_ += (now - lastStateChangeTime_);
597 } 598 }
598 599
599 void SetStateInternal(JobState state) 600 lastStateChangeTime_ = now;
600 { 601 }
601 state_ = state; 602
602 pauseScheduled_ = false; 603 void SetStateInternal(JobState state)
603 Touch(); 604 {
604 } 605 state_ = state;
605 606 pauseScheduled_ = false;
606 public: 607 Touch();
607 JobHandler(IJob* job, 608 }
608 int priority) : 609
609 id_(Toolbox::GenerateUuid()), 610 public:
610 state_(JobState_Pending), 611 JobHandler(IJob* job,
611 job_(job), 612 int priority) :
612 priority_(priority), 613 id_(Toolbox::GenerateUuid()),
613 creationTime_(boost::posix_time::microsec_clock::universal_time()), 614 state_(JobState_Pending),
614 lastStateChangeTime_(creationTime_), 615 job_(job),
615 runtime_(boost::posix_time::milliseconds(0)), 616 priority_(priority),
616 retryTime_(creationTime_), 617 creationTime_(boost::posix_time::microsec_clock::universal_time()),
617 pauseScheduled_(false) 618 lastStateChangeTime_(creationTime_),
618 { 619 runtime_(boost::posix_time::milliseconds(0)),
619 if (job == NULL) 620 retryTime_(creationTime_),
620 { 621 pauseScheduled_(false)
621 throw OrthancException(ErrorCode_NullPointer); 622 {
622 } 623 if (job == NULL)
623 624 {
624 lastStatus_ = JobStatus(ErrorCode_Success, *job); 625 throw OrthancException(ErrorCode_NullPointer);
625 } 626 }
626 627
627 const std::string& GetId() const 628 lastStatus_ = JobStatus(ErrorCode_Success, *job);
628 { 629 }
629 return id_; 630
630 } 631 const std::string& GetId() const
631 632 {
632 IJob& GetJob() const 633 return id_;
633 { 634 }
634 assert(job_.get() != NULL); 635
635 return *job_; 636 IJob& GetJob() const
636 } 637 {
637 638 assert(job_.get() != NULL);
638 void SetPriority(int priority) 639 return *job_;
639 { 640 }
640 priority_ = priority; 641
641 } 642 void SetPriority(int priority)
642 643 {
643 int GetPriority() const 644 priority_ = priority;
644 { 645 }
645 return priority_; 646
646 } 647 int GetPriority() const
647 648 {
648 JobState GetState() const 649 return priority_;
649 { 650 }
650 return state_; 651
651 } 652 JobState GetState() const
652 653 {
653 void SetState(JobState state) 654 return state_;
654 { 655 }
655 if (state == JobState_Retry) 656
656 { 657 void SetState(JobState state)
657 // Use "SetRetryState()" 658 {
658 throw OrthancException(ErrorCode_BadSequenceOfCalls); 659 if (state == JobState_Retry)
659 } 660 {
660 else 661 // Use "SetRetryState()"
661 { 662 throw OrthancException(ErrorCode_BadSequenceOfCalls);
662 SetStateInternal(state); 663 }
663 } 664 else
664 } 665 {
665 666 SetStateInternal(state);
666 void SetRetryState(unsigned int timeout) 667 }
667 { 668 }
668 if (state_ == JobState_Running) 669
669 { 670 void SetRetryState(unsigned int timeout)
670 SetStateInternal(JobState_Retry); 671 {
671 retryTime_ = (boost::posix_time::microsec_clock::universal_time() + 672 if (state_ == JobState_Running)
672 boost::posix_time::milliseconds(timeout)); 673 {
673 } 674 SetStateInternal(JobState_Retry);
674 else 675 retryTime_ = (boost::posix_time::microsec_clock::universal_time() +
675 { 676 boost::posix_time::milliseconds(timeout));
676 // Only valid for running jobs 677 }
677 throw OrthancException(ErrorCode_BadSequenceOfCalls); 678 else
678 } 679 {
679 } 680 // Only valid for running jobs
680 681 throw OrthancException(ErrorCode_BadSequenceOfCalls);
681 void SchedulePause() 682 }
682 { 683 }
683 if (state_ == JobState_Running) 684
684 { 685 void SchedulePause()
685 pauseScheduled_ = true; 686 {
686 } 687 if (state_ == JobState_Running)
687 else 688 {
688 { 689 pauseScheduled_ = true;
689 // Only valid for running jobs 690 }
690 throw OrthancException(ErrorCode_BadSequenceOfCalls); 691 else
691 } 692 {
692 } 693 // Only valid for running jobs
693 694 throw OrthancException(ErrorCode_BadSequenceOfCalls);
694 bool IsPauseScheduled() 695 }
695 { 696 }
696 return pauseScheduled_; 697
697 } 698 bool IsPauseScheduled()
698 699 {
699 bool IsRetryReady(const boost::posix_time::ptime& now) const 700 return pauseScheduled_;
700 { 701 }
701 if (state_ != JobState_Retry) 702
702 { 703 bool IsRetryReady(const boost::posix_time::ptime& now) const
703 throw OrthancException(ErrorCode_BadSequenceOfCalls); 704 {
704 } 705 if (state_ != JobState_Retry)
705 else 706 {
706 { 707 throw OrthancException(ErrorCode_BadSequenceOfCalls);
707 return retryTime_ <= now; 708 }
708 } 709 else
709 } 710 {
710 711 return retryTime_ <= now;
711 const boost::posix_time::ptime& GetCreationTime() const 712 }
712 { 713 }
713 return creationTime_; 714
714 } 715 const boost::posix_time::ptime& GetCreationTime() const
715 716 {
716 const boost::posix_time::ptime& GetLastStateChangeTime() const 717 return creationTime_;
717 { 718 }
718 return lastStateChangeTime_; 719
719 } 720 const boost::posix_time::ptime& GetLastStateChangeTime() const
720 721 {
721 const boost::posix_time::time_duration& GetRuntime() const 722 return lastStateChangeTime_;
722 { 723 }
723 return runtime_; 724
724 } 725 const boost::posix_time::time_duration& GetRuntime() const
725 726 {
726 const JobStatus& GetLastStatus() const 727 return runtime_;
727 { 728 }
728 return lastStatus_; 729
729 } 730 const JobStatus& GetLastStatus() const
730 731 {
731 void SetLastStatus(const JobStatus& status) 732 return lastStatus_;
732 { 733 }
733 lastStatus_ = status; 734
734 Touch(); 735 void SetLastStatus(const JobStatus& status)
735 } 736 {
736 }; 737 lastStatus_ = status;
737 738 Touch();
738 739 }
739 class JobsRegistry : public boost::noncopyable 740 };
740 { 741
741 private:
742 struct PriorityComparator 742 struct PriorityComparator
743 { 743 {
744 bool operator() (JobHandler*& a, 744 bool operator() (JobHandler*& a,
745 JobHandler*& b) const 745 JobHandler*& b) const
746 { 746 {
972 target.insert(it->first); 972 target.insert(it->first);
973 } 973 }
974 } 974 }
975 975
976 976
977 void GetJobsInfo(std::map<std::string, JobInfo>& target) 977 bool GetJobInfo(JobInfo& target,
978 const std::string& id)
978 { 979 {
979 boost::mutex::scoped_lock lock(mutex_); 980 boost::mutex::scoped_lock lock(mutex_);
980 CheckInvariants(); 981 CheckInvariants();
981 982
982 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 983 JobsIndex::const_iterator found = jobsIndex_.find(id);
983 it != jobsIndex_.end(); ++it) 984
984 { 985 if (found == jobsIndex_.end())
985 const JobHandler& handler = *it->second; 986 {
986 target[it->first] = JobInfo(handler.GetId(), 987 return false;
987 handler.GetPriority(), 988 }
988 handler.GetState(), 989 else
989 handler.GetLastStatus(), 990 {
990 handler.GetCreationTime(), 991 const JobHandler& handler = *found->second;
991 handler.GetLastStateChangeTime(), 992 target = JobInfo(handler.GetId(),
992 handler.GetRuntime()); 993 handler.GetPriority(),
994 handler.GetState(),
995 handler.GetLastStatus(),
996 handler.GetCreationTime(),
997 handler.GetLastStateChangeTime(),
998 handler.GetRuntime());
999 return true;
993 } 1000 }
994 } 1001 }
995 1002
996 1003
997 void Submit(std::string& id, 1004 void Submit(std::string& id,
1543 case JobStepCode_Failure: 1550 case JobStepCode_Failure:
1544 running.MarkFailure(); 1551 running.MarkFailure();
1545 return false; 1552 return false;
1546 1553
1547 case JobStepCode_Retry: 1554 case JobStepCode_Retry:
1548 running.MarkRetry(dynamic_cast<RetryResult&>(*result).GetTimeout()); 1555 running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout());
1549 return false; 1556 return false;
1550 1557
1551 case JobStepCode_Continue: 1558 case JobStepCode_Continue:
1552 return true; 1559 return true;
1553 1560
2082 engine.Start(); 2089 engine.Start();
2083 2090
2084 boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 2091 boost::this_thread::sleep(boost::posix_time::milliseconds(100));
2085 2092
2086 { 2093 {
2087 typedef std::map<std::string, JobInfo> Jobs; 2094 typedef std::set<std::string> Jobs;
2088 2095
2089 Jobs jobs; 2096 Jobs jobs;
2090 engine.GetRegistry().GetJobsInfo(jobs); 2097 engine.GetRegistry().ListJobs(jobs);
2091 2098
2092 Json::Value v; 2099 Json::Value v = Json::arrayValue;
2093 for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) 2100 for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it)
2094 { 2101 {
2095 Json::Value vv; 2102 JobInfo info;
2096 it->second.Format(vv); 2103
2097 v[it->first] = vv; 2104 if (engine.GetRegistry().GetJobInfo(info, *it))
2105 {
2106 Json::Value vv;
2107 info.Format(vv);
2108 v.append(vv);
2109 }
2098 } 2110 }
2099 2111
2100 std::cout << v << std::endl; 2112 std::cout << v << std::endl;
2101 } 2113 }
2102 std::cout << "====================================================" << std::endl; 2114 std::cout << "====================================================" << std::endl;
2105 2117
2106 engine.Stop(); 2118 engine.Stop();
2107 2119
2108 2120
2109 { 2121 {
2110 typedef std::map<std::string, JobInfo> Jobs; 2122 typedef std::set<std::string> Jobs;
2111 2123
2112 Jobs jobs; 2124 Jobs jobs;
2113 engine.GetRegistry().GetJobsInfo(jobs); 2125 engine.GetRegistry().ListJobs(jobs);
2114 2126
2115 Json::Value v; 2127 Json::Value v = Json::arrayValue;
2116 for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) 2128 for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it)
2117 { 2129 {
2118 Json::Value vv; 2130 JobInfo info;
2119 it->second.Format(vv); 2131
2120 v[it->first] = vv; 2132 if (engine.GetRegistry().GetJobInfo(info, *it))
2133 {
2134 Json::Value vv;
2135 info.Format(vv);
2136 v.append(vv);
2137 }
2121 } 2138 }
2122 2139
2123 std::cout << v << std::endl; 2140 std::cout << v << std::endl;
2124 } 2141 }
2125 } 2142 }