# HG changeset patch # User Sebastien Jodogne # Date 1525353873 -7200 # Node ID 57f81b9887136000f3fe621f985aed49dbf238fd # Parent b4516a6f214b91b7d99bde391742025dca9f962e cont diff -r b4516a6f214b -r 57f81b988713 Resources/ImplementationNotes/JobsEngineStates.dot --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Resources/ImplementationNotes/JobsEngineStates.dot Thu May 03 15:24:33 2018 +0200 @@ -0,0 +1,23 @@ +// dot -Tpdf JobsEngineStates.dot -o JobsEngineStates.pdf + +digraph G +{ + rankdir="LR"; + init [shape=point]; + failure, success [shape=doublecircle]; + + // Internal transitions + init -> pending; + pending -> running; + running -> success; + running -> failure; + running -> retry; + retry -> pending [label="timeout"]; + + // External actions + failure -> pending [label="Resubmit()" fontcolor="red"]; + paused -> pending [label="Resume()" fontcolor="red"]; + pending -> paused [label="Pause()" fontcolor="red"]; + retry -> paused [label="Pause()" fontcolor="red"]; + running -> paused [label="Pause()" fontcolor="red"]; + } diff -r b4516a6f214b -r 57f81b988713 UnitTestsSources/MultiThreadingTests.cpp --- a/UnitTestsSources/MultiThreadingTests.cpp Thu May 03 13:45:31 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Thu May 03 15:24:33 2018 +0200 @@ -481,13 +481,13 @@ } else { - return retryTime_ >= now; + return retryTime_ <= now; } } }; - class JobsMonitor : public boost::noncopyable + class JobsRegistry : public boost::noncopyable { private: struct PriorityComparator @@ -506,641 +506,661 @@ std::vector, // Could be a "std::deque" PriorityComparator> PendingJobs; - boost::mutex mutex_; - JobsIndex jobsIndex_; - PendingJobs pendingJobs_; - CompletedJobs completedJobs_; - RetryJobs retryJobs_; + boost::mutex mutex_; + JobsIndex jobsIndex_; + PendingJobs pendingJobs_; + CompletedJobs completedJobs_; + RetryJobs retryJobs_; - boost::condition_variable pendingJobAvailable_; - size_t maxCompletedJobs_; + boost::condition_variable pendingJobAvailable_; + size_t maxCompletedJobs_; #ifndef NDEBUG - bool IsPendingJob(const JobHandler& job) const + bool IsPendingJob(const JobHandler& job) const + { + PendingJobs copy = pendingJobs_; + while (!copy.empty()) + { + if (copy.top() == &job) + { + return true; + } + + copy.pop(); + } + + return false; + } + + bool IsCompletedJob(const JobHandler& job) const + { + for (CompletedJobs::const_iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) + { + if (*it == &job) + { + return true; + } + } + + return false; + } + + bool IsRetryJob(JobHandler& job) const + { + return retryJobs_.find(&job) != retryJobs_.end(); + } +#endif + + + void CheckInvariants() + { +#ifndef NDEBUG { PendingJobs copy = pendingJobs_; while (!copy.empty()) { - if (copy.top() == &job) - { - return true; - } - + assert(copy.top()->GetState() == JobState_Pending); copy.pop(); } + } - return false; + assert(completedJobs_.size() <= maxCompletedJobs_); + + for (CompletedJobs::const_iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) + { + assert((*it)->GetState() == JobState_Success || + (*it)->GetState() == JobState_Failure); + } + + for (RetryJobs::const_iterator it = retryJobs_.begin(); + it != retryJobs_.end(); ++it) + { + assert((*it)->GetState() == JobState_Retry); } - bool IsCompletedJob(const JobHandler& job) const + for (JobsIndex::iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) { - for (CompletedJobs::const_iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) + JobHandler& job = *it->second; + + assert(job.GetId() == it->first); + + switch (job.GetState()) { - if (*it == &job) - { - return true; - } - } + case JobState_Pending: + assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); + break; + + case JobState_Success: + case JobState_Failure: + assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); + break; + + case JobState_Retry: + assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); + break; + + case JobState_Running: + case JobState_Paused: + assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); + break; - return false; - } - - bool IsRetryJob(JobHandler& job) const - { - return retryJobs_.find(&job) != retryJobs_.end(); + default: + throw OrthancException(ErrorCode_InternalError); + } } #endif + } - void CheckInvariants() + void ForgetOldCompletedJobs() + { + if (maxCompletedJobs_ != 0) { -#ifndef NDEBUG - { - PendingJobs copy = pendingJobs_; - while (!copy.empty()) - { - assert(copy.top()->GetState() == JobState_Pending); - copy.pop(); - } - } - - assert(completedJobs_.size() <= maxCompletedJobs_); - - for (CompletedJobs::const_iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - assert((*it)->GetState() == JobState_Success || - (*it)->GetState() == JobState_Failure); - } - - for (RetryJobs::const_iterator it = retryJobs_.begin(); - it != retryJobs_.end(); ++it) - { - assert((*it)->GetState() == JobState_Retry); - } - - for (JobsIndex::iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) + while (completedJobs_.size() > maxCompletedJobs_) { - JobHandler& job = *it->second; + assert(completedJobs_.front() != NULL); + + std::string id = completedJobs_.front()->GetId(); + assert(jobsIndex_.find(id) != jobsIndex_.end()); - assert(job.GetId() == it->first); + jobsIndex_.erase(id); + delete(completedJobs_.front()); + completedJobs_.pop_front(); + } + } + } + - switch (job.GetState()) - { - case JobState_Pending: - assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); - break; - - case JobState_Success: - case JobState_Failure: - assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); - break; - - case JobState_Retry: - assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); - break; - - case JobState_Running: - case JobState_Paused: - assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); - break; + void MarkRunningAsCompleted(JobHandler& job, + bool success) + { + LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") + << ": " << job.GetId(); - default: - throw OrthancException(ErrorCode_InternalError); - } - } -#endif - } + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + assert(job.GetState() == JobState_Running); + + job.SetState(success ? JobState_Success : JobState_Failure); + + completedJobs_.push_back(&job); + ForgetOldCompletedJobs(); + + CheckInvariants(); + } - void ForgetOldCompletedJobs() - { - if (maxCompletedJobs_ != 0) - { - while (completedJobs_.size() > maxCompletedJobs_) - { - assert(completedJobs_.front() != NULL); + void MarkRunningAsRetry(JobHandler& job, + unsigned int timeout) + { + LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + assert(job.GetState() == JobState_Running && + retryJobs_.find(&job) == retryJobs_.end()); + + retryJobs_.insert(&job); + job.SetRetryState(timeout); + + CheckInvariants(); + } + + + void MarkRunningAsPaused(JobHandler& job) + { + LOG(INFO) << "Job paused: " << job.GetId(); - std::string id = completedJobs_.front()->GetId(); + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + assert(job.GetState() == JobState_Running); + + job.SetState(JobState_Paused); - assert(jobsIndex_.find(id) != jobsIndex_.end()); + CheckInvariants(); + } + + + JobHandler* WaitPendingJob(unsigned int timeout) + { + boost::mutex::scoped_lock lock(mutex_); - jobsIndex_.erase(id); - delete(completedJobs_.front()); - completedJobs_.pop_front(); + while (pendingJobs_.empty()) + { + if (timeout == 0) + { + pendingJobAvailable_.wait(lock); + } + else + { + bool success = pendingJobAvailable_.timed_wait + (lock, boost::posix_time::milliseconds(timeout)); + if (!success) + { + return NULL; } } } - - void MarkRunningAsCompleted(JobHandler& job, - bool success) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - assert(job.GetState() == JobState_Running); + JobHandler* job = pendingJobs_.top(); + pendingJobs_.pop(); + + job->SetState(JobState_Running); + return job; + } - job.SetState(success ? JobState_Success : JobState_Failure); - completedJobs_.push_back(&job); - ForgetOldCompletedJobs(); - - CheckInvariants(); - } +public: + JobsRegistry() : + maxCompletedJobs_(10) + { + } - void MarkRunningAsRetry(JobHandler& job, - unsigned int timeout) + ~JobsRegistry() + { + for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); + assert(it->second != NULL); + delete it->second; + } + } + + + void SetMaxCompletedJobs(size_t i) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); - assert(job.GetState() == JobState_Running && - retryJobs_.find(&job) == retryJobs_.end()); + maxCompletedJobs_ = i; + ForgetOldCompletedJobs(); + + CheckInvariants(); + } + - retryJobs_.insert(&job); - job.SetRetryState(timeout); + void ListJobs(std::set& target) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); - CheckInvariants(); + for (JobsIndex::const_iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) + { + target.insert(it->first); } + } - void MarkRunningAsPaused(JobHandler& job) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - assert(job.GetState() == JobState_Running); + void Submit(std::string& id, + IJob* job, // Takes ownership + int priority) + { + std::auto_ptr handler(new JobHandler(job, priority)); + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + id = handler->GetId(); + + pendingJobs_.push(handler.get()); + pendingJobAvailable_.notify_one(); - job.SetState(JobState_Paused); + jobsIndex_.insert(std::make_pair(id, handler.release())); + + LOG(INFO) << "New job submitted: " << id; + + CheckInvariants(); + } - CheckInvariants(); - } + + void Submit(IJob* job, // Takes ownership + int priority) + { + std::string id; + Submit(id, job, priority); + } - JobHandler* WaitPendingJob(unsigned int timeout) - { - boost::mutex::scoped_lock lock(mutex_); + void SetPriority(const std::string& id, + int priority) + { + LOG(INFO) << "Changing priority to " << priority << " for job: " << id; - while (pendingJobs_.empty()) - { - if (timeout == 0) - { - pendingJobAvailable_.wait(lock); - } - else - { - bool success = pendingJobAvailable_.timed_wait - (lock, boost::posix_time::milliseconds(timeout)); - if (!success) - { - return NULL; - } - } - } + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); - JobHandler* job = pendingJobs_.top(); - pendingJobs_.pop(); - - job->SetState(JobState_Running); - return job; + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; } - + else + { + found->second->SetPriority(priority); - public: - JobsMonitor() : - maxCompletedJobs_(10) - { - } - + if (found->second->GetState() == JobState_Pending) + { + // If the job is pending, we need to reconstruct the + // priority queue, as the heap condition has changed - ~JobsMonitor() - { - for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) - { - assert(it->second != NULL); - delete it->second; + PendingJobs copy; + std::swap(copy, pendingJobs_); + + assert(pendingJobs_.empty()); + while (!copy.empty()) + { + pendingJobs_.push(copy.top()); + copy.pop(); + } } } - - void SetMaxCompletedJobs(size_t i) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - maxCompletedJobs_ = i; - ForgetOldCompletedJobs(); - - CheckInvariants(); - } - - - void ListJobs(std::set& target) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - for (JobsIndex::const_iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) - { - target.insert(it->first); - } - } + CheckInvariants(); + } - void Submit(std::string& id, - IJob* job, // Takes ownership - int priority) - { - std::auto_ptr handler(new JobHandler(job, priority)); + void Pause(const std::string& id) + { + LOG(INFO) << "Pausing job: " << id; - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - id = handler->GetId(); - pendingJobs_.push(handler.get()); - jobsIndex_.insert(std::make_pair(id, handler.release())); + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); - pendingJobAvailable_.notify_one(); + JobsIndex::iterator found = jobsIndex_.find(id); - CheckInvariants(); - } - - - void Submit(IJob* job, // Takes ownership - int priority) + if (found == jobsIndex_.end()) { - std::string id; - Submit(id, job, priority); + LOG(WARNING) << "Unknown job: " << id; } - - - void SetPriority(const std::string& id, - int priority) + else { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) + switch (found->second->GetState()) { - LOG(WARNING) << "Unknown job: " << id; - } - else - { - found->second->SetPriority(priority); - - if (found->second->GetState() == JobState_Pending) + case JobState_Pending: { // If the job is pending, we need to reconstruct the - // priority queue, as the heap condition has changed - + // priority queue to remove it PendingJobs copy; std::swap(copy, pendingJobs_); assert(pendingJobs_.empty()); while (!copy.empty()) { - pendingJobs_.push(copy.top()); + if (copy.top()->GetId() != id) + { + pendingJobs_.push(copy.top()); + } + copy.pop(); } + + found->second->SetState(JobState_Paused); + + break; } + + case JobState_Retry: + { + RetryJobs::iterator item = retryJobs_.find(found->second); + assert(item != retryJobs_.end()); + retryJobs_.erase(item); + + found->second->SetState(JobState_Paused); + + break; + } + + case JobState_Paused: + case JobState_Success: + case JobState_Failure: + // Nothing to be done + break; + + case JobState_Running: + found->second->SchedulePause(); + break; + + default: + throw OrthancException(ErrorCode_InternalError); } - - CheckInvariants(); } - - void Pause(const std::string& id) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else - { - switch (found->second->GetState()) - { - case JobState_Pending: - { - // If the job is pending, we need to reconstruct the - // priority queue to remove it - PendingJobs copy; - std::swap(copy, pendingJobs_); - - assert(pendingJobs_.empty()); - while (!copy.empty()) - { - if (copy.top()->GetId() != id) - { - pendingJobs_.push(copy.top()); - } - - copy.pop(); - } - - found->second->SetState(JobState_Paused); - - break; - } - - case JobState_Retry: - { - RetryJobs::iterator item = retryJobs_.find(found->second); - assert(item != retryJobs_.end()); - retryJobs_.erase(item); - - found->second->SetState(JobState_Paused); - - break; - } - - case JobState_Paused: - case JobState_Success: - case JobState_Failure: - // Nothing to be done - break; - - case JobState_Running: - found->second->SchedulePause(); - break; - - default: - throw OrthancException(ErrorCode_InternalError); - } - } - - CheckInvariants(); - } + CheckInvariants(); + } - void Resume(const std::string& id) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); + void Resume(const std::string& id) + { + LOG(INFO) << "Resuming job: " << id; - JobsIndex::iterator found = jobsIndex_.find(id); + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else if (found->second->GetState() != JobState_Paused) - { - LOG(WARNING) << "Cannot resume a job that is not paused: " << id; - } - else - { - found->second->SetState(JobState_Pending); - pendingJobs_.push(found->second); - pendingJobAvailable_.notify_one(); - } + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else if (found->second->GetState() != JobState_Paused) + { + LOG(WARNING) << "Cannot resume a job that is not paused: " << id; + } + else + { + found->second->SetState(JobState_Pending); + pendingJobs_.push(found->second); + pendingJobAvailable_.notify_one(); + } - CheckInvariants(); - } + CheckInvariants(); + } - void Resubmit(const std::string& id) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); + void Resubmit(const std::string& id) + { + LOG(INFO) << "Resubmitting failed job: " << id; - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else if (found->second->GetState() != JobState_Failure) - { - LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; - } - else - { - bool ok = false; - for (CompletedJobs::iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - if (*it == found->second) - { - ok = true; - completedJobs_.erase(it); - break; - } - } + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); - assert(ok); - - found->second->SetState(JobState_Pending); - pendingJobs_.push(found->second); - pendingJobAvailable_.notify_one(); - } - - CheckInvariants(); + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; } - - - void ScheduleRetries() + else if (found->second->GetState() != JobState_Failure) + { + LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; + } + else { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - RetryJobs copy; - std::swap(copy, retryJobs_); - - const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - - assert(retryJobs_.empty()); - for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) + bool ok = false; + for (CompletedJobs::iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) { - if ((*it)->IsRetryReady(now)) + if (*it == found->second) { - (*it)->SetState(JobState_Pending); - } - else - { - retryJobs_.insert(*it); + ok = true; + completedJobs_.erase(it); + break; } } - CheckInvariants(); + assert(ok); + + found->second->SetState(JobState_Pending); + pendingJobs_.push(found->second); + pendingJobAvailable_.notify_one(); } + CheckInvariants(); + } + - bool GetState(JobState& state, - const std::string& id) + void ScheduleRetries() + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + RetryJobs copy; + std::swap(copy, retryJobs_); + + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + + assert(retryJobs_.empty()); + for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::const_iterator it = jobsIndex_.find(id); - if (it == jobsIndex_.end()) + if ((*it)->IsRetryReady(now)) { - return false; + LOG(INFO) << "Retrying job: " << (*it)->GetId(); + (*it)->SetState(JobState_Pending); + pendingJobs_.push(*it); + pendingJobAvailable_.notify_one(); } else { - state = it->second->GetState(); - return true; + retryJobs_.insert(*it); } } + CheckInvariants(); + } - class RunningJob : public boost::noncopyable + + bool GetState(JobState& state, + const std::string& id) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::const_iterator it = jobsIndex_.find(id); + if (it == jobsIndex_.end()) { - private: - JobsMonitor& that_; - JobHandler* handler_; - JobState targetState_; - unsigned int retryTimeout_; + return false; + } + else + { + state = it->second->GetState(); + return true; + } + } + + + class RunningJob : public boost::noncopyable + { + private: + JobsRegistry& that_; + JobHandler* handler_; + JobState targetState_; + unsigned int retryTimeout_; - public: - RunningJob(JobsMonitor& that, - unsigned int timeout) : - that_(that), - handler_(NULL), - targetState_(JobState_Failure), - retryTimeout_(0) - { - handler_ = that_.WaitPendingJob(timeout); - } + public: + RunningJob(JobsRegistry& that, + unsigned int timeout) : + that_(that), + handler_(NULL), + targetState_(JobState_Failure), + retryTimeout_(0) + { + handler_ = that_.WaitPendingJob(timeout); + } - ~RunningJob() + ~RunningJob() + { + if (IsValid()) { - if (IsValid()) + switch (targetState_) { - switch (targetState_) - { - case JobState_Failure: - that_.MarkRunningAsCompleted(*handler_, false); - break; + case JobState_Failure: + that_.MarkRunningAsCompleted(*handler_, false); + break; - case JobState_Success: - that_.MarkRunningAsCompleted(*handler_, true); - break; + case JobState_Success: + that_.MarkRunningAsCompleted(*handler_, true); + break; - case JobState_Paused: - that_.MarkRunningAsPaused(*handler_); - break; + case JobState_Paused: + that_.MarkRunningAsPaused(*handler_); + break; - case JobState_Retry: - that_.MarkRunningAsRetry(*handler_, retryTimeout_); - break; + case JobState_Retry: + that_.MarkRunningAsRetry(*handler_, retryTimeout_); + break; - default: - assert(0); - } + default: + assert(0); } } + } - bool IsValid() const + bool IsValid() const + { + return handler_ != NULL; + } + + const std::string& GetId() const + { + if (IsValid()) { - return handler_ != NULL; + return handler_->GetId(); } - - const std::string& GetId() const + else { - if (IsValid()) - { - return handler_->GetId(); - } - else - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + int GetPriority() const + { + if (IsValid()) + { + return handler_->GetPriority(); + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + bool IsPauseScheduled() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); } - int GetPriority() const + boost::mutex::scoped_lock lock(that_.mutex_); + that_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + return handler_->IsPauseScheduled(); + } + + IJob& GetJob() + { + if (!IsValid()) { - if (IsValid()) - { - return handler_->GetPriority(); - } - else - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } + throw OrthancException(ErrorCode_BadSequenceOfCalls); } - bool IsPauseScheduled() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - boost::mutex::scoped_lock lock(that_.mutex_); - that_.CheckInvariants(); - assert(handler_->GetState() == JobState_Running); + boost::mutex::scoped_lock lock(that_.mutex_); + that_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); - return handler_->IsPauseScheduled(); - } + return handler_->GetJob(); + } - IJob& GetJob() + void MarkSuccess() + { + if (!IsValid()) { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - boost::mutex::scoped_lock lock(that_.mutex_); - that_.CheckInvariants(); - assert(handler_->GetState() == JobState_Running); - - return handler_->GetJob(); + throw OrthancException(ErrorCode_BadSequenceOfCalls); } - void MarkSuccess() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } + targetState_ = JobState_Success; + } - targetState_ = JobState_Success; - } - - void MarkFailure() + void MarkFailure() + { + if (!IsValid()) { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - targetState_ = JobState_Failure; + throw OrthancException(ErrorCode_BadSequenceOfCalls); } - void MarkPause() + targetState_ = JobState_Failure; + } + + void SchedulePause() + { + if (!IsValid()) { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - targetState_ = JobState_Paused; + throw OrthancException(ErrorCode_BadSequenceOfCalls); } - void MarkRetry(unsigned int timeout) + targetState_ = JobState_Paused; + } + + void MarkRetry(unsigned int timeout) + { + if (!IsValid()) { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } - targetState_ = JobState_Retry; - retryTimeout_ = timeout; - } - }; + targetState_ = JobState_Retry; + retryTimeout_ = timeout; + } }; +}; } @@ -1181,12 +1201,12 @@ }; -static bool CheckState(Orthanc::JobsMonitor& monitor, +static bool CheckState(Orthanc::JobsRegistry& registry, const std::string& id, Orthanc::JobState state) { Orthanc::JobState s; - if (monitor.GetState(s, id)) + if (registry.GetState(s, id)) { return state == s; } @@ -1197,20 +1217,20 @@ } -TEST(JobsMonitor, Priority) +TEST(JobsRegistry, Priority) { - JobsMonitor monitor; + JobsRegistry registry; std::string i1, i2, i3, i4; - monitor.Submit(i1, new DummyJob(), 10); - monitor.Submit(i2, new DummyJob(), 30); - monitor.Submit(i3, new DummyJob(), 20); - monitor.Submit(i4, new DummyJob(), 5); + registry.Submit(i1, new DummyJob(), 10); + registry.Submit(i2, new DummyJob(), 30); + registry.Submit(i3, new DummyJob(), 20); + registry.Submit(i4, new DummyJob(), 5); - monitor.SetMaxCompletedJobs(2); + registry.SetMaxCompletedJobs(2); std::set id; - monitor.ListJobs(id); + registry.ListJobs(id); ASSERT_EQ(4u, id.size()); ASSERT_TRUE(id.find(i1) != id.end()); @@ -1218,100 +1238,268 @@ ASSERT_TRUE(id.find(i3) != id.end()); ASSERT_TRUE(id.find(i4) != id.end()); - ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Pending)); + ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending)); { - JobsMonitor::RunningJob job(monitor, 0); + JobsRegistry::RunningJob job(registry, 0); ASSERT_TRUE(job.IsValid()); ASSERT_EQ(30, job.GetPriority()); ASSERT_EQ(i2, job.GetId()); - ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Running)); + ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running)); } - ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Failure)); - ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Pending)); + ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Failure)); + ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Pending)); { - JobsMonitor::RunningJob job(monitor, 0); + JobsRegistry::RunningJob job(registry, 0); ASSERT_TRUE(job.IsValid()); ASSERT_EQ(20, job.GetPriority()); ASSERT_EQ(i3, job.GetId()); job.MarkSuccess(); - ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Running)); + ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Running)); } - ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Success)); + ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Success)); { - JobsMonitor::RunningJob job(monitor, 0); + JobsRegistry::RunningJob job(registry, 0); ASSERT_TRUE(job.IsValid()); ASSERT_EQ(10, job.GetPriority()); ASSERT_EQ(i1, job.GetId()); } { - JobsMonitor::RunningJob job(monitor, 0); + JobsRegistry::RunningJob job(registry, 0); ASSERT_TRUE(job.IsValid()); ASSERT_EQ(5, job.GetPriority()); ASSERT_EQ(i4, job.GetId()); } { - JobsMonitor::RunningJob job(monitor, 1); + JobsRegistry::RunningJob job(registry, 1); ASSERT_FALSE(job.IsValid()); } Orthanc::JobState s; - ASSERT_TRUE(monitor.GetState(s, i1)); - ASSERT_FALSE(monitor.GetState(s, i2)); // Removed because oldest - ASSERT_FALSE(monitor.GetState(s, i3)); // Removed because second oldest - ASSERT_TRUE(monitor.GetState(s, i4)); + ASSERT_TRUE(registry.GetState(s, i1)); + ASSERT_FALSE(registry.GetState(s, i2)); // Removed because oldest + ASSERT_FALSE(registry.GetState(s, i3)); // Removed because second oldest + ASSERT_TRUE(registry.GetState(s, i4)); - monitor.SetMaxCompletedJobs(1); // (*) - ASSERT_FALSE(monitor.GetState(s, i1)); // Just discarded by (*) - ASSERT_TRUE(monitor.GetState(s, i4)); + registry.SetMaxCompletedJobs(1); // (*) + ASSERT_FALSE(registry.GetState(s, i1)); // Just discarded by (*) + ASSERT_TRUE(registry.GetState(s, i4)); } -TEST(JobsMonitor, Resubmit) +TEST(JobsRegistry, Simultaneous) { - JobsMonitor monitor; + JobsRegistry registry; + + std::string i1, i2; + registry.Submit(i1, new DummyJob(), 20); + registry.Submit(i2, new DummyJob(), 10); + + ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Pending)); + ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending)); + + { + JobsRegistry::RunningJob job1(registry, 0); + JobsRegistry::RunningJob job2(registry, 0); + + ASSERT_TRUE(job1.IsValid()); + ASSERT_TRUE(job2.IsValid()); + + job1.MarkFailure(); + job2.MarkSuccess(); + + ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Running)); + ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Failure)); + ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Success)); +} + + +TEST(JobsRegistry, Resubmit) +{ + JobsRegistry registry; std::string id; - monitor.Submit(id, new DummyJob(), 10); + registry.Submit(id, new DummyJob(), 10); - ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); - monitor.Resubmit(id); - ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); + registry.Resubmit(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); { - JobsMonitor::RunningJob job(monitor, 0); + JobsRegistry::RunningJob job(registry, 0); ASSERT_TRUE(job.IsValid()); job.MarkFailure(); - ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); - monitor.Resubmit(id); - ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); + registry.Resubmit(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); } - ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Failure)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure)); - monitor.Resubmit(id); - ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending)); + registry.Resubmit(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); { - JobsMonitor::RunningJob job(monitor, 0); + JobsRegistry::RunningJob job(registry, 0); ASSERT_TRUE(job.IsValid()); ASSERT_EQ(id, job.GetId()); job.MarkSuccess(); - ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); + + registry.Resubmit(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); +} + + +TEST(JobsRegistry, Retry) +{ + JobsRegistry registry; + + std::string id; + registry.Submit(id, new DummyJob(), 10); + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + + { + JobsRegistry::RunningJob job(registry, 0); + ASSERT_TRUE(job.IsValid()); + job.MarkRetry(0); + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); + + registry.Resubmit(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); + + registry.ScheduleRetries(); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + + { + JobsRegistry::RunningJob job(registry, 0); + ASSERT_TRUE(job.IsValid()); + job.MarkSuccess(); + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); } - ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Success)); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); +} + + +TEST(JobsRegistry, PausePending) +{ + JobsRegistry registry; + + std::string id; + registry.Submit(id, new DummyJob(), 10); + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + + registry.Pause(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); + + registry.Pause(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); + + registry.Resubmit(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); + + registry.Resume(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); } + + +TEST(JobsRegistry, PauseRunning) +{ + JobsRegistry registry; + + std::string id; + registry.Submit(id, new DummyJob(), 10); + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + + { + JobsRegistry::RunningJob job(registry, 0); + ASSERT_TRUE(job.IsValid()); + + registry.Resubmit(id); + job.SchedulePause(); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); + + registry.Resubmit(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); + + registry.Resume(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + + { + JobsRegistry::RunningJob job(registry, 0); + ASSERT_TRUE(job.IsValid()); + + job.MarkSuccess(); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); +} + + +TEST(JobsRegistry, PauseRetry) +{ + JobsRegistry registry; + + std::string id; + registry.Submit(id, new DummyJob(), 10); + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + + { + JobsRegistry::RunningJob job(registry, 0); + ASSERT_TRUE(job.IsValid()); + + job.MarkRetry(0); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); + + registry.Pause(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); + + registry.Resume(id); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); + + { + JobsRegistry::RunningJob job(registry, 0); + ASSERT_TRUE(job.IsValid()); + + job.MarkSuccess(); + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); + } + + ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); +}