comparison UnitTestsSources/MultiThreadingTests.cpp @ 2569:2af17cd5eb1f jobs

reorganization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 07 May 2018 15:37:20 +0200
parents a46094602346
children 2e879c796ec7
comparison
equal deleted inserted replaced
2568:a46094602346 2569:2af17cd5eb1f
32 32
33 33
34 #include "PrecompiledHeadersUnitTests.h" 34 #include "PrecompiledHeadersUnitTests.h"
35 #include "gtest/gtest.h" 35 #include "gtest/gtest.h"
36 36
37 #include "../OrthancServer/Scheduler/ServerScheduler.h" 37 #include "../Core/JobsEngine/JobStepRetry.h"
38 #include "../Core/JobsEngine/JobsEngine.h"
39 #include "../Core/MultiThreading/Locker.h"
38 #include "../Core/OrthancException.h" 40 #include "../Core/OrthancException.h"
39 #include "../Core/SystemToolbox.h" 41 #include "../Core/SystemToolbox.h"
40 #include "../Core/Toolbox.h" 42 #include "../Core/Toolbox.h"
41 #include "../Core/MultiThreading/Locker.h" 43 #include "../OrthancServer/Scheduler/ServerScheduler.h"
42 44
43 using namespace Orthanc; 45 using namespace Orthanc;
44 46
45 namespace 47 namespace
46 { 48 {
237 } 239 }
238 } 240 }
239 241
240 242
241 243
242
243
244 #if !defined(ORTHANC_SANDBOXED)
245 # error The macro ORTHANC_SANDBOXED must be defined
246 #endif
247
248 #if ORTHANC_SANDBOXED == 1
249 # error The job engine cannot be used in sandboxed environments
250 #endif
251
252 #include "../Core/Logging.h"
253
254 #include <boost/math/special_functions/round.hpp>
255 #include <boost/date_time/posix_time/posix_time.hpp>
256 #include <queue>
257
258 namespace Orthanc
259 {
260 enum JobState
261 {
262 JobState_Pending,
263 JobState_Running,
264 JobState_Success,
265 JobState_Failure,
266 JobState_Paused,
267 JobState_Retry
268 };
269
270 static const char* EnumerationToString(JobState state)
271 {
272 switch (state)
273 {
274 case JobState_Pending:
275 return "Pending";
276
277 case JobState_Running:
278 return "Running";
279
280 case JobState_Success:
281 return "Success";
282
283 case JobState_Failure:
284 return "Failure";
285
286 case JobState_Paused:
287 return "Paused";
288
289 case JobState_Retry:
290 return "Retry";
291
292 default:
293 throw OrthancException(ErrorCode_ParameterOutOfRange);
294 }
295 }
296
297 enum JobStepCode
298 {
299 JobStepCode_Success,
300 JobStepCode_Failure,
301 JobStepCode_Continue,
302 JobStepCode_Retry
303 };
304
305 class JobStepResult
306 {
307 private:
308 JobStepCode code_;
309
310 public:
311 explicit JobStepResult(JobStepCode code) :
312 code_(code)
313 {
314 }
315
316 virtual ~JobStepResult()
317 {
318 }
319
320 JobStepCode GetCode() const
321 {
322 return code_;
323 }
324 };
325
326
327 class JobStepRetry : public JobStepResult
328 {
329 private:
330 unsigned int timeout_; // Retry after "timeout_" milliseconds
331
332 public:
333 JobStepRetry(unsigned int timeout) :
334 JobStepResult(JobStepCode_Retry),
335 timeout_(timeout)
336 {
337 }
338
339 unsigned int GetTimeout() const
340 {
341 return timeout_;
342 }
343 };
344
345
346 class IJob : public boost::noncopyable
347 {
348 public:
349 virtual ~IJob()
350 {
351 }
352
353 virtual JobStepResult* ExecuteStep() = 0;
354
355 virtual void ReleaseResources() = 0; // For pausing jobs
356
357 virtual float GetProgress() = 0;
358
359 virtual void GetDescription(Json::Value& value) = 0;
360 };
361
362
363 class JobStatus
364 {
365 private:
366 ErrorCode errorCode_;
367 float progress_;
368 Json::Value description_;
369
370 public:
371 JobStatus() :
372 errorCode_(ErrorCode_InternalError),
373 progress_(0),
374 description_(Json::objectValue)
375 {
376 }
377
378 JobStatus(ErrorCode code,
379 IJob& job) :
380 errorCode_(code),
381 progress_(job.GetProgress())
382 {
383 if (progress_ < 0)
384 {
385 progress_ = 0;
386 }
387
388 if (progress_ > 1)
389 {
390 progress_ = 1;
391 }
392
393 job.GetDescription(description_);
394 }
395
396 ErrorCode GetErrorCode() const
397 {
398 return errorCode_;
399 }
400
401 float GetProgress() const
402 {
403 return progress_;
404 }
405
406 const Json::Value& GetDescription() const
407 {
408 return description_;
409 }
410 };
411
412
413 class JobInfo
414 {
415 private:
416 std::string id_;
417 int priority_;
418 JobState state_;
419 boost::posix_time::ptime timestamp_;
420 boost::posix_time::ptime creationTime_;
421 boost::posix_time::ptime lastStateChangeTime_;
422 boost::posix_time::time_duration runtime_;
423 bool hasEta_;
424 boost::posix_time::ptime eta_;
425 JobStatus status_;
426
427 public:
428 JobInfo(const std::string& id,
429 int priority,
430 JobState state,
431 const JobStatus& status,
432 const boost::posix_time::ptime& creationTime,
433 const boost::posix_time::ptime& lastStateChangeTime,
434 const boost::posix_time::time_duration& runtime) :
435 id_(id),
436 priority_(priority),
437 state_(state),
438 timestamp_(boost::posix_time::microsec_clock::universal_time()),
439 creationTime_(creationTime),
440 lastStateChangeTime_(lastStateChangeTime),
441 runtime_(runtime),
442 hasEta_(false),
443 status_(status)
444 {
445 if (state_ == JobState_Running)
446 {
447 float ms = static_cast<float>(runtime_.total_milliseconds());
448
449 if (status_.GetProgress() > 0.01f &&
450 ms > 0.01f)
451 {
452 float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms;
453 eta_ = timestamp_ + boost::posix_time::milliseconds(remaining);
454 hasEta_ = true;
455 }
456 }
457 }
458
459 JobInfo() :
460 priority_(0),
461 state_(JobState_Failure),
462 timestamp_(boost::posix_time::microsec_clock::universal_time()),
463 creationTime_(timestamp_),
464 lastStateChangeTime_(timestamp_),
465 runtime_(boost::posix_time::milliseconds(0)),
466 hasEta_(false)
467 {
468 }
469
470 const std::string& GetIdentifier() const
471 {
472 return id_;
473 }
474
475 int GetPriority() const
476 {
477 return priority_;
478 }
479
480 JobState GetState() const
481 {
482 return state_;
483 }
484
485 const boost::posix_time::ptime& GetInfoTime() const
486 {
487 return timestamp_;
488 }
489
490 const boost::posix_time::ptime& GetCreationTime() const
491 {
492 return creationTime_;
493 }
494
495 const boost::posix_time::time_duration& GetRuntime() const
496 {
497 return runtime_;
498 }
499
500 bool HasEstimatedTimeOfArrival() const
501 {
502 return hasEta_;
503 }
504
505 bool HasCompletionTime() const
506 {
507 return (state_ == JobState_Success ||
508 state_ == JobState_Failure);
509 }
510
511 const boost::posix_time::ptime& GetEstimatedTimeOfArrival() const
512 {
513 if (hasEta_)
514 {
515 return eta_;
516 }
517 else
518 {
519 throw OrthancException(ErrorCode_BadSequenceOfCalls);
520 }
521 }
522
523 const boost::posix_time::ptime& GetCompletionTime() const
524 {
525 if (HasCompletionTime())
526 {
527 return lastStateChangeTime_;
528 }
529 else
530 {
531 throw OrthancException(ErrorCode_BadSequenceOfCalls);
532 }
533 }
534
535 const JobStatus& GetStatus() const
536 {
537 return status_;
538 }
539
540 JobStatus& GetStatus()
541 {
542 return status_;
543 }
544
545 void Format(Json::Value& target) const
546 {
547 target = Json::objectValue;
548 target["ID"] = id_;
549 target["Priority"] = priority_;
550 target["ErrorCode"] = static_cast<int>(status_.GetErrorCode());
551 target["ErrorDescription"] = EnumerationToString(status_.GetErrorCode());
552 target["State"] = EnumerationToString(state_);
553 target["Timestamp"] = boost::posix_time::to_iso_string(timestamp_);
554 target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_);
555 target["Runtime"] = static_cast<uint32_t>(runtime_.total_milliseconds());
556 target["Progress"] = boost::math::iround(status_.GetProgress() * 100.0f);
557 target["Description"] = status_.GetDescription();
558
559 if (HasEstimatedTimeOfArrival())
560 {
561 target["EstimatedTimeOfArrival"] = boost::posix_time::to_iso_string(GetEstimatedTimeOfArrival());
562 }
563
564 if (HasCompletionTime())
565 {
566 target["CompletionTime"] = boost::posix_time::to_iso_string(GetCompletionTime());
567 }
568 }
569 };
570
571
572
573
574 class JobsRegistry : public boost::noncopyable
575 {
576 private:
577 class JobHandler : public boost::noncopyable
578 {
579 private:
580 std::string id_;
581 JobState state_;
582 std::auto_ptr<IJob> job_;
583 int priority_; // "+inf()" means highest priority
584 boost::posix_time::ptime creationTime_;
585 boost::posix_time::ptime lastStateChangeTime_;
586 boost::posix_time::time_duration runtime_;
587 boost::posix_time::ptime retryTime_;
588 bool pauseScheduled_;
589 JobStatus lastStatus_;
590
591 void Touch()
592 {
593 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
594
595 if (state_ == JobState_Running)
596 {
597 runtime_ += (now - lastStateChangeTime_);
598 }
599
600 lastStateChangeTime_ = now;
601 }
602
603 void SetStateInternal(JobState state)
604 {
605 state_ = state;
606 pauseScheduled_ = false;
607 Touch();
608 }
609
610 public:
611 JobHandler(IJob* job,
612 int priority) :
613 id_(Toolbox::GenerateUuid()),
614 state_(JobState_Pending),
615 job_(job),
616 priority_(priority),
617 creationTime_(boost::posix_time::microsec_clock::universal_time()),
618 lastStateChangeTime_(creationTime_),
619 runtime_(boost::posix_time::milliseconds(0)),
620 retryTime_(creationTime_),
621 pauseScheduled_(false)
622 {
623 if (job == NULL)
624 {
625 throw OrthancException(ErrorCode_NullPointer);
626 }
627
628 lastStatus_ = JobStatus(ErrorCode_Success, *job);
629 }
630
631 const std::string& GetId() const
632 {
633 return id_;
634 }
635
636 IJob& GetJob() const
637 {
638 assert(job_.get() != NULL);
639 return *job_;
640 }
641
642 void SetPriority(int priority)
643 {
644 priority_ = priority;
645 }
646
647 int GetPriority() const
648 {
649 return priority_;
650 }
651
652 JobState GetState() const
653 {
654 return state_;
655 }
656
657 void SetState(JobState state)
658 {
659 if (state == JobState_Retry)
660 {
661 // Use "SetRetryState()"
662 throw OrthancException(ErrorCode_BadSequenceOfCalls);
663 }
664 else
665 {
666 SetStateInternal(state);
667 }
668 }
669
670 void SetRetryState(unsigned int timeout)
671 {
672 if (state_ == JobState_Running)
673 {
674 SetStateInternal(JobState_Retry);
675 retryTime_ = (boost::posix_time::microsec_clock::universal_time() +
676 boost::posix_time::milliseconds(timeout));
677 }
678 else
679 {
680 // Only valid for running jobs
681 throw OrthancException(ErrorCode_BadSequenceOfCalls);
682 }
683 }
684
685 void SchedulePause()
686 {
687 if (state_ == JobState_Running)
688 {
689 pauseScheduled_ = true;
690 }
691 else
692 {
693 // Only valid for running jobs
694 throw OrthancException(ErrorCode_BadSequenceOfCalls);
695 }
696 }
697
698 bool IsPauseScheduled()
699 {
700 return pauseScheduled_;
701 }
702
703 bool IsRetryReady(const boost::posix_time::ptime& now) const
704 {
705 if (state_ != JobState_Retry)
706 {
707 throw OrthancException(ErrorCode_BadSequenceOfCalls);
708 }
709 else
710 {
711 return retryTime_ <= now;
712 }
713 }
714
715 const boost::posix_time::ptime& GetCreationTime() const
716 {
717 return creationTime_;
718 }
719
720 const boost::posix_time::ptime& GetLastStateChangeTime() const
721 {
722 return lastStateChangeTime_;
723 }
724
725 const boost::posix_time::time_duration& GetRuntime() const
726 {
727 return runtime_;
728 }
729
730 const JobStatus& GetLastStatus() const
731 {
732 return lastStatus_;
733 }
734
735 void SetLastStatus(const JobStatus& status)
736 {
737 lastStatus_ = status;
738 Touch();
739 }
740 };
741
742 struct PriorityComparator
743 {
744 bool operator() (JobHandler*& a,
745 JobHandler*& b) const
746 {
747 return a->GetPriority() < b->GetPriority();
748 }
749 };
750
751 typedef std::map<std::string, JobHandler*> JobsIndex;
752 typedef std::list<JobHandler*> CompletedJobs;
753 typedef std::set<JobHandler*> RetryJobs;
754 typedef std::priority_queue<JobHandler*,
755 std::vector<JobHandler*>, // Could be a "std::deque"
756 PriorityComparator> PendingJobs;
757
758 boost::mutex mutex_;
759 JobsIndex jobsIndex_;
760 PendingJobs pendingJobs_;
761 CompletedJobs completedJobs_;
762 RetryJobs retryJobs_;
763
764 boost::condition_variable pendingJobAvailable_;
765 size_t maxCompletedJobs_;
766
767
768 #ifndef NDEBUG
769 bool IsPendingJob(const JobHandler& job) const
770 {
771 PendingJobs copy = pendingJobs_;
772 while (!copy.empty())
773 {
774 if (copy.top() == &job)
775 {
776 return true;
777 }
778
779 copy.pop();
780 }
781
782 return false;
783 }
784
785 bool IsCompletedJob(JobHandler& job) const
786 {
787 for (CompletedJobs::const_iterator it = completedJobs_.begin();
788 it != completedJobs_.end(); ++it)
789 {
790 if (*it == &job)
791 {
792 return true;
793 }
794 }
795
796 return false;
797 }
798
799 bool IsRetryJob(JobHandler& job) const
800 {
801 return retryJobs_.find(&job) != retryJobs_.end();
802 }
803 #endif
804
805
806 void CheckInvariants() const
807 {
808 #ifndef NDEBUG
809 {
810 PendingJobs copy = pendingJobs_;
811 while (!copy.empty())
812 {
813 assert(copy.top()->GetState() == JobState_Pending);
814 copy.pop();
815 }
816 }
817
818 assert(completedJobs_.size() <= maxCompletedJobs_);
819
820 for (CompletedJobs::const_iterator it = completedJobs_.begin();
821 it != completedJobs_.end(); ++it)
822 {
823 assert((*it)->GetState() == JobState_Success ||
824 (*it)->GetState() == JobState_Failure);
825 }
826
827 for (RetryJobs::const_iterator it = retryJobs_.begin();
828 it != retryJobs_.end(); ++it)
829 {
830 assert((*it)->GetState() == JobState_Retry);
831 }
832
833 for (JobsIndex::const_iterator it = jobsIndex_.begin();
834 it != jobsIndex_.end(); ++it)
835 {
836 JobHandler& job = *it->second;
837
838 assert(job.GetId() == it->first);
839
840 switch (job.GetState())
841 {
842 case JobState_Pending:
843 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
844 break;
845
846 case JobState_Success:
847 case JobState_Failure:
848 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
849 break;
850
851 case JobState_Retry:
852 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
853 break;
854
855 case JobState_Running:
856 case JobState_Paused:
857 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
858 break;
859
860 default:
861 throw OrthancException(ErrorCode_InternalError);
862 }
863 }
864 #endif
865 }
866
867
868 void ForgetOldCompletedJobs()
869 {
870 if (maxCompletedJobs_ != 0)
871 {
872 while (completedJobs_.size() > maxCompletedJobs_)
873 {
874 assert(completedJobs_.front() != NULL);
875
876 std::string id = completedJobs_.front()->GetId();
877 assert(jobsIndex_.find(id) != jobsIndex_.end());
878
879 jobsIndex_.erase(id);
880 delete(completedJobs_.front());
881 completedJobs_.pop_front();
882 }
883 }
884 }
885
886
887 void MarkRunningAsCompleted(JobHandler& job,
888 bool success)
889 {
890 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
891 << ": " << job.GetId();
892
893 CheckInvariants();
894 assert(job.GetState() == JobState_Running);
895
896 job.SetState(success ? JobState_Success : JobState_Failure);
897
898 completedJobs_.push_back(&job);
899 ForgetOldCompletedJobs();
900
901 CheckInvariants();
902 }
903
904
905 void MarkRunningAsRetry(JobHandler& job,
906 unsigned int timeout)
907 {
908 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();
909
910 CheckInvariants();
911
912 assert(job.GetState() == JobState_Running &&
913 retryJobs_.find(&job) == retryJobs_.end());
914
915 retryJobs_.insert(&job);
916 job.SetRetryState(timeout);
917
918 CheckInvariants();
919 }
920
921
922 void MarkRunningAsPaused(JobHandler& job)
923 {
924 LOG(INFO) << "Job paused: " << job.GetId();
925
926 CheckInvariants();
927 assert(job.GetState() == JobState_Running);
928
929 job.SetState(JobState_Paused);
930
931 CheckInvariants();
932 }
933
934
935 public:
936 JobsRegistry() :
937 maxCompletedJobs_(10)
938 {
939 }
940
941
942 ~JobsRegistry()
943 {
944 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
945 {
946 assert(it->second != NULL);
947 delete it->second;
948 }
949 }
950
951
952 void SetMaxCompletedJobs(size_t i)
953 {
954 boost::mutex::scoped_lock lock(mutex_);
955 CheckInvariants();
956
957 maxCompletedJobs_ = i;
958 ForgetOldCompletedJobs();
959
960 CheckInvariants();
961 }
962
963
964 void ListJobs(std::set<std::string>& target)
965 {
966 boost::mutex::scoped_lock lock(mutex_);
967 CheckInvariants();
968
969 for (JobsIndex::const_iterator it = jobsIndex_.begin();
970 it != jobsIndex_.end(); ++it)
971 {
972 target.insert(it->first);
973 }
974 }
975
976
977 bool GetJobInfo(JobInfo& target,
978 const std::string& id)
979 {
980 boost::mutex::scoped_lock lock(mutex_);
981 CheckInvariants();
982
983 JobsIndex::const_iterator found = jobsIndex_.find(id);
984
985 if (found == jobsIndex_.end())
986 {
987 return false;
988 }
989 else
990 {
991 const JobHandler& handler = *found->second;
992 target = JobInfo(handler.GetId(),
993 handler.GetPriority(),
994 handler.GetState(),
995 handler.GetLastStatus(),
996 handler.GetCreationTime(),
997 handler.GetLastStateChangeTime(),
998 handler.GetRuntime());
999 return true;
1000 }
1001 }
1002
1003
1004 void Submit(std::string& id,
1005 IJob* job, // Takes ownership
1006 int priority)
1007 {
1008 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority));
1009
1010 boost::mutex::scoped_lock lock(mutex_);
1011 CheckInvariants();
1012
1013 id = handler->GetId();
1014
1015 pendingJobs_.push(handler.get());
1016 pendingJobAvailable_.notify_one();
1017
1018 jobsIndex_.insert(std::make_pair(id, handler.release()));
1019
1020 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
1021
1022 CheckInvariants();
1023 }
1024
1025
1026 void Submit(IJob* job, // Takes ownership
1027 int priority)
1028 {
1029 std::string id;
1030 Submit(id, job, priority);
1031 }
1032
1033
1034 void SetPriority(const std::string& id,
1035 int priority)
1036 {
1037 LOG(INFO) << "Changing priority to " << priority << " for job: " << id;
1038
1039 boost::mutex::scoped_lock lock(mutex_);
1040 CheckInvariants();
1041
1042 JobsIndex::iterator found = jobsIndex_.find(id);
1043
1044 if (found == jobsIndex_.end())
1045 {
1046 LOG(WARNING) << "Unknown job: " << id;
1047 }
1048 else
1049 {
1050 found->second->SetPriority(priority);
1051
1052 if (found->second->GetState() == JobState_Pending)
1053 {
1054 // If the job is pending, we need to reconstruct the
1055 // priority queue, as the heap condition has changed
1056
1057 PendingJobs copy;
1058 std::swap(copy, pendingJobs_);
1059
1060 assert(pendingJobs_.empty());
1061 while (!copy.empty())
1062 {
1063 pendingJobs_.push(copy.top());
1064 copy.pop();
1065 }
1066 }
1067 }
1068
1069 CheckInvariants();
1070 }
1071
1072
1073 void Pause(const std::string& id)
1074 {
1075 LOG(INFO) << "Pausing job: " << id;
1076
1077 boost::mutex::scoped_lock lock(mutex_);
1078 CheckInvariants();
1079
1080 JobsIndex::iterator found = jobsIndex_.find(id);
1081
1082 if (found == jobsIndex_.end())
1083 {
1084 LOG(WARNING) << "Unknown job: " << id;
1085 }
1086 else
1087 {
1088 switch (found->second->GetState())
1089 {
1090 case JobState_Pending:
1091 {
1092 // If the job is pending, we need to reconstruct the
1093 // priority queue to remove it
1094 PendingJobs copy;
1095 std::swap(copy, pendingJobs_);
1096
1097 assert(pendingJobs_.empty());
1098 while (!copy.empty())
1099 {
1100 if (copy.top()->GetId() != id)
1101 {
1102 pendingJobs_.push(copy.top());
1103 }
1104
1105 copy.pop();
1106 }
1107
1108 found->second->SetState(JobState_Paused);
1109
1110 break;
1111 }
1112
1113 case JobState_Retry:
1114 {
1115 RetryJobs::iterator item = retryJobs_.find(found->second);
1116 assert(item != retryJobs_.end());
1117 retryJobs_.erase(item);
1118
1119 found->second->SetState(JobState_Paused);
1120
1121 break;
1122 }
1123
1124 case JobState_Paused:
1125 case JobState_Success:
1126 case JobState_Failure:
1127 // Nothing to be done
1128 break;
1129
1130 case JobState_Running:
1131 found->second->SchedulePause();
1132 break;
1133
1134 default:
1135 throw OrthancException(ErrorCode_InternalError);
1136 }
1137 }
1138
1139 CheckInvariants();
1140 }
1141
1142
1143 void Resume(const std::string& id)
1144 {
1145 LOG(INFO) << "Resuming job: " << id;
1146
1147 boost::mutex::scoped_lock lock(mutex_);
1148 CheckInvariants();
1149
1150 JobsIndex::iterator found = jobsIndex_.find(id);
1151
1152 if (found == jobsIndex_.end())
1153 {
1154 LOG(WARNING) << "Unknown job: " << id;
1155 }
1156 else if (found->second->GetState() != JobState_Paused)
1157 {
1158 LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
1159 }
1160 else
1161 {
1162 found->second->SetState(JobState_Pending);
1163 pendingJobs_.push(found->second);
1164 pendingJobAvailable_.notify_one();
1165 }
1166
1167 CheckInvariants();
1168 }
1169
1170
1171 void Resubmit(const std::string& id)
1172 {
1173 LOG(INFO) << "Resubmitting failed job: " << id;
1174
1175 boost::mutex::scoped_lock lock(mutex_);
1176 CheckInvariants();
1177
1178 JobsIndex::iterator found = jobsIndex_.find(id);
1179
1180 if (found == jobsIndex_.end())
1181 {
1182 LOG(WARNING) << "Unknown job: " << id;
1183 }
1184 else if (found->second->GetState() != JobState_Failure)
1185 {
1186 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
1187 }
1188 else
1189 {
1190 bool ok = false;
1191 for (CompletedJobs::iterator it = completedJobs_.begin();
1192 it != completedJobs_.end(); ++it)
1193 {
1194 if (*it == found->second)
1195 {
1196 ok = true;
1197 completedJobs_.erase(it);
1198 break;
1199 }
1200 }
1201
1202 assert(ok);
1203
1204 found->second->SetState(JobState_Pending);
1205 pendingJobs_.push(found->second);
1206 pendingJobAvailable_.notify_one();
1207 }
1208
1209 CheckInvariants();
1210 }
1211
1212
1213 void ScheduleRetries()
1214 {
1215 boost::mutex::scoped_lock lock(mutex_);
1216 CheckInvariants();
1217
1218 RetryJobs copy;
1219 std::swap(copy, retryJobs_);
1220
1221 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
1222
1223 assert(retryJobs_.empty());
1224 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
1225 {
1226 if ((*it)->IsRetryReady(now))
1227 {
1228 LOG(INFO) << "Retrying job: " << (*it)->GetId();
1229 (*it)->SetState(JobState_Pending);
1230 pendingJobs_.push(*it);
1231 pendingJobAvailable_.notify_one();
1232 }
1233 else
1234 {
1235 retryJobs_.insert(*it);
1236 }
1237 }
1238
1239 CheckInvariants();
1240 }
1241
1242
1243 bool GetState(JobState& state,
1244 const std::string& id)
1245 {
1246 boost::mutex::scoped_lock lock(mutex_);
1247 CheckInvariants();
1248
1249 JobsIndex::const_iterator it = jobsIndex_.find(id);
1250 if (it == jobsIndex_.end())
1251 {
1252 return false;
1253 }
1254 else
1255 {
1256 state = it->second->GetState();
1257 return true;
1258 }
1259 }
1260
1261
1262 class RunningJob : public boost::noncopyable
1263 {
1264 private:
1265 JobsRegistry& registry_;
1266 JobHandler* handler_; // Can only be accessed if the registry
1267 // mutex is locked!
1268 IJob* job_; // Will by design be in mutual exclusion,
1269 // because only one RunningJob can be
1270 // executed at a time on a JobHandler
1271
1272 std::string id_;
1273 int priority_;
1274 JobState targetState_;
1275 unsigned int targetRetryTimeout_;
1276
1277 public:
1278 RunningJob(JobsRegistry& registry,
1279 unsigned int timeout) :
1280 registry_(registry),
1281 handler_(NULL),
1282 targetState_(JobState_Failure),
1283 targetRetryTimeout_(0)
1284 {
1285 {
1286 boost::mutex::scoped_lock lock(registry_.mutex_);
1287
1288 while (registry_.pendingJobs_.empty())
1289 {
1290 if (timeout == 0)
1291 {
1292 registry_.pendingJobAvailable_.wait(lock);
1293 }
1294 else
1295 {
1296 bool success = registry_.pendingJobAvailable_.timed_wait
1297 (lock, boost::posix_time::milliseconds(timeout));
1298 if (!success)
1299 {
1300 // No pending job
1301 return;
1302 }
1303 }
1304 }
1305
1306 handler_ = registry_.pendingJobs_.top();
1307 registry_.pendingJobs_.pop();
1308
1309 assert(handler_->GetState() == JobState_Pending);
1310 handler_->SetState(JobState_Running);
1311
1312 job_ = &handler_->GetJob();
1313 id_ = handler_->GetId();
1314 priority_ = handler_->GetPriority();
1315 }
1316 }
1317
1318 ~RunningJob()
1319 {
1320 if (IsValid())
1321 {
1322 boost::mutex::scoped_lock lock(registry_.mutex_);
1323
1324 switch (targetState_)
1325 {
1326 case JobState_Failure:
1327 registry_.MarkRunningAsCompleted(*handler_, false);
1328 break;
1329
1330 case JobState_Success:
1331 registry_.MarkRunningAsCompleted(*handler_, true);
1332 break;
1333
1334 case JobState_Paused:
1335 registry_.MarkRunningAsPaused(*handler_);
1336 break;
1337
1338 case JobState_Retry:
1339 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
1340 break;
1341
1342 default:
1343 assert(0);
1344 }
1345 }
1346 }
1347
1348 bool IsValid() const
1349 {
1350 return (handler_ != NULL &&
1351 job_ != NULL);
1352 }
1353
1354 const std::string& GetId() const
1355 {
1356 if (!IsValid())
1357 {
1358 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1359 }
1360 else
1361 {
1362 return id_;
1363 }
1364 }
1365
1366 int GetPriority() const
1367 {
1368 if (!IsValid())
1369 {
1370 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1371 }
1372 else
1373 {
1374 return priority_;
1375 }
1376 }
1377
1378 IJob& GetJob()
1379 {
1380 if (!IsValid())
1381 {
1382 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1383 }
1384 else
1385 {
1386 return *job_;
1387 }
1388 }
1389
1390 bool IsPauseScheduled()
1391 {
1392 if (!IsValid())
1393 {
1394 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1395 }
1396 else
1397 {
1398 boost::mutex::scoped_lock lock(registry_.mutex_);
1399 registry_.CheckInvariants();
1400 assert(handler_->GetState() == JobState_Running);
1401
1402 return handler_->IsPauseScheduled();
1403 }
1404 }
1405
1406 void MarkSuccess()
1407 {
1408 if (!IsValid())
1409 {
1410 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1411 }
1412 else
1413 {
1414 targetState_ = JobState_Success;
1415 }
1416 }
1417
1418 void MarkFailure()
1419 {
1420 if (!IsValid())
1421 {
1422 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1423 }
1424 else
1425 {
1426 targetState_ = JobState_Failure;
1427 }
1428 }
1429
1430 void MarkPause()
1431 {
1432 if (!IsValid())
1433 {
1434 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1435 }
1436 else
1437 {
1438 targetState_ = JobState_Paused;
1439 }
1440 }
1441
1442 void MarkRetry(unsigned int timeout)
1443 {
1444 if (!IsValid())
1445 {
1446 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1447 }
1448 else
1449 {
1450 targetState_ = JobState_Retry;
1451 targetRetryTimeout_ = timeout;
1452 }
1453 }
1454
1455 void UpdateStatus(ErrorCode code)
1456 {
1457 if (!IsValid())
1458 {
1459 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1460 }
1461 else
1462 {
1463 JobStatus status(code, *job_);
1464
1465 boost::mutex::scoped_lock lock(registry_.mutex_);
1466 registry_.CheckInvariants();
1467 assert(handler_->GetState() == JobState_Running);
1468
1469 handler_->SetLastStatus(status);
1470 }
1471 }
1472 };
1473 };
1474
1475
1476
1477 class JobsEngine
1478 {
1479 private:
1480 enum State
1481 {
1482 State_Setup,
1483 State_Running,
1484 State_Stopping,
1485 State_Done
1486 };
1487
1488 boost::mutex stateMutex_;
1489 State state_;
1490 JobsRegistry registry_;
1491 boost::thread retryHandler_;
1492 std::vector<boost::thread> workers_;
1493
1494 bool ExecuteStep(JobsRegistry::RunningJob& running,
1495 size_t workerIndex)
1496 {
1497 assert(running.IsValid());
1498
1499 LOG(INFO) << "Executing job with priority " << running.GetPriority()
1500 << " in worker thread " << workerIndex << ": " << running.GetId();
1501
1502 if (running.IsPauseScheduled())
1503 {
1504 running.GetJob().ReleaseResources();
1505 running.MarkPause();
1506 return false;
1507 }
1508
1509 std::auto_ptr<JobStepResult> result;
1510
1511 {
1512 try
1513 {
1514 result.reset(running.GetJob().ExecuteStep());
1515
1516 if (result->GetCode() == JobStepCode_Failure)
1517 {
1518 running.UpdateStatus(ErrorCode_InternalError);
1519 }
1520 else
1521 {
1522 running.UpdateStatus(ErrorCode_Success);
1523 }
1524 }
1525 catch (OrthancException& e)
1526 {
1527 running.UpdateStatus(e.GetErrorCode());
1528 }
1529 catch (boost::bad_lexical_cast&)
1530 {
1531 running.UpdateStatus(ErrorCode_BadFileFormat);
1532 }
1533 catch (...)
1534 {
1535 running.UpdateStatus(ErrorCode_InternalError);
1536 }
1537
1538 if (result.get() == NULL)
1539 {
1540 result.reset(new JobStepResult(JobStepCode_Failure));
1541 }
1542 }
1543
1544 switch (result->GetCode())
1545 {
1546 case JobStepCode_Success:
1547 running.MarkSuccess();
1548 return false;
1549
1550 case JobStepCode_Failure:
1551 running.MarkFailure();
1552 return false;
1553
1554 case JobStepCode_Retry:
1555 running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout());
1556 return false;
1557
1558 case JobStepCode_Continue:
1559 return true;
1560
1561 default:
1562 throw OrthancException(ErrorCode_InternalError);
1563 }
1564 }
1565
1566 static void RetryHandler(JobsEngine* engine)
1567 {
1568 assert(engine != NULL);
1569
1570 for (;;)
1571 {
1572 boost::this_thread::sleep(boost::posix_time::milliseconds(200));
1573
1574 {
1575 boost::mutex::scoped_lock lock(engine->stateMutex_);
1576
1577 if (engine->state_ != State_Running)
1578 {
1579 return;
1580 }
1581 }
1582
1583 engine->GetRegistry().ScheduleRetries();
1584 }
1585 }
1586
1587 static void Worker(JobsEngine* engine,
1588 size_t workerIndex)
1589 {
1590 assert(engine != NULL);
1591
1592 LOG(INFO) << "Worker thread " << workerIndex << " has started";
1593
1594 for (;;)
1595 {
1596 {
1597 boost::mutex::scoped_lock lock(engine->stateMutex_);
1598
1599 if (engine->state_ != State_Running)
1600 {
1601 return;
1602 }
1603 }
1604
1605 JobsRegistry::RunningJob running(engine->GetRegistry(), 100);
1606
1607 if (running.IsValid())
1608 {
1609 for (;;)
1610 {
1611 if (!engine->ExecuteStep(running, workerIndex))
1612 {
1613 break;
1614 }
1615 }
1616 }
1617 }
1618 }
1619
1620 public:
1621 JobsEngine() :
1622 state_(State_Setup),
1623 workers_(1)
1624 {
1625 }
1626
1627 ~JobsEngine()
1628 {
1629 if (state_ != State_Setup &&
1630 state_ != State_Done)
1631 {
1632 LOG(ERROR) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!";
1633 Stop();
1634 }
1635 }
1636
1637 void SetWorkersCount(size_t count)
1638 {
1639 if (count == 0)
1640 {
1641 throw OrthancException(ErrorCode_ParameterOutOfRange);
1642 }
1643
1644 boost::mutex::scoped_lock lock(stateMutex_);
1645
1646 if (state_ != State_Setup)
1647 {
1648 // Can only be invoked before calling "Start()"
1649 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1650 }
1651
1652 workers_.resize(count);
1653 }
1654
1655 JobsRegistry& GetRegistry()
1656 {
1657 return registry_;
1658 }
1659
1660 void Start()
1661 {
1662 boost::mutex::scoped_lock lock(stateMutex_);
1663
1664 if (state_ != State_Setup)
1665 {
1666 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1667 }
1668
1669 retryHandler_ = boost::thread(RetryHandler, this);
1670
1671 for (size_t i = 0; i < workers_.size(); i++)
1672 {
1673 workers_[i] = boost::thread(Worker, this, i);
1674 }
1675
1676 state_ = State_Running;
1677
1678 LOG(WARNING) << "The jobs engine has started";
1679 }
1680
1681
1682 void Stop()
1683 {
1684 {
1685 boost::mutex::scoped_lock lock(stateMutex_);
1686
1687 if (state_ != State_Running)
1688 {
1689 return;
1690 }
1691
1692 state_ = State_Stopping;
1693 }
1694
1695 LOG(INFO) << "Stopping the jobs engine";
1696
1697 if (retryHandler_.joinable())
1698 {
1699 retryHandler_.join();
1700 }
1701
1702 for (size_t i = 0; i < workers_.size(); i++)
1703 {
1704 if (workers_[i].joinable())
1705 {
1706 workers_[i].join();
1707 }
1708 }
1709
1710 {
1711 boost::mutex::scoped_lock lock(stateMutex_);
1712 state_ = State_Done;
1713 }
1714
1715 LOG(WARNING) << "The jobs engine has stopped";
1716 }
1717 };
1718 }
1719
1720
1721
1722 class DummyJob : public Orthanc::IJob 244 class DummyJob : public Orthanc::IJob
1723 { 245 {
1724 private: 246 private:
1725 JobStepResult result_; 247 JobStepResult result_;
1726 unsigned int count_; 248 unsigned int count_;