Mercurial > hg > orthanc
changeset 2568:a46094602346 jobs
improvements
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 07 May 2018 15:02:34 +0200 |
parents | 3caca43371f5 |
children | 2af17cd5eb1f |
files | UnitTestsSources/MultiThreadingTests.cpp |
diffstat | 1 files changed, 214 insertions(+), 197 deletions(-) [+] |
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp Mon May 07 14:26:31 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Mon May 07 15:02:34 2018 +0200 @@ -300,17 +300,16 @@ JobStepCode_Failure, JobStepCode_Continue, JobStepCode_Retry - }; - - + }; + class JobStepResult { private: - JobStepCode status_; + JobStepCode code_; public: - explicit JobStepResult(JobStepCode status) : - status_(status) + explicit JobStepResult(JobStepCode code) : + code_(code) { } @@ -320,18 +319,18 @@ JobStepCode GetCode() const { - return status_; + return code_; } }; - class RetryResult : public JobStepResult + class JobStepRetry : public JobStepResult { private: unsigned int timeout_; // Retry after "timeout_" milliseconds public: - RetryResult(unsigned int timeout) : + JobStepRetry(unsigned int timeout) : JobStepResult(JobStepCode_Retry), timeout_(timeout) { @@ -570,175 +569,176 @@ }; - class JobHandler : public boost::noncopyable - { - private: - std::string id_; - JobState state_; - std::auto_ptr<IJob> job_; - int priority_; // "+inf()" means highest priority - boost::posix_time::ptime creationTime_; - boost::posix_time::ptime lastStateChangeTime_; - boost::posix_time::time_duration runtime_; - boost::posix_time::ptime retryTime_; - bool pauseScheduled_; - JobStatus lastStatus_; - - void Touch() - { - const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - - if (state_ == JobState_Running) - { - runtime_ += (now - lastStateChangeTime_); - } - - lastStateChangeTime_ = now; - } - - void SetStateInternal(JobState state) - { - state_ = state; - pauseScheduled_ = false; - Touch(); - } - - public: - JobHandler(IJob* job, - int priority) : - id_(Toolbox::GenerateUuid()), - state_(JobState_Pending), - job_(job), - priority_(priority), - creationTime_(boost::posix_time::microsec_clock::universal_time()), - lastStateChangeTime_(creationTime_), - runtime_(boost::posix_time::milliseconds(0)), - retryTime_(creationTime_), - pauseScheduled_(false) - { - if (job == NULL) - { - throw OrthancException(ErrorCode_NullPointer); - } - - lastStatus_ = JobStatus(ErrorCode_Success, *job); - } - - const std::string& GetId() const - { - return id_; - } - - IJob& GetJob() const - { - assert(job_.get() != NULL); - return *job_; - } - - void SetPriority(int priority) - { - priority_ = priority; - } - - int GetPriority() const - { - return priority_; - } - - JobState GetState() const - { - return state_; - } - - void SetState(JobState state) - { - if (state == JobState_Retry) - { - // Use "SetRetryState()" - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - SetStateInternal(state); - } - } - - void SetRetryState(unsigned int timeout) - { - if (state_ == JobState_Running) - { - SetStateInternal(JobState_Retry); - retryTime_ = (boost::posix_time::microsec_clock::universal_time() + - boost::posix_time::milliseconds(timeout)); - } - else - { - // Only valid for running jobs - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - void SchedulePause() - { - if (state_ == JobState_Running) - { - pauseScheduled_ = true; - } - else - { - // Only valid for running jobs - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - bool IsPauseScheduled() - { - return pauseScheduled_; - } - - bool IsRetryReady(const boost::posix_time::ptime& now) const - { - if (state_ != JobState_Retry) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return retryTime_ <= now; - } - } - - const boost::posix_time::ptime& GetCreationTime() const - { - return creationTime_; - } - - const boost::posix_time::ptime& GetLastStateChangeTime() const - { - return lastStateChangeTime_; - } - - const boost::posix_time::time_duration& GetRuntime() const - { - return runtime_; - } - - const JobStatus& GetLastStatus() const - { - return lastStatus_; - } - - void SetLastStatus(const JobStatus& status) - { - lastStatus_ = status; - Touch(); - } - }; class JobsRegistry : public boost::noncopyable { private: + class JobHandler : public boost::noncopyable + { + private: + std::string id_; + JobState state_; + std::auto_ptr<IJob> job_; + int priority_; // "+inf()" means highest priority + boost::posix_time::ptime creationTime_; + boost::posix_time::ptime lastStateChangeTime_; + boost::posix_time::time_duration runtime_; + boost::posix_time::ptime retryTime_; + bool pauseScheduled_; + JobStatus lastStatus_; + + void Touch() + { + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + + if (state_ == JobState_Running) + { + runtime_ += (now - lastStateChangeTime_); + } + + lastStateChangeTime_ = now; + } + + void SetStateInternal(JobState state) + { + state_ = state; + pauseScheduled_ = false; + Touch(); + } + + public: + JobHandler(IJob* job, + int priority) : + id_(Toolbox::GenerateUuid()), + state_(JobState_Pending), + job_(job), + priority_(priority), + creationTime_(boost::posix_time::microsec_clock::universal_time()), + lastStateChangeTime_(creationTime_), + runtime_(boost::posix_time::milliseconds(0)), + retryTime_(creationTime_), + pauseScheduled_(false) + { + if (job == NULL) + { + throw OrthancException(ErrorCode_NullPointer); + } + + lastStatus_ = JobStatus(ErrorCode_Success, *job); + } + + const std::string& GetId() const + { + return id_; + } + + IJob& GetJob() const + { + assert(job_.get() != NULL); + return *job_; + } + + void SetPriority(int priority) + { + priority_ = priority; + } + + int GetPriority() const + { + return priority_; + } + + JobState GetState() const + { + return state_; + } + + void SetState(JobState state) + { + if (state == JobState_Retry) + { + // Use "SetRetryState()" + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + SetStateInternal(state); + } + } + + void SetRetryState(unsigned int timeout) + { + if (state_ == JobState_Running) + { + SetStateInternal(JobState_Retry); + retryTime_ = (boost::posix_time::microsec_clock::universal_time() + + boost::posix_time::milliseconds(timeout)); + } + else + { + // Only valid for running jobs + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + void SchedulePause() + { + if (state_ == JobState_Running) + { + pauseScheduled_ = true; + } + else + { + // Only valid for running jobs + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + bool IsPauseScheduled() + { + return pauseScheduled_; + } + + bool IsRetryReady(const boost::posix_time::ptime& now) const + { + if (state_ != JobState_Retry) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return retryTime_ <= now; + } + } + + const boost::posix_time::ptime& GetCreationTime() const + { + return creationTime_; + } + + const boost::posix_time::ptime& GetLastStateChangeTime() const + { + return lastStateChangeTime_; + } + + const boost::posix_time::time_duration& GetRuntime() const + { + return runtime_; + } + + const JobStatus& GetLastStatus() const + { + return lastStatus_; + } + + void SetLastStatus(const JobStatus& status) + { + lastStatus_ = status; + Touch(); + } + }; + struct PriorityComparator { bool operator() (JobHandler*& a, @@ -974,22 +974,29 @@ } - void GetJobsInfo(std::map<std::string, JobInfo>& target) + bool GetJobInfo(JobInfo& target, + const std::string& id) { boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); - for (JobsIndex::const_iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) + JobsIndex::const_iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + return false; + } + else { - const JobHandler& handler = *it->second; - target[it->first] = JobInfo(handler.GetId(), - handler.GetPriority(), - handler.GetState(), - handler.GetLastStatus(), - handler.GetCreationTime(), - handler.GetLastStateChangeTime(), - handler.GetRuntime()); + const JobHandler& handler = *found->second; + target = JobInfo(handler.GetId(), + handler.GetPriority(), + handler.GetState(), + handler.GetLastStatus(), + handler.GetCreationTime(), + handler.GetLastStateChangeTime(), + handler.GetRuntime()); + return true; } } @@ -1545,7 +1552,7 @@ return false; case JobStepCode_Retry: - running.MarkRetry(dynamic_cast<RetryResult&>(*result).GetTimeout()); + running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout()); return false; case JobStepCode_Continue: @@ -2084,17 +2091,22 @@ boost::this_thread::sleep(boost::posix_time::milliseconds(100)); { - typedef std::map<std::string, JobInfo> Jobs; + typedef std::set<std::string> Jobs; Jobs jobs; - engine.GetRegistry().GetJobsInfo(jobs); + engine.GetRegistry().ListJobs(jobs); - Json::Value v; + Json::Value v = Json::arrayValue; for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) { - Json::Value vv; - it->second.Format(vv); - v[it->first] = vv; + JobInfo info; + + if (engine.GetRegistry().GetJobInfo(info, *it)) + { + Json::Value vv; + info.Format(vv); + v.append(vv); + } } std::cout << v << std::endl; @@ -2107,17 +2119,22 @@ { - typedef std::map<std::string, JobInfo> Jobs; + typedef std::set<std::string> Jobs; Jobs jobs; - engine.GetRegistry().GetJobsInfo(jobs); + engine.GetRegistry().ListJobs(jobs); - Json::Value v; + Json::Value v = Json::arrayValue; for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) { - Json::Value vv; - it->second.Format(vv); - v[it->first] = vv; + JobInfo info; + + if (engine.GetRegistry().GetJobInfo(info, *it)) + { + Json::Value vv; + info.Format(vv); + v.append(vv); + } } std::cout << v << std::endl;