# HG changeset patch # User Sebastien Jodogne # Date 1525448822 -7200 # Node ID 98dfc1948d00948a90f9391ea95263a0cab0931b # Parent 1e66fe3ddf9fa3821bb517a83984a02010eca565 RunningJob::ExecuteStep() diff -r 1e66fe3ddf9f -r 98dfc1948d00 UnitTestsSources/MultiThreadingTests.cpp --- a/UnitTestsSources/MultiThreadingTests.cpp Fri May 04 17:28:47 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri May 04 17:47:02 2018 +0200 @@ -306,7 +306,7 @@ public: explicit JobStepResult(JobStepCode status) : - status_(status) + status_(status) { } @@ -328,8 +328,8 @@ public: RetryResult(unsigned int timeout) : - JobStepResult(JobStepCode_Retry), - timeout_(timeout) + JobStepResult(JobStepCode_Retry), + timeout_(timeout) { } @@ -353,16 +353,18 @@ virtual float GetProgress() = 0; - virtual void FormatStatus(Json::Value& value) = 0; + virtual void GetDescription(Json::Value& value) = 0; }; - struct JobStatus + class JobStatus { + private: ErrorCode errorCode_; float progress_; Json::Value description_; + public: JobStatus() : errorCode_(ErrorCode_Success), progress_(0), @@ -371,16 +373,32 @@ } JobStatus(ErrorCode code, - float progress) : + IJob& job) : errorCode_(code), - progress_(progress), - description_(Json::objectValue) + progress_(job.GetProgress()) { - if (progress < 0 || - progress > 1) + if (progress_ < 0 || + progress_ > 1) { throw OrthancException(ErrorCode_ParameterOutOfRange); } + + job.GetDescription(description_); + } + + ErrorCode GetErrorCode() const + { + return errorCode_; + } + + float GetProgress() const + { + return progress_; + } + + const Json::Value& GetDescription() const + { + return description_; } }; @@ -414,7 +432,7 @@ status_(status) { float ms = static_cast(runtime_.total_milliseconds()); - float remaining = boost::math::llround(1.0f - status_.progress_) * ms; + float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms; eta_ = infoTime_ + boost::posix_time::milliseconds(remaining); } @@ -601,14 +619,14 @@ } } - JobStatus& GetLastStatus() + const JobStatus& GetLastStatus() const { return lastStatus_; } - const JobStatus& GetLastStatus() const + void SetLastStatus(const JobStatus& status) { - return lastStatus_; + lastStatus_ = status; } }; @@ -626,747 +644,767 @@ }; typedef std::map JobsIndex; - typedef std::list CompletedJobs; + typedef std::list CompletedJobs; typedef std::set RetryJobs; typedef std::priority_queue, // Could be a "std::deque" PriorityComparator> PendingJobs; - boost::mutex mutex_; - JobsIndex jobsIndex_; - PendingJobs pendingJobs_; - CompletedJobs completedJobs_; - RetryJobs retryJobs_; + boost::mutex mutex_; + JobsIndex jobsIndex_; + PendingJobs pendingJobs_; + CompletedJobs completedJobs_; + RetryJobs retryJobs_; - boost::condition_variable pendingJobAvailable_; - size_t maxCompletedJobs_; + boost::condition_variable pendingJobAvailable_; + size_t maxCompletedJobs_; #ifndef NDEBUG - bool IsPendingJob(const JobHandler& job) const - { - PendingJobs copy = pendingJobs_; - while (!copy.empty()) - { - if (copy.top() == &job) - { - return true; - } - - copy.pop(); - } - - return false; - } - - bool IsCompletedJob(const JobHandler& job) const - { - for (CompletedJobs::const_iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - if (*it == &job) - { - return true; - } - } - - return false; - } - - bool IsRetryJob(JobHandler& job) const - { - return retryJobs_.find(&job) != retryJobs_.end(); - } -#endif - - - void CheckInvariants() - { -#ifndef NDEBUG + bool IsPendingJob(const JobHandler& job) const { PendingJobs copy = pendingJobs_; while (!copy.empty()) { - assert(copy.top()->GetState() == JobState_Pending); + if (copy.top() == &job) + { + return true; + } + copy.pop(); } - } - assert(completedJobs_.size() <= maxCompletedJobs_); - - for (CompletedJobs::const_iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - assert((*it)->GetState() == JobState_Success || - (*it)->GetState() == JobState_Failure); - } - - for (RetryJobs::const_iterator it = retryJobs_.begin(); - it != retryJobs_.end(); ++it) - { - assert((*it)->GetState() == JobState_Retry); + return false; } - for (JobsIndex::iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) + bool IsCompletedJob(JobHandler& job) const { - JobHandler& job = *it->second; - - assert(job.GetId() == it->first); - - switch (job.GetState()) + for (CompletedJobs::const_iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) { - case JobState_Pending: - assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); - break; - - case JobState_Success: - case JobState_Failure: - assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); - break; - - case JobState_Retry: - assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); - break; - - case JobState_Running: - case JobState_Paused: - assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); - break; + if (*it == &job) + { + return true; + } + } - default: - throw OrthancException(ErrorCode_InternalError); - } + return false; + } + + bool IsRetryJob(JobHandler& job) const + { + return retryJobs_.find(&job) != retryJobs_.end(); } #endif - } - - - void ForgetOldCompletedJobs() - { - if (maxCompletedJobs_ != 0) - { - while (completedJobs_.size() > maxCompletedJobs_) - { - assert(completedJobs_.front() != NULL); - - std::string id = completedJobs_.front()->GetId(); - assert(jobsIndex_.find(id) != jobsIndex_.end()); - - jobsIndex_.erase(id); - delete(completedJobs_.front()); - completedJobs_.pop_front(); - } - } - } - - - void MarkRunningAsCompleted(JobHandler& job, - bool success) - { - LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") - << ": " << job.GetId(); - - CheckInvariants(); - assert(job.GetState() == JobState_Running); - - job.SetState(success ? JobState_Success : JobState_Failure); - - completedJobs_.push_back(&job); - ForgetOldCompletedJobs(); - - CheckInvariants(); - } - - - void MarkRunningAsRetry(JobHandler& job, - unsigned int timeout) - { - LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); - - CheckInvariants(); - - assert(job.GetState() == JobState_Running && - retryJobs_.find(&job) == retryJobs_.end()); - - retryJobs_.insert(&job); - job.SetRetryState(timeout); - - CheckInvariants(); - } - void MarkRunningAsPaused(JobHandler& job) - { - LOG(INFO) << "Job paused: " << job.GetId(); + void CheckInvariants() const + { +#ifndef NDEBUG + { + PendingJobs copy = pendingJobs_; + while (!copy.empty()) + { + assert(copy.top()->GetState() == JobState_Pending); + copy.pop(); + } + } - CheckInvariants(); - assert(job.GetState() == JobState_Running); + assert(completedJobs_.size() <= maxCompletedJobs_); - job.SetState(JobState_Paused); + for (CompletedJobs::const_iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) + { + assert((*it)->GetState() == JobState_Success || + (*it)->GetState() == JobState_Failure); + } - CheckInvariants(); - } - + for (RetryJobs::const_iterator it = retryJobs_.begin(); + it != retryJobs_.end(); ++it) + { + assert((*it)->GetState() == JobState_Retry); + } -public: - JobsRegistry() : - maxCompletedJobs_(10) - { - } + for (JobsIndex::const_iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) + { + JobHandler& job = *it->second; + assert(job.GetId() == it->first); - ~JobsRegistry() - { - for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) - { - assert(it->second != NULL); - delete it->second; + switch (job.GetState()) + { + case JobState_Pending: + assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); + break; + + case JobState_Success: + case JobState_Failure: + assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); + break; + + case JobState_Retry: + assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); + break; + + case JobState_Running: + case JobState_Paused: + assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + } +#endif } - } - - - void SetMaxCompletedJobs(size_t i) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - maxCompletedJobs_ = i; - ForgetOldCompletedJobs(); - - CheckInvariants(); - } - - - void ListJobs(std::set& target) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - for (JobsIndex::const_iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) - { - target.insert(it->first); - } - } - void Submit(std::string& id, - IJob* job, // Takes ownership - int priority) - { - std::auto_ptr handler(new JobHandler(job, priority)); - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - id = handler->GetId(); - - pendingJobs_.push(handler.get()); - pendingJobAvailable_.notify_one(); - - jobsIndex_.insert(std::make_pair(id, handler.release())); - - LOG(INFO) << "New job submitted: " << id; - - CheckInvariants(); - } - - - void Submit(IJob* job, // Takes ownership - int priority) - { - std::string id; - Submit(id, job, priority); - } - + void ForgetOldCompletedJobs() + { + if (maxCompletedJobs_ != 0) + { + while (completedJobs_.size() > maxCompletedJobs_) + { + assert(completedJobs_.front() != NULL); - void SetPriority(const std::string& id, - int priority) - { - LOG(INFO) << "Changing priority to " << priority << " for job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); + std::string id = completedJobs_.front()->GetId(); + assert(jobsIndex_.find(id) != jobsIndex_.end()); - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else - { - found->second->SetPriority(priority); - - if (found->second->GetState() == JobState_Pending) - { - // If the job is pending, we need to reconstruct the - // priority queue, as the heap condition has changed - - PendingJobs copy; - std::swap(copy, pendingJobs_); - - assert(pendingJobs_.empty()); - while (!copy.empty()) - { - pendingJobs_.push(copy.top()); - copy.pop(); + jobsIndex_.erase(id); + delete(completedJobs_.front()); + completedJobs_.pop_front(); } } } - CheckInvariants(); - } + + void MarkRunningAsCompleted(JobHandler& job, + bool success) + { + LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") + << ": " << job.GetId(); + + CheckInvariants(); + assert(job.GetState() == JobState_Running); + + job.SetState(success ? JobState_Success : JobState_Failure); + + completedJobs_.push_back(&job); + ForgetOldCompletedJobs(); + + CheckInvariants(); + } + + + void MarkRunningAsRetry(JobHandler& job, + unsigned int timeout) + { + LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); + + CheckInvariants(); + + assert(job.GetState() == JobState_Running && + retryJobs_.find(&job) == retryJobs_.end()); + + retryJobs_.insert(&job); + job.SetRetryState(timeout); + + CheckInvariants(); + } + + + void MarkRunningAsPaused(JobHandler& job) + { + LOG(INFO) << "Job paused: " << job.GetId(); + + CheckInvariants(); + assert(job.GetState() == JobState_Running); + + job.SetState(JobState_Paused); + + CheckInvariants(); + } + + + public: + JobsRegistry() : + maxCompletedJobs_(10) + { + } + + + ~JobsRegistry() + { + for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) + { + assert(it->second != NULL); + delete it->second; + } + } - void Pause(const std::string& id) - { - LOG(INFO) << "Pausing job: " << id; + void SetMaxCompletedJobs(size_t i) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + maxCompletedJobs_ = i; + ForgetOldCompletedJobs(); + + CheckInvariants(); + } + + + void ListJobs(std::set& target) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); + for (JobsIndex::const_iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) + { + target.insert(it->first); + } + } + - JobsIndex::iterator found = jobsIndex_.find(id); + void Submit(std::string& id, + IJob* job, // Takes ownership + int priority) + { + std::auto_ptr handler(new JobHandler(job, priority)); + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + id = handler->GetId(); - if (found == jobsIndex_.end()) + pendingJobs_.push(handler.get()); + pendingJobAvailable_.notify_one(); + + jobsIndex_.insert(std::make_pair(id, handler.release())); + + LOG(INFO) << "New job submitted: " << id; + + CheckInvariants(); + } + + + void Submit(IJob* job, // Takes ownership + int priority) { - LOG(WARNING) << "Unknown job: " << id; + std::string id; + Submit(id, job, priority); } - else + + + void SetPriority(const std::string& id, + int priority) { - switch (found->second->GetState()) + LOG(INFO) << "Changing priority to " << priority << " for job: " << id; + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) { - case JobState_Pending: + LOG(WARNING) << "Unknown job: " << id; + } + else + { + found->second->SetPriority(priority); + + if (found->second->GetState() == JobState_Pending) { // If the job is pending, we need to reconstruct the - // priority queue to remove it + // priority queue, as the heap condition has changed + PendingJobs copy; std::swap(copy, pendingJobs_); assert(pendingJobs_.empty()); while (!copy.empty()) { - if (copy.top()->GetId() != id) - { - pendingJobs_.push(copy.top()); - } - + pendingJobs_.push(copy.top()); copy.pop(); } - - found->second->SetState(JobState_Paused); - - break; - } - - case JobState_Retry: - { - RetryJobs::iterator item = retryJobs_.find(found->second); - assert(item != retryJobs_.end()); - retryJobs_.erase(item); - - found->second->SetState(JobState_Paused); - - break; - } - - case JobState_Paused: - case JobState_Success: - case JobState_Failure: - // Nothing to be done - break; - - case JobState_Running: - found->second->SchedulePause(); - break; - - default: - throw OrthancException(ErrorCode_InternalError); - } - } - - CheckInvariants(); - } - - - void Resume(const std::string& id) - { - LOG(INFO) << "Resuming job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else if (found->second->GetState() != JobState_Paused) - { - LOG(WARNING) << "Cannot resume a job that is not paused: " << id; - } - else - { - found->second->SetState(JobState_Pending); - pendingJobs_.push(found->second); - pendingJobAvailable_.notify_one(); - } - - CheckInvariants(); - } - - - void Resubmit(const std::string& id) - { - LOG(INFO) << "Resubmitting failed job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else if (found->second->GetState() != JobState_Failure) - { - LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; - } - else - { - bool ok = false; - for (CompletedJobs::iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - if (*it == found->second) - { - ok = true; - completedJobs_.erase(it); - break; } } - assert(ok); - - found->second->SetState(JobState_Pending); - pendingJobs_.push(found->second); - pendingJobAvailable_.notify_one(); + CheckInvariants(); } - CheckInvariants(); - } - - void ScheduleRetries() - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - RetryJobs copy; - std::swap(copy, retryJobs_); - - const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + void Pause(const std::string& id) + { + LOG(INFO) << "Pausing job: " << id; - assert(retryJobs_.empty()); - for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) - { - if ((*it)->IsRetryReady(now)) + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) { - LOG(INFO) << "Retrying job: " << (*it)->GetId(); - (*it)->SetState(JobState_Pending); - pendingJobs_.push(*it); - pendingJobAvailable_.notify_one(); + LOG(WARNING) << "Unknown job: " << id; } else { - retryJobs_.insert(*it); - } - } + switch (found->second->GetState()) + { + case JobState_Pending: + { + // If the job is pending, we need to reconstruct the + // priority queue to remove it + PendingJobs copy; + std::swap(copy, pendingJobs_); - CheckInvariants(); - } + assert(pendingJobs_.empty()); + while (!copy.empty()) + { + if (copy.top()->GetId() != id) + { + pendingJobs_.push(copy.top()); + } + copy.pop(); + } + + found->second->SetState(JobState_Paused); + + break; + } - bool GetState(JobState& state, - const std::string& id) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); + case JobState_Retry: + { + RetryJobs::iterator item = retryJobs_.find(found->second); + assert(item != retryJobs_.end()); + retryJobs_.erase(item); + + found->second->SetState(JobState_Paused); + + break; + } - JobsIndex::const_iterator it = jobsIndex_.find(id); - if (it == jobsIndex_.end()) - { - return false; + case JobState_Paused: + case JobState_Success: + case JobState_Failure: + // Nothing to be done + break; + + case JobState_Running: + found->second->SchedulePause(); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + } + + CheckInvariants(); } - else - { - state = it->second->GetState(); - return true; - } - } - class RunningJob : public boost::noncopyable - { - private: - JobsRegistry& registry_; - JobHandler* handler_; // Can only be accessed if the registry - // mutex is locked! - IJob* job_; // Will by design be in mutual exclusion, - // because only one RunningJob can be - // executed at a time on a JobHandler + void Resume(const std::string& id) + { + LOG(INFO) << "Resuming job: " << id; + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else if (found->second->GetState() != JobState_Paused) + { + LOG(WARNING) << "Cannot resume a job that is not paused: " << id; + } + else + { + found->second->SetState(JobState_Pending); + pendingJobs_.push(found->second); + pendingJobAvailable_.notify_one(); + } + + CheckInvariants(); + } + - std::string id_; - int priority_; - JobState targetState_; - unsigned int targetRetryTimeout_; - - public: - RunningJob(JobsRegistry& registry, - unsigned int timeout) : - registry_(registry), - handler_(NULL), - targetState_(JobState_Failure), - targetRetryTimeout_(0) + void Resubmit(const std::string& id) { - { - boost::mutex::scoped_lock lock(registry_.mutex_); + LOG(INFO) << "Resubmitting failed job: " << id; + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); - while (registry_.pendingJobs_.empty()) + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else if (found->second->GetState() != JobState_Failure) + { + LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; + } + else + { + bool ok = false; + for (CompletedJobs::iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) { - if (timeout == 0) - { - registry_.pendingJobAvailable_.wait(lock); - } - else + if (*it == found->second) { - bool success = registry_.pendingJobAvailable_.timed_wait - (lock, boost::posix_time::milliseconds(timeout)); - if (!success) - { - // No pending job - return; - } + ok = true; + completedJobs_.erase(it); + break; } } - handler_ = registry_.pendingJobs_.top(); - registry_.pendingJobs_.pop(); + assert(ok); - assert(handler_->GetState() == JobState_Pending); - handler_->SetState(JobState_Running); + found->second->SetState(JobState_Pending); + pendingJobs_.push(found->second); + pendingJobAvailable_.notify_one(); + } - job_ = &handler_->GetJob(); - id_ = handler_->GetId(); - priority_ = handler_->GetPriority(); - } + CheckInvariants(); } - ~RunningJob() - { - if (IsValid()) - { - boost::mutex::scoped_lock lock(registry_.mutex_); - switch (targetState_) - { - case JobState_Failure: - registry_.MarkRunningAsCompleted(*handler_, false); - break; + void ScheduleRetries() + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + RetryJobs copy; + std::swap(copy, retryJobs_); + + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - case JobState_Success: - registry_.MarkRunningAsCompleted(*handler_, true); - break; - - case JobState_Paused: - registry_.MarkRunningAsPaused(*handler_); - break; - - case JobState_Retry: - registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); - break; - - default: - assert(0); + assert(retryJobs_.empty()); + for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) + { + if ((*it)->IsRetryReady(now)) + { + LOG(INFO) << "Retrying job: " << (*it)->GetId(); + (*it)->SetState(JobState_Pending); + pendingJobs_.push(*it); + pendingJobAvailable_.notify_one(); + } + else + { + retryJobs_.insert(*it); } } + + CheckInvariants(); } - bool IsValid() const - { - return (handler_ != NULL && - job_ != NULL); - } - const std::string& GetId() const + bool GetState(JobState& state, + const std::string& id) { - if (!IsValid()) + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::const_iterator it = jobsIndex_.find(id); + if (it == jobsIndex_.end()) { - throw OrthancException(ErrorCode_BadSequenceOfCalls); + return false; } else { - return id_; - } - } - - int GetPriority() const - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return priority_; - } - } - - bool IsPauseScheduled() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - boost::mutex::scoped_lock lock(registry_.mutex_); - registry_.CheckInvariants(); - assert(handler_->GetState() == JobState_Running); - - return handler_->IsPauseScheduled(); - } - } - - void MarkSuccess() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Success; + state = it->second->GetState(); + return true; } } - void MarkFailure() + + class RunningJob : public boost::noncopyable { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Failure; - } - } + private: + JobsRegistry& registry_; + JobHandler* handler_; // Can only be accessed if the registry + // mutex is locked! + IJob* job_; // Will by design be in mutual exclusion, + // because only one RunningJob can be + // executed at a time on a JobHandler - void MarkPause() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else + std::string id_; + int priority_; + JobState targetState_; + unsigned int targetRetryTimeout_; + + public: + RunningJob(JobsRegistry& registry, + unsigned int timeout) : + registry_(registry), + handler_(NULL), + targetState_(JobState_Failure), + targetRetryTimeout_(0) { - targetState_ = JobState_Paused; - } - } + { + boost::mutex::scoped_lock lock(registry_.mutex_); - void MarkRetry(unsigned int timeout) - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Retry; - targetRetryTimeout_ = timeout; - } - } + while (registry_.pendingJobs_.empty()) + { + if (timeout == 0) + { + registry_.pendingJobAvailable_.wait(lock); + } + else + { + bool success = registry_.pendingJobAvailable_.timed_wait + (lock, boost::posix_time::milliseconds(timeout)); + if (!success) + { + // No pending job + return; + } + } + } - /*void ExecuteStep() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); + handler_ = registry_.pendingJobs_.top(); + registry_.pendingJobs_.pop(); + + assert(handler_->GetState() == JobState_Pending); + handler_->SetState(JobState_Running); + + job_ = &handler_->GetJob(); + id_ = handler_->GetId(); + priority_ = handler_->GetPriority(); + } } - if (IsPauseScheduled()) + ~RunningJob() { - targetState_ = JobState_Paused; - return; + if (IsValid()) + { + boost::mutex::scoped_lock lock(registry_.mutex_); + + switch (targetState_) + { + case JobState_Failure: + registry_.MarkRunningAsCompleted(*handler_, false); + break; + + case JobState_Success: + registry_.MarkRunningAsCompleted(*handler_, true); + break; + + case JobState_Paused: + registry_.MarkRunningAsPaused(*handler_); + break; + + case JobState_Retry: + registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); + break; + + default: + assert(0); + } + } } - std::auto_ptr result; - ErrorCode code; - + bool IsValid() const { - bool ok = false; + return (handler_ != NULL && + job_ != NULL); + } - try + const std::string& GetId() const + { + if (!IsValid()) { - result.reset(job_->ExecuteStep()); - ok = true; - - if (result->GetCode() == JobStepCode_Failure) - { - code = ErrorCode_InternalError; - } + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return id_; } - catch (OrthancException& e) + } + + int GetPriority() const + { + if (!IsValid()) { - code = e.GetErrorCode(); - } - catch (boost::bad_lexical_cast&) - { - code = ErrorCode_BadFileFormat; + throw OrthancException(ErrorCode_BadSequenceOfCalls); } - catch (...) + else { - code = ErrorCode_InternalError; + return priority_; } + } - if (ok) + bool IsPauseScheduled() + { + if (!IsValid()) { - code = ErrorCode_Success; + throw OrthancException(ErrorCode_BadSequenceOfCalls); } else { - result.reset(new JobStepResult(JobStepCode_Failure)); + boost::mutex::scoped_lock lock(registry_.mutex_); + registry_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + return handler_->IsPauseScheduled(); + } + } + + void MarkSuccess() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Success; + } + } + + void MarkFailure() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Failure; + } + } + + void MarkPause() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Paused; + } + } + + void MarkRetry(unsigned int timeout) + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Retry; + targetRetryTimeout_ = timeout; + } + } + + void UpdateStatus(const JobStatus& status) + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + boost::mutex::scoped_lock lock(registry_.mutex_); + registry_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + handler_->SetLastStatus(status); } } - switch (result->GetCode()) + bool ExecuteStep() { - case JobStepCode_Success: - targetState_ = JobState_Success; - break; + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + if (IsPauseScheduled()) + { + targetState_ = JobState_Paused; + return false; + } - case JobStepCode_Failure: - targetState_ = JobState_Failure; - break; + std::auto_ptr result; + ErrorCode code; + + { + bool ok = false; + + try + { + result.reset(job_->ExecuteStep()); + ok = true; - case JobStepCode_Continue: - targetState_ = JobState_Running; - break; + if (result->GetCode() == JobStepCode_Failure) + { + code = ErrorCode_InternalError; + } + } + catch (OrthancException& e) + { + code = e.GetErrorCode(); + } + catch (boost::bad_lexical_cast&) + { + code = ErrorCode_BadFileFormat; + } + catch (...) + { + code = ErrorCode_InternalError; + } + + if (ok) + { + code = ErrorCode_Success; + } + else + { + result.reset(new JobStepResult(JobStepCode_Failure)); + } + } - case JobStepCode_Retry: - targetState_ = JobState_Retry; - targetRetryTimeout_ = dynamic_cast(*result).GetTimeout(); - break; + { + JobStatus status(code, *job_); + UpdateStatus(status); + } + + switch (result->GetCode()) + { + case JobStepCode_Success: + targetState_ = JobState_Success; + return false; + + case JobStepCode_Failure: + targetState_ = JobState_Failure; + return false; - default: - throw OrthancException(ErrorCode_InternalError); + case JobStepCode_Retry: + targetState_ = JobState_Retry; + targetRetryTimeout_ = dynamic_cast(*result).GetTimeout(); + return false; + + case JobStepCode_Continue: + return true; + + default: + throw OrthancException(ErrorCode_InternalError); + } } - }*/ + }; }; -}; } @@ -1383,7 +1421,7 @@ } explicit DummyJob(JobStepResult result) : - result_(result) + result_(result) { } @@ -1401,7 +1439,7 @@ return 0; } - virtual void FormatStatus(Json::Value& value) + virtual void GetDescription(Json::Value& value) { } };