comparison Core/JobsEngine/JobsRegistry.cpp @ 2950:dc18d5804746

support of JobsHistorySize set to zero
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 30 Nov 2018 17:19:57 +0100
parents 577786f59252
children d924f9bb61cc
comparison
equal deleted inserted replaced
2949:e6204cd21443 2950:dc18d5804746
45 static const char* TYPE = "Type"; 45 static const char* TYPE = "Type";
46 static const char* PRIORITY = "Priority"; 46 static const char* PRIORITY = "Priority";
47 static const char* JOB = "Job"; 47 static const char* JOB = "Job";
48 static const char* JOBS = "Jobs"; 48 static const char* JOBS = "Jobs";
49 static const char* JOBS_REGISTRY = "JobsRegistry"; 49 static const char* JOBS_REGISTRY = "JobsRegistry";
50 static const char* MAX_COMPLETED_JOBS = "MaxCompletedJobs";
51 static const char* CREATION_TIME = "CreationTime"; 50 static const char* CREATION_TIME = "CreationTime";
52 static const char* LAST_CHANGE_TIME = "LastChangeTime"; 51 static const char* LAST_CHANGE_TIME = "LastChangeTime";
53 static const char* RUNTIME = "Runtime"; 52 static const char* RUNTIME = "Runtime";
54 53
55 54
433 #endif 432 #endif
434 433
435 434
436 void JobsRegistry::ForgetOldCompletedJobs() 435 void JobsRegistry::ForgetOldCompletedJobs()
437 { 436 {
438 if (maxCompletedJobs_ != 0) 437 while (completedJobs_.size() > maxCompletedJobs_)
439 { 438 {
440 while (completedJobs_.size() > maxCompletedJobs_) 439 assert(completedJobs_.front() != NULL);
441 { 440
442 assert(completedJobs_.front() != NULL); 441 std::string id = completedJobs_.front()->GetId();
443 442 assert(jobsIndex_.find(id) != jobsIndex_.end());
444 std::string id = completedJobs_.front()->GetId(); 443
445 assert(jobsIndex_.find(id) != jobsIndex_.end()); 444 jobsIndex_.erase(id);
446 445 delete(completedJobs_.front());
447 jobsIndex_.erase(id); 446 completedJobs_.pop_front();
448 delete(completedJobs_.front()); 447 }
449 completedJobs_.pop_front(); 448
450 } 449 CheckInvariants();
451 }
452 } 450 }
453 451
454 452
455 void JobsRegistry::SetCompletedJob(JobHandler& job, 453 void JobsRegistry::SetCompletedJob(JobHandler& job,
456 bool success) 454 bool success)
457 { 455 {
458 job.SetState(success ? JobState_Success : JobState_Failure); 456 job.SetState(success ? JobState_Success : JobState_Failure);
459 457
460 completedJobs_.push_back(&job); 458 completedJobs_.push_back(&job);
459 someJobComplete_.notify_all();
460 }
461
462
463 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job,
464 CompletedReason reason)
465 {
466 const char* tmp;
467
468 switch (reason)
469 {
470 case CompletedReason_Success:
471 tmp = "success";
472 break;
473
474 case CompletedReason_Failure:
475 tmp = "success";
476 break;
477
478 case CompletedReason_Canceled:
479 tmp = "cancel";
480 break;
481
482 default:
483 throw OrthancException(ErrorCode_InternalError);
484 }
485
486 LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId();
487
488 CheckInvariants();
489
490 assert(job.GetState() == JobState_Running);
491 SetCompletedJob(job, reason == CompletedReason_Success);
492
493 if (reason == CompletedReason_Canceled)
494 {
495 job.SetLastErrorCode(ErrorCode_CanceledJob);
496 }
497
498 if (observer_ != NULL)
499 {
500 if (reason == CompletedReason_Success)
501 {
502 observer_->SignalJobSuccess(job.GetId());
503 }
504 else
505 {
506 observer_->SignalJobFailure(job.GetId());
507 }
508 }
509
510 // WARNING: The following call might make "job" invalid if the job
511 // history size is empty
461 ForgetOldCompletedJobs(); 512 ForgetOldCompletedJobs();
462
463 someJobComplete_.notify_all();
464 }
465
466
467 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job,
468 bool success)
469 {
470 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
471 << ": " << job.GetId();
472
473 CheckInvariants();
474
475 assert(job.GetState() == JobState_Running);
476 SetCompletedJob(job, success);
477
478 if (observer_ != NULL)
479 {
480 if (success)
481 {
482 observer_->SignalJobSuccess(job.GetId());
483 }
484 else
485 {
486 observer_->SignalJobFailure(job.GetId());
487 }
488 }
489
490 CheckInvariants();
491 } 513 }
492 514
493 515
494 void JobsRegistry::MarkRunningAsRetry(JobHandler& job, 516 void JobsRegistry::MarkRunningAsRetry(JobHandler& job,
495 unsigned int timeout) 517 unsigned int timeout)
556 578
557 LOG(INFO) << "The size of the history of the jobs engine is set to: " << n << " job(s)"; 579 LOG(INFO) << "The size of the history of the jobs engine is set to: " << n << " job(s)";
558 580
559 maxCompletedJobs_ = n; 581 maxCompletedJobs_ = n;
560 ForgetOldCompletedJobs(); 582 ForgetOldCompletedJobs();
561 583 }
562 CheckInvariants(); 584
585
586 size_t JobsRegistry::GetMaxCompletedJobs()
587 {
588 boost::mutex::scoped_lock lock(mutex_);
589 CheckInvariants();
590 return maxCompletedJobs_;
563 } 591 }
564 592
565 593
566 void JobsRegistry::ListJobs(std::set<std::string>& target) 594 void JobsRegistry::ListJobs(std::set<std::string>& target)
567 { 595 {
602 } 630 }
603 } 631 }
604 632
605 633
606 void JobsRegistry::SubmitInternal(std::string& id, 634 void JobsRegistry::SubmitInternal(std::string& id,
607 JobHandler* handlerRaw, 635 JobHandler* handler)
608 bool keepLastChangeTime) 636 {
609 { 637 if (handler == NULL)
610 if (handlerRaw == NULL)
611 { 638 {
612 throw OrthancException(ErrorCode_NullPointer); 639 throw OrthancException(ErrorCode_NullPointer);
613 } 640 }
614 641
615 std::auto_ptr<JobHandler> handler(handlerRaw); 642 std::auto_ptr<JobHandler> protection(handler);
616
617 boost::posix_time::ptime lastChangeTime = handler->GetLastStateChangeTime();
618 643
619 { 644 {
620 boost::mutex::scoped_lock lock(mutex_); 645 boost::mutex::scoped_lock lock(mutex_);
621 CheckInvariants(); 646 CheckInvariants();
622 647
623 id = handler->GetId(); 648 id = handler->GetId();
624 int priority = handler->GetPriority(); 649 int priority = handler->GetPriority();
650
651 jobsIndex_.insert(std::make_pair(id, protection.release()));
625 652
626 switch (handler->GetState()) 653 switch (handler->GetState())
627 { 654 {
628 case JobState_Pending: 655 case JobState_Pending:
629 case JobState_Retry: 656 case JobState_Retry:
630 case JobState_Running: 657 case JobState_Running:
631 handler->SetState(JobState_Pending); 658 handler->SetState(JobState_Pending);
632 pendingJobs_.push(handler.get()); 659 pendingJobs_.push(handler);
633 pendingJobAvailable_.notify_one(); 660 pendingJobAvailable_.notify_one();
634 break; 661 break;
635 662
636 case JobState_Success: 663 case JobState_Success:
637 SetCompletedJob(*handler, true); 664 SetCompletedJob(*handler, true);
648 LOG(ERROR) << "A job should not be loaded from state: " 675 LOG(ERROR) << "A job should not be loaded from state: "
649 << EnumerationToString(handler->GetState()); 676 << EnumerationToString(handler->GetState());
650 throw OrthancException(ErrorCode_InternalError); 677 throw OrthancException(ErrorCode_InternalError);
651 } 678 }
652 679
653 if (keepLastChangeTime)
654 {
655 handler->SetLastStateChangeTime(lastChangeTime);
656 }
657
658 jobsIndex_.insert(std::make_pair(id, handler.release()));
659
660 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; 680 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
661 681
662 if (observer_ != NULL) 682 if (observer_ != NULL)
663 { 683 {
664 observer_->SignalJobSubmitted(id); 684 observer_->SignalJobSubmitted(id);
665 } 685 }
666 686
667 CheckInvariants(); 687 // WARNING: The following call might make "handler" invalid if
688 // the job history size is empty
689 ForgetOldCompletedJobs();
668 } 690 }
669 } 691 }
670 692
671 693
672 void JobsRegistry::Submit(std::string& id, 694 void JobsRegistry::Submit(std::string& id,
673 IJob* job, // Takes ownership 695 IJob* job, // Takes ownership
674 int priority) 696 int priority)
675 { 697 {
676 SubmitInternal(id, new JobHandler(job, priority), false); 698 SubmitInternal(id, new JobHandler(job, priority));
677 } 699 }
678 700
679 701
680 void JobsRegistry::Submit(IJob* job, // Takes ownership 702 void JobsRegistry::Submit(IJob* job, // Takes ownership
681 int priority) 703 int priority)
682 { 704 {
683 std::string id; 705 std::string id;
684 SubmitInternal(id, new JobHandler(job, priority), false); 706 SubmitInternal(id, new JobHandler(job, priority));
685 } 707 }
686 708
687 709
688 bool JobsRegistry::SubmitAndWait(Json::Value& successContent, 710 bool JobsRegistry::SubmitAndWait(Json::Value& successContent,
689 IJob* job, // Takes ownership 711 IJob* job, // Takes ownership
902 924
903 default: 925 default:
904 throw OrthancException(ErrorCode_InternalError); 926 throw OrthancException(ErrorCode_InternalError);
905 } 927 }
906 928
907 CheckInvariants(); 929 // WARNING: The following call might make "handler" invalid if
930 // the job history size is empty
931 ForgetOldCompletedJobs();
932
908 return true; 933 return true;
909 } 934 }
910 } 935 }
911 936
912 937
1089 boost::mutex::scoped_lock lock(registry_.mutex_); 1114 boost::mutex::scoped_lock lock(registry_.mutex_);
1090 1115
1091 switch (targetState_) 1116 switch (targetState_)
1092 { 1117 {
1093 case JobState_Failure: 1118 case JobState_Failure:
1094 registry_.MarkRunningAsCompleted(*handler_, false); 1119 registry_.MarkRunningAsCompleted
1095 1120 (*handler_, canceled_ ? CompletedReason_Canceled : CompletedReason_Failure);
1096 if (canceled_)
1097 {
1098 handler_->SetLastErrorCode(ErrorCode_CanceledJob);
1099 }
1100
1101 break; 1121 break;
1102 1122
1103 case JobState_Success: 1123 case JobState_Success:
1104 registry_.MarkRunningAsCompleted(*handler_, true); 1124 registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success);
1105 break; 1125 break;
1106 1126
1107 case JobState_Paused: 1127 case JobState_Paused:
1108 registry_.MarkRunningAsPaused(*handler_); 1128 registry_.MarkRunningAsPaused(*handler_);
1109 break; 1129 break;
1291 boost::mutex::scoped_lock lock(mutex_); 1311 boost::mutex::scoped_lock lock(mutex_);
1292 CheckInvariants(); 1312 CheckInvariants();
1293 1313
1294 target = Json::objectValue; 1314 target = Json::objectValue;
1295 target[TYPE] = JOBS_REGISTRY; 1315 target[TYPE] = JOBS_REGISTRY;
1296 target[MAX_COMPLETED_JOBS] = static_cast<unsigned int>(maxCompletedJobs_);
1297 target[JOBS] = Json::objectValue; 1316 target[JOBS] = Json::objectValue;
1298 1317
1299 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 1318 for (JobsIndex::const_iterator it = jobsIndex_.begin();
1300 it != jobsIndex_.end(); ++it) 1319 it != jobsIndex_.end(); ++it)
1301 { 1320 {
1307 } 1326 }
1308 } 1327 }
1309 1328
1310 1329
1311 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, 1330 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
1312 const Json::Value& s) : 1331 const Json::Value& s,
1332 size_t maxCompletedJobs) :
1333 maxCompletedJobs_(maxCompletedJobs),
1313 observer_(NULL) 1334 observer_(NULL)
1314 { 1335 {
1315 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || 1336 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
1316 !s.isMember(JOBS) || 1337 !s.isMember(JOBS) ||
1317 s[JOBS].type() != Json::objectValue) 1338 s[JOBS].type() != Json::objectValue)
1318 { 1339 {
1319 throw OrthancException(ErrorCode_BadFileFormat); 1340 throw OrthancException(ErrorCode_BadFileFormat);
1320 } 1341 }
1321 1342
1322 maxCompletedJobs_ = SerializationToolbox::ReadUnsignedInteger(s, MAX_COMPLETED_JOBS);
1323
1324 Json::Value::Members members = s[JOBS].getMemberNames(); 1343 Json::Value::Members members = s[JOBS].getMemberNames();
1325 1344
1326 for (Json::Value::Members::const_iterator it = members.begin(); 1345 for (Json::Value::Members::const_iterator it = members.begin();
1327 it != members.end(); ++it) 1346 it != members.end(); ++it)
1328 { 1347 {
1329 std::auto_ptr<JobHandler> job(new JobHandler(unserializer, s[JOBS][*it], *it)); 1348 std::auto_ptr<JobHandler> job(new JobHandler(unserializer, s[JOBS][*it], *it));
1330 1349
1350 const boost::posix_time::ptime lastChangeTime = job->GetLastStateChangeTime();
1351
1331 std::string id; 1352 std::string id;
1332 SubmitInternal(id, job.release(), true); 1353 SubmitInternal(id, job.release());
1354
1355 // Check whether the job has not been removed (which could be
1356 // the case if the "maxCompletedJobs_" value gets smaller)
1357 JobsIndex::iterator found = jobsIndex_.find(id);
1358 if (found != jobsIndex_.end())
1359 {
1360 // The job still lies in the history: Update the time of its
1361 // last change to the time that was serialized
1362 assert(found->second != NULL);
1363 found->second->SetLastStateChangeTime(lastChangeTime);
1364 }
1333 } 1365 }
1334 } 1366 }
1335 } 1367 }