comparison Core/JobsEngine/JobsRegistry.cpp @ 4022:a2e4edc7b9aa more-changes

wip: adding job changes
author Alain Mazy <alain@mazy.be>
date Tue, 09 Jun 2020 08:46:52 +0200
parents 2a170a8f1faf
children 1d2b31fc782f
comparison
equal deleted inserted replaced
4013:1e9f6d706237 4022:a2e4edc7b9aa
493 if (reason == CompletedReason_Canceled) 493 if (reason == CompletedReason_Canceled)
494 { 494 {
495 job.SetLastErrorCode(ErrorCode_CanceledJob); 495 job.SetLastErrorCode(ErrorCode_CanceledJob);
496 } 496 }
497 497
498 if (observer_ != NULL) 498 {
499 { 499 boost::shared_lock<boost::shared_mutex> lock(observersMutex_);
500 if (reason == CompletedReason_Success) 500
501 { 501 for (Observers::iterator it = observers_.begin(); it != observers_.end(); ++it)
502 observer_->SignalJobSuccess(job.GetId()); 502 {
503 } 503 if (reason == CompletedReason_Success)
504 else 504 {
505 { 505 (*it)->SignalJobSuccess(job.GetId());
506 observer_->SignalJobFailure(job.GetId()); 506 }
507 else
508 {
509 (*it)->SignalJobFailure(job.GetId());
510 }
511
507 } 512 }
508 } 513 }
509 514
510 // WARNING: The following call might make "job" invalid if the job 515 // WARNING: The following call might make "job" invalid if the job
511 // history size is empty 516 // history size is empty
709 } 714 }
710 } 715 }
711 716
712 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; 717 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
713 718
714 if (observer_ != NULL) 719 {
715 { 720 boost::shared_lock<boost::shared_mutex> lock(observersMutex_);
716 observer_->SignalJobSubmitted(id); 721
722 for (Observers::iterator it = observers_.begin(); it != observers_.end(); ++it)
723 {
724 (*it)->SignalJobSubmitted(id);
725 }
717 } 726 }
718 727
719 // WARNING: The following call might make "handler" invalid if 728 // WARNING: The following call might make "handler" invalid if
720 // the job history size is empty 729 // the job history size is empty
721 ForgetOldCompletedJobs(); 730 ForgetOldCompletedJobs();
1098 boost::mutex::scoped_lock lock(mutex_); 1107 boost::mutex::scoped_lock lock(mutex_);
1099 return GetStateInternal(state, id); 1108 return GetStateInternal(state, id);
1100 } 1109 }
1101 1110
1102 1111
1103 void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer) 1112 void JobsRegistry::AddObserver(JobsRegistry::IObserver& observer)
1104 { 1113 {
1105 boost::mutex::scoped_lock lock(mutex_); 1114 boost::unique_lock<boost::shared_mutex> lock(observersMutex_);
1106 observer_ = &observer; 1115 observers_.insert(&observer);
1107 } 1116 }
1108 1117
1109 1118
1110 void JobsRegistry::ResetObserver() 1119 void JobsRegistry::ResetObserver(JobsRegistry::IObserver& observer)
1111 { 1120 {
1112 boost::mutex::scoped_lock lock(mutex_); 1121 boost::unique_lock<boost::shared_mutex> lock(observersMutex_);
1113 observer_ = NULL; 1122 Observers::iterator it = observers_.find(&observer);
1123 if (it != observers_.end())
1124 {
1125 observers_.erase(it);
1126 }
1114 } 1127 }
1115 1128
1116 1129
1117 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, 1130 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry,
1118 unsigned int timeout) : 1131 unsigned int timeout) :
1379 1392
1380 1393
1381 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, 1394 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
1382 const Json::Value& s, 1395 const Json::Value& s,
1383 size_t maxCompletedJobs) : 1396 size_t maxCompletedJobs) :
1384 maxCompletedJobs_(maxCompletedJobs), 1397 maxCompletedJobs_(maxCompletedJobs)
1385 observer_(NULL)
1386 { 1398 {
1387 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || 1399 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
1388 !s.isMember(JOBS) || 1400 !s.isMember(JOBS) ||
1389 s[JOBS].type() != Json::objectValue) 1401 s[JOBS].type() != Json::objectValue)
1390 { 1402 {