# HG changeset patch # User Sebastien Jodogne # Date 1525692029 -7200 # Node ID 0f06b4d5b3d02d7e1ae846660d4cb156b03f34db # Parent f8681f251caa7d636fb5c27c02cbea7b54974730 JobsEngine diff -r f8681f251caa -r 0f06b4d5b3d0 UnitTestsSources/MultiThreadingTests.cpp --- a/UnitTestsSources/MultiThreadingTests.cpp Fri May 04 17:49:44 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Mon May 07 13:20:29 2018 +0200 @@ -289,6 +289,33 @@ JobState_Paused, JobState_Retry }; + + static const char* EnumerationToString(JobState state) + { + switch (state) + { + case JobState_Pending: + return "Pending"; + + case JobState_Running: + return "Running"; + + case JobState_Success: + return "Success"; + + case JobState_Failure: + return "Failure"; + + case JobState_Paused: + return "Paused"; + + case JobState_Retry: + return "Retry"; + + default: + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + } enum JobStepCode { @@ -377,10 +404,14 @@ errorCode_(code), progress_(job.GetProgress()) { - if (progress_ < 0 || - progress_ > 1) + if (progress_ < 0) { - throw OrthancException(ErrorCode_ParameterOutOfRange); + progress_ = 0; + } + + if (progress_ > 1) + { + progress_ = 1; } job.GetDescription(description_); @@ -408,9 +439,8 @@ private: std::string id_; int priority_; - ErrorCode errorCode_; JobState state_; - boost::posix_time::ptime infoTime_; + boost::posix_time::ptime timestamp_; boost::posix_time::ptime creationTime_; boost::posix_time::time_duration runtime_; boost::posix_time::ptime eta_; @@ -426,14 +456,24 @@ id_(id), priority_(priority), state_(state), - infoTime_(boost::posix_time::microsec_clock::universal_time()), + timestamp_(boost::posix_time::microsec_clock::universal_time()), creationTime_(creationTime), runtime_(runtime), status_(status) { float ms = static_cast(runtime_.total_milliseconds()); float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms; - eta_ = infoTime_ + boost::posix_time::milliseconds(remaining); + eta_ = timestamp_ + boost::posix_time::milliseconds(remaining); + } + + JobInfo() : + priority_(0), + state_(JobState_Failure), + timestamp_(boost::posix_time::microsec_clock::universal_time()), + creationTime_(timestamp_), + runtime_(boost::posix_time::milliseconds(0)), + eta_(timestamp_) + { } const std::string& GetIdentifier() const @@ -446,11 +486,6 @@ return priority_; } - ErrorCode GetErrorCode() const - { - return errorCode_; - } - JobState GetState() const { return state_; @@ -458,7 +493,7 @@ const boost::posix_time::ptime& GetInfoTime() const { - return infoTime_; + return timestamp_; } const boost::posix_time::ptime& GetCreationTime() const @@ -485,6 +520,21 @@ { return status_; } + + void Format(Json::Value& target) const + { + target = Json::objectValue; + target["ID"] = id_; + target["Priority"] = priority_; + target["ErrorCode"] = EnumerationToString(status_.GetErrorCode()); + target["State"] = EnumerationToString(state_); + target["Timestamp"] = boost::posix_time::to_iso_string(timestamp_); + target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_); + target["Runtime"] = static_cast(runtime_.total_milliseconds()); + target["EstimatedTimeOfArrival"] = boost::posix_time::to_iso_string(eta_); + target["Progress"] = boost::math::iround(status_.GetProgress() * 100.0f); + target["Description"] = status_.GetDescription(); + } }; @@ -619,6 +669,16 @@ } } + const boost::posix_time::ptime& GetCreationTime() const + { + return creationTime_; + } + + const boost::posix_time::time_duration& GetRuntime() const + { + return runtime_; + } + const JobStatus& GetLastStatus() const { return lastStatus_; @@ -869,6 +929,25 @@ } + void GetJobsInfo(std::map& target) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + for (JobsIndex::const_iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) + { + const JobHandler& handler = *it->second; + target[it->first] = JobInfo(handler.GetId(), + handler.GetPriority(), + handler.GetState(), + handler.GetLastStatus(), + handler.GetCreationTime(), + handler.GetRuntime()); + } + } + + void Submit(std::string& id, IJob* job, // Takes ownership int priority) @@ -885,7 +964,7 @@ jobsIndex_.insert(std::make_pair(id, handler.release())); - LOG(INFO) << "New job submitted: " << id; + LOG(INFO) << "New job submitted with priority " << priority << ": " << id; CheckInvariants(); } @@ -1243,6 +1322,18 @@ } } + IJob& GetJob() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return *job_; + } + } + bool IsPauseScheduled() { if (!IsValid()) @@ -1291,7 +1382,6 @@ } else { - job_->ReleaseResources(); targetState_ = JobState_Paused; } } @@ -1309,7 +1399,7 @@ } } - void UpdateStatus(const JobStatus& status) + void UpdateStatus(ErrorCode code) { if (!IsValid()) { @@ -1317,6 +1407,8 @@ } else { + JobStatus status(code, *job_); + boost::mutex::scoped_lock lock(registry_.mutex_); registry_.CheckInvariants(); assert(handler_->GetState() == JobState_Running); @@ -1324,87 +1416,251 @@ handler_->SetLastStatus(status); } } + }; + }; - bool ExecuteStep() + + + class JobsEngine + { + private: + enum State + { + State_Setup, + State_Running, + State_Stopping, + State_Done + }; + + boost::mutex stateMutex_; + State state_; + JobsRegistry registry_; + boost::thread retryHandler_; + std::vector workers_; + + bool ExecuteStep(JobsRegistry::RunningJob& running, + size_t workerIndex) + { + assert(running.IsValid()); + + LOG(INFO) << "Executing job with priority " << running.GetPriority() + << " in worker thread " << workerIndex << ": " << running.GetId(); + + if (running.IsPauseScheduled()) { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } + running.GetJob().ReleaseResources(); + running.MarkPause(); + return false; + } - if (IsPauseScheduled()) + std::auto_ptr result; + + { + try { - targetState_ = JobState_Paused; - return false; - } + result.reset(running.GetJob().ExecuteStep()); - std::auto_ptr result; - ErrorCode code; - - { - bool ok = false; - - try + if (result->GetCode() == JobStepCode_Failure) { - result.reset(job_->ExecuteStep()); - ok = true; - - if (result->GetCode() == JobStepCode_Failure) - { - code = ErrorCode_InternalError; - } - } - catch (OrthancException& e) - { - code = e.GetErrorCode(); - } - catch (boost::bad_lexical_cast&) - { - code = ErrorCode_BadFileFormat; - } - catch (...) - { - code = ErrorCode_InternalError; - } - - if (ok) - { - code = ErrorCode_Success; + running.UpdateStatus(ErrorCode_InternalError); } else { - result.reset(new JobStepResult(JobStepCode_Failure)); + running.UpdateStatus(ErrorCode_Success); + } + } + catch (OrthancException& e) + { + running.UpdateStatus(e.GetErrorCode()); + } + catch (boost::bad_lexical_cast&) + { + running.UpdateStatus(ErrorCode_BadFileFormat); + } + catch (...) + { + running.UpdateStatus(ErrorCode_InternalError); + } + + if (result.get() == NULL) + { + result.reset(new JobStepResult(JobStepCode_Failure)); + } + } + + switch (result->GetCode()) + { + case JobStepCode_Success: + running.MarkSuccess(); + return false; + + case JobStepCode_Failure: + running.MarkFailure(); + return false; + + case JobStepCode_Retry: + running.MarkRetry(dynamic_cast(*result).GetTimeout()); + return false; + + case JobStepCode_Continue: + return true; + + default: + throw OrthancException(ErrorCode_InternalError); + } + } + + static void RetryHandler(JobsEngine* engine) + { + assert(engine != NULL); + + for (;;) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(200)); + + { + boost::mutex::scoped_lock lock(engine->stateMutex_); + + if (engine->state_ != State_Running) + { + return; + } + } + + engine->GetRegistry().ScheduleRetries(); + } + } + + static void Worker(JobsEngine* engine, + size_t workerIndex) + { + assert(engine != NULL); + + LOG(INFO) << "Worker thread " << workerIndex << " has started"; + + for (;;) + { + { + boost::mutex::scoped_lock lock(engine->stateMutex_); + + if (engine->state_ != State_Running) + { + return; } } + JobsRegistry::RunningJob running(engine->GetRegistry(), 100); + + if (running.IsValid()) { - JobStatus status(code, *job_); - UpdateStatus(status); + for (;;) + { + if (!engine->ExecuteStep(running, workerIndex)) + { + break; + } + } } + } + } + + public: + JobsEngine() : + state_(State_Setup), + workers_(1) + { + } - switch (result->GetCode()) - { - case JobStepCode_Success: - targetState_ = JobState_Success; - return false; + ~JobsEngine() + { + if (state_ != State_Setup && + state_ != State_Done) + { + LOG(ERROR) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!"; + Stop(); + } + } + + void SetWorkersCount(size_t count) + { + if (count == 0) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + + boost::mutex::scoped_lock lock(stateMutex_); + + if (state_ != State_Setup) + { + // Can only be invoked before calling "Start()" + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } - case JobStepCode_Failure: - targetState_ = JobState_Failure; - return false; + workers_.resize(count); + } + + JobsRegistry& GetRegistry() + { + return registry_; + } + + void Start() + { + boost::mutex::scoped_lock lock(stateMutex_); + + if (state_ != State_Setup) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + retryHandler_ = boost::thread(RetryHandler, this); + + for (size_t i = 0; i < workers_.size(); i++) + { + workers_[i] = boost::thread(Worker, this, i); + } + + state_ = State_Running; - case JobStepCode_Retry: - targetState_ = JobState_Retry; - targetRetryTimeout_ = dynamic_cast(*result).GetTimeout(); - return false; + LOG(WARNING) << "The jobs engine has started"; + } + + + void Stop() + { + { + boost::mutex::scoped_lock lock(stateMutex_); - case JobStepCode_Continue: - return true; - - default: - throw OrthancException(ErrorCode_InternalError); + if (state_ != State_Running) + { + return; + } + + state_ = State_Stopping; + } + + LOG(INFO) << "Stopping the jobs engine"; + + if (retryHandler_.joinable()) + { + retryHandler_.join(); + } + + for (size_t i = 0; i < workers_.size(); i++) + { + if (workers_[i].joinable()) + { + workers_[i].join(); } } - }; + + { + boost::mutex::scoped_lock lock(stateMutex_); + state_ = State_Done; + } + + LOG(WARNING) << "The jobs engine has stopped"; + } }; } @@ -1414,21 +1670,37 @@ { private: JobStepResult result_; + unsigned int count_; + unsigned int steps_; public: DummyJob() : - result_(Orthanc::JobStepCode_Success) + result_(Orthanc::JobStepCode_Success), + count_(0), + steps_(4) { } explicit DummyJob(JobStepResult result) : - result_(result) + result_(result), + count_(0), + steps_(4) { } virtual JobStepResult* ExecuteStep() { - return new JobStepResult(result_); + boost::this_thread::sleep(boost::posix_time::milliseconds(50)); + + if (count_ == steps_ - 1) + { + return new JobStepResult(result_); + } + else + { + count_++; + return new JobStepResult(JobStepCode_Continue); + } } virtual void ReleaseResources() @@ -1437,11 +1709,12 @@ virtual float GetProgress() { - return 0; + return static_cast(count_) / static_cast(steps_ - 1); } virtual void GetDescription(Json::Value& value) { + value["hello"] = "world"; } }; @@ -1748,3 +2021,36 @@ ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); } + + +TEST(JobsEngine, Basic) +{ + JobsEngine engine; + + std::string s; + + for (size_t i = 0; i < 20; i++) + engine.GetRegistry().Submit(s, new DummyJob(), rand() % 10); + + engine.SetWorkersCount(3); + engine.Start(); + + boost::this_thread::sleep(boost::posix_time::milliseconds(200)); + + engine.Stop(); + + typedef std::map Jobs; + + Jobs jobs; + engine.GetRegistry().GetJobsInfo(jobs); + + Json::Value v; + for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) + { + Json::Value vv; + it->second.Format(vv); + v[it->first] = vv; + } + + std::cout << v << std::endl; +}