Mercurial > hg > orthanc
changeset 2581:8da2cffc2378 jobs
JobsRegistry::Cancel()
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 11 May 2018 17:33:19 +0200 |
parents | 055d7d4a823f |
children | b3da733d984c |
files | Core/Enumerations.cpp Core/Enumerations.h Core/JobsEngine/JobStatus.h Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsRegistry.cpp Core/JobsEngine/JobsRegistry.h OrthancServer/main.cpp Plugins/Include/orthanc/OrthancCPlugin.h Resources/ErrorCodes.json Resources/ImplementationNotes/JobsEngineStates.dot Resources/ImplementationNotes/JobsEngineStates.pdf UnitTestsSources/MultiThreadingTests.cpp |
diffstat | 12 files changed, 316 insertions(+), 37 deletions(-) [+] |
line wrap: on
line diff
--- a/Core/Enumerations.cpp Fri May 11 09:35:17 2018 +0200 +++ b/Core/Enumerations.cpp Fri May 11 17:33:19 2018 +0200 @@ -164,6 +164,9 @@ case ErrorCode_DatabaseUnavailable: return "The database is currently not available (probably a transient situation)"; + case ErrorCode_CanceledJob: + return "This job was canceled"; + case ErrorCode_SQLiteNotOpened: return "SQLite: The database is not opened";
--- a/Core/Enumerations.h Fri May 11 09:35:17 2018 +0200 +++ b/Core/Enumerations.h Fri May 11 17:33:19 2018 +0200 @@ -96,6 +96,7 @@ ErrorCode_NotAcceptable = 34 /*!< Cannot send a response which is acceptable according to the Accept HTTP header */, ErrorCode_NullPointer = 35 /*!< Cannot handle a NULL pointer */, ErrorCode_DatabaseUnavailable = 36 /*!< The database is currently not available (probably a transient situation) */, + ErrorCode_CanceledJob = 37 /*!< This job was canceled */, ErrorCode_SQLiteNotOpened = 1000 /*!< SQLite: The database is not opened */, ErrorCode_SQLiteAlreadyOpened = 1001 /*!< SQLite: Connection is already open */, ErrorCode_SQLiteCannotOpen = 1002 /*!< SQLite: Unable to open the database */,
--- a/Core/JobsEngine/JobStatus.h Fri May 11 09:35:17 2018 +0200 +++ b/Core/JobsEngine/JobStatus.h Fri May 11 17:33:19 2018 +0200 @@ -57,6 +57,11 @@ return errorCode_; } + void SetErrorCode(ErrorCode error) + { + errorCode_ = error; + } + float GetProgress() const { return progress_;
--- a/Core/JobsEngine/JobsEngine.cpp Fri May 11 09:35:17 2018 +0200 +++ b/Core/JobsEngine/JobsEngine.cpp Fri May 11 17:33:19 2018 +0200 @@ -60,6 +60,13 @@ return false; } + if (running.IsCancelScheduled()) + { + running.GetJob().ReleaseResources(); + running.MarkCanceled(); + return false; + } + std::auto_ptr<JobStepResult> result; {
--- a/Core/JobsEngine/JobsRegistry.cpp Fri May 11 09:35:17 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.cpp Fri May 11 17:33:19 2018 +0200 @@ -52,6 +52,7 @@ boost::posix_time::time_duration runtime_; boost::posix_time::ptime retryTime_; bool pauseScheduled_; + bool cancelScheduled_; JobStatus lastStatus_; void Touch() @@ -70,6 +71,7 @@ { state_ = state; pauseScheduled_ = false; + cancelScheduled_ = false; Touch(); } @@ -84,7 +86,8 @@ lastStateChangeTime_(creationTime_), runtime_(boost::posix_time::milliseconds(0)), retryTime_(creationTime_), - pauseScheduled_(false) + pauseScheduled_(false), + cancelScheduled_(false) { if (job == NULL) { @@ -162,11 +165,29 @@ } } + void ScheduleCancel() + { + if (state_ == JobState_Running) + { + cancelScheduled_ = true; + } + else + { + // Only valid for running jobs + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + bool IsPauseScheduled() { return pauseScheduled_; } + bool IsCancelScheduled() + { + return cancelScheduled_; + } + bool IsRetryReady(const boost::posix_time::ptime& now) const { if (state_ != JobState_Retry) @@ -204,6 +225,11 @@ lastStatus_ = status; Touch(); } + + void SetLastErrorCode(ErrorCode code) + { + lastStatus_.SetErrorCode(code); + } }; @@ -335,6 +361,18 @@ } + void JobsRegistry::SetCompletedJob(JobHandler& job, + bool success) + { + job.SetState(success ? JobState_Success : JobState_Failure); + + completedJobs_.push_back(&job); + ForgetOldCompletedJobs(); + + someJobComplete_.notify_all(); + } + + void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, bool success) { @@ -342,14 +380,9 @@ << ": " << job.GetId(); CheckInvariants(); + assert(job.GetState() == JobState_Running); - - job.SetState(success ? JobState_Success : JobState_Failure); - - completedJobs_.push_back(&job); - ForgetOldCompletedJobs(); - - someJobComplete_.notify_all(); + SetCompletedJob(job, success); CheckInvariants(); } @@ -501,8 +534,6 @@ std::string id; Submit(id, job, priority); - printf(">> %s\n", id.c_str()); fflush(stdout); - JobState state; { @@ -561,6 +592,34 @@ } + void JobsRegistry::RemovePendingJob(const std::string& id) + { + // If the job is pending, we need to reconstruct the priority + // queue to remove it + PendingJobs copy; + std::swap(copy, pendingJobs_); + + assert(pendingJobs_.empty()); + while (!copy.empty()) + { + if (copy.top()->GetId() != id) + { + pendingJobs_.push(copy.top()); + } + + copy.pop(); + } + } + + + void JobsRegistry::RemoveRetryJob(JobHandler* handler) + { + RetryJobs::iterator item = retryJobs_.find(handler); + assert(item != retryJobs_.end()); + retryJobs_.erase(item); + } + + bool JobsRegistry::Pause(const std::string& id) { LOG(INFO) << "Pausing job: " << id; @@ -580,38 +639,14 @@ switch (found->second->GetState()) { case JobState_Pending: - { - // If the job is pending, we need to reconstruct the - // priority queue to remove it - PendingJobs copy; - std::swap(copy, pendingJobs_); - - assert(pendingJobs_.empty()); - while (!copy.empty()) - { - if (copy.top()->GetId() != id) - { - pendingJobs_.push(copy.top()); - } - - copy.pop(); - } - + RemovePendingJob(id); found->second->SetState(JobState_Paused); - break; - } case JobState_Retry: - { - RetryJobs::iterator item = retryJobs_.find(found->second); - assert(item != retryJobs_.end()); - retryJobs_.erase(item); - + RemoveRetryJob(found->second); found->second->SetState(JobState_Paused); - break; - } case JobState_Paused: case JobState_Success: @@ -633,6 +668,60 @@ } + bool JobsRegistry::Cancel(const std::string& id) + { + LOG(INFO) << "Canceling job: " << id; + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + return false; + } + else + { + switch (found->second->GetState()) + { + case JobState_Pending: + RemovePendingJob(id); + SetCompletedJob(*found->second, false); + found->second->SetLastErrorCode(ErrorCode_CanceledJob); + break; + + case JobState_Retry: + RemoveRetryJob(found->second); + SetCompletedJob(*found->second, false); + found->second->SetLastErrorCode(ErrorCode_CanceledJob); + break; + + case JobState_Paused: + SetCompletedJob(*found->second, false); + found->second->SetLastErrorCode(ErrorCode_CanceledJob); + break; + + case JobState_Success: + case JobState_Failure: + // Nothing to be done + break; + + case JobState_Running: + found->second->ScheduleCancel(); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + + CheckInvariants(); + return true; + } + } + + bool JobsRegistry::Resume(const std::string& id) { LOG(INFO) << "Resuming job: " << id; @@ -751,7 +840,8 @@ registry_(registry), handler_(NULL), targetState_(JobState_Failure), - targetRetryTimeout_(0) + targetRetryTimeout_(0), + canceled_(false) { { boost::mutex::scoped_lock lock(registry_.mutex_); @@ -779,6 +869,7 @@ assert(handler_->GetState() == JobState_Pending); handler_->SetState(JobState_Running); + handler_->SetLastErrorCode(ErrorCode_Success); job_ = &handler_->GetJob(); id_ = handler_->GetId(); @@ -797,6 +888,12 @@ { case JobState_Failure: registry_.MarkRunningAsCompleted(*handler_, false); + + if (canceled_) + { + handler_->SetLastErrorCode(ErrorCode_CanceledJob); + } + break; case JobState_Success: @@ -881,6 +978,23 @@ } + bool JobsRegistry::RunningJob::IsCancelScheduled() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + boost::mutex::scoped_lock lock(registry_.mutex_); + registry_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + return handler_->IsCancelScheduled(); + } + } + + void JobsRegistry::RunningJob::MarkSuccess() { if (!IsValid()) @@ -907,6 +1021,20 @@ } + void JobsRegistry::RunningJob::MarkCanceled() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Failure; + canceled_ = true; + } + } + + void JobsRegistry::RunningJob::MarkPause() { if (!IsValid())
--- a/Core/JobsEngine/JobsRegistry.h Fri May 11 09:35:17 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.h Fri May 11 17:33:19 2018 +0200 @@ -93,6 +93,9 @@ void ForgetOldCompletedJobs(); + void SetCompletedJob(JobHandler& job, + bool success); + void MarkRunningAsCompleted(JobHandler& job, bool success); @@ -104,6 +107,10 @@ bool GetStateInternal(JobState& state, const std::string& id); + void RemovePendingJob(const std::string& id); + + void RemoveRetryJob(JobHandler* handler); + public: JobsRegistry() : maxCompletedJobs_(10) @@ -138,6 +145,8 @@ bool Resume(const std::string& id); bool Resubmit(const std::string& id); + + bool Cancel(const std::string& id); void ScheduleRetries(); @@ -158,6 +167,7 @@ int priority_; JobState targetState_; unsigned int targetRetryTimeout_; + bool canceled_; public: RunningJob(JobsRegistry& registry, @@ -175,12 +185,16 @@ bool IsPauseScheduled(); + bool IsCancelScheduled(); + void MarkSuccess(); void MarkFailure(); void MarkPause(); + void MarkCanceled(); + void MarkRetry(unsigned int timeout); void UpdateStatus(ErrorCode code);
--- a/OrthancServer/main.cpp Fri May 11 09:35:17 2018 +0200 +++ b/OrthancServer/main.cpp Fri May 11 17:33:19 2018 +0200 @@ -574,6 +574,7 @@ PrintErrorCode(ErrorCode_NotAcceptable, "Cannot send a response which is acceptable according to the Accept HTTP header"); PrintErrorCode(ErrorCode_NullPointer, "Cannot handle a NULL pointer"); PrintErrorCode(ErrorCode_DatabaseUnavailable, "The database is currently not available (probably a transient situation)"); + PrintErrorCode(ErrorCode_CanceledJob, "This job was canceled"); PrintErrorCode(ErrorCode_SQLiteNotOpened, "SQLite: The database is not opened"); PrintErrorCode(ErrorCode_SQLiteAlreadyOpened, "SQLite: Connection is already open"); PrintErrorCode(ErrorCode_SQLiteCannotOpen, "SQLite: Unable to open the database");
--- a/Plugins/Include/orthanc/OrthancCPlugin.h Fri May 11 09:35:17 2018 +0200 +++ b/Plugins/Include/orthanc/OrthancCPlugin.h Fri May 11 17:33:19 2018 +0200 @@ -235,6 +235,7 @@ OrthancPluginErrorCode_NotAcceptable = 34 /*!< Cannot send a response which is acceptable according to the Accept HTTP header */, OrthancPluginErrorCode_NullPointer = 35 /*!< Cannot handle a NULL pointer */, OrthancPluginErrorCode_DatabaseUnavailable = 36 /*!< The database is currently not available (probably a transient situation) */, + OrthancPluginErrorCode_CanceledJob = 37 /*!< This job was canceled */, OrthancPluginErrorCode_SQLiteNotOpened = 1000 /*!< SQLite: The database is not opened */, OrthancPluginErrorCode_SQLiteAlreadyOpened = 1001 /*!< SQLite: Connection is already open */, OrthancPluginErrorCode_SQLiteCannotOpen = 1002 /*!< SQLite: Unable to open the database */,
--- a/Resources/ErrorCodes.json Fri May 11 09:35:17 2018 +0200 +++ b/Resources/ErrorCodes.json Fri May 11 17:33:19 2018 +0200 @@ -207,6 +207,11 @@ "Name": "DatabaseUnavailable", "Description": "The database is currently not available (probably a transient situation)" }, + { + "Code": 37, + "Name": "CanceledJob", + "Description": "This job was canceled" + },
--- a/Resources/ImplementationNotes/JobsEngineStates.dot Fri May 11 09:35:17 2018 +0200 +++ b/Resources/ImplementationNotes/JobsEngineStates.dot Fri May 11 17:33:19 2018 +0200 @@ -20,4 +20,9 @@ pending -> paused [label="Pause()" fontcolor="red"]; retry -> paused [label="Pause()" fontcolor="red"]; running -> paused [label="Pause()" fontcolor="red"]; + + paused -> failure [label="Cancel()" fontcolor="red"]; + pending -> failure [label="Cancel()" fontcolor="red"]; + retry -> failure [label="Cancel()" fontcolor="red"]; + running -> failure [label="Cancel()" fontcolor="red"]; }
--- a/UnitTestsSources/MultiThreadingTests.cpp Fri May 11 09:35:17 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri May 11 17:33:19 2018 +0200 @@ -323,6 +323,22 @@ } +static bool CheckErrorCode(Orthanc::JobsRegistry& registry, + const std::string& id, + Orthanc::ErrorCode code) +{ + Orthanc::JobInfo s; + if (registry.GetJobInfo(s, id)) + { + return code == s.GetStatus().GetErrorCode(); + } + else + { + return false; + } +} + + TEST(JobsRegistry, Priority) { JobsRegistry registry; @@ -611,6 +627,99 @@ } +TEST(JobsRegistry, Cancel) +{ + JobsRegistry registry; + + std::string id; + registry.Submit(id, new DummyJob(), 10); + + ASSERT_FALSE(registry.Cancel("nope")); + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success)); + + ASSERT_TRUE(registry.Cancel(id)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); + + ASSERT_TRUE(registry.Cancel(id)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); + + ASSERT_TRUE(registry.Resubmit(id)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); + + { + JobsRegistry::RunningJob job(registry, 0); + ASSERT_TRUE(job.IsValid()); + + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success)); + + job.MarkSuccess(); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success)); + + ASSERT_TRUE(registry.Cancel(id)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success)); + + registry.Submit(id, new DummyJob(), 10); + + { + JobsRegistry::RunningJob job(registry, 0); + ASSERT_TRUE(job.IsValid()); + ASSERT_EQ(id, job.GetId()); + + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); + + job.MarkCanceled(); + } + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); + + ASSERT_TRUE(registry.Resubmit(id)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); + + ASSERT_TRUE(registry.Pause(id)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); + + ASSERT_TRUE(registry.Cancel(id)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); + + ASSERT_TRUE(registry.Resubmit(id)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); + + { + JobsRegistry::RunningJob job(registry, 0); + ASSERT_TRUE(job.IsValid()); + ASSERT_EQ(id, job.GetId()); + + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); + + job.MarkRetry(500); + } + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success)); + + ASSERT_TRUE(registry.Cancel(id)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure)); + ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); +} + + TEST(JobsEngine, Basic)