Mercurial > hg > orthanc
changeset 2559:9b7680dee75d jobs
cont
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 03 May 2018 18:45:47 +0200 |
parents | 57f81b988713 |
children | 7d4a3eca96af |
files | Resources/ImplementationNotes/JobsEngineStates.dot UnitTestsSources/MultiThreadingTests.cpp |
diffstat | 2 files changed, 227 insertions(+), 38 deletions(-) [+] |
line wrap: on
line diff
--- a/Resources/ImplementationNotes/JobsEngineStates.dot Thu May 03 15:24:33 2018 +0200 +++ b/Resources/ImplementationNotes/JobsEngineStates.dot Thu May 03 18:45:47 2018 +0200 @@ -20,4 +20,4 @@ pending -> paused [label="Pause()" fontcolor="red"]; retry -> paused [label="Pause()" fontcolor="red"]; running -> paused [label="Pause()" fontcolor="red"]; - } +}
--- a/UnitTestsSources/MultiThreadingTests.cpp Thu May 03 15:24:33 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Thu May 03 18:45:47 2018 +0200 @@ -274,6 +274,7 @@ #include "../Core/Logging.h" +#include <boost/math/special_functions/round.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <queue> @@ -356,18 +357,121 @@ }; + class JobInfo + { + private: + std::string id_; + int priority_; + ErrorCode errorCode_; + JobState state_; + float progress_; + boost::posix_time::ptime infoTime_; + boost::posix_time::ptime creationTime_; + boost::posix_time::time_duration runtime_; + boost::posix_time::ptime eta_; + Json::Value status_; + + public: + JobInfo(const std::string& id, + int priority, + ErrorCode errorCode, + JobState state, + float progress, + const boost::posix_time::ptime& creationTime, + const boost::posix_time::time_duration& runtime) : + id_(id), + priority_(priority), + errorCode_(errorCode), + state_(state), + progress_(progress), + infoTime_(boost::posix_time::microsec_clock::universal_time()), + creationTime_(creationTime), + runtime_(runtime) + { + if (progress < 0 || + progress > 1) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + + float r = static_cast<float>(runtime_.total_milliseconds()); + + eta_ = infoTime_ + boost::posix_time::milliseconds(boost::math::llround(1.0f - progress) * r); + } + + const std::string& GetIdentifier() const + { + return id_; + } + + int GetPriority() const + { + return priority_; + } + + ErrorCode GetErrorCode() const + { + return errorCode_; + } + + JobState GetState() const + { + return state_; + } + + float GetProgress() const + { + return progress_; + } + + const boost::posix_time::ptime& GetInfoTime() const + { + return infoTime_; + } + + const boost::posix_time::ptime& GetCreationTime() const + { + return creationTime_; + } + + const boost::posix_time::time_duration& GetRuntime() const + { + return runtime_; + } + + const boost::posix_time::ptime& GetEstimatedTimeOfArrival() const + { + return eta_; + } + + const Json::Value& GetStatus() const + { + return status_; + } + + Json::Value& GetStatus() + { + return status_; + } + }; + + class JobHandler : public boost::noncopyable { private: - std::string id_; - JobState state_; - std::auto_ptr<IJob> job_; - int priority_; // "+inf()" means highest priority - boost::posix_time::ptime creationTime_; - boost::posix_time::ptime lastUpdateTime_; - boost::posix_time::ptime retryTime_; - uint64_t runtime_; // In milliseconds - bool pauseScheduled_; + std::string id_; + JobState state_; + boost::mutex jobMutex_; + std::auto_ptr<IJob> job_; + int priority_; // "+inf()" means highest priority + boost::posix_time::ptime creationTime_; + boost::posix_time::ptime lastStateChangeTime; + boost::posix_time::ptime retryTime_; + boost::posix_time::time_duration runtime_; + bool pauseScheduled_; + ErrorCode lastErrorCode_; + float lastProgress_; + Json::Value lastStatus_; void SetStateInternal(JobState state) { @@ -375,11 +479,11 @@ if (state_ == JobState_Running) { - runtime_ += (now - lastUpdateTime_).total_milliseconds(); + runtime_ += (now - lastStateChangeTime); } state_ = state; - lastUpdateTime_ = now; + lastStateChangeTime = now; pauseScheduled_ = false; } @@ -391,9 +495,11 @@ job_(job), priority_(priority), creationTime_(boost::posix_time::microsec_clock::universal_time()), - lastUpdateTime_(creationTime_), - runtime_(0), - pauseScheduled_(false) + lastStateChangeTime(creationTime_), + runtime_(boost::posix_time::milliseconds(0)), + pauseScheduled_(false), + lastErrorCode_(ErrorCode_Success), + lastProgress_(0) { if (job == NULL) { @@ -406,12 +512,6 @@ return id_; } - IJob& GetJob() const - { - assert(job_.get() != NULL); - return *job_; - } - void SetPriority(int priority) { priority_ = priority; @@ -484,6 +584,36 @@ return retryTime_ <= now; } } + + class JobLock + { + private: + boost::mutex::scoped_lock lock_; + JobHandler& handler_; + + public: + JobLock(JobHandler& handler) : + lock_(handler.jobMutex_), + handler_(handler) + { + } + + IJob& GetJob() + { + return *handler_.job_; + } + + void UpdateStatus() + { + handler_.lastProgress_ = handler_.job_->GetProgress(); + handler_.job_->FormatStatus(handler_.lastStatus_); + } + + void SetLastErrorCode(ErrorCode code) + { + handler_.lastErrorCode_ = code; + } + }; }; @@ -1019,9 +1149,9 @@ { private: JobsRegistry& that_; - JobHandler* handler_; - JobState targetState_; - unsigned int retryTimeout_; + JobHandler* handler_; + JobState targetState_; + unsigned int retryTimeout_; public: RunningJob(JobsRegistry& that, @@ -1105,20 +1235,6 @@ return handler_->IsPauseScheduled(); } - IJob& GetJob() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - boost::mutex::scoped_lock lock(that_.mutex_); - that_.CheckInvariants(); - assert(handler_->GetState() == JobState_Running); - - return handler_->GetJob(); - } - void MarkSuccess() { if (!IsValid()) @@ -1159,6 +1275,79 @@ targetState_ = JobState_Retry; retryTimeout_ = timeout; } + + void ExecuteStep() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + if (handler_->IsPauseScheduled()) + { + targetState_ = JobState_Paused; + return; + } + + std::auto_ptr<JobStepResult> result; + + { + JobHandler::JobLock lock(*handler_); + + bool ok = false; + + try + { + result.reset(lock.GetJob().ExecuteStep()); + lock.UpdateStatus(); + ok = true; + } + catch (OrthancException& e) + { + lock.SetLastErrorCode(e.GetErrorCode()); + } + catch (boost::bad_lexical_cast&) + { + lock.SetLastErrorCode(ErrorCode_BadFileFormat); + } + catch (...) + { + lock.SetLastErrorCode(ErrorCode_InternalError); + } + + if (ok) + { + lock.SetLastErrorCode(ErrorCode_Success); + } + else + { + result.reset(new JobStepResult(JobStepStatus_Failure)); + } + } + + switch (result->GetStatus()) + { + case JobStepStatus_Success: + targetState_ = JobState_Success; + break; + + case JobStepStatus_Failure: + targetState_ = JobState_Failure; + break; + + case JobStepStatus_Continue: + targetState_ = JobState_Running; + break; + + case JobStepStatus_Retry: + targetState_ = JobState_Retry; + retryTimeout_ = dynamic_cast<RetryResult&>(*result).GetTimeout(); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + } }; }; }