Mercurial > hg > orthanc
diff UnitTestsSources/MultiThreadingTests.cpp @ 2569:2af17cd5eb1f jobs
reorganization
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 07 May 2018 15:37:20 +0200 |
parents | a46094602346 |
children | 2e879c796ec7 |
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp Mon May 07 15:02:34 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Mon May 07 15:37:20 2018 +0200 @@ -34,11 +34,13 @@ #include "PrecompiledHeadersUnitTests.h" #include "gtest/gtest.h" -#include "../OrthancServer/Scheduler/ServerScheduler.h" +#include "../Core/JobsEngine/JobStepRetry.h" +#include "../Core/JobsEngine/JobsEngine.h" +#include "../Core/MultiThreading/Locker.h" #include "../Core/OrthancException.h" #include "../Core/SystemToolbox.h" #include "../Core/Toolbox.h" -#include "../Core/MultiThreading/Locker.h" +#include "../OrthancServer/Scheduler/ServerScheduler.h" using namespace Orthanc; @@ -239,1486 +241,6 @@ - - -#if !defined(ORTHANC_SANDBOXED) -# error The macro ORTHANC_SANDBOXED must be defined -#endif - -#if ORTHANC_SANDBOXED == 1 -# error The job engine cannot be used in sandboxed environments -#endif - -#include "../Core/Logging.h" - -#include <boost/math/special_functions/round.hpp> -#include <boost/date_time/posix_time/posix_time.hpp> -#include <queue> - -namespace Orthanc -{ - enum JobState - { - JobState_Pending, - JobState_Running, - JobState_Success, - JobState_Failure, - JobState_Paused, - JobState_Retry - }; - - static const char* EnumerationToString(JobState state) - { - switch (state) - { - case JobState_Pending: - return "Pending"; - - case JobState_Running: - return "Running"; - - case JobState_Success: - return "Success"; - - case JobState_Failure: - return "Failure"; - - case JobState_Paused: - return "Paused"; - - case JobState_Retry: - return "Retry"; - - default: - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - } - - enum JobStepCode - { - JobStepCode_Success, - JobStepCode_Failure, - JobStepCode_Continue, - JobStepCode_Retry - }; - - class JobStepResult - { - private: - JobStepCode code_; - - public: - explicit JobStepResult(JobStepCode code) : - code_(code) - { - } - - virtual ~JobStepResult() - { - } - - JobStepCode GetCode() const - { - return code_; - } - }; - - - class JobStepRetry : public JobStepResult - { - private: - unsigned int timeout_; // Retry after "timeout_" milliseconds - - public: - JobStepRetry(unsigned int timeout) : - JobStepResult(JobStepCode_Retry), - timeout_(timeout) - { - } - - unsigned int GetTimeout() const - { - return timeout_; - } - }; - - - class IJob : public boost::noncopyable - { - public: - virtual ~IJob() - { - } - - virtual JobStepResult* ExecuteStep() = 0; - - virtual void ReleaseResources() = 0; // For pausing jobs - - virtual float GetProgress() = 0; - - virtual void GetDescription(Json::Value& value) = 0; - }; - - - class JobStatus - { - private: - ErrorCode errorCode_; - float progress_; - Json::Value description_; - - public: - JobStatus() : - errorCode_(ErrorCode_InternalError), - progress_(0), - description_(Json::objectValue) - { - } - - JobStatus(ErrorCode code, - IJob& job) : - errorCode_(code), - progress_(job.GetProgress()) - { - if (progress_ < 0) - { - progress_ = 0; - } - - if (progress_ > 1) - { - progress_ = 1; - } - - job.GetDescription(description_); - } - - ErrorCode GetErrorCode() const - { - return errorCode_; - } - - float GetProgress() const - { - return progress_; - } - - const Json::Value& GetDescription() const - { - return description_; - } - }; - - - class JobInfo - { - private: - std::string id_; - int priority_; - JobState state_; - boost::posix_time::ptime timestamp_; - boost::posix_time::ptime creationTime_; - boost::posix_time::ptime lastStateChangeTime_; - boost::posix_time::time_duration runtime_; - bool hasEta_; - boost::posix_time::ptime eta_; - JobStatus status_; - - public: - JobInfo(const std::string& id, - int priority, - JobState state, - const JobStatus& status, - const boost::posix_time::ptime& creationTime, - const boost::posix_time::ptime& lastStateChangeTime, - const boost::posix_time::time_duration& runtime) : - id_(id), - priority_(priority), - state_(state), - timestamp_(boost::posix_time::microsec_clock::universal_time()), - creationTime_(creationTime), - lastStateChangeTime_(lastStateChangeTime), - runtime_(runtime), - hasEta_(false), - status_(status) - { - if (state_ == JobState_Running) - { - float ms = static_cast<float>(runtime_.total_milliseconds()); - - if (status_.GetProgress() > 0.01f && - ms > 0.01f) - { - float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms; - eta_ = timestamp_ + boost::posix_time::milliseconds(remaining); - hasEta_ = true; - } - } - } - - JobInfo() : - priority_(0), - state_(JobState_Failure), - timestamp_(boost::posix_time::microsec_clock::universal_time()), - creationTime_(timestamp_), - lastStateChangeTime_(timestamp_), - runtime_(boost::posix_time::milliseconds(0)), - hasEta_(false) - { - } - - const std::string& GetIdentifier() const - { - return id_; - } - - int GetPriority() const - { - return priority_; - } - - JobState GetState() const - { - return state_; - } - - const boost::posix_time::ptime& GetInfoTime() const - { - return timestamp_; - } - - const boost::posix_time::ptime& GetCreationTime() const - { - return creationTime_; - } - - const boost::posix_time::time_duration& GetRuntime() const - { - return runtime_; - } - - bool HasEstimatedTimeOfArrival() const - { - return hasEta_; - } - - bool HasCompletionTime() const - { - return (state_ == JobState_Success || - state_ == JobState_Failure); - } - - const boost::posix_time::ptime& GetEstimatedTimeOfArrival() const - { - if (hasEta_) - { - return eta_; - } - else - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - const boost::posix_time::ptime& GetCompletionTime() const - { - if (HasCompletionTime()) - { - return lastStateChangeTime_; - } - else - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - const JobStatus& GetStatus() const - { - return status_; - } - - JobStatus& GetStatus() - { - return status_; - } - - void Format(Json::Value& target) const - { - target = Json::objectValue; - target["ID"] = id_; - target["Priority"] = priority_; - target["ErrorCode"] = static_cast<int>(status_.GetErrorCode()); - target["ErrorDescription"] = EnumerationToString(status_.GetErrorCode()); - target["State"] = EnumerationToString(state_); - target["Timestamp"] = boost::posix_time::to_iso_string(timestamp_); - target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_); - target["Runtime"] = static_cast<uint32_t>(runtime_.total_milliseconds()); - target["Progress"] = boost::math::iround(status_.GetProgress() * 100.0f); - target["Description"] = status_.GetDescription(); - - if (HasEstimatedTimeOfArrival()) - { - target["EstimatedTimeOfArrival"] = boost::posix_time::to_iso_string(GetEstimatedTimeOfArrival()); - } - - if (HasCompletionTime()) - { - target["CompletionTime"] = boost::posix_time::to_iso_string(GetCompletionTime()); - } - } - }; - - - - - class JobsRegistry : public boost::noncopyable - { - private: - class JobHandler : public boost::noncopyable - { - private: - std::string id_; - JobState state_; - std::auto_ptr<IJob> job_; - int priority_; // "+inf()" means highest priority - boost::posix_time::ptime creationTime_; - boost::posix_time::ptime lastStateChangeTime_; - boost::posix_time::time_duration runtime_; - boost::posix_time::ptime retryTime_; - bool pauseScheduled_; - JobStatus lastStatus_; - - void Touch() - { - const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - - if (state_ == JobState_Running) - { - runtime_ += (now - lastStateChangeTime_); - } - - lastStateChangeTime_ = now; - } - - void SetStateInternal(JobState state) - { - state_ = state; - pauseScheduled_ = false; - Touch(); - } - - public: - JobHandler(IJob* job, - int priority) : - id_(Toolbox::GenerateUuid()), - state_(JobState_Pending), - job_(job), - priority_(priority), - creationTime_(boost::posix_time::microsec_clock::universal_time()), - lastStateChangeTime_(creationTime_), - runtime_(boost::posix_time::milliseconds(0)), - retryTime_(creationTime_), - pauseScheduled_(false) - { - if (job == NULL) - { - throw OrthancException(ErrorCode_NullPointer); - } - - lastStatus_ = JobStatus(ErrorCode_Success, *job); - } - - const std::string& GetId() const - { - return id_; - } - - IJob& GetJob() const - { - assert(job_.get() != NULL); - return *job_; - } - - void SetPriority(int priority) - { - priority_ = priority; - } - - int GetPriority() const - { - return priority_; - } - - JobState GetState() const - { - return state_; - } - - void SetState(JobState state) - { - if (state == JobState_Retry) - { - // Use "SetRetryState()" - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - SetStateInternal(state); - } - } - - void SetRetryState(unsigned int timeout) - { - if (state_ == JobState_Running) - { - SetStateInternal(JobState_Retry); - retryTime_ = (boost::posix_time::microsec_clock::universal_time() + - boost::posix_time::milliseconds(timeout)); - } - else - { - // Only valid for running jobs - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - void SchedulePause() - { - if (state_ == JobState_Running) - { - pauseScheduled_ = true; - } - else - { - // Only valid for running jobs - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - bool IsPauseScheduled() - { - return pauseScheduled_; - } - - bool IsRetryReady(const boost::posix_time::ptime& now) const - { - if (state_ != JobState_Retry) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return retryTime_ <= now; - } - } - - const boost::posix_time::ptime& GetCreationTime() const - { - return creationTime_; - } - - const boost::posix_time::ptime& GetLastStateChangeTime() const - { - return lastStateChangeTime_; - } - - const boost::posix_time::time_duration& GetRuntime() const - { - return runtime_; - } - - const JobStatus& GetLastStatus() const - { - return lastStatus_; - } - - void SetLastStatus(const JobStatus& status) - { - lastStatus_ = status; - Touch(); - } - }; - - struct PriorityComparator - { - bool operator() (JobHandler*& a, - JobHandler*& b) const - { - return a->GetPriority() < b->GetPriority(); - } - }; - - typedef std::map<std::string, JobHandler*> JobsIndex; - typedef std::list<JobHandler*> CompletedJobs; - typedef std::set<JobHandler*> RetryJobs; - typedef std::priority_queue<JobHandler*, - std::vector<JobHandler*>, // Could be a "std::deque" - PriorityComparator> PendingJobs; - - boost::mutex mutex_; - JobsIndex jobsIndex_; - PendingJobs pendingJobs_; - CompletedJobs completedJobs_; - RetryJobs retryJobs_; - - boost::condition_variable pendingJobAvailable_; - size_t maxCompletedJobs_; - - -#ifndef NDEBUG - bool IsPendingJob(const JobHandler& job) const - { - PendingJobs copy = pendingJobs_; - while (!copy.empty()) - { - if (copy.top() == &job) - { - return true; - } - - copy.pop(); - } - - return false; - } - - bool IsCompletedJob(JobHandler& job) const - { - for (CompletedJobs::const_iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - if (*it == &job) - { - return true; - } - } - - return false; - } - - bool IsRetryJob(JobHandler& job) const - { - return retryJobs_.find(&job) != retryJobs_.end(); - } -#endif - - - void CheckInvariants() const - { -#ifndef NDEBUG - { - PendingJobs copy = pendingJobs_; - while (!copy.empty()) - { - assert(copy.top()->GetState() == JobState_Pending); - copy.pop(); - } - } - - assert(completedJobs_.size() <= maxCompletedJobs_); - - for (CompletedJobs::const_iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - assert((*it)->GetState() == JobState_Success || - (*it)->GetState() == JobState_Failure); - } - - for (RetryJobs::const_iterator it = retryJobs_.begin(); - it != retryJobs_.end(); ++it) - { - assert((*it)->GetState() == JobState_Retry); - } - - for (JobsIndex::const_iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) - { - JobHandler& job = *it->second; - - assert(job.GetId() == it->first); - - switch (job.GetState()) - { - case JobState_Pending: - assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); - break; - - case JobState_Success: - case JobState_Failure: - assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); - break; - - case JobState_Retry: - assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); - break; - - case JobState_Running: - case JobState_Paused: - assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); - break; - - default: - throw OrthancException(ErrorCode_InternalError); - } - } -#endif - } - - - void ForgetOldCompletedJobs() - { - if (maxCompletedJobs_ != 0) - { - while (completedJobs_.size() > maxCompletedJobs_) - { - assert(completedJobs_.front() != NULL); - - std::string id = completedJobs_.front()->GetId(); - assert(jobsIndex_.find(id) != jobsIndex_.end()); - - jobsIndex_.erase(id); - delete(completedJobs_.front()); - completedJobs_.pop_front(); - } - } - } - - - void MarkRunningAsCompleted(JobHandler& job, - bool success) - { - LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") - << ": " << job.GetId(); - - CheckInvariants(); - assert(job.GetState() == JobState_Running); - - job.SetState(success ? JobState_Success : JobState_Failure); - - completedJobs_.push_back(&job); - ForgetOldCompletedJobs(); - - CheckInvariants(); - } - - - void MarkRunningAsRetry(JobHandler& job, - unsigned int timeout) - { - LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); - - CheckInvariants(); - - assert(job.GetState() == JobState_Running && - retryJobs_.find(&job) == retryJobs_.end()); - - retryJobs_.insert(&job); - job.SetRetryState(timeout); - - CheckInvariants(); - } - - - void MarkRunningAsPaused(JobHandler& job) - { - LOG(INFO) << "Job paused: " << job.GetId(); - - CheckInvariants(); - assert(job.GetState() == JobState_Running); - - job.SetState(JobState_Paused); - - CheckInvariants(); - } - - - public: - JobsRegistry() : - maxCompletedJobs_(10) - { - } - - - ~JobsRegistry() - { - for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) - { - assert(it->second != NULL); - delete it->second; - } - } - - - void SetMaxCompletedJobs(size_t i) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - maxCompletedJobs_ = i; - ForgetOldCompletedJobs(); - - CheckInvariants(); - } - - - void ListJobs(std::set<std::string>& target) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - for (JobsIndex::const_iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) - { - target.insert(it->first); - } - } - - - bool GetJobInfo(JobInfo& target, - const std::string& id) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::const_iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - return false; - } - else - { - const JobHandler& handler = *found->second; - target = JobInfo(handler.GetId(), - handler.GetPriority(), - handler.GetState(), - handler.GetLastStatus(), - handler.GetCreationTime(), - handler.GetLastStateChangeTime(), - handler.GetRuntime()); - return true; - } - } - - - void Submit(std::string& id, - IJob* job, // Takes ownership - int priority) - { - std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - id = handler->GetId(); - - pendingJobs_.push(handler.get()); - pendingJobAvailable_.notify_one(); - - jobsIndex_.insert(std::make_pair(id, handler.release())); - - LOG(INFO) << "New job submitted with priority " << priority << ": " << id; - - CheckInvariants(); - } - - - void Submit(IJob* job, // Takes ownership - int priority) - { - std::string id; - Submit(id, job, priority); - } - - - void SetPriority(const std::string& id, - int priority) - { - LOG(INFO) << "Changing priority to " << priority << " for job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else - { - found->second->SetPriority(priority); - - if (found->second->GetState() == JobState_Pending) - { - // If the job is pending, we need to reconstruct the - // priority queue, as the heap condition has changed - - PendingJobs copy; - std::swap(copy, pendingJobs_); - - assert(pendingJobs_.empty()); - while (!copy.empty()) - { - pendingJobs_.push(copy.top()); - copy.pop(); - } - } - } - - CheckInvariants(); - } - - - void Pause(const std::string& id) - { - LOG(INFO) << "Pausing job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else - { - switch (found->second->GetState()) - { - case JobState_Pending: - { - // If the job is pending, we need to reconstruct the - // priority queue to remove it - PendingJobs copy; - std::swap(copy, pendingJobs_); - - assert(pendingJobs_.empty()); - while (!copy.empty()) - { - if (copy.top()->GetId() != id) - { - pendingJobs_.push(copy.top()); - } - - copy.pop(); - } - - found->second->SetState(JobState_Paused); - - break; - } - - case JobState_Retry: - { - RetryJobs::iterator item = retryJobs_.find(found->second); - assert(item != retryJobs_.end()); - retryJobs_.erase(item); - - found->second->SetState(JobState_Paused); - - break; - } - - case JobState_Paused: - case JobState_Success: - case JobState_Failure: - // Nothing to be done - break; - - case JobState_Running: - found->second->SchedulePause(); - break; - - default: - throw OrthancException(ErrorCode_InternalError); - } - } - - CheckInvariants(); - } - - - void Resume(const std::string& id) - { - LOG(INFO) << "Resuming job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else if (found->second->GetState() != JobState_Paused) - { - LOG(WARNING) << "Cannot resume a job that is not paused: " << id; - } - else - { - found->second->SetState(JobState_Pending); - pendingJobs_.push(found->second); - pendingJobAvailable_.notify_one(); - } - - CheckInvariants(); - } - - - void Resubmit(const std::string& id) - { - LOG(INFO) << "Resubmitting failed job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else if (found->second->GetState() != JobState_Failure) - { - LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; - } - else - { - bool ok = false; - for (CompletedJobs::iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - if (*it == found->second) - { - ok = true; - completedJobs_.erase(it); - break; - } - } - - assert(ok); - - found->second->SetState(JobState_Pending); - pendingJobs_.push(found->second); - pendingJobAvailable_.notify_one(); - } - - CheckInvariants(); - } - - - void ScheduleRetries() - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - RetryJobs copy; - std::swap(copy, retryJobs_); - - const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - - assert(retryJobs_.empty()); - for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) - { - if ((*it)->IsRetryReady(now)) - { - LOG(INFO) << "Retrying job: " << (*it)->GetId(); - (*it)->SetState(JobState_Pending); - pendingJobs_.push(*it); - pendingJobAvailable_.notify_one(); - } - else - { - retryJobs_.insert(*it); - } - } - - CheckInvariants(); - } - - - bool GetState(JobState& state, - const std::string& id) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::const_iterator it = jobsIndex_.find(id); - if (it == jobsIndex_.end()) - { - return false; - } - else - { - state = it->second->GetState(); - return true; - } - } - - - class RunningJob : public boost::noncopyable - { - private: - JobsRegistry& registry_; - JobHandler* handler_; // Can only be accessed if the registry - // mutex is locked! - IJob* job_; // Will by design be in mutual exclusion, - // because only one RunningJob can be - // executed at a time on a JobHandler - - std::string id_; - int priority_; - JobState targetState_; - unsigned int targetRetryTimeout_; - - public: - RunningJob(JobsRegistry& registry, - unsigned int timeout) : - registry_(registry), - handler_(NULL), - targetState_(JobState_Failure), - targetRetryTimeout_(0) - { - { - boost::mutex::scoped_lock lock(registry_.mutex_); - - while (registry_.pendingJobs_.empty()) - { - if (timeout == 0) - { - registry_.pendingJobAvailable_.wait(lock); - } - else - { - bool success = registry_.pendingJobAvailable_.timed_wait - (lock, boost::posix_time::milliseconds(timeout)); - if (!success) - { - // No pending job - return; - } - } - } - - handler_ = registry_.pendingJobs_.top(); - registry_.pendingJobs_.pop(); - - assert(handler_->GetState() == JobState_Pending); - handler_->SetState(JobState_Running); - - job_ = &handler_->GetJob(); - id_ = handler_->GetId(); - priority_ = handler_->GetPriority(); - } - } - - ~RunningJob() - { - if (IsValid()) - { - boost::mutex::scoped_lock lock(registry_.mutex_); - - switch (targetState_) - { - case JobState_Failure: - registry_.MarkRunningAsCompleted(*handler_, false); - break; - - case JobState_Success: - registry_.MarkRunningAsCompleted(*handler_, true); - break; - - case JobState_Paused: - registry_.MarkRunningAsPaused(*handler_); - break; - - case JobState_Retry: - registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); - break; - - default: - assert(0); - } - } - } - - bool IsValid() const - { - return (handler_ != NULL && - job_ != NULL); - } - - const std::string& GetId() const - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return id_; - } - } - - int GetPriority() const - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return priority_; - } - } - - IJob& GetJob() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return *job_; - } - } - - bool IsPauseScheduled() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - boost::mutex::scoped_lock lock(registry_.mutex_); - registry_.CheckInvariants(); - assert(handler_->GetState() == JobState_Running); - - return handler_->IsPauseScheduled(); - } - } - - void MarkSuccess() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Success; - } - } - - void MarkFailure() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Failure; - } - } - - void MarkPause() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Paused; - } - } - - void MarkRetry(unsigned int timeout) - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Retry; - targetRetryTimeout_ = timeout; - } - } - - void UpdateStatus(ErrorCode code) - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - JobStatus status(code, *job_); - - boost::mutex::scoped_lock lock(registry_.mutex_); - registry_.CheckInvariants(); - assert(handler_->GetState() == JobState_Running); - - handler_->SetLastStatus(status); - } - } - }; - }; - - - - class JobsEngine - { - private: - enum State - { - State_Setup, - State_Running, - State_Stopping, - State_Done - }; - - boost::mutex stateMutex_; - State state_; - JobsRegistry registry_; - boost::thread retryHandler_; - std::vector<boost::thread> workers_; - - bool ExecuteStep(JobsRegistry::RunningJob& running, - size_t workerIndex) - { - assert(running.IsValid()); - - LOG(INFO) << "Executing job with priority " << running.GetPriority() - << " in worker thread " << workerIndex << ": " << running.GetId(); - - if (running.IsPauseScheduled()) - { - running.GetJob().ReleaseResources(); - running.MarkPause(); - return false; - } - - std::auto_ptr<JobStepResult> result; - - { - try - { - result.reset(running.GetJob().ExecuteStep()); - - if (result->GetCode() == JobStepCode_Failure) - { - running.UpdateStatus(ErrorCode_InternalError); - } - else - { - running.UpdateStatus(ErrorCode_Success); - } - } - catch (OrthancException& e) - { - running.UpdateStatus(e.GetErrorCode()); - } - catch (boost::bad_lexical_cast&) - { - running.UpdateStatus(ErrorCode_BadFileFormat); - } - catch (...) - { - running.UpdateStatus(ErrorCode_InternalError); - } - - if (result.get() == NULL) - { - result.reset(new JobStepResult(JobStepCode_Failure)); - } - } - - switch (result->GetCode()) - { - case JobStepCode_Success: - running.MarkSuccess(); - return false; - - case JobStepCode_Failure: - running.MarkFailure(); - return false; - - case JobStepCode_Retry: - running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout()); - return false; - - case JobStepCode_Continue: - return true; - - default: - throw OrthancException(ErrorCode_InternalError); - } - } - - static void RetryHandler(JobsEngine* engine) - { - assert(engine != NULL); - - for (;;) - { - boost::this_thread::sleep(boost::posix_time::milliseconds(200)); - - { - boost::mutex::scoped_lock lock(engine->stateMutex_); - - if (engine->state_ != State_Running) - { - return; - } - } - - engine->GetRegistry().ScheduleRetries(); - } - } - - static void Worker(JobsEngine* engine, - size_t workerIndex) - { - assert(engine != NULL); - - LOG(INFO) << "Worker thread " << workerIndex << " has started"; - - for (;;) - { - { - boost::mutex::scoped_lock lock(engine->stateMutex_); - - if (engine->state_ != State_Running) - { - return; - } - } - - JobsRegistry::RunningJob running(engine->GetRegistry(), 100); - - if (running.IsValid()) - { - for (;;) - { - if (!engine->ExecuteStep(running, workerIndex)) - { - break; - } - } - } - } - } - - public: - JobsEngine() : - state_(State_Setup), - workers_(1) - { - } - - ~JobsEngine() - { - if (state_ != State_Setup && - state_ != State_Done) - { - LOG(ERROR) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!"; - Stop(); - } - } - - void SetWorkersCount(size_t count) - { - if (count == 0) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - - boost::mutex::scoped_lock lock(stateMutex_); - - if (state_ != State_Setup) - { - // Can only be invoked before calling "Start()" - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - workers_.resize(count); - } - - JobsRegistry& GetRegistry() - { - return registry_; - } - - void Start() - { - boost::mutex::scoped_lock lock(stateMutex_); - - if (state_ != State_Setup) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - retryHandler_ = boost::thread(RetryHandler, this); - - for (size_t i = 0; i < workers_.size(); i++) - { - workers_[i] = boost::thread(Worker, this, i); - } - - state_ = State_Running; - - LOG(WARNING) << "The jobs engine has started"; - } - - - void Stop() - { - { - boost::mutex::scoped_lock lock(stateMutex_); - - if (state_ != State_Running) - { - return; - } - - state_ = State_Stopping; - } - - LOG(INFO) << "Stopping the jobs engine"; - - if (retryHandler_.joinable()) - { - retryHandler_.join(); - } - - for (size_t i = 0; i < workers_.size(); i++) - { - if (workers_[i].joinable()) - { - workers_[i].join(); - } - } - - { - boost::mutex::scoped_lock lock(stateMutex_); - state_ = State_Done; - } - - LOG(WARNING) << "The jobs engine has stopped"; - } - }; -} - - - class DummyJob : public Orthanc::IJob { private: