Mercurial > hg > orthanc
comparison Core/JobsEngine/JobsRegistry.cpp @ 2950:dc18d5804746
support of JobsHistorySize set to zero
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 30 Nov 2018 17:19:57 +0100 |
parents | 577786f59252 |
children | d924f9bb61cc |
comparison
equal
deleted
inserted
replaced
2949:e6204cd21443 | 2950:dc18d5804746 |
---|---|
45 static const char* TYPE = "Type"; | 45 static const char* TYPE = "Type"; |
46 static const char* PRIORITY = "Priority"; | 46 static const char* PRIORITY = "Priority"; |
47 static const char* JOB = "Job"; | 47 static const char* JOB = "Job"; |
48 static const char* JOBS = "Jobs"; | 48 static const char* JOBS = "Jobs"; |
49 static const char* JOBS_REGISTRY = "JobsRegistry"; | 49 static const char* JOBS_REGISTRY = "JobsRegistry"; |
50 static const char* MAX_COMPLETED_JOBS = "MaxCompletedJobs"; | |
51 static const char* CREATION_TIME = "CreationTime"; | 50 static const char* CREATION_TIME = "CreationTime"; |
52 static const char* LAST_CHANGE_TIME = "LastChangeTime"; | 51 static const char* LAST_CHANGE_TIME = "LastChangeTime"; |
53 static const char* RUNTIME = "Runtime"; | 52 static const char* RUNTIME = "Runtime"; |
54 | 53 |
55 | 54 |
433 #endif | 432 #endif |
434 | 433 |
435 | 434 |
436 void JobsRegistry::ForgetOldCompletedJobs() | 435 void JobsRegistry::ForgetOldCompletedJobs() |
437 { | 436 { |
438 if (maxCompletedJobs_ != 0) | 437 while (completedJobs_.size() > maxCompletedJobs_) |
439 { | 438 { |
440 while (completedJobs_.size() > maxCompletedJobs_) | 439 assert(completedJobs_.front() != NULL); |
441 { | 440 |
442 assert(completedJobs_.front() != NULL); | 441 std::string id = completedJobs_.front()->GetId(); |
443 | 442 assert(jobsIndex_.find(id) != jobsIndex_.end()); |
444 std::string id = completedJobs_.front()->GetId(); | 443 |
445 assert(jobsIndex_.find(id) != jobsIndex_.end()); | 444 jobsIndex_.erase(id); |
446 | 445 delete(completedJobs_.front()); |
447 jobsIndex_.erase(id); | 446 completedJobs_.pop_front(); |
448 delete(completedJobs_.front()); | 447 } |
449 completedJobs_.pop_front(); | 448 |
450 } | 449 CheckInvariants(); |
451 } | |
452 } | 450 } |
453 | 451 |
454 | 452 |
455 void JobsRegistry::SetCompletedJob(JobHandler& job, | 453 void JobsRegistry::SetCompletedJob(JobHandler& job, |
456 bool success) | 454 bool success) |
457 { | 455 { |
458 job.SetState(success ? JobState_Success : JobState_Failure); | 456 job.SetState(success ? JobState_Success : JobState_Failure); |
459 | 457 |
460 completedJobs_.push_back(&job); | 458 completedJobs_.push_back(&job); |
459 someJobComplete_.notify_all(); | |
460 } | |
461 | |
462 | |
463 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, | |
464 CompletedReason reason) | |
465 { | |
466 const char* tmp; | |
467 | |
468 switch (reason) | |
469 { | |
470 case CompletedReason_Success: | |
471 tmp = "success"; | |
472 break; | |
473 | |
474 case CompletedReason_Failure: | |
475 tmp = "success"; | |
476 break; | |
477 | |
478 case CompletedReason_Canceled: | |
479 tmp = "cancel"; | |
480 break; | |
481 | |
482 default: | |
483 throw OrthancException(ErrorCode_InternalError); | |
484 } | |
485 | |
486 LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId(); | |
487 | |
488 CheckInvariants(); | |
489 | |
490 assert(job.GetState() == JobState_Running); | |
491 SetCompletedJob(job, reason == CompletedReason_Success); | |
492 | |
493 if (reason == CompletedReason_Canceled) | |
494 { | |
495 job.SetLastErrorCode(ErrorCode_CanceledJob); | |
496 } | |
497 | |
498 if (observer_ != NULL) | |
499 { | |
500 if (reason == CompletedReason_Success) | |
501 { | |
502 observer_->SignalJobSuccess(job.GetId()); | |
503 } | |
504 else | |
505 { | |
506 observer_->SignalJobFailure(job.GetId()); | |
507 } | |
508 } | |
509 | |
510 // WARNING: The following call might make "job" invalid if the job | |
511 // history size is empty | |
461 ForgetOldCompletedJobs(); | 512 ForgetOldCompletedJobs(); |
462 | |
463 someJobComplete_.notify_all(); | |
464 } | |
465 | |
466 | |
467 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, | |
468 bool success) | |
469 { | |
470 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") | |
471 << ": " << job.GetId(); | |
472 | |
473 CheckInvariants(); | |
474 | |
475 assert(job.GetState() == JobState_Running); | |
476 SetCompletedJob(job, success); | |
477 | |
478 if (observer_ != NULL) | |
479 { | |
480 if (success) | |
481 { | |
482 observer_->SignalJobSuccess(job.GetId()); | |
483 } | |
484 else | |
485 { | |
486 observer_->SignalJobFailure(job.GetId()); | |
487 } | |
488 } | |
489 | |
490 CheckInvariants(); | |
491 } | 513 } |
492 | 514 |
493 | 515 |
494 void JobsRegistry::MarkRunningAsRetry(JobHandler& job, | 516 void JobsRegistry::MarkRunningAsRetry(JobHandler& job, |
495 unsigned int timeout) | 517 unsigned int timeout) |
556 | 578 |
557 LOG(INFO) << "The size of the history of the jobs engine is set to: " << n << " job(s)"; | 579 LOG(INFO) << "The size of the history of the jobs engine is set to: " << n << " job(s)"; |
558 | 580 |
559 maxCompletedJobs_ = n; | 581 maxCompletedJobs_ = n; |
560 ForgetOldCompletedJobs(); | 582 ForgetOldCompletedJobs(); |
561 | 583 } |
562 CheckInvariants(); | 584 |
585 | |
586 size_t JobsRegistry::GetMaxCompletedJobs() | |
587 { | |
588 boost::mutex::scoped_lock lock(mutex_); | |
589 CheckInvariants(); | |
590 return maxCompletedJobs_; | |
563 } | 591 } |
564 | 592 |
565 | 593 |
566 void JobsRegistry::ListJobs(std::set<std::string>& target) | 594 void JobsRegistry::ListJobs(std::set<std::string>& target) |
567 { | 595 { |
602 } | 630 } |
603 } | 631 } |
604 | 632 |
605 | 633 |
606 void JobsRegistry::SubmitInternal(std::string& id, | 634 void JobsRegistry::SubmitInternal(std::string& id, |
607 JobHandler* handlerRaw, | 635 JobHandler* handler) |
608 bool keepLastChangeTime) | 636 { |
609 { | 637 if (handler == NULL) |
610 if (handlerRaw == NULL) | |
611 { | 638 { |
612 throw OrthancException(ErrorCode_NullPointer); | 639 throw OrthancException(ErrorCode_NullPointer); |
613 } | 640 } |
614 | 641 |
615 std::auto_ptr<JobHandler> handler(handlerRaw); | 642 std::auto_ptr<JobHandler> protection(handler); |
616 | |
617 boost::posix_time::ptime lastChangeTime = handler->GetLastStateChangeTime(); | |
618 | 643 |
619 { | 644 { |
620 boost::mutex::scoped_lock lock(mutex_); | 645 boost::mutex::scoped_lock lock(mutex_); |
621 CheckInvariants(); | 646 CheckInvariants(); |
622 | 647 |
623 id = handler->GetId(); | 648 id = handler->GetId(); |
624 int priority = handler->GetPriority(); | 649 int priority = handler->GetPriority(); |
650 | |
651 jobsIndex_.insert(std::make_pair(id, protection.release())); | |
625 | 652 |
626 switch (handler->GetState()) | 653 switch (handler->GetState()) |
627 { | 654 { |
628 case JobState_Pending: | 655 case JobState_Pending: |
629 case JobState_Retry: | 656 case JobState_Retry: |
630 case JobState_Running: | 657 case JobState_Running: |
631 handler->SetState(JobState_Pending); | 658 handler->SetState(JobState_Pending); |
632 pendingJobs_.push(handler.get()); | 659 pendingJobs_.push(handler); |
633 pendingJobAvailable_.notify_one(); | 660 pendingJobAvailable_.notify_one(); |
634 break; | 661 break; |
635 | 662 |
636 case JobState_Success: | 663 case JobState_Success: |
637 SetCompletedJob(*handler, true); | 664 SetCompletedJob(*handler, true); |
648 LOG(ERROR) << "A job should not be loaded from state: " | 675 LOG(ERROR) << "A job should not be loaded from state: " |
649 << EnumerationToString(handler->GetState()); | 676 << EnumerationToString(handler->GetState()); |
650 throw OrthancException(ErrorCode_InternalError); | 677 throw OrthancException(ErrorCode_InternalError); |
651 } | 678 } |
652 | 679 |
653 if (keepLastChangeTime) | |
654 { | |
655 handler->SetLastStateChangeTime(lastChangeTime); | |
656 } | |
657 | |
658 jobsIndex_.insert(std::make_pair(id, handler.release())); | |
659 | |
660 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; | 680 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; |
661 | 681 |
662 if (observer_ != NULL) | 682 if (observer_ != NULL) |
663 { | 683 { |
664 observer_->SignalJobSubmitted(id); | 684 observer_->SignalJobSubmitted(id); |
665 } | 685 } |
666 | 686 |
667 CheckInvariants(); | 687 // WARNING: The following call might make "handler" invalid if |
688 // the job history size is empty | |
689 ForgetOldCompletedJobs(); | |
668 } | 690 } |
669 } | 691 } |
670 | 692 |
671 | 693 |
672 void JobsRegistry::Submit(std::string& id, | 694 void JobsRegistry::Submit(std::string& id, |
673 IJob* job, // Takes ownership | 695 IJob* job, // Takes ownership |
674 int priority) | 696 int priority) |
675 { | 697 { |
676 SubmitInternal(id, new JobHandler(job, priority), false); | 698 SubmitInternal(id, new JobHandler(job, priority)); |
677 } | 699 } |
678 | 700 |
679 | 701 |
680 void JobsRegistry::Submit(IJob* job, // Takes ownership | 702 void JobsRegistry::Submit(IJob* job, // Takes ownership |
681 int priority) | 703 int priority) |
682 { | 704 { |
683 std::string id; | 705 std::string id; |
684 SubmitInternal(id, new JobHandler(job, priority), false); | 706 SubmitInternal(id, new JobHandler(job, priority)); |
685 } | 707 } |
686 | 708 |
687 | 709 |
688 bool JobsRegistry::SubmitAndWait(Json::Value& successContent, | 710 bool JobsRegistry::SubmitAndWait(Json::Value& successContent, |
689 IJob* job, // Takes ownership | 711 IJob* job, // Takes ownership |
902 | 924 |
903 default: | 925 default: |
904 throw OrthancException(ErrorCode_InternalError); | 926 throw OrthancException(ErrorCode_InternalError); |
905 } | 927 } |
906 | 928 |
907 CheckInvariants(); | 929 // WARNING: The following call might make "handler" invalid if |
930 // the job history size is empty | |
931 ForgetOldCompletedJobs(); | |
932 | |
908 return true; | 933 return true; |
909 } | 934 } |
910 } | 935 } |
911 | 936 |
912 | 937 |
1089 boost::mutex::scoped_lock lock(registry_.mutex_); | 1114 boost::mutex::scoped_lock lock(registry_.mutex_); |
1090 | 1115 |
1091 switch (targetState_) | 1116 switch (targetState_) |
1092 { | 1117 { |
1093 case JobState_Failure: | 1118 case JobState_Failure: |
1094 registry_.MarkRunningAsCompleted(*handler_, false); | 1119 registry_.MarkRunningAsCompleted |
1095 | 1120 (*handler_, canceled_ ? CompletedReason_Canceled : CompletedReason_Failure); |
1096 if (canceled_) | |
1097 { | |
1098 handler_->SetLastErrorCode(ErrorCode_CanceledJob); | |
1099 } | |
1100 | |
1101 break; | 1121 break; |
1102 | 1122 |
1103 case JobState_Success: | 1123 case JobState_Success: |
1104 registry_.MarkRunningAsCompleted(*handler_, true); | 1124 registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success); |
1105 break; | 1125 break; |
1106 | 1126 |
1107 case JobState_Paused: | 1127 case JobState_Paused: |
1108 registry_.MarkRunningAsPaused(*handler_); | 1128 registry_.MarkRunningAsPaused(*handler_); |
1109 break; | 1129 break; |
1291 boost::mutex::scoped_lock lock(mutex_); | 1311 boost::mutex::scoped_lock lock(mutex_); |
1292 CheckInvariants(); | 1312 CheckInvariants(); |
1293 | 1313 |
1294 target = Json::objectValue; | 1314 target = Json::objectValue; |
1295 target[TYPE] = JOBS_REGISTRY; | 1315 target[TYPE] = JOBS_REGISTRY; |
1296 target[MAX_COMPLETED_JOBS] = static_cast<unsigned int>(maxCompletedJobs_); | |
1297 target[JOBS] = Json::objectValue; | 1316 target[JOBS] = Json::objectValue; |
1298 | 1317 |
1299 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | 1318 for (JobsIndex::const_iterator it = jobsIndex_.begin(); |
1300 it != jobsIndex_.end(); ++it) | 1319 it != jobsIndex_.end(); ++it) |
1301 { | 1320 { |
1307 } | 1326 } |
1308 } | 1327 } |
1309 | 1328 |
1310 | 1329 |
1311 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, | 1330 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, |
1312 const Json::Value& s) : | 1331 const Json::Value& s, |
1332 size_t maxCompletedJobs) : | |
1333 maxCompletedJobs_(maxCompletedJobs), | |
1313 observer_(NULL) | 1334 observer_(NULL) |
1314 { | 1335 { |
1315 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || | 1336 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || |
1316 !s.isMember(JOBS) || | 1337 !s.isMember(JOBS) || |
1317 s[JOBS].type() != Json::objectValue) | 1338 s[JOBS].type() != Json::objectValue) |
1318 { | 1339 { |
1319 throw OrthancException(ErrorCode_BadFileFormat); | 1340 throw OrthancException(ErrorCode_BadFileFormat); |
1320 } | 1341 } |
1321 | 1342 |
1322 maxCompletedJobs_ = SerializationToolbox::ReadUnsignedInteger(s, MAX_COMPLETED_JOBS); | |
1323 | |
1324 Json::Value::Members members = s[JOBS].getMemberNames(); | 1343 Json::Value::Members members = s[JOBS].getMemberNames(); |
1325 | 1344 |
1326 for (Json::Value::Members::const_iterator it = members.begin(); | 1345 for (Json::Value::Members::const_iterator it = members.begin(); |
1327 it != members.end(); ++it) | 1346 it != members.end(); ++it) |
1328 { | 1347 { |
1329 std::auto_ptr<JobHandler> job(new JobHandler(unserializer, s[JOBS][*it], *it)); | 1348 std::auto_ptr<JobHandler> job(new JobHandler(unserializer, s[JOBS][*it], *it)); |
1330 | 1349 |
1350 const boost::posix_time::ptime lastChangeTime = job->GetLastStateChangeTime(); | |
1351 | |
1331 std::string id; | 1352 std::string id; |
1332 SubmitInternal(id, job.release(), true); | 1353 SubmitInternal(id, job.release()); |
1354 | |
1355 // Check whether the job has not been removed (which could be | |
1356 // the case if the "maxCompletedJobs_" value gets smaller) | |
1357 JobsIndex::iterator found = jobsIndex_.find(id); | |
1358 if (found != jobsIndex_.end()) | |
1359 { | |
1360 // The job still lies in the history: Update the time of its | |
1361 // last change to the time that was serialized | |
1362 assert(found->second != NULL); | |
1363 found->second->SetLastStateChangeTime(lastChangeTime); | |
1364 } | |
1333 } | 1365 } |
1334 } | 1366 } |
1335 } | 1367 } |