# HG changeset patch # User Sebastien Jodogne # Date 1525347931 -7200 # Node ID b4516a6f214b91b7d99bde391742025dca9f962e # Parent 91e944c8389b908a9db0977a56360f714ab058de state machine diff -r 91e944c8389b -r b4516a6f214b UnitTestsSources/MultiThreadingTests.cpp --- a/UnitTestsSources/MultiThreadingTests.cpp Thu May 03 10:27:39 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Thu May 03 13:45:31 2018 +0200 @@ -272,7 +272,10 @@ # error The job engine cannot be used in sandboxed environments #endif +#include "../Core/Logging.h" + #include +#include namespace Orthanc { @@ -289,24 +292,24 @@ enum JobStepStatus { JobStepStatus_Success, - JobStepStatus_Error, + JobStepStatus_Failure, JobStepStatus_Continue, JobStepStatus_Retry }; - class IJobStepResult : public boost::noncopyable + class JobStepResult { private: JobStepStatus status_; public: - explicit IJobStepResult(JobStepStatus status) : - status_(status) + explicit JobStepResult(JobStepStatus status) : + status_(status) { } - virtual ~IJobStepResult() + virtual ~JobStepResult() { } @@ -317,15 +320,15 @@ }; - class RetryResult : public IJobStepResult + class RetryResult : public JobStepResult { private: unsigned int timeout_; // Retry after "timeout_" milliseconds public: RetryResult(unsigned int timeout) : - IJobStepResult(JobStepStatus_Retry), - timeout_(timeout) + JobStepResult(JobStepStatus_Retry), + timeout_(timeout) { } @@ -343,7 +346,7 @@ { } - virtual IJobStepResult* ExecuteStep() = 0; + virtual JobStepResult* ExecuteStep() = 0; virtual void ReleaseResources() = 0; // For pausing jobs @@ -353,83 +356,690 @@ }; + class JobHandler : public boost::noncopyable + { + private: + std::string id_; + JobState state_; + std::auto_ptr job_; + int priority_; // "+inf()" means highest priority + boost::posix_time::ptime creationTime_; + boost::posix_time::ptime lastUpdateTime_; + boost::posix_time::ptime retryTime_; + uint64_t runtime_; // In milliseconds + bool pauseScheduled_; + + void SetStateInternal(JobState state) + { + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + + if (state_ == JobState_Running) + { + runtime_ += (now - lastUpdateTime_).total_milliseconds(); + } + + state_ = state; + lastUpdateTime_ = now; + pauseScheduled_ = false; + } + + public: + JobHandler(IJob* job, + int priority) : + id_(Toolbox::GenerateUuid()), + state_(JobState_Pending), + job_(job), + priority_(priority), + creationTime_(boost::posix_time::microsec_clock::universal_time()), + lastUpdateTime_(creationTime_), + runtime_(0), + pauseScheduled_(false) + { + if (job == NULL) + { + throw OrthancException(ErrorCode_NullPointer); + } + } + + const std::string& GetId() const + { + return id_; + } + + IJob& GetJob() const + { + assert(job_.get() != NULL); + return *job_; + } + + void SetPriority(int priority) + { + priority_ = priority; + } + + int GetPriority() const + { + return priority_; + } + + JobState GetState() const + { + return state_; + } + + void SetState(JobState state) + { + if (state == JobState_Retry) + { + // Use "SetRetryState()" + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + SetStateInternal(state); + } + } + + void SetRetryState(unsigned int timeout) + { + if (state_ == JobState_Running) + { + SetStateInternal(JobState_Retry); + retryTime_ = (boost::posix_time::microsec_clock::universal_time() + + boost::posix_time::milliseconds(timeout)); + } + else + { + // Only valid for running jobs + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + void SchedulePause() + { + if (state_ == JobState_Running) + { + pauseScheduled_ = true; + } + else + { + // Only valid for running jobs + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + bool IsPauseScheduled() + { + return pauseScheduled_; + } + + bool IsRetryReady(const boost::posix_time::ptime& now) const + { + if (state_ != JobState_Retry) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return retryTime_ >= now; + } + } + }; + + class JobsMonitor : public boost::noncopyable { private: - class JobHandler : public boost::noncopyable + struct PriorityComparator { - private: - std::string id_; - JobState state_; - std::auto_ptr job_; - int priority_; // "+inf()" means highest priority - boost::posix_time::ptime creationTime_; - boost::posix_time::ptime lastUpdateTime_; - uint64_t runtime_; // In milliseconds + bool operator() (JobHandler*& a, + JobHandler*& b) const + { + return a->GetPriority() < b->GetPriority(); + } + }; + + typedef std::map JobsIndex; + typedef std::list CompletedJobs; + typedef std::set RetryJobs; + typedef std::priority_queue, // Could be a "std::deque" + PriorityComparator> PendingJobs; + + boost::mutex mutex_; + JobsIndex jobsIndex_; + PendingJobs pendingJobs_; + CompletedJobs completedJobs_; + RetryJobs retryJobs_; + + boost::condition_variable pendingJobAvailable_; + size_t maxCompletedJobs_; + + +#ifndef NDEBUG + bool IsPendingJob(const JobHandler& job) const + { + PendingJobs copy = pendingJobs_; + while (!copy.empty()) + { + if (copy.top() == &job) + { + return true; + } - public: - JobHandler(IJob* job, - int priority) : - id_(Toolbox::GenerateUuid()), - state_(JobState_Pending), - job_(job), - priority_(priority), - creationTime_(boost::posix_time::microsec_clock::universal_time()), - lastUpdateTime_(creationTime_), - runtime_(0) + copy.pop(); + } + + return false; + } + + bool IsCompletedJob(const JobHandler& job) const + { + for (CompletedJobs::const_iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) { - if (job == NULL) + if (*it == &job) { - throw OrthancException(ErrorCode_NullPointer); + return true; + } + } + + return false; + } + + bool IsRetryJob(JobHandler& job) const + { + return retryJobs_.find(&job) != retryJobs_.end(); + } +#endif + + + void CheckInvariants() + { +#ifndef NDEBUG + { + PendingJobs copy = pendingJobs_; + while (!copy.empty()) + { + assert(copy.top()->GetState() == JobState_Pending); + copy.pop(); } } - const std::string& GetId() const + assert(completedJobs_.size() <= maxCompletedJobs_); + + for (CompletedJobs::const_iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) + { + assert((*it)->GetState() == JobState_Success || + (*it)->GetState() == JobState_Failure); + } + + for (RetryJobs::const_iterator it = retryJobs_.begin(); + it != retryJobs_.end(); ++it) + { + assert((*it)->GetState() == JobState_Retry); + } + + for (JobsIndex::iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) + { + JobHandler& job = *it->second; + + assert(job.GetId() == it->first); + + switch (job.GetState()) + { + case JobState_Pending: + assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); + break; + + case JobState_Success: + case JobState_Failure: + assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); + break; + + case JobState_Retry: + assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); + break; + + case JobState_Running: + case JobState_Paused: + assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + } +#endif + } + + + void ForgetOldCompletedJobs() + { + if (maxCompletedJobs_ != 0) { - return id_; + while (completedJobs_.size() > maxCompletedJobs_) + { + assert(completedJobs_.front() != NULL); + + std::string id = completedJobs_.front()->GetId(); + + assert(jobsIndex_.find(id) != jobsIndex_.end()); + + jobsIndex_.erase(id); + delete(completedJobs_.front()); + completedJobs_.pop_front(); + } } - }; + } + + + void MarkRunningAsCompleted(JobHandler& job, + bool success) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + assert(job.GetState() == JobState_Running); + + job.SetState(success ? JobState_Success : JobState_Failure); + + completedJobs_.push_back(&job); + ForgetOldCompletedJobs(); + + CheckInvariants(); + } + + + void MarkRunningAsRetry(JobHandler& job, + unsigned int timeout) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + assert(job.GetState() == JobState_Running && + retryJobs_.find(&job) == retryJobs_.end()); + + retryJobs_.insert(&job); + job.SetRetryState(timeout); + + CheckInvariants(); + } + + + void MarkRunningAsPaused(JobHandler& job) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + assert(job.GetState() == JobState_Running); + + job.SetState(JobState_Paused); + + CheckInvariants(); + } + + + JobHandler* WaitPendingJob(unsigned int timeout) + { + boost::mutex::scoped_lock lock(mutex_); + + while (pendingJobs_.empty()) + { + if (timeout == 0) + { + pendingJobAvailable_.wait(lock); + } + else + { + bool success = pendingJobAvailable_.timed_wait + (lock, boost::posix_time::milliseconds(timeout)); + if (!success) + { + return NULL; + } + } + } + + JobHandler* job = pendingJobs_.top(); + pendingJobs_.pop(); + + job->SetState(JobState_Running); + return job; + } + public: + JobsMonitor() : + maxCompletedJobs_(10) + { + } + + + ~JobsMonitor() + { + for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) + { + assert(it->second != NULL); + delete it->second; + } + } + + + void SetMaxCompletedJobs(size_t i) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + maxCompletedJobs_ = i; + ForgetOldCompletedJobs(); + + CheckInvariants(); + } + + + void ListJobs(std::set& target) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + for (JobsIndex::const_iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) + { + target.insert(it->first); + } + } + + void Submit(std::string& id, - IJob* job, + IJob* job, // Takes ownership int priority) { std::auto_ptr handler(new JobHandler(job, priority)); + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + id = handler->GetId(); + pendingJobs_.push(handler.get()); + jobsIndex_.insert(std::make_pair(id, handler.release())); + + pendingJobAvailable_.notify_one(); + + CheckInvariants(); } + + void Submit(IJob* job, // Takes ownership + int priority) + { + std::string id; + Submit(id, job, priority); + } + + void SetPriority(const std::string& id, int priority) { - // TODO + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else + { + found->second->SetPriority(priority); + + if (found->second->GetState() == JobState_Pending) + { + // If the job is pending, we need to reconstruct the + // priority queue, as the heap condition has changed + + PendingJobs copy; + std::swap(copy, pendingJobs_); + + assert(pendingJobs_.empty()); + while (!copy.empty()) + { + pendingJobs_.push(copy.top()); + copy.pop(); + } + } + } + + CheckInvariants(); } + void Pause(const std::string& id) { - // TODO + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else + { + switch (found->second->GetState()) + { + case JobState_Pending: + { + // If the job is pending, we need to reconstruct the + // priority queue to remove it + PendingJobs copy; + std::swap(copy, pendingJobs_); + + assert(pendingJobs_.empty()); + while (!copy.empty()) + { + if (copy.top()->GetId() != id) + { + pendingJobs_.push(copy.top()); + } + + copy.pop(); + } + + found->second->SetState(JobState_Paused); + + break; + } + + case JobState_Retry: + { + RetryJobs::iterator item = retryJobs_.find(found->second); + assert(item != retryJobs_.end()); + retryJobs_.erase(item); + + found->second->SetState(JobState_Paused); + + break; + } + + case JobState_Paused: + case JobState_Success: + case JobState_Failure: + // Nothing to be done + break; + + case JobState_Running: + found->second->SchedulePause(); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + } + + CheckInvariants(); } + void Resume(const std::string& id) { - // TODO + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else if (found->second->GetState() != JobState_Paused) + { + LOG(WARNING) << "Cannot resume a job that is not paused: " << id; + } + else + { + found->second->SetState(JobState_Pending); + pendingJobs_.push(found->second); + pendingJobAvailable_.notify_one(); + } + + CheckInvariants(); } + void Resubmit(const std::string& id) { - // TODO + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else if (found->second->GetState() != JobState_Failure) + { + LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; + } + else + { + bool ok = false; + for (CompletedJobs::iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) + { + if (*it == found->second) + { + ok = true; + completedJobs_.erase(it); + break; + } + } + + assert(ok); + + found->second->SetState(JobState_Pending); + pendingJobs_.push(found->second); + pendingJobAvailable_.notify_one(); + } + + CheckInvariants(); } - class JobToRun : public boost::noncopyable + + void ScheduleRetries() + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + RetryJobs copy; + std::swap(copy, retryJobs_); + + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + + assert(retryJobs_.empty()); + for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) + { + if ((*it)->IsRetryReady(now)) + { + (*it)->SetState(JobState_Pending); + } + else + { + retryJobs_.insert(*it); + } + } + + CheckInvariants(); + } + + + bool GetState(JobState& state, + const std::string& id) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::const_iterator it = jobsIndex_.find(id); + if (it == jobsIndex_.end()) + { + return false; + } + else + { + state = it->second->GetState(); + return true; + } + } + + + class RunningJob : public boost::noncopyable { private: - JobHandler* handler_; + JobsMonitor& that_; + JobHandler* handler_; + JobState targetState_; + unsigned int retryTimeout_; public: - JobToRun(JobsMonitor& that, - unsigned int timeout) : - handler_(NULL) + RunningJob(JobsMonitor& that, + unsigned int timeout) : + that_(that), + handler_(NULL), + targetState_(JobState_Failure), + retryTimeout_(0) + { + handler_ = that_.WaitPendingJob(timeout); + } + + ~RunningJob() { + if (IsValid()) + { + switch (targetState_) + { + case JobState_Failure: + that_.MarkRunningAsCompleted(*handler_, false); + break; + + case JobState_Success: + that_.MarkRunningAsCompleted(*handler_, true); + break; + + case JobState_Paused: + that_.MarkRunningAsPaused(*handler_); + break; + + case JobState_Retry: + that_.MarkRunningAsRetry(*handler_, retryTimeout_); + break; + + default: + assert(0); + } + } } bool IsValid() const @@ -437,7 +1047,271 @@ return handler_ != NULL; } - + const std::string& GetId() const + { + if (IsValid()) + { + return handler_->GetId(); + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + int GetPriority() const + { + if (IsValid()) + { + return handler_->GetPriority(); + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + bool IsPauseScheduled() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + boost::mutex::scoped_lock lock(that_.mutex_); + that_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + return handler_->IsPauseScheduled(); + } + + IJob& GetJob() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + boost::mutex::scoped_lock lock(that_.mutex_); + that_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + return handler_->GetJob(); + } + + void MarkSuccess() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + targetState_ = JobState_Success; + } + + void MarkFailure() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + targetState_ = JobState_Failure; + } + + void MarkPause() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + targetState_ = JobState_Paused; + } + + void MarkRetry(unsigned int timeout) + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + targetState_ = JobState_Retry; + retryTimeout_ = timeout; + } }; }; } + + + +class DummyJob : public Orthanc::IJob +{ +private: + JobStepResult result_; + +public: + DummyJob() : + result_(Orthanc::JobStepStatus_Success) + { + } + + explicit DummyJob(JobStepResult result) : + result_(result) + { + } + + virtual JobStepResult* ExecuteStep() + { + return new JobStepResult(result_); + } + + virtual void ReleaseResources() + { + } + + virtual float GetProgress() + { + return 0; + } + + virtual void FormatStatus(Json::Value& value) + { + } +}; + + +static bool CheckState(Orthanc::JobsMonitor& monitor, + const std::string& id, + Orthanc::JobState state) +{ + Orthanc::JobState s; + if (monitor.GetState(s, id)) + { + return state == s; + } + else + { + return false; + } +} + + +TEST(JobsMonitor, Priority) +{ + JobsMonitor monitor; + + std::string i1, i2, i3, i4; + monitor.Submit(i1, new DummyJob(), 10); + monitor.Submit(i2, new DummyJob(), 30); + monitor.Submit(i3, new DummyJob(), 20); + monitor.Submit(i4, new DummyJob(), 5); + + monitor.SetMaxCompletedJobs(2); + + std::set id; + monitor.ListJobs(id); + + ASSERT_EQ(4u, id.size()); + ASSERT_TRUE(id.find(i1) != id.end()); + ASSERT_TRUE(id.find(i2) != id.end()); + ASSERT_TRUE(id.find(i3) != id.end()); + ASSERT_TRUE(id.find(i4) != id.end()); + + ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Pending)); + + { + JobsMonitor::RunningJob job(monitor, 0); + ASSERT_TRUE(job.IsValid()); + ASSERT_EQ(30, job.GetPriority()); + ASSERT_EQ(i2, job.GetId()); + + ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Failure)); + ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Pending)); + + { + JobsMonitor::RunningJob job(monitor, 0); + ASSERT_TRUE(job.IsValid()); + ASSERT_EQ(20, job.GetPriority()); + ASSERT_EQ(i3, job.GetId()); + + job.MarkSuccess(); + + ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Success)); + + { + JobsMonitor::RunningJob job(monitor, 0); + ASSERT_TRUE(job.IsValid()); + ASSERT_EQ(10, job.GetPriority()); + ASSERT_EQ(i1, job.GetId()); + } + + { + JobsMonitor::RunningJob job(monitor, 0); + ASSERT_TRUE(job.IsValid()); + ASSERT_EQ(5, job.GetPriority()); + ASSERT_EQ(i4, job.GetId()); + } + + { + JobsMonitor::RunningJob job(monitor, 1); + ASSERT_FALSE(job.IsValid()); + } + + Orthanc::JobState s; + ASSERT_TRUE(monitor.GetState(s, i1)); + ASSERT_FALSE(monitor.GetState(s, i2)); // Removed because oldest + ASSERT_FALSE(monitor.GetState(s, i3)); // Removed because second oldest + ASSERT_TRUE(monitor.GetState(s, i4)); + + monitor.SetMaxCompletedJobs(1); // (*) + ASSERT_FALSE(monitor.GetState(s, i1)); // Just discarded by (*) + ASSERT_TRUE(monitor.GetState(s, i4)); +} + + +TEST(JobsMonitor, Resubmit) +{ + JobsMonitor monitor; + + std::string id; + monitor.Submit(id, new DummyJob(), 10); + + ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); + + monitor.Resubmit(id); + ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); + + { + JobsMonitor::RunningJob job(monitor, 0); + ASSERT_TRUE(job.IsValid()); + job.MarkFailure(); + + ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); + + monitor.Resubmit(id); + ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Failure)); + + monitor.Resubmit(id); + ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); + + { + JobsMonitor::RunningJob job(monitor, 0); + ASSERT_TRUE(job.IsValid()); + ASSERT_EQ(id, job.GetId()); + + job.MarkSuccess(); + ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Success)); +}