Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreadingTests.cpp @ 2569:2af17cd5eb1f jobs
reorganization
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 07 May 2018 15:37:20 +0200 |
parents | a46094602346 |
children | 2e879c796ec7 |
comparison
equal
deleted
inserted
replaced
2568:a46094602346 | 2569:2af17cd5eb1f |
---|---|
32 | 32 |
33 | 33 |
34 #include "PrecompiledHeadersUnitTests.h" | 34 #include "PrecompiledHeadersUnitTests.h" |
35 #include "gtest/gtest.h" | 35 #include "gtest/gtest.h" |
36 | 36 |
37 #include "../OrthancServer/Scheduler/ServerScheduler.h" | 37 #include "../Core/JobsEngine/JobStepRetry.h" |
38 #include "../Core/JobsEngine/JobsEngine.h" | |
39 #include "../Core/MultiThreading/Locker.h" | |
38 #include "../Core/OrthancException.h" | 40 #include "../Core/OrthancException.h" |
39 #include "../Core/SystemToolbox.h" | 41 #include "../Core/SystemToolbox.h" |
40 #include "../Core/Toolbox.h" | 42 #include "../Core/Toolbox.h" |
41 #include "../Core/MultiThreading/Locker.h" | 43 #include "../OrthancServer/Scheduler/ServerScheduler.h" |
42 | 44 |
43 using namespace Orthanc; | 45 using namespace Orthanc; |
44 | 46 |
45 namespace | 47 namespace |
46 { | 48 { |
237 } | 239 } |
238 } | 240 } |
239 | 241 |
240 | 242 |
241 | 243 |
242 | |
243 | |
244 #if !defined(ORTHANC_SANDBOXED) | |
245 # error The macro ORTHANC_SANDBOXED must be defined | |
246 #endif | |
247 | |
248 #if ORTHANC_SANDBOXED == 1 | |
249 # error The job engine cannot be used in sandboxed environments | |
250 #endif | |
251 | |
252 #include "../Core/Logging.h" | |
253 | |
254 #include <boost/math/special_functions/round.hpp> | |
255 #include <boost/date_time/posix_time/posix_time.hpp> | |
256 #include <queue> | |
257 | |
258 namespace Orthanc | |
259 { | |
260 enum JobState | |
261 { | |
262 JobState_Pending, | |
263 JobState_Running, | |
264 JobState_Success, | |
265 JobState_Failure, | |
266 JobState_Paused, | |
267 JobState_Retry | |
268 }; | |
269 | |
270 static const char* EnumerationToString(JobState state) | |
271 { | |
272 switch (state) | |
273 { | |
274 case JobState_Pending: | |
275 return "Pending"; | |
276 | |
277 case JobState_Running: | |
278 return "Running"; | |
279 | |
280 case JobState_Success: | |
281 return "Success"; | |
282 | |
283 case JobState_Failure: | |
284 return "Failure"; | |
285 | |
286 case JobState_Paused: | |
287 return "Paused"; | |
288 | |
289 case JobState_Retry: | |
290 return "Retry"; | |
291 | |
292 default: | |
293 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
294 } | |
295 } | |
296 | |
297 enum JobStepCode | |
298 { | |
299 JobStepCode_Success, | |
300 JobStepCode_Failure, | |
301 JobStepCode_Continue, | |
302 JobStepCode_Retry | |
303 }; | |
304 | |
305 class JobStepResult | |
306 { | |
307 private: | |
308 JobStepCode code_; | |
309 | |
310 public: | |
311 explicit JobStepResult(JobStepCode code) : | |
312 code_(code) | |
313 { | |
314 } | |
315 | |
316 virtual ~JobStepResult() | |
317 { | |
318 } | |
319 | |
320 JobStepCode GetCode() const | |
321 { | |
322 return code_; | |
323 } | |
324 }; | |
325 | |
326 | |
327 class JobStepRetry : public JobStepResult | |
328 { | |
329 private: | |
330 unsigned int timeout_; // Retry after "timeout_" milliseconds | |
331 | |
332 public: | |
333 JobStepRetry(unsigned int timeout) : | |
334 JobStepResult(JobStepCode_Retry), | |
335 timeout_(timeout) | |
336 { | |
337 } | |
338 | |
339 unsigned int GetTimeout() const | |
340 { | |
341 return timeout_; | |
342 } | |
343 }; | |
344 | |
345 | |
346 class IJob : public boost::noncopyable | |
347 { | |
348 public: | |
349 virtual ~IJob() | |
350 { | |
351 } | |
352 | |
353 virtual JobStepResult* ExecuteStep() = 0; | |
354 | |
355 virtual void ReleaseResources() = 0; // For pausing jobs | |
356 | |
357 virtual float GetProgress() = 0; | |
358 | |
359 virtual void GetDescription(Json::Value& value) = 0; | |
360 }; | |
361 | |
362 | |
363 class JobStatus | |
364 { | |
365 private: | |
366 ErrorCode errorCode_; | |
367 float progress_; | |
368 Json::Value description_; | |
369 | |
370 public: | |
371 JobStatus() : | |
372 errorCode_(ErrorCode_InternalError), | |
373 progress_(0), | |
374 description_(Json::objectValue) | |
375 { | |
376 } | |
377 | |
378 JobStatus(ErrorCode code, | |
379 IJob& job) : | |
380 errorCode_(code), | |
381 progress_(job.GetProgress()) | |
382 { | |
383 if (progress_ < 0) | |
384 { | |
385 progress_ = 0; | |
386 } | |
387 | |
388 if (progress_ > 1) | |
389 { | |
390 progress_ = 1; | |
391 } | |
392 | |
393 job.GetDescription(description_); | |
394 } | |
395 | |
396 ErrorCode GetErrorCode() const | |
397 { | |
398 return errorCode_; | |
399 } | |
400 | |
401 float GetProgress() const | |
402 { | |
403 return progress_; | |
404 } | |
405 | |
406 const Json::Value& GetDescription() const | |
407 { | |
408 return description_; | |
409 } | |
410 }; | |
411 | |
412 | |
413 class JobInfo | |
414 { | |
415 private: | |
416 std::string id_; | |
417 int priority_; | |
418 JobState state_; | |
419 boost::posix_time::ptime timestamp_; | |
420 boost::posix_time::ptime creationTime_; | |
421 boost::posix_time::ptime lastStateChangeTime_; | |
422 boost::posix_time::time_duration runtime_; | |
423 bool hasEta_; | |
424 boost::posix_time::ptime eta_; | |
425 JobStatus status_; | |
426 | |
427 public: | |
428 JobInfo(const std::string& id, | |
429 int priority, | |
430 JobState state, | |
431 const JobStatus& status, | |
432 const boost::posix_time::ptime& creationTime, | |
433 const boost::posix_time::ptime& lastStateChangeTime, | |
434 const boost::posix_time::time_duration& runtime) : | |
435 id_(id), | |
436 priority_(priority), | |
437 state_(state), | |
438 timestamp_(boost::posix_time::microsec_clock::universal_time()), | |
439 creationTime_(creationTime), | |
440 lastStateChangeTime_(lastStateChangeTime), | |
441 runtime_(runtime), | |
442 hasEta_(false), | |
443 status_(status) | |
444 { | |
445 if (state_ == JobState_Running) | |
446 { | |
447 float ms = static_cast<float>(runtime_.total_milliseconds()); | |
448 | |
449 if (status_.GetProgress() > 0.01f && | |
450 ms > 0.01f) | |
451 { | |
452 float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms; | |
453 eta_ = timestamp_ + boost::posix_time::milliseconds(remaining); | |
454 hasEta_ = true; | |
455 } | |
456 } | |
457 } | |
458 | |
459 JobInfo() : | |
460 priority_(0), | |
461 state_(JobState_Failure), | |
462 timestamp_(boost::posix_time::microsec_clock::universal_time()), | |
463 creationTime_(timestamp_), | |
464 lastStateChangeTime_(timestamp_), | |
465 runtime_(boost::posix_time::milliseconds(0)), | |
466 hasEta_(false) | |
467 { | |
468 } | |
469 | |
470 const std::string& GetIdentifier() const | |
471 { | |
472 return id_; | |
473 } | |
474 | |
475 int GetPriority() const | |
476 { | |
477 return priority_; | |
478 } | |
479 | |
480 JobState GetState() const | |
481 { | |
482 return state_; | |
483 } | |
484 | |
485 const boost::posix_time::ptime& GetInfoTime() const | |
486 { | |
487 return timestamp_; | |
488 } | |
489 | |
490 const boost::posix_time::ptime& GetCreationTime() const | |
491 { | |
492 return creationTime_; | |
493 } | |
494 | |
495 const boost::posix_time::time_duration& GetRuntime() const | |
496 { | |
497 return runtime_; | |
498 } | |
499 | |
500 bool HasEstimatedTimeOfArrival() const | |
501 { | |
502 return hasEta_; | |
503 } | |
504 | |
505 bool HasCompletionTime() const | |
506 { | |
507 return (state_ == JobState_Success || | |
508 state_ == JobState_Failure); | |
509 } | |
510 | |
511 const boost::posix_time::ptime& GetEstimatedTimeOfArrival() const | |
512 { | |
513 if (hasEta_) | |
514 { | |
515 return eta_; | |
516 } | |
517 else | |
518 { | |
519 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
520 } | |
521 } | |
522 | |
523 const boost::posix_time::ptime& GetCompletionTime() const | |
524 { | |
525 if (HasCompletionTime()) | |
526 { | |
527 return lastStateChangeTime_; | |
528 } | |
529 else | |
530 { | |
531 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
532 } | |
533 } | |
534 | |
535 const JobStatus& GetStatus() const | |
536 { | |
537 return status_; | |
538 } | |
539 | |
540 JobStatus& GetStatus() | |
541 { | |
542 return status_; | |
543 } | |
544 | |
545 void Format(Json::Value& target) const | |
546 { | |
547 target = Json::objectValue; | |
548 target["ID"] = id_; | |
549 target["Priority"] = priority_; | |
550 target["ErrorCode"] = static_cast<int>(status_.GetErrorCode()); | |
551 target["ErrorDescription"] = EnumerationToString(status_.GetErrorCode()); | |
552 target["State"] = EnumerationToString(state_); | |
553 target["Timestamp"] = boost::posix_time::to_iso_string(timestamp_); | |
554 target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_); | |
555 target["Runtime"] = static_cast<uint32_t>(runtime_.total_milliseconds()); | |
556 target["Progress"] = boost::math::iround(status_.GetProgress() * 100.0f); | |
557 target["Description"] = status_.GetDescription(); | |
558 | |
559 if (HasEstimatedTimeOfArrival()) | |
560 { | |
561 target["EstimatedTimeOfArrival"] = boost::posix_time::to_iso_string(GetEstimatedTimeOfArrival()); | |
562 } | |
563 | |
564 if (HasCompletionTime()) | |
565 { | |
566 target["CompletionTime"] = boost::posix_time::to_iso_string(GetCompletionTime()); | |
567 } | |
568 } | |
569 }; | |
570 | |
571 | |
572 | |
573 | |
574 class JobsRegistry : public boost::noncopyable | |
575 { | |
576 private: | |
577 class JobHandler : public boost::noncopyable | |
578 { | |
579 private: | |
580 std::string id_; | |
581 JobState state_; | |
582 std::auto_ptr<IJob> job_; | |
583 int priority_; // "+inf()" means highest priority | |
584 boost::posix_time::ptime creationTime_; | |
585 boost::posix_time::ptime lastStateChangeTime_; | |
586 boost::posix_time::time_duration runtime_; | |
587 boost::posix_time::ptime retryTime_; | |
588 bool pauseScheduled_; | |
589 JobStatus lastStatus_; | |
590 | |
591 void Touch() | |
592 { | |
593 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
594 | |
595 if (state_ == JobState_Running) | |
596 { | |
597 runtime_ += (now - lastStateChangeTime_); | |
598 } | |
599 | |
600 lastStateChangeTime_ = now; | |
601 } | |
602 | |
603 void SetStateInternal(JobState state) | |
604 { | |
605 state_ = state; | |
606 pauseScheduled_ = false; | |
607 Touch(); | |
608 } | |
609 | |
610 public: | |
611 JobHandler(IJob* job, | |
612 int priority) : | |
613 id_(Toolbox::GenerateUuid()), | |
614 state_(JobState_Pending), | |
615 job_(job), | |
616 priority_(priority), | |
617 creationTime_(boost::posix_time::microsec_clock::universal_time()), | |
618 lastStateChangeTime_(creationTime_), | |
619 runtime_(boost::posix_time::milliseconds(0)), | |
620 retryTime_(creationTime_), | |
621 pauseScheduled_(false) | |
622 { | |
623 if (job == NULL) | |
624 { | |
625 throw OrthancException(ErrorCode_NullPointer); | |
626 } | |
627 | |
628 lastStatus_ = JobStatus(ErrorCode_Success, *job); | |
629 } | |
630 | |
631 const std::string& GetId() const | |
632 { | |
633 return id_; | |
634 } | |
635 | |
636 IJob& GetJob() const | |
637 { | |
638 assert(job_.get() != NULL); | |
639 return *job_; | |
640 } | |
641 | |
642 void SetPriority(int priority) | |
643 { | |
644 priority_ = priority; | |
645 } | |
646 | |
647 int GetPriority() const | |
648 { | |
649 return priority_; | |
650 } | |
651 | |
652 JobState GetState() const | |
653 { | |
654 return state_; | |
655 } | |
656 | |
657 void SetState(JobState state) | |
658 { | |
659 if (state == JobState_Retry) | |
660 { | |
661 // Use "SetRetryState()" | |
662 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
663 } | |
664 else | |
665 { | |
666 SetStateInternal(state); | |
667 } | |
668 } | |
669 | |
670 void SetRetryState(unsigned int timeout) | |
671 { | |
672 if (state_ == JobState_Running) | |
673 { | |
674 SetStateInternal(JobState_Retry); | |
675 retryTime_ = (boost::posix_time::microsec_clock::universal_time() + | |
676 boost::posix_time::milliseconds(timeout)); | |
677 } | |
678 else | |
679 { | |
680 // Only valid for running jobs | |
681 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
682 } | |
683 } | |
684 | |
685 void SchedulePause() | |
686 { | |
687 if (state_ == JobState_Running) | |
688 { | |
689 pauseScheduled_ = true; | |
690 } | |
691 else | |
692 { | |
693 // Only valid for running jobs | |
694 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
695 } | |
696 } | |
697 | |
698 bool IsPauseScheduled() | |
699 { | |
700 return pauseScheduled_; | |
701 } | |
702 | |
703 bool IsRetryReady(const boost::posix_time::ptime& now) const | |
704 { | |
705 if (state_ != JobState_Retry) | |
706 { | |
707 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
708 } | |
709 else | |
710 { | |
711 return retryTime_ <= now; | |
712 } | |
713 } | |
714 | |
715 const boost::posix_time::ptime& GetCreationTime() const | |
716 { | |
717 return creationTime_; | |
718 } | |
719 | |
720 const boost::posix_time::ptime& GetLastStateChangeTime() const | |
721 { | |
722 return lastStateChangeTime_; | |
723 } | |
724 | |
725 const boost::posix_time::time_duration& GetRuntime() const | |
726 { | |
727 return runtime_; | |
728 } | |
729 | |
730 const JobStatus& GetLastStatus() const | |
731 { | |
732 return lastStatus_; | |
733 } | |
734 | |
735 void SetLastStatus(const JobStatus& status) | |
736 { | |
737 lastStatus_ = status; | |
738 Touch(); | |
739 } | |
740 }; | |
741 | |
742 struct PriorityComparator | |
743 { | |
744 bool operator() (JobHandler*& a, | |
745 JobHandler*& b) const | |
746 { | |
747 return a->GetPriority() < b->GetPriority(); | |
748 } | |
749 }; | |
750 | |
751 typedef std::map<std::string, JobHandler*> JobsIndex; | |
752 typedef std::list<JobHandler*> CompletedJobs; | |
753 typedef std::set<JobHandler*> RetryJobs; | |
754 typedef std::priority_queue<JobHandler*, | |
755 std::vector<JobHandler*>, // Could be a "std::deque" | |
756 PriorityComparator> PendingJobs; | |
757 | |
758 boost::mutex mutex_; | |
759 JobsIndex jobsIndex_; | |
760 PendingJobs pendingJobs_; | |
761 CompletedJobs completedJobs_; | |
762 RetryJobs retryJobs_; | |
763 | |
764 boost::condition_variable pendingJobAvailable_; | |
765 size_t maxCompletedJobs_; | |
766 | |
767 | |
768 #ifndef NDEBUG | |
769 bool IsPendingJob(const JobHandler& job) const | |
770 { | |
771 PendingJobs copy = pendingJobs_; | |
772 while (!copy.empty()) | |
773 { | |
774 if (copy.top() == &job) | |
775 { | |
776 return true; | |
777 } | |
778 | |
779 copy.pop(); | |
780 } | |
781 | |
782 return false; | |
783 } | |
784 | |
785 bool IsCompletedJob(JobHandler& job) const | |
786 { | |
787 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
788 it != completedJobs_.end(); ++it) | |
789 { | |
790 if (*it == &job) | |
791 { | |
792 return true; | |
793 } | |
794 } | |
795 | |
796 return false; | |
797 } | |
798 | |
799 bool IsRetryJob(JobHandler& job) const | |
800 { | |
801 return retryJobs_.find(&job) != retryJobs_.end(); | |
802 } | |
803 #endif | |
804 | |
805 | |
806 void CheckInvariants() const | |
807 { | |
808 #ifndef NDEBUG | |
809 { | |
810 PendingJobs copy = pendingJobs_; | |
811 while (!copy.empty()) | |
812 { | |
813 assert(copy.top()->GetState() == JobState_Pending); | |
814 copy.pop(); | |
815 } | |
816 } | |
817 | |
818 assert(completedJobs_.size() <= maxCompletedJobs_); | |
819 | |
820 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
821 it != completedJobs_.end(); ++it) | |
822 { | |
823 assert((*it)->GetState() == JobState_Success || | |
824 (*it)->GetState() == JobState_Failure); | |
825 } | |
826 | |
827 for (RetryJobs::const_iterator it = retryJobs_.begin(); | |
828 it != retryJobs_.end(); ++it) | |
829 { | |
830 assert((*it)->GetState() == JobState_Retry); | |
831 } | |
832 | |
833 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
834 it != jobsIndex_.end(); ++it) | |
835 { | |
836 JobHandler& job = *it->second; | |
837 | |
838 assert(job.GetId() == it->first); | |
839 | |
840 switch (job.GetState()) | |
841 { | |
842 case JobState_Pending: | |
843 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); | |
844 break; | |
845 | |
846 case JobState_Success: | |
847 case JobState_Failure: | |
848 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); | |
849 break; | |
850 | |
851 case JobState_Retry: | |
852 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
853 break; | |
854 | |
855 case JobState_Running: | |
856 case JobState_Paused: | |
857 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
858 break; | |
859 | |
860 default: | |
861 throw OrthancException(ErrorCode_InternalError); | |
862 } | |
863 } | |
864 #endif | |
865 } | |
866 | |
867 | |
868 void ForgetOldCompletedJobs() | |
869 { | |
870 if (maxCompletedJobs_ != 0) | |
871 { | |
872 while (completedJobs_.size() > maxCompletedJobs_) | |
873 { | |
874 assert(completedJobs_.front() != NULL); | |
875 | |
876 std::string id = completedJobs_.front()->GetId(); | |
877 assert(jobsIndex_.find(id) != jobsIndex_.end()); | |
878 | |
879 jobsIndex_.erase(id); | |
880 delete(completedJobs_.front()); | |
881 completedJobs_.pop_front(); | |
882 } | |
883 } | |
884 } | |
885 | |
886 | |
887 void MarkRunningAsCompleted(JobHandler& job, | |
888 bool success) | |
889 { | |
890 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") | |
891 << ": " << job.GetId(); | |
892 | |
893 CheckInvariants(); | |
894 assert(job.GetState() == JobState_Running); | |
895 | |
896 job.SetState(success ? JobState_Success : JobState_Failure); | |
897 | |
898 completedJobs_.push_back(&job); | |
899 ForgetOldCompletedJobs(); | |
900 | |
901 CheckInvariants(); | |
902 } | |
903 | |
904 | |
905 void MarkRunningAsRetry(JobHandler& job, | |
906 unsigned int timeout) | |
907 { | |
908 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); | |
909 | |
910 CheckInvariants(); | |
911 | |
912 assert(job.GetState() == JobState_Running && | |
913 retryJobs_.find(&job) == retryJobs_.end()); | |
914 | |
915 retryJobs_.insert(&job); | |
916 job.SetRetryState(timeout); | |
917 | |
918 CheckInvariants(); | |
919 } | |
920 | |
921 | |
922 void MarkRunningAsPaused(JobHandler& job) | |
923 { | |
924 LOG(INFO) << "Job paused: " << job.GetId(); | |
925 | |
926 CheckInvariants(); | |
927 assert(job.GetState() == JobState_Running); | |
928 | |
929 job.SetState(JobState_Paused); | |
930 | |
931 CheckInvariants(); | |
932 } | |
933 | |
934 | |
935 public: | |
936 JobsRegistry() : | |
937 maxCompletedJobs_(10) | |
938 { | |
939 } | |
940 | |
941 | |
942 ~JobsRegistry() | |
943 { | |
944 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) | |
945 { | |
946 assert(it->second != NULL); | |
947 delete it->second; | |
948 } | |
949 } | |
950 | |
951 | |
952 void SetMaxCompletedJobs(size_t i) | |
953 { | |
954 boost::mutex::scoped_lock lock(mutex_); | |
955 CheckInvariants(); | |
956 | |
957 maxCompletedJobs_ = i; | |
958 ForgetOldCompletedJobs(); | |
959 | |
960 CheckInvariants(); | |
961 } | |
962 | |
963 | |
964 void ListJobs(std::set<std::string>& target) | |
965 { | |
966 boost::mutex::scoped_lock lock(mutex_); | |
967 CheckInvariants(); | |
968 | |
969 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
970 it != jobsIndex_.end(); ++it) | |
971 { | |
972 target.insert(it->first); | |
973 } | |
974 } | |
975 | |
976 | |
977 bool GetJobInfo(JobInfo& target, | |
978 const std::string& id) | |
979 { | |
980 boost::mutex::scoped_lock lock(mutex_); | |
981 CheckInvariants(); | |
982 | |
983 JobsIndex::const_iterator found = jobsIndex_.find(id); | |
984 | |
985 if (found == jobsIndex_.end()) | |
986 { | |
987 return false; | |
988 } | |
989 else | |
990 { | |
991 const JobHandler& handler = *found->second; | |
992 target = JobInfo(handler.GetId(), | |
993 handler.GetPriority(), | |
994 handler.GetState(), | |
995 handler.GetLastStatus(), | |
996 handler.GetCreationTime(), | |
997 handler.GetLastStateChangeTime(), | |
998 handler.GetRuntime()); | |
999 return true; | |
1000 } | |
1001 } | |
1002 | |
1003 | |
1004 void Submit(std::string& id, | |
1005 IJob* job, // Takes ownership | |
1006 int priority) | |
1007 { | |
1008 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); | |
1009 | |
1010 boost::mutex::scoped_lock lock(mutex_); | |
1011 CheckInvariants(); | |
1012 | |
1013 id = handler->GetId(); | |
1014 | |
1015 pendingJobs_.push(handler.get()); | |
1016 pendingJobAvailable_.notify_one(); | |
1017 | |
1018 jobsIndex_.insert(std::make_pair(id, handler.release())); | |
1019 | |
1020 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; | |
1021 | |
1022 CheckInvariants(); | |
1023 } | |
1024 | |
1025 | |
1026 void Submit(IJob* job, // Takes ownership | |
1027 int priority) | |
1028 { | |
1029 std::string id; | |
1030 Submit(id, job, priority); | |
1031 } | |
1032 | |
1033 | |
1034 void SetPriority(const std::string& id, | |
1035 int priority) | |
1036 { | |
1037 LOG(INFO) << "Changing priority to " << priority << " for job: " << id; | |
1038 | |
1039 boost::mutex::scoped_lock lock(mutex_); | |
1040 CheckInvariants(); | |
1041 | |
1042 JobsIndex::iterator found = jobsIndex_.find(id); | |
1043 | |
1044 if (found == jobsIndex_.end()) | |
1045 { | |
1046 LOG(WARNING) << "Unknown job: " << id; | |
1047 } | |
1048 else | |
1049 { | |
1050 found->second->SetPriority(priority); | |
1051 | |
1052 if (found->second->GetState() == JobState_Pending) | |
1053 { | |
1054 // If the job is pending, we need to reconstruct the | |
1055 // priority queue, as the heap condition has changed | |
1056 | |
1057 PendingJobs copy; | |
1058 std::swap(copy, pendingJobs_); | |
1059 | |
1060 assert(pendingJobs_.empty()); | |
1061 while (!copy.empty()) | |
1062 { | |
1063 pendingJobs_.push(copy.top()); | |
1064 copy.pop(); | |
1065 } | |
1066 } | |
1067 } | |
1068 | |
1069 CheckInvariants(); | |
1070 } | |
1071 | |
1072 | |
1073 void Pause(const std::string& id) | |
1074 { | |
1075 LOG(INFO) << "Pausing job: " << id; | |
1076 | |
1077 boost::mutex::scoped_lock lock(mutex_); | |
1078 CheckInvariants(); | |
1079 | |
1080 JobsIndex::iterator found = jobsIndex_.find(id); | |
1081 | |
1082 if (found == jobsIndex_.end()) | |
1083 { | |
1084 LOG(WARNING) << "Unknown job: " << id; | |
1085 } | |
1086 else | |
1087 { | |
1088 switch (found->second->GetState()) | |
1089 { | |
1090 case JobState_Pending: | |
1091 { | |
1092 // If the job is pending, we need to reconstruct the | |
1093 // priority queue to remove it | |
1094 PendingJobs copy; | |
1095 std::swap(copy, pendingJobs_); | |
1096 | |
1097 assert(pendingJobs_.empty()); | |
1098 while (!copy.empty()) | |
1099 { | |
1100 if (copy.top()->GetId() != id) | |
1101 { | |
1102 pendingJobs_.push(copy.top()); | |
1103 } | |
1104 | |
1105 copy.pop(); | |
1106 } | |
1107 | |
1108 found->second->SetState(JobState_Paused); | |
1109 | |
1110 break; | |
1111 } | |
1112 | |
1113 case JobState_Retry: | |
1114 { | |
1115 RetryJobs::iterator item = retryJobs_.find(found->second); | |
1116 assert(item != retryJobs_.end()); | |
1117 retryJobs_.erase(item); | |
1118 | |
1119 found->second->SetState(JobState_Paused); | |
1120 | |
1121 break; | |
1122 } | |
1123 | |
1124 case JobState_Paused: | |
1125 case JobState_Success: | |
1126 case JobState_Failure: | |
1127 // Nothing to be done | |
1128 break; | |
1129 | |
1130 case JobState_Running: | |
1131 found->second->SchedulePause(); | |
1132 break; | |
1133 | |
1134 default: | |
1135 throw OrthancException(ErrorCode_InternalError); | |
1136 } | |
1137 } | |
1138 | |
1139 CheckInvariants(); | |
1140 } | |
1141 | |
1142 | |
1143 void Resume(const std::string& id) | |
1144 { | |
1145 LOG(INFO) << "Resuming job: " << id; | |
1146 | |
1147 boost::mutex::scoped_lock lock(mutex_); | |
1148 CheckInvariants(); | |
1149 | |
1150 JobsIndex::iterator found = jobsIndex_.find(id); | |
1151 | |
1152 if (found == jobsIndex_.end()) | |
1153 { | |
1154 LOG(WARNING) << "Unknown job: " << id; | |
1155 } | |
1156 else if (found->second->GetState() != JobState_Paused) | |
1157 { | |
1158 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; | |
1159 } | |
1160 else | |
1161 { | |
1162 found->second->SetState(JobState_Pending); | |
1163 pendingJobs_.push(found->second); | |
1164 pendingJobAvailable_.notify_one(); | |
1165 } | |
1166 | |
1167 CheckInvariants(); | |
1168 } | |
1169 | |
1170 | |
1171 void Resubmit(const std::string& id) | |
1172 { | |
1173 LOG(INFO) << "Resubmitting failed job: " << id; | |
1174 | |
1175 boost::mutex::scoped_lock lock(mutex_); | |
1176 CheckInvariants(); | |
1177 | |
1178 JobsIndex::iterator found = jobsIndex_.find(id); | |
1179 | |
1180 if (found == jobsIndex_.end()) | |
1181 { | |
1182 LOG(WARNING) << "Unknown job: " << id; | |
1183 } | |
1184 else if (found->second->GetState() != JobState_Failure) | |
1185 { | |
1186 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; | |
1187 } | |
1188 else | |
1189 { | |
1190 bool ok = false; | |
1191 for (CompletedJobs::iterator it = completedJobs_.begin(); | |
1192 it != completedJobs_.end(); ++it) | |
1193 { | |
1194 if (*it == found->second) | |
1195 { | |
1196 ok = true; | |
1197 completedJobs_.erase(it); | |
1198 break; | |
1199 } | |
1200 } | |
1201 | |
1202 assert(ok); | |
1203 | |
1204 found->second->SetState(JobState_Pending); | |
1205 pendingJobs_.push(found->second); | |
1206 pendingJobAvailable_.notify_one(); | |
1207 } | |
1208 | |
1209 CheckInvariants(); | |
1210 } | |
1211 | |
1212 | |
1213 void ScheduleRetries() | |
1214 { | |
1215 boost::mutex::scoped_lock lock(mutex_); | |
1216 CheckInvariants(); | |
1217 | |
1218 RetryJobs copy; | |
1219 std::swap(copy, retryJobs_); | |
1220 | |
1221 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
1222 | |
1223 assert(retryJobs_.empty()); | |
1224 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) | |
1225 { | |
1226 if ((*it)->IsRetryReady(now)) | |
1227 { | |
1228 LOG(INFO) << "Retrying job: " << (*it)->GetId(); | |
1229 (*it)->SetState(JobState_Pending); | |
1230 pendingJobs_.push(*it); | |
1231 pendingJobAvailable_.notify_one(); | |
1232 } | |
1233 else | |
1234 { | |
1235 retryJobs_.insert(*it); | |
1236 } | |
1237 } | |
1238 | |
1239 CheckInvariants(); | |
1240 } | |
1241 | |
1242 | |
1243 bool GetState(JobState& state, | |
1244 const std::string& id) | |
1245 { | |
1246 boost::mutex::scoped_lock lock(mutex_); | |
1247 CheckInvariants(); | |
1248 | |
1249 JobsIndex::const_iterator it = jobsIndex_.find(id); | |
1250 if (it == jobsIndex_.end()) | |
1251 { | |
1252 return false; | |
1253 } | |
1254 else | |
1255 { | |
1256 state = it->second->GetState(); | |
1257 return true; | |
1258 } | |
1259 } | |
1260 | |
1261 | |
1262 class RunningJob : public boost::noncopyable | |
1263 { | |
1264 private: | |
1265 JobsRegistry& registry_; | |
1266 JobHandler* handler_; // Can only be accessed if the registry | |
1267 // mutex is locked! | |
1268 IJob* job_; // Will by design be in mutual exclusion, | |
1269 // because only one RunningJob can be | |
1270 // executed at a time on a JobHandler | |
1271 | |
1272 std::string id_; | |
1273 int priority_; | |
1274 JobState targetState_; | |
1275 unsigned int targetRetryTimeout_; | |
1276 | |
1277 public: | |
1278 RunningJob(JobsRegistry& registry, | |
1279 unsigned int timeout) : | |
1280 registry_(registry), | |
1281 handler_(NULL), | |
1282 targetState_(JobState_Failure), | |
1283 targetRetryTimeout_(0) | |
1284 { | |
1285 { | |
1286 boost::mutex::scoped_lock lock(registry_.mutex_); | |
1287 | |
1288 while (registry_.pendingJobs_.empty()) | |
1289 { | |
1290 if (timeout == 0) | |
1291 { | |
1292 registry_.pendingJobAvailable_.wait(lock); | |
1293 } | |
1294 else | |
1295 { | |
1296 bool success = registry_.pendingJobAvailable_.timed_wait | |
1297 (lock, boost::posix_time::milliseconds(timeout)); | |
1298 if (!success) | |
1299 { | |
1300 // No pending job | |
1301 return; | |
1302 } | |
1303 } | |
1304 } | |
1305 | |
1306 handler_ = registry_.pendingJobs_.top(); | |
1307 registry_.pendingJobs_.pop(); | |
1308 | |
1309 assert(handler_->GetState() == JobState_Pending); | |
1310 handler_->SetState(JobState_Running); | |
1311 | |
1312 job_ = &handler_->GetJob(); | |
1313 id_ = handler_->GetId(); | |
1314 priority_ = handler_->GetPriority(); | |
1315 } | |
1316 } | |
1317 | |
1318 ~RunningJob() | |
1319 { | |
1320 if (IsValid()) | |
1321 { | |
1322 boost::mutex::scoped_lock lock(registry_.mutex_); | |
1323 | |
1324 switch (targetState_) | |
1325 { | |
1326 case JobState_Failure: | |
1327 registry_.MarkRunningAsCompleted(*handler_, false); | |
1328 break; | |
1329 | |
1330 case JobState_Success: | |
1331 registry_.MarkRunningAsCompleted(*handler_, true); | |
1332 break; | |
1333 | |
1334 case JobState_Paused: | |
1335 registry_.MarkRunningAsPaused(*handler_); | |
1336 break; | |
1337 | |
1338 case JobState_Retry: | |
1339 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); | |
1340 break; | |
1341 | |
1342 default: | |
1343 assert(0); | |
1344 } | |
1345 } | |
1346 } | |
1347 | |
1348 bool IsValid() const | |
1349 { | |
1350 return (handler_ != NULL && | |
1351 job_ != NULL); | |
1352 } | |
1353 | |
1354 const std::string& GetId() const | |
1355 { | |
1356 if (!IsValid()) | |
1357 { | |
1358 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1359 } | |
1360 else | |
1361 { | |
1362 return id_; | |
1363 } | |
1364 } | |
1365 | |
1366 int GetPriority() const | |
1367 { | |
1368 if (!IsValid()) | |
1369 { | |
1370 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1371 } | |
1372 else | |
1373 { | |
1374 return priority_; | |
1375 } | |
1376 } | |
1377 | |
1378 IJob& GetJob() | |
1379 { | |
1380 if (!IsValid()) | |
1381 { | |
1382 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1383 } | |
1384 else | |
1385 { | |
1386 return *job_; | |
1387 } | |
1388 } | |
1389 | |
1390 bool IsPauseScheduled() | |
1391 { | |
1392 if (!IsValid()) | |
1393 { | |
1394 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1395 } | |
1396 else | |
1397 { | |
1398 boost::mutex::scoped_lock lock(registry_.mutex_); | |
1399 registry_.CheckInvariants(); | |
1400 assert(handler_->GetState() == JobState_Running); | |
1401 | |
1402 return handler_->IsPauseScheduled(); | |
1403 } | |
1404 } | |
1405 | |
1406 void MarkSuccess() | |
1407 { | |
1408 if (!IsValid()) | |
1409 { | |
1410 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1411 } | |
1412 else | |
1413 { | |
1414 targetState_ = JobState_Success; | |
1415 } | |
1416 } | |
1417 | |
1418 void MarkFailure() | |
1419 { | |
1420 if (!IsValid()) | |
1421 { | |
1422 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1423 } | |
1424 else | |
1425 { | |
1426 targetState_ = JobState_Failure; | |
1427 } | |
1428 } | |
1429 | |
1430 void MarkPause() | |
1431 { | |
1432 if (!IsValid()) | |
1433 { | |
1434 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1435 } | |
1436 else | |
1437 { | |
1438 targetState_ = JobState_Paused; | |
1439 } | |
1440 } | |
1441 | |
1442 void MarkRetry(unsigned int timeout) | |
1443 { | |
1444 if (!IsValid()) | |
1445 { | |
1446 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1447 } | |
1448 else | |
1449 { | |
1450 targetState_ = JobState_Retry; | |
1451 targetRetryTimeout_ = timeout; | |
1452 } | |
1453 } | |
1454 | |
1455 void UpdateStatus(ErrorCode code) | |
1456 { | |
1457 if (!IsValid()) | |
1458 { | |
1459 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1460 } | |
1461 else | |
1462 { | |
1463 JobStatus status(code, *job_); | |
1464 | |
1465 boost::mutex::scoped_lock lock(registry_.mutex_); | |
1466 registry_.CheckInvariants(); | |
1467 assert(handler_->GetState() == JobState_Running); | |
1468 | |
1469 handler_->SetLastStatus(status); | |
1470 } | |
1471 } | |
1472 }; | |
1473 }; | |
1474 | |
1475 | |
1476 | |
1477 class JobsEngine | |
1478 { | |
1479 private: | |
1480 enum State | |
1481 { | |
1482 State_Setup, | |
1483 State_Running, | |
1484 State_Stopping, | |
1485 State_Done | |
1486 }; | |
1487 | |
1488 boost::mutex stateMutex_; | |
1489 State state_; | |
1490 JobsRegistry registry_; | |
1491 boost::thread retryHandler_; | |
1492 std::vector<boost::thread> workers_; | |
1493 | |
1494 bool ExecuteStep(JobsRegistry::RunningJob& running, | |
1495 size_t workerIndex) | |
1496 { | |
1497 assert(running.IsValid()); | |
1498 | |
1499 LOG(INFO) << "Executing job with priority " << running.GetPriority() | |
1500 << " in worker thread " << workerIndex << ": " << running.GetId(); | |
1501 | |
1502 if (running.IsPauseScheduled()) | |
1503 { | |
1504 running.GetJob().ReleaseResources(); | |
1505 running.MarkPause(); | |
1506 return false; | |
1507 } | |
1508 | |
1509 std::auto_ptr<JobStepResult> result; | |
1510 | |
1511 { | |
1512 try | |
1513 { | |
1514 result.reset(running.GetJob().ExecuteStep()); | |
1515 | |
1516 if (result->GetCode() == JobStepCode_Failure) | |
1517 { | |
1518 running.UpdateStatus(ErrorCode_InternalError); | |
1519 } | |
1520 else | |
1521 { | |
1522 running.UpdateStatus(ErrorCode_Success); | |
1523 } | |
1524 } | |
1525 catch (OrthancException& e) | |
1526 { | |
1527 running.UpdateStatus(e.GetErrorCode()); | |
1528 } | |
1529 catch (boost::bad_lexical_cast&) | |
1530 { | |
1531 running.UpdateStatus(ErrorCode_BadFileFormat); | |
1532 } | |
1533 catch (...) | |
1534 { | |
1535 running.UpdateStatus(ErrorCode_InternalError); | |
1536 } | |
1537 | |
1538 if (result.get() == NULL) | |
1539 { | |
1540 result.reset(new JobStepResult(JobStepCode_Failure)); | |
1541 } | |
1542 } | |
1543 | |
1544 switch (result->GetCode()) | |
1545 { | |
1546 case JobStepCode_Success: | |
1547 running.MarkSuccess(); | |
1548 return false; | |
1549 | |
1550 case JobStepCode_Failure: | |
1551 running.MarkFailure(); | |
1552 return false; | |
1553 | |
1554 case JobStepCode_Retry: | |
1555 running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout()); | |
1556 return false; | |
1557 | |
1558 case JobStepCode_Continue: | |
1559 return true; | |
1560 | |
1561 default: | |
1562 throw OrthancException(ErrorCode_InternalError); | |
1563 } | |
1564 } | |
1565 | |
1566 static void RetryHandler(JobsEngine* engine) | |
1567 { | |
1568 assert(engine != NULL); | |
1569 | |
1570 for (;;) | |
1571 { | |
1572 boost::this_thread::sleep(boost::posix_time::milliseconds(200)); | |
1573 | |
1574 { | |
1575 boost::mutex::scoped_lock lock(engine->stateMutex_); | |
1576 | |
1577 if (engine->state_ != State_Running) | |
1578 { | |
1579 return; | |
1580 } | |
1581 } | |
1582 | |
1583 engine->GetRegistry().ScheduleRetries(); | |
1584 } | |
1585 } | |
1586 | |
1587 static void Worker(JobsEngine* engine, | |
1588 size_t workerIndex) | |
1589 { | |
1590 assert(engine != NULL); | |
1591 | |
1592 LOG(INFO) << "Worker thread " << workerIndex << " has started"; | |
1593 | |
1594 for (;;) | |
1595 { | |
1596 { | |
1597 boost::mutex::scoped_lock lock(engine->stateMutex_); | |
1598 | |
1599 if (engine->state_ != State_Running) | |
1600 { | |
1601 return; | |
1602 } | |
1603 } | |
1604 | |
1605 JobsRegistry::RunningJob running(engine->GetRegistry(), 100); | |
1606 | |
1607 if (running.IsValid()) | |
1608 { | |
1609 for (;;) | |
1610 { | |
1611 if (!engine->ExecuteStep(running, workerIndex)) | |
1612 { | |
1613 break; | |
1614 } | |
1615 } | |
1616 } | |
1617 } | |
1618 } | |
1619 | |
1620 public: | |
1621 JobsEngine() : | |
1622 state_(State_Setup), | |
1623 workers_(1) | |
1624 { | |
1625 } | |
1626 | |
1627 ~JobsEngine() | |
1628 { | |
1629 if (state_ != State_Setup && | |
1630 state_ != State_Done) | |
1631 { | |
1632 LOG(ERROR) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!"; | |
1633 Stop(); | |
1634 } | |
1635 } | |
1636 | |
1637 void SetWorkersCount(size_t count) | |
1638 { | |
1639 if (count == 0) | |
1640 { | |
1641 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
1642 } | |
1643 | |
1644 boost::mutex::scoped_lock lock(stateMutex_); | |
1645 | |
1646 if (state_ != State_Setup) | |
1647 { | |
1648 // Can only be invoked before calling "Start()" | |
1649 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1650 } | |
1651 | |
1652 workers_.resize(count); | |
1653 } | |
1654 | |
1655 JobsRegistry& GetRegistry() | |
1656 { | |
1657 return registry_; | |
1658 } | |
1659 | |
1660 void Start() | |
1661 { | |
1662 boost::mutex::scoped_lock lock(stateMutex_); | |
1663 | |
1664 if (state_ != State_Setup) | |
1665 { | |
1666 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1667 } | |
1668 | |
1669 retryHandler_ = boost::thread(RetryHandler, this); | |
1670 | |
1671 for (size_t i = 0; i < workers_.size(); i++) | |
1672 { | |
1673 workers_[i] = boost::thread(Worker, this, i); | |
1674 } | |
1675 | |
1676 state_ = State_Running; | |
1677 | |
1678 LOG(WARNING) << "The jobs engine has started"; | |
1679 } | |
1680 | |
1681 | |
1682 void Stop() | |
1683 { | |
1684 { | |
1685 boost::mutex::scoped_lock lock(stateMutex_); | |
1686 | |
1687 if (state_ != State_Running) | |
1688 { | |
1689 return; | |
1690 } | |
1691 | |
1692 state_ = State_Stopping; | |
1693 } | |
1694 | |
1695 LOG(INFO) << "Stopping the jobs engine"; | |
1696 | |
1697 if (retryHandler_.joinable()) | |
1698 { | |
1699 retryHandler_.join(); | |
1700 } | |
1701 | |
1702 for (size_t i = 0; i < workers_.size(); i++) | |
1703 { | |
1704 if (workers_[i].joinable()) | |
1705 { | |
1706 workers_[i].join(); | |
1707 } | |
1708 } | |
1709 | |
1710 { | |
1711 boost::mutex::scoped_lock lock(stateMutex_); | |
1712 state_ = State_Done; | |
1713 } | |
1714 | |
1715 LOG(WARNING) << "The jobs engine has stopped"; | |
1716 } | |
1717 }; | |
1718 } | |
1719 | |
1720 | |
1721 | |
1722 class DummyJob : public Orthanc::IJob | 244 class DummyJob : public Orthanc::IJob |
1723 { | 245 { |
1724 private: | 246 private: |
1725 JobStepResult result_; | 247 JobStepResult result_; |
1726 unsigned int count_; | 248 unsigned int count_; |