# HG changeset patch # User Sebastien Jodogne # Date 1528727373 -7200 # Node ID 8e0bc055d18c483dbc12df0e16a0e9442802ac42 # Parent 3efc44fac20948b5f0eec98af3da0bc468a05c10 JobsRegistry::IObserver diff -r 3efc44fac209 -r 8e0bc055d18c Core/JobsEngine/JobsRegistry.cpp --- a/Core/JobsEngine/JobsRegistry.cpp Mon Jun 11 15:57:25 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.cpp Mon Jun 11 16:29:33 2018 +0200 @@ -292,7 +292,7 @@ } else { - LOG(WARNING) << "Job backup is not supported for job of type: " << jobType_; + LOG(INFO) << "Job backup is not supported for job of type: " << jobType_; return false; } } @@ -475,6 +475,18 @@ assert(job.GetState() == JobState_Running); SetCompletedJob(job, success); + if (observer_ != NULL) + { + if (success) + { + observer_->SignalJobSuccess(job.GetId()); + } + else + { + observer_->SignalJobFailure(job.GetId()); + } + } + CheckInvariants(); } @@ -604,49 +616,56 @@ boost::posix_time::ptime lastChangeTime = handler->GetLastStateChangeTime(); - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); - id = handler->GetId(); - int priority = handler->GetPriority(); + id = handler->GetId(); + int priority = handler->GetPriority(); - switch (handler->GetState()) - { - case JobState_Pending: - case JobState_Retry: - case JobState_Running: - handler->SetState(JobState_Pending); - pendingJobs_.push(handler.get()); - pendingJobAvailable_.notify_one(); - break; + switch (handler->GetState()) + { + case JobState_Pending: + case JobState_Retry: + case JobState_Running: + handler->SetState(JobState_Pending); + pendingJobs_.push(handler.get()); + pendingJobAvailable_.notify_one(); + break; - case JobState_Success: - SetCompletedJob(*handler, true); - break; + case JobState_Success: + SetCompletedJob(*handler, true); + break; - case JobState_Failure: - SetCompletedJob(*handler, false); - break; + case JobState_Failure: + SetCompletedJob(*handler, false); + break; + + case JobState_Paused: + break; + + default: + LOG(ERROR) << "A job should not be loaded from state: " + << EnumerationToString(handler->GetState()); + throw OrthancException(ErrorCode_InternalError); + } - case JobState_Paused: - break; - - default: - LOG(ERROR) << "A job should not be loaded from state: " - << EnumerationToString(handler->GetState()); - throw OrthancException(ErrorCode_InternalError); + if (keepLastChangeTime) + { + handler->SetLastStateChangeTime(lastChangeTime); + } + + jobsIndex_.insert(std::make_pair(id, handler.release())); + + LOG(INFO) << "New job submitted with priority " << priority << ": " << id; + + if (observer_ != NULL) + { + observer_->SignalJobSubmitted(id); + } + + CheckInvariants(); } - - if (keepLastChangeTime) - { - handler->SetLastStateChangeTime(lastChangeTime); - } - - jobsIndex_.insert(std::make_pair(id, handler.release())); - - LOG(INFO) << "New job submitted with priority " << priority << ": " << id; - - CheckInvariants(); } @@ -974,6 +993,20 @@ return GetStateInternal(state, id); } + + void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer) + { + boost::mutex::scoped_lock lock(mutex_); + observer_ = &observer; + } + + + void JobsRegistry::ResetObserver() + { + boost::mutex::scoped_lock lock(mutex_); + observer_ = NULL; + } + JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, unsigned int timeout) : @@ -1245,7 +1278,8 @@ JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, - const Json::Value& s) + const Json::Value& s) : + observer_(NULL) { if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || !s.isMember(JOBS) || diff -r 3efc44fac209 -r 8e0bc055d18c Core/JobsEngine/JobsRegistry.h --- a/Core/JobsEngine/JobsRegistry.h Mon Jun 11 15:57:25 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.h Mon Jun 11 16:29:33 2018 +0200 @@ -55,6 +55,21 @@ // This class handles the state machine of the jobs engine class JobsRegistry : public boost::noncopyable { + public: + class IObserver : public boost::noncopyable + { + public: + virtual ~IObserver() + { + } + + virtual void SignalJobSubmitted(const std::string& jobId) = 0; + + virtual void SignalJobSuccess(const std::string& jobId) = 0; + + virtual void SignalJobFailure(const std::string& jobId) = 0; + }; + private: class JobHandler; @@ -81,6 +96,8 @@ boost::condition_variable someJobComplete_; size_t maxCompletedJobs_; + IObserver* observer_; + #ifndef NDEBUG bool IsPendingJob(const JobHandler& job) const; @@ -118,7 +135,8 @@ public: JobsRegistry() : - maxCompletedJobs_(10) + maxCompletedJobs_(10), + observer_(NULL) { } @@ -162,6 +180,10 @@ bool GetState(JobState& state, const std::string& id); + void SetObserver(IObserver& observer); + + void ResetObserver(); + class RunningJob : public boost::noncopyable { private: diff -r 3efc44fac209 -r 8e0bc055d18c OrthancServer/ServerContext.cpp --- a/OrthancServer/ServerContext.cpp Mon Jun 11 15:57:25 2018 +0200 +++ b/OrthancServer/ServerContext.cpp Mon Jun 11 16:29:33 2018 +0200 @@ -121,15 +121,41 @@ { boost::this_thread::sleep(boost::posix_time::milliseconds(sleepDelay)); - if (boost::posix_time::microsec_clock::universal_time() >= next) + if (that->haveJobsChanged_ || + boost::posix_time::microsec_clock::universal_time() >= next) { - that->SaveJobsEngine(); + that->haveJobsChanged_ = false; + that->SaveJobsEngine(); next = boost::posix_time::microsec_clock::universal_time() + PERIODICITY; } } } + + + void ServerContext::SignalJobSubmitted(const std::string& jobId) + { + haveJobsChanged_ = true; + + // TODO: Call Lua + } + + + void ServerContext::SignalJobSuccess(const std::string& jobId) + { + haveJobsChanged_ = true; + + // TODO: Call Lua + } + void ServerContext::SignalJobFailure(const std::string& jobId) + { + haveJobsChanged_ = true; + + // TODO: Call Lua + } + + void ServerContext::SetupJobsEngine(bool unitTesting, bool loadJobsFromDatabase) { @@ -166,6 +192,7 @@ //jobsEngine_.GetRegistry().SetMaxCompleted // TODO + jobsEngine_.GetRegistry().SetObserver(*this); jobsEngine_.Start(); } @@ -206,6 +233,7 @@ plugins_(NULL), #endif done_(false), + haveJobsChanged_(false), queryRetrieveArchive_(Configuration::GetGlobalUnsignedIntegerParameter("QueryRetrieveSize", 10)), defaultLocalAet_(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC")) { @@ -250,6 +278,7 @@ saveJobsThread_.join(); } + jobsEngine_.GetRegistry().ResetObserver(); SaveJobsEngine(); // Do not change the order below! diff -r 3efc44fac209 -r 8e0bc055d18c OrthancServer/ServerContext.h --- a/OrthancServer/ServerContext.h Mon Jun 11 15:57:25 2018 +0200 +++ b/OrthancServer/ServerContext.h Mon Jun 11 16:29:33 2018 +0200 @@ -60,7 +60,7 @@ * filesystem (including compression), as well as the index of the * DICOM store. It implements the required locking mechanisms. **/ - class ServerContext + class ServerContext : private JobsRegistry::IObserver { private: class DicomCacheProvider : public ICachePageProvider @@ -118,6 +118,12 @@ void SaveJobsEngine(); + virtual void SignalJobSubmitted(const std::string& jobId); + + virtual void SignalJobSuccess(const std::string& jobId); + + virtual void SignalJobFailure(const std::string& jobId); + ServerIndex index_; IStorageArea& area_; @@ -139,6 +145,7 @@ boost::recursive_mutex listenersMutex_; bool done_; + bool haveJobsChanged_; SharedMessageQueue pendingChanges_; boost::thread changeThread_; boost::thread saveJobsThread_;