Mercurial > hg > orthanc
view OrthancFramework/Sources/JobsEngine/JobsRegistry.cpp @ 5682:fd4c5e064cbe find-refactoring
started refactoring of ServerContext::Apply()
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Tue, 09 Jul 2024 11:17:34 +0200 |
parents | f7adfb22e20e |
children | 4d932683049d |
line wrap: on
line source
/** * Orthanc - A Lightweight, RESTful DICOM Store * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics * Department, University Hospital of Liege, Belgium * Copyright (C) 2017-2023 Osimis S.A., Belgium * Copyright (C) 2024-2024 Orthanc Team SRL, Belgium * Copyright (C) 2021-2024 Sebastien Jodogne, ICTEAM UCLouvain, Belgium * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this program. If not, see * <http://www.gnu.org/licenses/>. **/ #include "../PrecompiledHeaders.h" #include "JobsRegistry.h" #include "../Logging.h" #include "../OrthancException.h" #include "../Toolbox.h" #include "../SerializationToolbox.h" namespace Orthanc { static const char* STATE = "State"; static const char* TYPE = "Type"; static const char* PRIORITY = "Priority"; static const char* JOB = "Job"; static const char* JOBS = "Jobs"; static const char* JOBS_REGISTRY = "JobsRegistry"; static const char* CREATION_TIME = "CreationTime"; static const char* LAST_CHANGE_TIME = "LastChangeTime"; static const char* RUNTIME = "Runtime"; static const char* ERROR_CODE = "ErrorCode"; static const char* ERROR_DETAILS = "ErrorDetails"; class JobsRegistry::JobHandler : public boost::noncopyable { private: std::string id_; JobState state_; std::string jobType_; std::unique_ptr<IJob> job_; int priority_; // "+inf()" means highest priority boost::posix_time::ptime creationTime_; boost::posix_time::ptime lastStateChangeTime_; boost::posix_time::time_duration runtime_; boost::posix_time::ptime retryTime_; bool pauseScheduled_; bool cancelScheduled_; JobStatus lastStatus_; void Touch() { const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); if (state_ == JobState_Running) { runtime_ += (now - lastStateChangeTime_); } lastStateChangeTime_ = now; } void SetStateInternal(JobState state) { state_ = state; pauseScheduled_ = false; cancelScheduled_ = false; Touch(); } public: JobHandler(IJob* job, int priority) : id_(Toolbox::GenerateUuid()), state_(JobState_Pending), job_(job), priority_(priority), creationTime_(boost::posix_time::microsec_clock::universal_time()), lastStateChangeTime_(creationTime_), runtime_(boost::posix_time::milliseconds(0)), retryTime_(creationTime_), pauseScheduled_(false), cancelScheduled_(false) { if (job == NULL) { throw OrthancException(ErrorCode_NullPointer); } job->GetJobType(jobType_); job->Start(); lastStatus_ = JobStatus(ErrorCode_Success, "", *job_); } const std::string& GetId() const { return id_; } IJob& GetJob() const { assert(job_.get() != NULL); return *job_; } void SetPriority(int priority) { priority_ = priority; } int GetPriority() const { return priority_; } JobState GetState() const { return state_; } void SetState(JobState state) { if (state == JobState_Retry) { // Use "SetRetryState()" throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { SetStateInternal(state); } } void SetRetryState(unsigned int timeout) { if (state_ == JobState_Running) { SetStateInternal(JobState_Retry); retryTime_ = (boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(timeout)); } else { // Only valid for running jobs throw OrthancException(ErrorCode_BadSequenceOfCalls); } } void SchedulePause() { if (state_ == JobState_Running) { pauseScheduled_ = true; } else { // Only valid for running jobs throw OrthancException(ErrorCode_BadSequenceOfCalls); } } void ScheduleCancel() { if (state_ == JobState_Running) { cancelScheduled_ = true; } else { // Only valid for running jobs throw OrthancException(ErrorCode_BadSequenceOfCalls); } } bool IsPauseScheduled() { return pauseScheduled_; } bool IsCancelScheduled() { return cancelScheduled_; } bool IsRetryReady(const boost::posix_time::ptime& now) const { if (state_ != JobState_Retry) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { return retryTime_ <= now; } } const boost::posix_time::ptime& GetCreationTime() const { return creationTime_; } const boost::posix_time::ptime& GetLastStateChangeTime() const { return lastStateChangeTime_; } void SetLastStateChangeTime(const boost::posix_time::ptime& time) { lastStateChangeTime_ = time; } const boost::posix_time::time_duration& GetRuntime() const { return runtime_; } void ResetRuntime() { runtime_ = boost::posix_time::milliseconds(0); } const JobStatus& GetLastStatus() const { return lastStatus_; } void SetLastStatus(const JobStatus& status) { lastStatus_ = status; Touch(); } void SetLastErrorCode(ErrorCode code) { lastStatus_.SetErrorCode(code); } bool Serialize(Json::Value& target) const { target = Json::objectValue; bool ok; if (state_ == JobState_Running) { // WARNING: Cannot directly access the "job_" member, as long // as a "RunningJob" instance is running. We do not use a // mutex at the "JobHandler" level, as serialization would be // blocked while a step in the job is running. Instead, we // save a snapshot of the serialized job. (*) if (lastStatus_.HasSerialized()) { target[JOB] = lastStatus_.GetSerialized(); ok = true; } else { ok = false; } } else { ok = job_->Serialize(target[JOB]); } if (ok) { target[STATE] = EnumerationToString(state_); target[PRIORITY] = priority_; target[CREATION_TIME] = boost::posix_time::to_iso_string(creationTime_); target[LAST_CHANGE_TIME] = boost::posix_time::to_iso_string(lastStateChangeTime_); target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds()); // New in Orthanc 1.9.5 target[ERROR_CODE] = static_cast<int>(lastStatus_.GetErrorCode()); target[ERROR_DETAILS] = lastStatus_.GetDetails(); return true; } else { LOG(TRACE) << "Job backup is not supported for job of type: " << jobType_; return false; } } JobHandler(IJobUnserializer& unserializer, const Json::Value& serialized, const std::string& id) : id_(id), pauseScheduled_(false), cancelScheduled_(false) { state_ = StringToJobState(SerializationToolbox::ReadString(serialized, STATE)); priority_ = SerializationToolbox::ReadInteger(serialized, PRIORITY); creationTime_ = boost::posix_time::from_iso_string (SerializationToolbox::ReadString(serialized, CREATION_TIME)); lastStateChangeTime_ = boost::posix_time::from_iso_string (SerializationToolbox::ReadString(serialized, LAST_CHANGE_TIME)); runtime_ = boost::posix_time::milliseconds (SerializationToolbox::ReadInteger(serialized, RUNTIME)); retryTime_ = creationTime_; job_.reset(unserializer.UnserializeJob(serialized[JOB])); job_->GetJobType(jobType_); job_->Start(); ErrorCode errorCode; if (serialized.isMember(ERROR_CODE)) { errorCode = static_cast<ErrorCode>(SerializationToolbox::ReadInteger(serialized, ERROR_CODE)); } else { errorCode = ErrorCode_Success; // Backward compatibility with Orthanc <= 1.9.4 } std::string details; if (serialized.isMember(ERROR_DETAILS)) // Backward compatibility with Orthanc <= 1.9.4 { details = SerializationToolbox::ReadString(serialized, ERROR_DETAILS); } lastStatus_ = JobStatus(errorCode, details, *job_); } }; bool JobsRegistry::PriorityComparator::operator() (JobHandler* const& a, JobHandler* const& b) const { return a->GetPriority() < b->GetPriority(); } #if defined(NDEBUG) void JobsRegistry::CheckInvariants() const { } #else bool JobsRegistry::IsPendingJob(const JobHandler& job) const { PendingJobs copy = pendingJobs_; while (!copy.empty()) { if (copy.top() == &job) { return true; } copy.pop(); } return false; } bool JobsRegistry::IsCompletedJob(JobHandler& job) const { for (CompletedJobs::const_iterator it = completedJobs_.begin(); it != completedJobs_.end(); ++it) { if (*it == &job) { return true; } } return false; } bool JobsRegistry::IsRetryJob(JobHandler& job) const { return retryJobs_.find(&job) != retryJobs_.end(); } void JobsRegistry::CheckInvariants() const { { PendingJobs copy = pendingJobs_; while (!copy.empty()) { assert(copy.top()->GetState() == JobState_Pending); copy.pop(); } } assert(completedJobs_.size() <= maxCompletedJobs_); for (CompletedJobs::const_iterator it = completedJobs_.begin(); it != completedJobs_.end(); ++it) { assert((*it)->GetState() == JobState_Success || (*it)->GetState() == JobState_Failure); } for (RetryJobs::const_iterator it = retryJobs_.begin(); it != retryJobs_.end(); ++it) { assert((*it)->GetState() == JobState_Retry); } for (JobsIndex::const_iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) { JobHandler& job = *it->second; assert(job.GetId() == it->first); switch (job.GetState()) { case JobState_Pending: assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); break; case JobState_Success: case JobState_Failure: assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); break; case JobState_Retry: assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); break; case JobState_Running: case JobState_Paused: assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); break; default: throw OrthancException(ErrorCode_InternalError); } } } #endif void JobsRegistry::ForgetOldCompletedJobs() { while (completedJobs_.size() > maxCompletedJobs_) { assert(completedJobs_.front() != NULL); std::string id = completedJobs_.front()->GetId(); assert(jobsIndex_.find(id) != jobsIndex_.end()); jobsIndex_.erase(id); delete(completedJobs_.front()); completedJobs_.pop_front(); } CheckInvariants(); } void JobsRegistry::SetCompletedJob(JobHandler& job, bool success) { job.SetState(success ? JobState_Success : JobState_Failure); completedJobs_.push_back(&job); someJobComplete_.notify_all(); } void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, CompletedReason reason) { const char* tmp; switch (reason) { case CompletedReason_Success: tmp = "success"; break; case CompletedReason_Failure: tmp = "failure"; break; case CompletedReason_Canceled: tmp = "cancel"; break; default: throw OrthancException(ErrorCode_InternalError); } LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId(); CheckInvariants(); assert(job.GetState() == JobState_Running); SetCompletedJob(job, reason == CompletedReason_Success); if (reason == CompletedReason_Canceled) { job.SetLastErrorCode(ErrorCode_CanceledJob); } if (observer_ != NULL) { if (reason == CompletedReason_Success) { observer_->SignalJobSuccess(job.GetId()); } else { observer_->SignalJobFailure(job.GetId()); } } // WARNING: The following call might make "job" invalid if the job // history size is empty ForgetOldCompletedJobs(); } void JobsRegistry::MarkRunningAsRetry(JobHandler& job, unsigned int timeout) { LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); CheckInvariants(); assert(job.GetState() == JobState_Running && retryJobs_.find(&job) == retryJobs_.end()); retryJobs_.insert(&job); job.SetRetryState(timeout); CheckInvariants(); } void JobsRegistry::MarkRunningAsPaused(JobHandler& job) { LOG(INFO) << "Job paused: " << job.GetId(); CheckInvariants(); assert(job.GetState() == JobState_Running); job.SetState(JobState_Paused); CheckInvariants(); } bool JobsRegistry::GetStateInternal(JobState& state, const std::string& id) { CheckInvariants(); JobsIndex::const_iterator it = jobsIndex_.find(id); if (it == jobsIndex_.end()) { return false; } else { state = it->second->GetState(); return true; } } JobsRegistry::~JobsRegistry() { for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) { assert(it->second != NULL); delete it->second; } } void JobsRegistry::SetMaxCompletedJobs(size_t n) { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); LOG(INFO) << "The size of the history of the jobs engine is set to: " << n << " job(s)"; maxCompletedJobs_ = n; ForgetOldCompletedJobs(); } size_t JobsRegistry::GetMaxCompletedJobs() { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); return maxCompletedJobs_; } void JobsRegistry::ListJobs(std::set<std::string>& target) { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); for (JobsIndex::const_iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) { target.insert(it->first); } } bool JobsRegistry::GetJobInfo(JobInfo& target, const std::string& id) { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); JobsIndex::const_iterator found = jobsIndex_.find(id); if (found == jobsIndex_.end()) { return false; } else { const JobHandler& handler = *found->second; target = JobInfo(handler.GetId(), handler.GetPriority(), handler.GetState(), handler.GetLastStatus(), handler.GetCreationTime(), handler.GetLastStateChangeTime(), handler.GetRuntime()); return true; } } bool JobsRegistry::DeleteJobInfo(const std::string& id) { LOG(INFO) << "Deleting job: " << id; boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); JobsIndex::iterator found = jobsIndex_.find(id); if (found == jobsIndex_.end()) { LOG(WARNING) << "Unknown job to delete: " << id; return false; } else { for (CompletedJobs::iterator it = completedJobs_.begin(); it != completedJobs_.end(); ++it) { if (*it == found->second) { found->second->GetJob().DeleteAllOutputs(); delete found->second; completedJobs_.erase(it); jobsIndex_.erase(id); return true; } } LOG(WARNING) << "Can not delete a job that is not complete: " << id; return false; } } bool JobsRegistry::GetJobOutput(std::string& output, MimeType& mime, std::string& filename, const std::string& job, const std::string& key) { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); JobsIndex::const_iterator found = jobsIndex_.find(job); if (found == jobsIndex_.end()) { return false; } else { const JobHandler& handler = *found->second; if (handler.GetState() == JobState_Success) { return handler.GetJob().GetOutput(output, mime, filename, key); } else { return false; } } } bool JobsRegistry::DeleteJobOutput(const std::string& job, const std::string& key) { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); JobsIndex::const_iterator found = jobsIndex_.find(job); if (found == jobsIndex_.end()) { return false; } else { const JobHandler& handler = *found->second; if (handler.GetState() == JobState_Success) { return handler.GetJob().DeleteOutput(key); } else { return false; } } } void JobsRegistry::SubmitInternal(std::string& id, JobHandler* handler) { if (handler == NULL) { throw OrthancException(ErrorCode_NullPointer); } std::unique_ptr<JobHandler> protection(handler); { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); id = handler->GetId(); int priority = handler->GetPriority(); jobsIndex_.insert(std::make_pair(id, protection.release())); switch (handler->GetState()) { case JobState_Pending: case JobState_Retry: case JobState_Running: handler->SetState(JobState_Pending); pendingJobs_.push(handler); pendingJobAvailable_.notify_one(); break; case JobState_Success: SetCompletedJob(*handler, true); break; case JobState_Failure: SetCompletedJob(*handler, false); break; case JobState_Paused: break; default: { std::string details = ("A job should not be loaded from state: " + std::string(EnumerationToString(handler->GetState()))); throw OrthancException(ErrorCode_InternalError, details); } } LOG(INFO) << "New job submitted with priority " << priority << ": " << id; if (observer_ != NULL) { observer_->SignalJobSubmitted(id); } // WARNING: The following call might make "handler" invalid if // the job history size is empty ForgetOldCompletedJobs(); } } JobsRegistry::JobsRegistry(size_t maxCompletedJobs) : maxCompletedJobs_(maxCompletedJobs), observer_(NULL) { } void JobsRegistry::Submit(std::string& id, IJob* job, // Takes ownership int priority) { SubmitInternal(id, new JobHandler(job, priority)); } void JobsRegistry::Submit(IJob* job, // Takes ownership int priority) { std::string id; SubmitInternal(id, new JobHandler(job, priority)); } void JobsRegistry::SubmitAndWait(Json::Value& successContent, IJob* job, // Takes ownership int priority) { std::string id; Submit(id, job, priority); JobState state = JobState_Pending; // Dummy initialization { boost::mutex::scoped_lock lock(mutex_); for (;;) { if (!GetStateInternal(state, id)) { // Job has finished and has been lost (typically happens if // "JobsHistorySize" is 0) throw OrthancException(ErrorCode_InexistentItem, "Cannot retrieve the status of the job, " "make sure that \"JobsHistorySize\" is not 0"); } else if (state == JobState_Failure) { // Failure JobsIndex::const_iterator it = jobsIndex_.find(id); if (it != jobsIndex_.end()) // Should always be true, already tested in GetStateInternal() { ErrorCode code = it->second->GetLastStatus().GetErrorCode(); const std::string& details = it->second->GetLastStatus().GetDetails(); if (details.empty()) { throw OrthancException(code); } else { throw OrthancException(code, details); } } else { throw OrthancException(ErrorCode_InternalError); } } else if (state == JobState_Success) { // Success, try and retrieve the status of the job JobsIndex::const_iterator it = jobsIndex_.find(id); if (it == jobsIndex_.end()) { // Should not happen state = JobState_Failure; } else { const JobStatus& status = it->second->GetLastStatus(); successContent = status.GetPublicContent(); } return; } else { // This job has not finished yet, wait for new completion someJobComplete_.wait(lock); } } } } bool JobsRegistry::SetPriority(const std::string& id, int priority) { LOG(INFO) << "Changing priority to " << priority << " for job: " << id; boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); JobsIndex::iterator found = jobsIndex_.find(id); if (found == jobsIndex_.end()) { LOG(WARNING) << "Unknown job: " << id; return false; } else { found->second->SetPriority(priority); if (found->second->GetState() == JobState_Pending) { // If the job is pending, we need to reconstruct the // priority queue, as the heap condition has changed PendingJobs copy; std::swap(copy, pendingJobs_); assert(pendingJobs_.empty()); while (!copy.empty()) { pendingJobs_.push(copy.top()); copy.pop(); } } CheckInvariants(); return true; } } void JobsRegistry::RemovePendingJob(const std::string& id) { // If the job is pending, we need to reconstruct the priority // queue to remove it PendingJobs copy; std::swap(copy, pendingJobs_); assert(pendingJobs_.empty()); while (!copy.empty()) { if (copy.top()->GetId() != id) { pendingJobs_.push(copy.top()); } copy.pop(); } } void JobsRegistry::RemoveRetryJob(JobHandler* handler) { RetryJobs::iterator item = retryJobs_.find(handler); assert(item != retryJobs_.end()); retryJobs_.erase(item); } bool JobsRegistry::Pause(const std::string& id) { LOG(INFO) << "Pausing job: " << id; boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); JobsIndex::iterator found = jobsIndex_.find(id); if (found == jobsIndex_.end()) { LOG(WARNING) << "Unknown job: " << id; return false; } else { switch (found->second->GetState()) { case JobState_Pending: RemovePendingJob(id); found->second->SetState(JobState_Paused); break; case JobState_Retry: RemoveRetryJob(found->second); found->second->SetState(JobState_Paused); break; case JobState_Paused: case JobState_Success: case JobState_Failure: // Nothing to be done break; case JobState_Running: found->second->SchedulePause(); break; default: throw OrthancException(ErrorCode_InternalError); } CheckInvariants(); return true; } } bool JobsRegistry::Cancel(const std::string& id) { LOG(INFO) << "Canceling job: " << id; boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); JobsIndex::iterator found = jobsIndex_.find(id); if (found == jobsIndex_.end()) { LOG(WARNING) << "Unknown job: " << id; return false; } else { switch (found->second->GetState()) { case JobState_Pending: RemovePendingJob(id); SetCompletedJob(*found->second, false); found->second->SetLastErrorCode(ErrorCode_CanceledJob); break; case JobState_Retry: RemoveRetryJob(found->second); SetCompletedJob(*found->second, false); found->second->SetLastErrorCode(ErrorCode_CanceledJob); break; case JobState_Paused: SetCompletedJob(*found->second, false); found->second->SetLastErrorCode(ErrorCode_CanceledJob); break; case JobState_Success: case JobState_Failure: // Nothing to be done break; case JobState_Running: found->second->ScheduleCancel(); break; default: throw OrthancException(ErrorCode_InternalError); } // WARNING: The following call might make "handler" invalid if // the job history size is empty ForgetOldCompletedJobs(); return true; } } bool JobsRegistry::Resume(const std::string& id) { LOG(INFO) << "Resuming job: " << id; boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); JobsIndex::iterator found = jobsIndex_.find(id); if (found == jobsIndex_.end()) { LOG(WARNING) << "Unknown job: " << id; return false; } else if (found->second->GetState() != JobState_Paused) { LOG(WARNING) << "Cannot resume a job that is not paused: " << id; return false; } else { found->second->SetState(JobState_Pending); pendingJobs_.push(found->second); pendingJobAvailable_.notify_one(); CheckInvariants(); return true; } } bool JobsRegistry::Resubmit(const std::string& id) { LOG(INFO) << "Resubmitting failed job: " << id; boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); JobsIndex::iterator found = jobsIndex_.find(id); if (found == jobsIndex_.end()) { LOG(WARNING) << "Unknown job: " << id; return false; } else if (found->second->GetState() != JobState_Failure) { LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; return false; } else { found->second->GetJob().Reset(); bool ok = false; for (CompletedJobs::iterator it = completedJobs_.begin(); it != completedJobs_.end(); ++it) { if (*it == found->second) { ok = true; completedJobs_.erase(it); break; } } (void) ok; // Remove warning about unused variable in release builds assert(ok); found->second->ResetRuntime(); found->second->SetState(JobState_Pending); pendingJobs_.push(found->second); pendingJobAvailable_.notify_one(); CheckInvariants(); return true; } } void JobsRegistry::ScheduleRetries() { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); RetryJobs copy; std::swap(copy, retryJobs_); const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); assert(retryJobs_.empty()); for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) { if ((*it)->IsRetryReady(now)) { LOG(INFO) << "Retrying job: " << (*it)->GetId(); (*it)->SetState(JobState_Pending); pendingJobs_.push(*it); pendingJobAvailable_.notify_one(); } else { retryJobs_.insert(*it); } } CheckInvariants(); } bool JobsRegistry::GetState(JobState& state, const std::string& id) { boost::mutex::scoped_lock lock(mutex_); return GetStateInternal(state, id); } void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer) { boost::mutex::scoped_lock lock(mutex_); observer_ = &observer; } void JobsRegistry::ResetObserver() { boost::mutex::scoped_lock lock(mutex_); observer_ = NULL; } JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, unsigned int timeout) : registry_(registry), handler_(NULL), targetState_(JobState_Failure), targetRetryTimeout_(0), canceled_(false) { { boost::mutex::scoped_lock lock(registry_.mutex_); while (registry_.pendingJobs_.empty()) { if (timeout == 0) { registry_.pendingJobAvailable_.wait(lock); } else { bool success = registry_.pendingJobAvailable_.timed_wait (lock, boost::posix_time::milliseconds(timeout)); if (!success) { // No pending job return; } } } handler_ = registry_.pendingJobs_.top(); registry_.pendingJobs_.pop(); assert(handler_->GetState() == JobState_Pending); handler_->SetState(JobState_Running); handler_->SetLastErrorCode(ErrorCode_Success); job_ = &handler_->GetJob(); id_ = handler_->GetId(); priority_ = handler_->GetPriority(); } } JobsRegistry::RunningJob::~RunningJob() { if (IsValid()) { boost::mutex::scoped_lock lock(registry_.mutex_); switch (targetState_) { case JobState_Failure: registry_.MarkRunningAsCompleted (*handler_, canceled_ ? CompletedReason_Canceled : CompletedReason_Failure); break; case JobState_Success: registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success); break; case JobState_Paused: registry_.MarkRunningAsPaused(*handler_); break; case JobState_Retry: registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); break; default: assert(0); } } } bool JobsRegistry::RunningJob::IsValid() const { return (handler_ != NULL && job_ != NULL); } const std::string& JobsRegistry::RunningJob::GetId() const { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { return id_; } } int JobsRegistry::RunningJob::GetPriority() const { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { return priority_; } } IJob& JobsRegistry::RunningJob::GetJob() { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { return *job_; } } bool JobsRegistry::RunningJob::IsPauseScheduled() { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { boost::mutex::scoped_lock lock(registry_.mutex_); registry_.CheckInvariants(); assert(handler_->GetState() == JobState_Running); return handler_->IsPauseScheduled(); } } bool JobsRegistry::RunningJob::IsCancelScheduled() { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { boost::mutex::scoped_lock lock(registry_.mutex_); registry_.CheckInvariants(); assert(handler_->GetState() == JobState_Running); return handler_->IsCancelScheduled(); } } void JobsRegistry::RunningJob::MarkSuccess() { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { targetState_ = JobState_Success; } } void JobsRegistry::RunningJob::MarkFailure() { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { targetState_ = JobState_Failure; } } void JobsRegistry::RunningJob::MarkCanceled() { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { targetState_ = JobState_Failure; canceled_ = true; } } void JobsRegistry::RunningJob::MarkPause() { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { targetState_ = JobState_Paused; } } void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout) { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { targetState_ = JobState_Retry; targetRetryTimeout_ = timeout; } } void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code, const std::string& details) { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { JobStatus status(code, details, *job_); boost::mutex::scoped_lock lock(registry_.mutex_); registry_.CheckInvariants(); assert(handler_->GetState() == JobState_Running); handler_->SetLastStatus(status); } } void JobsRegistry::Serialize(Json::Value& target) { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); target = Json::objectValue; target[TYPE] = JOBS_REGISTRY; target[JOBS] = Json::objectValue; for (JobsIndex::const_iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) { Json::Value v; if (it->second->Serialize(v)) { target[JOBS][it->first] = v; } } } JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, const Json::Value& s, size_t maxCompletedJobs) : maxCompletedJobs_(maxCompletedJobs), observer_(NULL) { if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || !s.isMember(JOBS) || s[JOBS].type() != Json::objectValue) { throw OrthancException(ErrorCode_BadFileFormat); } Json::Value::Members members = s[JOBS].getMemberNames(); for (Json::Value::Members::const_iterator it = members.begin(); it != members.end(); ++it) { std::unique_ptr<JobHandler> job; try { job.reset(new JobHandler(unserializer, s[JOBS][*it], *it)); } catch (OrthancException& e) { LOG(WARNING) << "Cannot unserialize one job from previous execution, " << "skipping it: " << e.What(); continue; } const boost::posix_time::ptime lastChangeTime = job->GetLastStateChangeTime(); std::string id; SubmitInternal(id, job.release()); // Check whether the job has not been removed (which could be // the case if the "maxCompletedJobs_" value gets smaller) JobsIndex::iterator found = jobsIndex_.find(id); if (found != jobsIndex_.end()) { // The job still lies in the history: Update the time of its // last change to the time that was serialized assert(found->second != NULL); found->second->SetLastStateChangeTime(lastChangeTime); } } } void JobsRegistry::GetStatistics(unsigned int& pending, unsigned int& running, unsigned int& success, unsigned int& failed) { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); pending = 0; running = 0; success = 0; failed = 0; for (JobsIndex::const_iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) { JobHandler& job = *it->second; switch (job.GetState()) { case JobState_Retry: case JobState_Pending: pending ++; break; case JobState_Paused: case JobState_Running: running ++; break; case JobState_Success: success ++; break; case JobState_Failure: failed ++; break; default: throw OrthancException(ErrorCode_InternalError); } } } }