Mercurial > hg > orthanc
diff Core/JobsEngine/JobsRegistry.cpp @ 2581:8da2cffc2378 jobs
JobsRegistry::Cancel()
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 11 May 2018 17:33:19 +0200 |
parents | 3372c5255333 |
children | 1b6a6d80b6f2 |
line wrap: on
line diff
--- a/Core/JobsEngine/JobsRegistry.cpp Fri May 11 09:35:17 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.cpp Fri May 11 17:33:19 2018 +0200 @@ -52,6 +52,7 @@ boost::posix_time::time_duration runtime_; boost::posix_time::ptime retryTime_; bool pauseScheduled_; + bool cancelScheduled_; JobStatus lastStatus_; void Touch() @@ -70,6 +71,7 @@ { state_ = state; pauseScheduled_ = false; + cancelScheduled_ = false; Touch(); } @@ -84,7 +86,8 @@ lastStateChangeTime_(creationTime_), runtime_(boost::posix_time::milliseconds(0)), retryTime_(creationTime_), - pauseScheduled_(false) + pauseScheduled_(false), + cancelScheduled_(false) { if (job == NULL) { @@ -162,11 +165,29 @@ } } + void ScheduleCancel() + { + if (state_ == JobState_Running) + { + cancelScheduled_ = true; + } + else + { + // Only valid for running jobs + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + bool IsPauseScheduled() { return pauseScheduled_; } + bool IsCancelScheduled() + { + return cancelScheduled_; + } + bool IsRetryReady(const boost::posix_time::ptime& now) const { if (state_ != JobState_Retry) @@ -204,6 +225,11 @@ lastStatus_ = status; Touch(); } + + void SetLastErrorCode(ErrorCode code) + { + lastStatus_.SetErrorCode(code); + } }; @@ -335,6 +361,18 @@ } + void JobsRegistry::SetCompletedJob(JobHandler& job, + bool success) + { + job.SetState(success ? JobState_Success : JobState_Failure); + + completedJobs_.push_back(&job); + ForgetOldCompletedJobs(); + + someJobComplete_.notify_all(); + } + + void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, bool success) { @@ -342,14 +380,9 @@ << ": " << job.GetId(); CheckInvariants(); + assert(job.GetState() == JobState_Running); - - job.SetState(success ? JobState_Success : JobState_Failure); - - completedJobs_.push_back(&job); - ForgetOldCompletedJobs(); - - someJobComplete_.notify_all(); + SetCompletedJob(job, success); CheckInvariants(); } @@ -501,8 +534,6 @@ std::string id; Submit(id, job, priority); - printf(">> %s\n", id.c_str()); fflush(stdout); - JobState state; { @@ -561,6 +592,34 @@ } + void JobsRegistry::RemovePendingJob(const std::string& id) + { + // If the job is pending, we need to reconstruct the priority + // queue to remove it + PendingJobs copy; + std::swap(copy, pendingJobs_); + + assert(pendingJobs_.empty()); + while (!copy.empty()) + { + if (copy.top()->GetId() != id) + { + pendingJobs_.push(copy.top()); + } + + copy.pop(); + } + } + + + void JobsRegistry::RemoveRetryJob(JobHandler* handler) + { + RetryJobs::iterator item = retryJobs_.find(handler); + assert(item != retryJobs_.end()); + retryJobs_.erase(item); + } + + bool JobsRegistry::Pause(const std::string& id) { LOG(INFO) << "Pausing job: " << id; @@ -580,38 +639,14 @@ switch (found->second->GetState()) { case JobState_Pending: - { - // If the job is pending, we need to reconstruct the - // priority queue to remove it - PendingJobs copy; - std::swap(copy, pendingJobs_); - - assert(pendingJobs_.empty()); - while (!copy.empty()) - { - if (copy.top()->GetId() != id) - { - pendingJobs_.push(copy.top()); - } - - copy.pop(); - } - + RemovePendingJob(id); found->second->SetState(JobState_Paused); - break; - } case JobState_Retry: - { - RetryJobs::iterator item = retryJobs_.find(found->second); - assert(item != retryJobs_.end()); - retryJobs_.erase(item); - + RemoveRetryJob(found->second); found->second->SetState(JobState_Paused); - break; - } case JobState_Paused: case JobState_Success: @@ -633,6 +668,60 @@ } + bool JobsRegistry::Cancel(const std::string& id) + { + LOG(INFO) << "Canceling job: " << id; + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + return false; + } + else + { + switch (found->second->GetState()) + { + case JobState_Pending: + RemovePendingJob(id); + SetCompletedJob(*found->second, false); + found->second->SetLastErrorCode(ErrorCode_CanceledJob); + break; + + case JobState_Retry: + RemoveRetryJob(found->second); + SetCompletedJob(*found->second, false); + found->second->SetLastErrorCode(ErrorCode_CanceledJob); + break; + + case JobState_Paused: + SetCompletedJob(*found->second, false); + found->second->SetLastErrorCode(ErrorCode_CanceledJob); + break; + + case JobState_Success: + case JobState_Failure: + // Nothing to be done + break; + + case JobState_Running: + found->second->ScheduleCancel(); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + + CheckInvariants(); + return true; + } + } + + bool JobsRegistry::Resume(const std::string& id) { LOG(INFO) << "Resuming job: " << id; @@ -751,7 +840,8 @@ registry_(registry), handler_(NULL), targetState_(JobState_Failure), - targetRetryTimeout_(0) + targetRetryTimeout_(0), + canceled_(false) { { boost::mutex::scoped_lock lock(registry_.mutex_); @@ -779,6 +869,7 @@ assert(handler_->GetState() == JobState_Pending); handler_->SetState(JobState_Running); + handler_->SetLastErrorCode(ErrorCode_Success); job_ = &handler_->GetJob(); id_ = handler_->GetId(); @@ -797,6 +888,12 @@ { case JobState_Failure: registry_.MarkRunningAsCompleted(*handler_, false); + + if (canceled_) + { + handler_->SetLastErrorCode(ErrorCode_CanceledJob); + } + break; case JobState_Success: @@ -881,6 +978,23 @@ } + bool JobsRegistry::RunningJob::IsCancelScheduled() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + boost::mutex::scoped_lock lock(registry_.mutex_); + registry_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + return handler_->IsCancelScheduled(); + } + } + + void JobsRegistry::RunningJob::MarkSuccess() { if (!IsValid()) @@ -907,6 +1021,20 @@ } + void JobsRegistry::RunningJob::MarkCanceled() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Failure; + canceled_ = true; + } + } + + void JobsRegistry::RunningJob::MarkPause() { if (!IsValid())