comparison Core/JobsEngine/JobsRegistry.cpp @ 2673:8e0bc055d18c jobs

JobsRegistry::IObserver
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 11 Jun 2018 16:29:33 +0200
parents c5646f766b3e
children ea7aea6f6a95
comparison
equal deleted inserted replaced
2672:3efc44fac209 2673:8e0bc055d18c
290 target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds()); 290 target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds());
291 return true; 291 return true;
292 } 292 }
293 else 293 else
294 { 294 {
295 LOG(WARNING) << "Job backup is not supported for job of type: " << jobType_; 295 LOG(INFO) << "Job backup is not supported for job of type: " << jobType_;
296 return false; 296 return false;
297 } 297 }
298 } 298 }
299 299
300 JobHandler(IJobUnserializer& unserializer, 300 JobHandler(IJobUnserializer& unserializer,
472 472
473 CheckInvariants(); 473 CheckInvariants();
474 474
475 assert(job.GetState() == JobState_Running); 475 assert(job.GetState() == JobState_Running);
476 SetCompletedJob(job, success); 476 SetCompletedJob(job, success);
477
478 if (observer_ != NULL)
479 {
480 if (success)
481 {
482 observer_->SignalJobSuccess(job.GetId());
483 }
484 else
485 {
486 observer_->SignalJobFailure(job.GetId());
487 }
488 }
477 489
478 CheckInvariants(); 490 CheckInvariants();
479 } 491 }
480 492
481 493
602 614
603 std::auto_ptr<JobHandler> handler(handlerRaw); 615 std::auto_ptr<JobHandler> handler(handlerRaw);
604 616
605 boost::posix_time::ptime lastChangeTime = handler->GetLastStateChangeTime(); 617 boost::posix_time::ptime lastChangeTime = handler->GetLastStateChangeTime();
606 618
607 boost::mutex::scoped_lock lock(mutex_); 619 {
608 CheckInvariants(); 620 boost::mutex::scoped_lock lock(mutex_);
609 621 CheckInvariants();
610 id = handler->GetId(); 622
611 int priority = handler->GetPriority(); 623 id = handler->GetId();
612 624 int priority = handler->GetPriority();
613 switch (handler->GetState()) 625
614 { 626 switch (handler->GetState())
615 case JobState_Pending: 627 {
616 case JobState_Retry: 628 case JobState_Pending:
617 case JobState_Running: 629 case JobState_Retry:
618 handler->SetState(JobState_Pending); 630 case JobState_Running:
619 pendingJobs_.push(handler.get()); 631 handler->SetState(JobState_Pending);
620 pendingJobAvailable_.notify_one(); 632 pendingJobs_.push(handler.get());
621 break; 633 pendingJobAvailable_.notify_one();
634 break;
622 635
623 case JobState_Success: 636 case JobState_Success:
624 SetCompletedJob(*handler, true); 637 SetCompletedJob(*handler, true);
625 break; 638 break;
626 639
627 case JobState_Failure: 640 case JobState_Failure:
628 SetCompletedJob(*handler, false); 641 SetCompletedJob(*handler, false);
629 break; 642 break;
630 643
631 case JobState_Paused: 644 case JobState_Paused:
632 break; 645 break;
633 646
634 default: 647 default:
635 LOG(ERROR) << "A job should not be loaded from state: " 648 LOG(ERROR) << "A job should not be loaded from state: "
636 << EnumerationToString(handler->GetState()); 649 << EnumerationToString(handler->GetState());
637 throw OrthancException(ErrorCode_InternalError); 650 throw OrthancException(ErrorCode_InternalError);
638 } 651 }
639 652
640 if (keepLastChangeTime) 653 if (keepLastChangeTime)
641 { 654 {
642 handler->SetLastStateChangeTime(lastChangeTime); 655 handler->SetLastStateChangeTime(lastChangeTime);
643 } 656 }
644 657
645 jobsIndex_.insert(std::make_pair(id, handler.release())); 658 jobsIndex_.insert(std::make_pair(id, handler.release()));
646 659
647 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; 660 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
648 661
649 CheckInvariants(); 662 if (observer_ != NULL)
663 {
664 observer_->SignalJobSubmitted(id);
665 }
666
667 CheckInvariants();
668 }
650 } 669 }
651 670
652 671
653 void JobsRegistry::Submit(std::string& id, 672 void JobsRegistry::Submit(std::string& id,
654 IJob* job, // Takes ownership 673 IJob* job, // Takes ownership
970 bool JobsRegistry::GetState(JobState& state, 989 bool JobsRegistry::GetState(JobState& state,
971 const std::string& id) 990 const std::string& id)
972 { 991 {
973 boost::mutex::scoped_lock lock(mutex_); 992 boost::mutex::scoped_lock lock(mutex_);
974 return GetStateInternal(state, id); 993 return GetStateInternal(state, id);
994 }
995
996
997 void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer)
998 {
999 boost::mutex::scoped_lock lock(mutex_);
1000 observer_ = &observer;
1001 }
1002
1003
1004 void JobsRegistry::ResetObserver()
1005 {
1006 boost::mutex::scoped_lock lock(mutex_);
1007 observer_ = NULL;
975 } 1008 }
976 1009
977 1010
978 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, 1011 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry,
979 unsigned int timeout) : 1012 unsigned int timeout) :
1243 } 1276 }
1244 } 1277 }
1245 1278
1246 1279
1247 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, 1280 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
1248 const Json::Value& s) 1281 const Json::Value& s) :
1282 observer_(NULL)
1249 { 1283 {
1250 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || 1284 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
1251 !s.isMember(JOBS) || 1285 !s.isMember(JOBS) ||
1252 s[JOBS].type() != Json::objectValue) 1286 s[JOBS].type() != Json::objectValue)
1253 { 1287 {