# HG changeset patch # User Sebastien Jodogne # Date 1525447727 -7200 # Node ID 1e66fe3ddf9fa3821bb517a83984a02010eca565 # Parent 7d4a3eca96afdd69a57d85dba5d479bbf7b1b7d1 refactoring diff -r 7d4a3eca96af -r 1e66fe3ddf9f UnitTestsSources/MultiThreadingTests.cpp --- a/UnitTestsSources/MultiThreadingTests.cpp Thu May 03 18:48:20 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri May 04 17:28:47 2018 +0200 @@ -290,22 +290,22 @@ JobState_Retry }; - enum JobStepStatus + enum JobStepCode { - JobStepStatus_Success, - JobStepStatus_Failure, - JobStepStatus_Continue, - JobStepStatus_Retry + JobStepCode_Success, + JobStepCode_Failure, + JobStepCode_Continue, + JobStepCode_Retry }; class JobStepResult { private: - JobStepStatus status_; + JobStepCode status_; public: - explicit JobStepResult(JobStepStatus status) : + explicit JobStepResult(JobStepCode status) : status_(status) { } @@ -314,7 +314,7 @@ { } - JobStepStatus GetStatus() const + JobStepCode GetCode() const { return status_; } @@ -328,7 +328,7 @@ public: RetryResult(unsigned int timeout) : - JobStepResult(JobStepStatus_Retry), + JobStepResult(JobStepCode_Retry), timeout_(timeout) { } @@ -357,6 +357,34 @@ }; + struct JobStatus + { + ErrorCode errorCode_; + float progress_; + Json::Value description_; + + JobStatus() : + errorCode_(ErrorCode_Success), + progress_(0), + description_(Json::objectValue) + { + } + + JobStatus(ErrorCode code, + float progress) : + errorCode_(code), + progress_(progress), + description_(Json::objectValue) + { + if (progress < 0 || + progress > 1) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + } + }; + + class JobInfo { private: @@ -364,39 +392,30 @@ int priority_; ErrorCode errorCode_; JobState state_; - float progress_; boost::posix_time::ptime infoTime_; boost::posix_time::ptime creationTime_; boost::posix_time::time_duration runtime_; boost::posix_time::ptime eta_; - Json::Value status_; + JobStatus status_; public: JobInfo(const std::string& id, int priority, - ErrorCode errorCode, JobState state, - float progress, + const JobStatus& status, const boost::posix_time::ptime& creationTime, const boost::posix_time::time_duration& runtime) : id_(id), priority_(priority), - errorCode_(errorCode), state_(state), - progress_(progress), infoTime_(boost::posix_time::microsec_clock::universal_time()), creationTime_(creationTime), - runtime_(runtime) + runtime_(runtime), + status_(status) { - if (progress < 0 || - progress > 1) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - - float r = static_cast(runtime_.total_milliseconds()); - - eta_ = infoTime_ + boost::posix_time::milliseconds(boost::math::llround(1.0f - progress) * r); + float ms = static_cast(runtime_.total_milliseconds()); + float remaining = boost::math::llround(1.0f - status_.progress_) * ms; + eta_ = infoTime_ + boost::posix_time::milliseconds(remaining); } const std::string& GetIdentifier() const @@ -419,11 +438,6 @@ return state_; } - float GetProgress() const - { - return progress_; - } - const boost::posix_time::ptime& GetInfoTime() const { return infoTime_; @@ -444,12 +458,12 @@ return eta_; } - const Json::Value& GetStatus() const + const JobStatus& GetStatus() const { return status_; } - Json::Value& GetStatus() + JobStatus& GetStatus() { return status_; } @@ -457,21 +471,18 @@ class JobHandler : public boost::noncopyable - { + { private: std::string id_; JobState state_; - boost::mutex jobMutex_; std::auto_ptr job_; int priority_; // "+inf()" means highest priority boost::posix_time::ptime creationTime_; - boost::posix_time::ptime lastStateChangeTime; + boost::posix_time::ptime lastStateChangeTime_; + boost::posix_time::time_duration runtime_; boost::posix_time::ptime retryTime_; - boost::posix_time::time_duration runtime_; bool pauseScheduled_; - ErrorCode lastErrorCode_; - float lastProgress_; - Json::Value lastStatus_; + JobStatus lastStatus_; void SetStateInternal(JobState state) { @@ -479,11 +490,11 @@ if (state_ == JobState_Running) { - runtime_ += (now - lastStateChangeTime); + runtime_ += (now - lastStateChangeTime_); } state_ = state; - lastStateChangeTime = now; + lastStateChangeTime_ = now; pauseScheduled_ = false; } @@ -495,11 +506,10 @@ job_(job), priority_(priority), creationTime_(boost::posix_time::microsec_clock::universal_time()), - lastStateChangeTime(creationTime_), + lastStateChangeTime_(creationTime_), runtime_(boost::posix_time::milliseconds(0)), - pauseScheduled_(false), - lastErrorCode_(ErrorCode_Success), - lastProgress_(0) + retryTime_(creationTime_), + pauseScheduled_(false) { if (job == NULL) { @@ -512,6 +522,12 @@ return id_; } + IJob& GetJob() const + { + assert(job_.get() != NULL); + return *job_; + } + void SetPriority(int priority) { priority_ = priority; @@ -585,35 +601,15 @@ } } - class JobLock + JobStatus& GetLastStatus() { - private: - boost::mutex::scoped_lock lock_; - JobHandler& handler_; - - public: - JobLock(JobHandler& handler) : - lock_(handler.jobMutex_), - handler_(handler) - { - } + return lastStatus_; + } - IJob& GetJob() - { - return *handler_.job_; - } - - void UpdateStatus() - { - handler_.lastProgress_ = handler_.job_->GetProgress(); - handler_.job_->FormatStatus(handler_.lastStatus_); - } - - void SetLastErrorCode(ErrorCode code) - { - handler_.lastErrorCode_ = code; - } - }; + const JobStatus& GetLastStatus() const + { + return lastStatus_; + } }; @@ -771,7 +767,6 @@ LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") << ": " << job.GetId(); - boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); assert(job.GetState() == JobState_Running); @@ -789,7 +784,6 @@ { LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); - boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); assert(job.GetState() == JobState_Running && @@ -806,7 +800,6 @@ { LOG(INFO) << "Job paused: " << job.GetId(); - boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); assert(job.GetState() == JobState_Running); @@ -816,35 +809,6 @@ } - JobHandler* WaitPendingJob(unsigned int timeout) - { - boost::mutex::scoped_lock lock(mutex_); - - while (pendingJobs_.empty()) - { - if (timeout == 0) - { - pendingJobAvailable_.wait(lock); - } - else - { - bool success = pendingJobAvailable_.timed_wait - (lock, boost::posix_time::milliseconds(timeout)); - if (!success) - { - return NULL; - } - } - } - - JobHandler* job = pendingJobs_.top(); - pendingJobs_.pop(); - - job->SetState(JobState_Running); - return job; - } - - public: JobsRegistry() : maxCompletedJobs_(10) @@ -1148,42 +1112,81 @@ class RunningJob : public boost::noncopyable { private: - JobsRegistry& that_; - JobHandler* handler_; + JobsRegistry& registry_; + JobHandler* handler_; // Can only be accessed if the registry + // mutex is locked! + IJob* job_; // Will by design be in mutual exclusion, + // because only one RunningJob can be + // executed at a time on a JobHandler + + std::string id_; + int priority_; JobState targetState_; - unsigned int retryTimeout_; + unsigned int targetRetryTimeout_; public: - RunningJob(JobsRegistry& that, + RunningJob(JobsRegistry& registry, unsigned int timeout) : - that_(that), + registry_(registry), handler_(NULL), targetState_(JobState_Failure), - retryTimeout_(0) + targetRetryTimeout_(0) { - handler_ = that_.WaitPendingJob(timeout); + { + boost::mutex::scoped_lock lock(registry_.mutex_); + + while (registry_.pendingJobs_.empty()) + { + if (timeout == 0) + { + registry_.pendingJobAvailable_.wait(lock); + } + else + { + bool success = registry_.pendingJobAvailable_.timed_wait + (lock, boost::posix_time::milliseconds(timeout)); + if (!success) + { + // No pending job + return; + } + } + } + + handler_ = registry_.pendingJobs_.top(); + registry_.pendingJobs_.pop(); + + assert(handler_->GetState() == JobState_Pending); + handler_->SetState(JobState_Running); + + job_ = &handler_->GetJob(); + id_ = handler_->GetId(); + priority_ = handler_->GetPriority(); + } } ~RunningJob() { if (IsValid()) { + boost::mutex::scoped_lock lock(registry_.mutex_); + switch (targetState_) { case JobState_Failure: - that_.MarkRunningAsCompleted(*handler_, false); + registry_.MarkRunningAsCompleted(*handler_, false); break; case JobState_Success: - that_.MarkRunningAsCompleted(*handler_, true); + registry_.MarkRunningAsCompleted(*handler_, true); break; case JobState_Paused: - that_.MarkRunningAsPaused(*handler_); + registry_.MarkRunningAsPaused(*handler_); break; case JobState_Retry: - that_.MarkRunningAsRetry(*handler_, retryTimeout_); + registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); break; default: @@ -1194,31 +1197,32 @@ bool IsValid() const { - return handler_ != NULL; + return (handler_ != NULL && + job_ != NULL); } const std::string& GetId() const { - if (IsValid()) + if (!IsValid()) { - return handler_->GetId(); + throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } + return id_; + } } int GetPriority() const { - if (IsValid()) + if (!IsValid()) { - return handler_->GetPriority(); + throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } + return priority_; + } } bool IsPauseScheduled() @@ -1227,12 +1231,14 @@ { throw OrthancException(ErrorCode_BadSequenceOfCalls); } - - boost::mutex::scoped_lock lock(that_.mutex_); - that_.CheckInvariants(); - assert(handler_->GetState() == JobState_Running); - - return handler_->IsPauseScheduled(); + else + { + boost::mutex::scoped_lock lock(registry_.mutex_); + registry_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + return handler_->IsPauseScheduled(); + } } void MarkSuccess() @@ -1241,8 +1247,10 @@ { throw OrthancException(ErrorCode_BadSequenceOfCalls); } - - targetState_ = JobState_Success; + else + { + targetState_ = JobState_Success; + } } void MarkFailure() @@ -1251,18 +1259,22 @@ { throw OrthancException(ErrorCode_BadSequenceOfCalls); } - - targetState_ = JobState_Failure; + else + { + targetState_ = JobState_Failure; + } } - void SchedulePause() + void MarkPause() { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } - - targetState_ = JobState_Paused; + else + { + targetState_ = JobState_Paused; + } } void MarkRetry(unsigned int timeout) @@ -1271,83 +1283,88 @@ { throw OrthancException(ErrorCode_BadSequenceOfCalls); } - - targetState_ = JobState_Retry; - retryTimeout_ = timeout; + else + { + targetState_ = JobState_Retry; + targetRetryTimeout_ = timeout; + } } - void ExecuteStep() + /*void ExecuteStep() { if (!IsValid()) { throw OrthancException(ErrorCode_BadSequenceOfCalls); } - if (handler_->IsPauseScheduled()) + if (IsPauseScheduled()) { targetState_ = JobState_Paused; return; } std::auto_ptr result; + ErrorCode code; { - JobHandler::JobLock lock(*handler_); - bool ok = false; try { - result.reset(lock.GetJob().ExecuteStep()); - lock.UpdateStatus(); + result.reset(job_->ExecuteStep()); ok = true; + + if (result->GetCode() == JobStepCode_Failure) + { + code = ErrorCode_InternalError; + } } catch (OrthancException& e) { - lock.SetLastErrorCode(e.GetErrorCode()); + code = e.GetErrorCode(); } catch (boost::bad_lexical_cast&) { - lock.SetLastErrorCode(ErrorCode_BadFileFormat); + code = ErrorCode_BadFileFormat; } catch (...) { - lock.SetLastErrorCode(ErrorCode_InternalError); + code = ErrorCode_InternalError; } if (ok) { - lock.SetLastErrorCode(ErrorCode_Success); + code = ErrorCode_Success; } else { - result.reset(new JobStepResult(JobStepStatus_Failure)); + result.reset(new JobStepResult(JobStepCode_Failure)); } } - switch (result->GetStatus()) + switch (result->GetCode()) { - case JobStepStatus_Success: + case JobStepCode_Success: targetState_ = JobState_Success; break; - case JobStepStatus_Failure: + case JobStepCode_Failure: targetState_ = JobState_Failure; break; - case JobStepStatus_Continue: + case JobStepCode_Continue: targetState_ = JobState_Running; break; - case JobStepStatus_Retry: + case JobStepCode_Retry: targetState_ = JobState_Retry; - retryTimeout_ = dynamic_cast(*result).GetTimeout(); + targetRetryTimeout_ = dynamic_cast(*result).GetTimeout(); break; default: throw OrthancException(ErrorCode_InternalError); } - } + }*/ }; }; } @@ -1361,7 +1378,7 @@ public: DummyJob() : - result_(Orthanc::JobStepStatus_Success) + result_(Orthanc::JobStepCode_Success) { } @@ -1633,7 +1650,7 @@ ASSERT_TRUE(job.IsValid()); registry.Resubmit(id); - job.SchedulePause(); + job.MarkPause(); ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); }