Mercurial > hg > orthanc
changeset 2569:2af17cd5eb1f jobs
reorganization
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 07 May 2018 15:37:20 +0200 |
parents | a46094602346 |
children | 2e879c796ec7 |
files | Core/Enumerations.cpp Core/Enumerations.h Core/JobsEngine/IJob.h Core/JobsEngine/JobInfo.cpp Core/JobsEngine/JobInfo.h Core/JobsEngine/JobStatus.cpp Core/JobsEngine/JobStatus.h Core/JobsEngine/JobStepResult.h Core/JobsEngine/JobStepRetry.h Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsEngine.h Core/JobsEngine/JobsRegistry.cpp Core/JobsEngine/JobsRegistry.h Resources/CMake/OrthancFrameworkConfiguration.cmake UnitTestsSources/MultiThreadingTests.cpp |
diffstat | 15 files changed, 2067 insertions(+), 1482 deletions(-) [+] |
line wrap: on
line diff
--- a/Core/Enumerations.cpp Mon May 07 15:02:34 2018 +0200 +++ b/Core/Enumerations.cpp Mon May 07 15:37:20 2018 +0200 @@ -984,6 +984,34 @@ } + const char* EnumerationToString(JobState state) + { + switch (state) + { + case JobState_Pending: + return "Pending"; + + case JobState_Running: + return "Running"; + + case JobState_Success: + return "Success"; + + case JobState_Failure: + return "Failure"; + + case JobState_Paused: + return "Paused"; + + case JobState_Retry: + return "Retry"; + + default: + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + } + + Encoding StringToEncoding(const char* encoding) { std::string s(encoding);
--- a/Core/Enumerations.h Mon May 07 15:02:34 2018 +0200 +++ b/Core/Enumerations.h Mon May 07 15:37:20 2018 +0200 @@ -542,6 +542,24 @@ TransferSyntax_Rle }; + enum JobState + { + JobState_Pending, + JobState_Running, + JobState_Success, + JobState_Failure, + JobState_Paused, + JobState_Retry + }; + + enum JobStepCode + { + JobStepCode_Success, + JobStepCode_Failure, + JobStepCode_Continue, + JobStepCode_Retry + }; + /** * WARNING: Do not change the explicit values in the enumerations @@ -622,6 +640,8 @@ const char* EnumerationToString(ValueRepresentation vr); + const char* EnumerationToString(JobState state); + Encoding StringToEncoding(const char* encoding); ResourceType StringToResourceType(const char* type);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/IJob.h Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,58 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "JobStepResult.h" + +#include <boost/noncopyable.hpp> +#include <json/value.h> + +namespace Orthanc +{ + class IJob : public boost::noncopyable + { + public: + virtual ~IJob() + { + } + + virtual JobStepResult* ExecuteStep() = 0; + + virtual void ReleaseResources() = 0; // For pausing jobs + + virtual float GetProgress() = 0; + + virtual void GetDescription(Json::Value& value) = 0; + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobInfo.cpp Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,142 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeaders.h" +#include "JobInfo.h" + +#include "../OrthancException.h" + +namespace Orthanc +{ + JobInfo::JobInfo(const std::string& id, + int priority, + JobState state, + const JobStatus& status, + const boost::posix_time::ptime& creationTime, + const boost::posix_time::ptime& lastStateChangeTime, + const boost::posix_time::time_duration& runtime) : + id_(id), + priority_(priority), + state_(state), + timestamp_(boost::posix_time::microsec_clock::universal_time()), + creationTime_(creationTime), + lastStateChangeTime_(lastStateChangeTime), + runtime_(runtime), + hasEta_(false), + status_(status) + { + if (state_ == JobState_Running) + { + float ms = static_cast<float>(runtime_.total_milliseconds()); + + if (status_.GetProgress() > 0.01f && + ms > 0.01f) + { + float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms; + eta_ = timestamp_ + boost::posix_time::milliseconds(remaining); + hasEta_ = true; + } + } + } + + + JobInfo::JobInfo() : + priority_(0), + state_(JobState_Failure), + timestamp_(boost::posix_time::microsec_clock::universal_time()), + creationTime_(timestamp_), + lastStateChangeTime_(timestamp_), + runtime_(boost::posix_time::milliseconds(0)), + hasEta_(false) + { + } + + + bool JobInfo::HasCompletionTime() const + { + return (state_ == JobState_Success || + state_ == JobState_Failure); + } + + + const boost::posix_time::ptime& JobInfo::GetEstimatedTimeOfArrival() const + { + if (hasEta_) + { + return eta_; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + + const boost::posix_time::ptime& JobInfo::GetCompletionTime() const + { + if (HasCompletionTime()) + { + return lastStateChangeTime_; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + + void JobInfo::Format(Json::Value& target) const + { + target = Json::objectValue; + target["ID"] = id_; + target["Priority"] = priority_; + target["ErrorCode"] = static_cast<int>(status_.GetErrorCode()); + target["ErrorDescription"] = EnumerationToString(status_.GetErrorCode()); + target["State"] = EnumerationToString(state_); + target["Timestamp"] = boost::posix_time::to_iso_string(timestamp_); + target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_); + target["Runtime"] = static_cast<uint32_t>(runtime_.total_milliseconds()); + target["Progress"] = boost::math::iround(status_.GetProgress() * 100.0f); + target["Description"] = status_.GetDescription(); + + if (HasEstimatedTimeOfArrival()) + { + target["EstimatedTimeOfArrival"] = boost::posix_time::to_iso_string(GetEstimatedTimeOfArrival()); + } + + if (HasCompletionTime()) + { + target["CompletionTime"] = boost::posix_time::to_iso_string(GetCompletionTime()); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobInfo.h Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,120 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "JobStatus.h" + +#include <boost/date_time/posix_time/posix_time.hpp> + +namespace Orthanc +{ + class JobInfo + { + private: + std::string id_; + int priority_; + JobState state_; + boost::posix_time::ptime timestamp_; + boost::posix_time::ptime creationTime_; + boost::posix_time::ptime lastStateChangeTime_; + boost::posix_time::time_duration runtime_; + bool hasEta_; + boost::posix_time::ptime eta_; + JobStatus status_; + + public: + JobInfo(const std::string& id, + int priority, + JobState state, + const JobStatus& status, + const boost::posix_time::ptime& creationTime, + const boost::posix_time::ptime& lastStateChangeTime, + const boost::posix_time::time_duration& runtime); + + JobInfo(); + + const std::string& GetIdentifier() const + { + return id_; + } + + int GetPriority() const + { + return priority_; + } + + JobState GetState() const + { + return state_; + } + + const boost::posix_time::ptime& GetInfoTime() const + { + return timestamp_; + } + + const boost::posix_time::ptime& GetCreationTime() const + { + return creationTime_; + } + + const boost::posix_time::time_duration& GetRuntime() const + { + return runtime_; + } + + bool HasEstimatedTimeOfArrival() const + { + return hasEta_; + } + + bool HasCompletionTime() const; + + const boost::posix_time::ptime& GetEstimatedTimeOfArrival() const; + + const boost::posix_time::ptime& GetCompletionTime() const; + + const JobStatus& GetStatus() const + { + return status_; + } + + JobStatus& GetStatus() + { + return status_; + } + + void Format(Json::Value& target) const; + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobStatus.cpp Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,64 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeaders.h" +#include "JobStatus.h" + +namespace Orthanc +{ + JobStatus::JobStatus() : + errorCode_(ErrorCode_InternalError), + progress_(0), + description_(Json::objectValue) + { + } + + + JobStatus::JobStatus(ErrorCode code, + IJob& job) : + errorCode_(code), + progress_(job.GetProgress()) + { + if (progress_ < 0) + { + progress_ = 0; + } + + if (progress_ > 1) + { + progress_ = 1; + } + + job.GetDescription(description_); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobStatus.h Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,68 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "IJob.h" + +namespace Orthanc +{ + class JobStatus + { + private: + ErrorCode errorCode_; + float progress_; + Json::Value description_; + + public: + JobStatus(); + + JobStatus(ErrorCode code, + IJob& job); + + ErrorCode GetErrorCode() const + { + return errorCode_; + } + + float GetProgress() const + { + return progress_; + } + + const Json::Value& GetDescription() const + { + return description_; + } + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobStepResult.h Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,60 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "../Enumerations.h" + +namespace Orthanc +{ + class JobStepResult + { + private: + JobStepCode code_; + + public: + explicit JobStepResult(JobStepCode code) : + code_(code) + { + } + + virtual ~JobStepResult() + { + } + + JobStepCode GetCode() const + { + return code_; + } + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobStepRetry.h Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,57 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "JobStepResult.h" + +namespace Orthanc +{ + class JobStepRetry : public JobStepResult + { + private: + unsigned int timeout_; // Retry after "timeout_" milliseconds + + public: + JobStepRetry(unsigned int timeout) : + JobStepResult(JobStepCode_Retry), + timeout_(timeout) + { + } + + unsigned int GetTimeout() const + { + return timeout_; + } + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobsEngine.cpp Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,267 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeaders.h" +#include "JobsEngine.h" + +#include "JobStepRetry.h" + +#include "../Logging.h" +#include "../OrthancException.h" + +namespace Orthanc +{ + bool JobsEngine::ExecuteStep(JobsRegistry::RunningJob& running, + size_t workerIndex) + { + assert(running.IsValid()); + + LOG(INFO) << "Executing job with priority " << running.GetPriority() + << " in worker thread " << workerIndex << ": " << running.GetId(); + + if (running.IsPauseScheduled()) + { + running.GetJob().ReleaseResources(); + running.MarkPause(); + return false; + } + + std::auto_ptr<JobStepResult> result; + + { + try + { + result.reset(running.GetJob().ExecuteStep()); + + if (result->GetCode() == JobStepCode_Failure) + { + running.UpdateStatus(ErrorCode_InternalError); + } + else + { + running.UpdateStatus(ErrorCode_Success); + } + } + catch (OrthancException& e) + { + running.UpdateStatus(e.GetErrorCode()); + } + catch (boost::bad_lexical_cast&) + { + running.UpdateStatus(ErrorCode_BadFileFormat); + } + catch (...) + { + running.UpdateStatus(ErrorCode_InternalError); + } + + if (result.get() == NULL) + { + result.reset(new JobStepResult(JobStepCode_Failure)); + } + } + + switch (result->GetCode()) + { + case JobStepCode_Success: + running.MarkSuccess(); + return false; + + case JobStepCode_Failure: + running.MarkFailure(); + return false; + + case JobStepCode_Retry: + running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout()); + return false; + + case JobStepCode_Continue: + return true; + + default: + throw OrthancException(ErrorCode_InternalError); + } + } + + + void JobsEngine::RetryHandler(JobsEngine* engine) + { + assert(engine != NULL); + + for (;;) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(200)); + + { + boost::mutex::scoped_lock lock(engine->stateMutex_); + + if (engine->state_ != State_Running) + { + return; + } + } + + engine->GetRegistry().ScheduleRetries(); + } + } + + + void JobsEngine::Worker(JobsEngine* engine, + size_t workerIndex) + { + assert(engine != NULL); + + LOG(INFO) << "Worker thread " << workerIndex << " has started"; + + for (;;) + { + { + boost::mutex::scoped_lock lock(engine->stateMutex_); + + if (engine->state_ != State_Running) + { + return; + } + } + + JobsRegistry::RunningJob running(engine->GetRegistry(), 100); + + if (running.IsValid()) + { + for (;;) + { + if (!engine->ExecuteStep(running, workerIndex)) + { + break; + } + } + } + } + } + + + JobsEngine::JobsEngine() : + state_(State_Setup), + workers_(1) + { + } + + + JobsEngine::~JobsEngine() + { + if (state_ != State_Setup && + state_ != State_Done) + { + LOG(ERROR) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!"; + Stop(); + } + } + + + void JobsEngine::SetWorkersCount(size_t count) + { + if (count == 0) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + + boost::mutex::scoped_lock lock(stateMutex_); + + if (state_ != State_Setup) + { + // Can only be invoked before calling "Start()" + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + workers_.resize(count); + } + + + void JobsEngine::Start() + { + boost::mutex::scoped_lock lock(stateMutex_); + + if (state_ != State_Setup) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + retryHandler_ = boost::thread(RetryHandler, this); + + for (size_t i = 0; i < workers_.size(); i++) + { + workers_[i] = boost::thread(Worker, this, i); + } + + state_ = State_Running; + + LOG(WARNING) << "The jobs engine has started"; + } + + + void JobsEngine::Stop() + { + { + boost::mutex::scoped_lock lock(stateMutex_); + + if (state_ != State_Running) + { + return; + } + + state_ = State_Stopping; + } + + LOG(INFO) << "Stopping the jobs engine"; + + if (retryHandler_.joinable()) + { + retryHandler_.join(); + } + + for (size_t i = 0; i < workers_.size(); i++) + { + if (workers_[i].joinable()) + { + workers_[i].join(); + } + } + + { + boost::mutex::scoped_lock lock(stateMutex_); + state_ = State_Done; + } + + LOG(WARNING) << "The jobs engine has stopped"; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobsEngine.h Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,83 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "JobsRegistry.h" + +#include <boost/thread.hpp> + +namespace Orthanc +{ + class JobsEngine + { + private: + enum State + { + State_Setup, + State_Running, + State_Stopping, + State_Done + }; + + boost::mutex stateMutex_; + State state_; + JobsRegistry registry_; + boost::thread retryHandler_; + std::vector<boost::thread> workers_; + + bool ExecuteStep(JobsRegistry::RunningJob& running, + size_t workerIndex); + + static void RetryHandler(JobsEngine* engine); + + static void Worker(JobsEngine* engine, + size_t workerIndex); + + public: + JobsEngine(); + + ~JobsEngine(); + + void SetWorkersCount(size_t count); + + JobsRegistry& GetRegistry() + { + return registry_; + } + + void Start(); + + void Stop(); + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobsRegistry.cpp Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,910 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeaders.h" +#include "JobsRegistry.h" + +#include "../Logging.h" +#include "../OrthancException.h" +#include "../Toolbox.h" + +namespace Orthanc +{ + class JobsRegistry::JobHandler : public boost::noncopyable + { + private: + std::string id_; + JobState state_; + std::auto_ptr<IJob> job_; + int priority_; // "+inf()" means highest priority + boost::posix_time::ptime creationTime_; + boost::posix_time::ptime lastStateChangeTime_; + boost::posix_time::time_duration runtime_; + boost::posix_time::ptime retryTime_; + bool pauseScheduled_; + JobStatus lastStatus_; + + void Touch() + { + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + + if (state_ == JobState_Running) + { + runtime_ += (now - lastStateChangeTime_); + } + + lastStateChangeTime_ = now; + } + + void SetStateInternal(JobState state) + { + state_ = state; + pauseScheduled_ = false; + Touch(); + } + + public: + JobHandler(IJob* job, + int priority) : + id_(Toolbox::GenerateUuid()), + state_(JobState_Pending), + job_(job), + priority_(priority), + creationTime_(boost::posix_time::microsec_clock::universal_time()), + lastStateChangeTime_(creationTime_), + runtime_(boost::posix_time::milliseconds(0)), + retryTime_(creationTime_), + pauseScheduled_(false) + { + if (job == NULL) + { + throw OrthancException(ErrorCode_NullPointer); + } + + lastStatus_ = JobStatus(ErrorCode_Success, *job); + } + + const std::string& GetId() const + { + return id_; + } + + IJob& GetJob() const + { + assert(job_.get() != NULL); + return *job_; + } + + void SetPriority(int priority) + { + priority_ = priority; + } + + int GetPriority() const + { + return priority_; + } + + JobState GetState() const + { + return state_; + } + + void SetState(JobState state) + { + if (state == JobState_Retry) + { + // Use "SetRetryState()" + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + SetStateInternal(state); + } + } + + void SetRetryState(unsigned int timeout) + { + if (state_ == JobState_Running) + { + SetStateInternal(JobState_Retry); + retryTime_ = (boost::posix_time::microsec_clock::universal_time() + + boost::posix_time::milliseconds(timeout)); + } + else + { + // Only valid for running jobs + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + void SchedulePause() + { + if (state_ == JobState_Running) + { + pauseScheduled_ = true; + } + else + { + // Only valid for running jobs + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + bool IsPauseScheduled() + { + return pauseScheduled_; + } + + bool IsRetryReady(const boost::posix_time::ptime& now) const + { + if (state_ != JobState_Retry) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return retryTime_ <= now; + } + } + + const boost::posix_time::ptime& GetCreationTime() const + { + return creationTime_; + } + + const boost::posix_time::ptime& GetLastStateChangeTime() const + { + return lastStateChangeTime_; + } + + const boost::posix_time::time_duration& GetRuntime() const + { + return runtime_; + } + + const JobStatus& GetLastStatus() const + { + return lastStatus_; + } + + void SetLastStatus(const JobStatus& status) + { + lastStatus_ = status; + Touch(); + } + }; + + + bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a, + JobHandler*& b) const + { + return a->GetPriority() < b->GetPriority(); + } + + +#if defined(NDEBUG) + void JobsRegistry::CheckInvariants() const + { + } + +#else + bool JobsRegistry::IsPendingJob(const JobHandler& job) const + { + PendingJobs copy = pendingJobs_; + while (!copy.empty()) + { + if (copy.top() == &job) + { + return true; + } + + copy.pop(); + } + + return false; + } + + bool JobsRegistry::IsCompletedJob(JobHandler& job) const + { + for (CompletedJobs::const_iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) + { + if (*it == &job) + { + return true; + } + } + + return false; + } + + bool JobsRegistry::IsRetryJob(JobHandler& job) const + { + return retryJobs_.find(&job) != retryJobs_.end(); + } + + void JobsRegistry::CheckInvariants() const + { + { + PendingJobs copy = pendingJobs_; + while (!copy.empty()) + { + assert(copy.top()->GetState() == JobState_Pending); + copy.pop(); + } + } + + assert(completedJobs_.size() <= maxCompletedJobs_); + + for (CompletedJobs::const_iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) + { + assert((*it)->GetState() == JobState_Success || + (*it)->GetState() == JobState_Failure); + } + + for (RetryJobs::const_iterator it = retryJobs_.begin(); + it != retryJobs_.end(); ++it) + { + assert((*it)->GetState() == JobState_Retry); + } + + for (JobsIndex::const_iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) + { + JobHandler& job = *it->second; + + assert(job.GetId() == it->first); + + switch (job.GetState()) + { + case JobState_Pending: + assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); + break; + + case JobState_Success: + case JobState_Failure: + assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); + break; + + case JobState_Retry: + assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); + break; + + case JobState_Running: + case JobState_Paused: + assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + } + } +#endif + + + void JobsRegistry::ForgetOldCompletedJobs() + { + if (maxCompletedJobs_ != 0) + { + while (completedJobs_.size() > maxCompletedJobs_) + { + assert(completedJobs_.front() != NULL); + + std::string id = completedJobs_.front()->GetId(); + assert(jobsIndex_.find(id) != jobsIndex_.end()); + + jobsIndex_.erase(id); + delete(completedJobs_.front()); + completedJobs_.pop_front(); + } + } + } + + + void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, + bool success) + { + LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") + << ": " << job.GetId(); + + CheckInvariants(); + assert(job.GetState() == JobState_Running); + + job.SetState(success ? JobState_Success : JobState_Failure); + + completedJobs_.push_back(&job); + ForgetOldCompletedJobs(); + + CheckInvariants(); + } + + + void JobsRegistry::MarkRunningAsRetry(JobHandler& job, + unsigned int timeout) + { + LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); + + CheckInvariants(); + + assert(job.GetState() == JobState_Running && + retryJobs_.find(&job) == retryJobs_.end()); + + retryJobs_.insert(&job); + job.SetRetryState(timeout); + + CheckInvariants(); + } + + + void JobsRegistry::MarkRunningAsPaused(JobHandler& job) + { + LOG(INFO) << "Job paused: " << job.GetId(); + + CheckInvariants(); + assert(job.GetState() == JobState_Running); + + job.SetState(JobState_Paused); + + CheckInvariants(); + } + + + JobsRegistry::~JobsRegistry() + { + for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) + { + assert(it->second != NULL); + delete it->second; + } + } + + + void JobsRegistry::SetMaxCompletedJobs(size_t i) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + maxCompletedJobs_ = i; + ForgetOldCompletedJobs(); + + CheckInvariants(); + } + + + void JobsRegistry::ListJobs(std::set<std::string>& target) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + for (JobsIndex::const_iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) + { + target.insert(it->first); + } + } + + + bool JobsRegistry::GetJobInfo(JobInfo& target, + const std::string& id) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::const_iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + return false; + } + else + { + const JobHandler& handler = *found->second; + target = JobInfo(handler.GetId(), + handler.GetPriority(), + handler.GetState(), + handler.GetLastStatus(), + handler.GetCreationTime(), + handler.GetLastStateChangeTime(), + handler.GetRuntime()); + return true; + } + } + + + void JobsRegistry::Submit(std::string& id, + IJob* job, // Takes ownership + int priority) + { + std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + id = handler->GetId(); + + pendingJobs_.push(handler.get()); + pendingJobAvailable_.notify_one(); + + jobsIndex_.insert(std::make_pair(id, handler.release())); + + LOG(INFO) << "New job submitted with priority " << priority << ": " << id; + + CheckInvariants(); + } + + + void JobsRegistry::Submit(IJob* job, // Takes ownership + int priority) + { + std::string id; + Submit(id, job, priority); + } + + + void JobsRegistry::SetPriority(const std::string& id, + int priority) + { + LOG(INFO) << "Changing priority to " << priority << " for job: " << id; + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else + { + found->second->SetPriority(priority); + + if (found->second->GetState() == JobState_Pending) + { + // If the job is pending, we need to reconstruct the + // priority queue, as the heap condition has changed + + PendingJobs copy; + std::swap(copy, pendingJobs_); + + assert(pendingJobs_.empty()); + while (!copy.empty()) + { + pendingJobs_.push(copy.top()); + copy.pop(); + } + } + } + + CheckInvariants(); + } + + + void JobsRegistry::Pause(const std::string& id) + { + LOG(INFO) << "Pausing job: " << id; + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else + { + switch (found->second->GetState()) + { + case JobState_Pending: + { + // If the job is pending, we need to reconstruct the + // priority queue to remove it + PendingJobs copy; + std::swap(copy, pendingJobs_); + + assert(pendingJobs_.empty()); + while (!copy.empty()) + { + if (copy.top()->GetId() != id) + { + pendingJobs_.push(copy.top()); + } + + copy.pop(); + } + + found->second->SetState(JobState_Paused); + + break; + } + + case JobState_Retry: + { + RetryJobs::iterator item = retryJobs_.find(found->second); + assert(item != retryJobs_.end()); + retryJobs_.erase(item); + + found->second->SetState(JobState_Paused); + + break; + } + + case JobState_Paused: + case JobState_Success: + case JobState_Failure: + // Nothing to be done + break; + + case JobState_Running: + found->second->SchedulePause(); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + } + + CheckInvariants(); + } + + + void JobsRegistry::Resume(const std::string& id) + { + LOG(INFO) << "Resuming job: " << id; + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else if (found->second->GetState() != JobState_Paused) + { + LOG(WARNING) << "Cannot resume a job that is not paused: " << id; + } + else + { + found->second->SetState(JobState_Pending); + pendingJobs_.push(found->second); + pendingJobAvailable_.notify_one(); + } + + CheckInvariants(); + } + + + void JobsRegistry::Resubmit(const std::string& id) + { + LOG(INFO) << "Resubmitting failed job: " << id; + + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::iterator found = jobsIndex_.find(id); + + if (found == jobsIndex_.end()) + { + LOG(WARNING) << "Unknown job: " << id; + } + else if (found->second->GetState() != JobState_Failure) + { + LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; + } + else + { + bool ok = false; + for (CompletedJobs::iterator it = completedJobs_.begin(); + it != completedJobs_.end(); ++it) + { + if (*it == found->second) + { + ok = true; + completedJobs_.erase(it); + break; + } + } + + assert(ok); + + found->second->SetState(JobState_Pending); + pendingJobs_.push(found->second); + pendingJobAvailable_.notify_one(); + } + + CheckInvariants(); + } + + + void JobsRegistry::ScheduleRetries() + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + RetryJobs copy; + std::swap(copy, retryJobs_); + + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + + assert(retryJobs_.empty()); + for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) + { + if ((*it)->IsRetryReady(now)) + { + LOG(INFO) << "Retrying job: " << (*it)->GetId(); + (*it)->SetState(JobState_Pending); + pendingJobs_.push(*it); + pendingJobAvailable_.notify_one(); + } + else + { + retryJobs_.insert(*it); + } + } + + CheckInvariants(); + } + + + bool JobsRegistry::GetState(JobState& state, + const std::string& id) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + JobsIndex::const_iterator it = jobsIndex_.find(id); + if (it == jobsIndex_.end()) + { + return false; + } + else + { + state = it->second->GetState(); + return true; + } + } + + + JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, + unsigned int timeout) : + registry_(registry), + handler_(NULL), + targetState_(JobState_Failure), + targetRetryTimeout_(0) + { + { + boost::mutex::scoped_lock lock(registry_.mutex_); + + while (registry_.pendingJobs_.empty()) + { + if (timeout == 0) + { + registry_.pendingJobAvailable_.wait(lock); + } + else + { + bool success = registry_.pendingJobAvailable_.timed_wait + (lock, boost::posix_time::milliseconds(timeout)); + if (!success) + { + // No pending job + return; + } + } + } + + handler_ = registry_.pendingJobs_.top(); + registry_.pendingJobs_.pop(); + + assert(handler_->GetState() == JobState_Pending); + handler_->SetState(JobState_Running); + + job_ = &handler_->GetJob(); + id_ = handler_->GetId(); + priority_ = handler_->GetPriority(); + } + } + + + JobsRegistry::RunningJob::~RunningJob() + { + if (IsValid()) + { + boost::mutex::scoped_lock lock(registry_.mutex_); + + switch (targetState_) + { + case JobState_Failure: + registry_.MarkRunningAsCompleted(*handler_, false); + break; + + case JobState_Success: + registry_.MarkRunningAsCompleted(*handler_, true); + break; + + case JobState_Paused: + registry_.MarkRunningAsPaused(*handler_); + break; + + case JobState_Retry: + registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); + break; + + default: + assert(0); + } + } + } + + + bool JobsRegistry::RunningJob::IsValid() const + { + return (handler_ != NULL && + job_ != NULL); + } + + + const std::string& JobsRegistry::RunningJob::GetId() const + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return id_; + } + } + + + int JobsRegistry::RunningJob::GetPriority() const + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return priority_; + } + } + + + IJob& JobsRegistry::RunningJob::GetJob() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return *job_; + } + } + + + bool JobsRegistry::RunningJob::IsPauseScheduled() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + boost::mutex::scoped_lock lock(registry_.mutex_); + registry_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + return handler_->IsPauseScheduled(); + } + } + + + void JobsRegistry::RunningJob::MarkSuccess() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Success; + } + } + + + void JobsRegistry::RunningJob::MarkFailure() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Failure; + } + } + + + void JobsRegistry::RunningJob::MarkPause() + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Paused; + } + } + + + void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout) + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + targetState_ = JobState_Retry; + targetRetryTimeout_ = timeout; + } + } + + + void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code) + { + if (!IsValid()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + JobStatus status(code, *job_); + + boost::mutex::scoped_lock lock(registry_.mutex_); + registry_.CheckInvariants(); + assert(handler_->GetState() == JobState_Running); + + handler_->SetLastStatus(status); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/JobsRegistry.h Mon May 07 15:37:20 2018 +0200 @@ -0,0 +1,182 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#if !defined(ORTHANC_SANDBOXED) +# error The macro ORTHANC_SANDBOXED must be defined +#endif + +#if ORTHANC_SANDBOXED == 1 +# error The job engine cannot be used in sandboxed environments +#endif + +#include "JobInfo.h" + +#include <list> +#include <set> +#include <queue> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> + +namespace Orthanc +{ + // This class handles the state machine of the jobs engine + class JobsRegistry : public boost::noncopyable + { + private: + class JobHandler; + + struct PriorityComparator + { + bool operator() (JobHandler*& a, + JobHandler*& b) const; + }; + + typedef std::map<std::string, JobHandler*> JobsIndex; + typedef std::list<JobHandler*> CompletedJobs; + typedef std::set<JobHandler*> RetryJobs; + typedef std::priority_queue<JobHandler*, + std::vector<JobHandler*>, // Could be a "std::deque" + PriorityComparator> PendingJobs; + + boost::mutex mutex_; + JobsIndex jobsIndex_; + PendingJobs pendingJobs_; + CompletedJobs completedJobs_; + RetryJobs retryJobs_; + + boost::condition_variable pendingJobAvailable_; + size_t maxCompletedJobs_; + + +#ifndef NDEBUG + bool IsPendingJob(const JobHandler& job) const; + + bool IsCompletedJob(JobHandler& job) const; + + bool IsRetryJob(JobHandler& job) const; +#endif + + void CheckInvariants() const; + + void ForgetOldCompletedJobs(); + + void MarkRunningAsCompleted(JobHandler& job, + bool success); + + void MarkRunningAsRetry(JobHandler& job, + unsigned int timeout); + + void MarkRunningAsPaused(JobHandler& job); + + public: + JobsRegistry() : + maxCompletedJobs_(10) + { + } + + + ~JobsRegistry(); + + void SetMaxCompletedJobs(size_t i); + + void ListJobs(std::set<std::string>& target); + + bool GetJobInfo(JobInfo& target, + const std::string& id); + + void Submit(std::string& id, + IJob* job, // Takes ownership + int priority); + + void Submit(IJob* job, // Takes ownership + int priority); + + void SetPriority(const std::string& id, + int priority); + + void Pause(const std::string& id); + + void Resume(const std::string& id); + + void Resubmit(const std::string& id); + + void ScheduleRetries(); + + bool GetState(JobState& state, + const std::string& id); + + class RunningJob : public boost::noncopyable + { + private: + JobsRegistry& registry_; + JobHandler* handler_; // Can only be accessed if the + // registry mutex is locked! + IJob* job_; // Will by design be in mutual exclusion, + // because only one RunningJob can be + // executed at a time on a JobHandler + + std::string id_; + int priority_; + JobState targetState_; + unsigned int targetRetryTimeout_; + + public: + RunningJob(JobsRegistry& registry, + unsigned int timeout); + + ~RunningJob(); + + bool IsValid() const; + + const std::string& GetId() const; + + int GetPriority() const; + + IJob& GetJob(); + + bool IsPauseScheduled(); + + void MarkSuccess(); + + void MarkFailure(); + + void MarkPause(); + + void MarkRetry(unsigned int timeout); + + void UpdateStatus(ErrorCode code); + }; + }; +}
--- a/Resources/CMake/OrthancFrameworkConfiguration.cmake Mon May 07 15:02:34 2018 +0200 +++ b/Resources/CMake/OrthancFrameworkConfiguration.cmake Mon May 07 15:37:20 2018 +0200 @@ -498,6 +498,10 @@ list(APPEND ORTHANC_CORE_SOURCES_INTERNAL ${ORTHANC_ROOT}/Core/Cache/SharedArchive.cpp ${ORTHANC_ROOT}/Core/FileStorage/FilesystemStorage.cpp + ${ORTHANC_ROOT}/Core/JobsEngine/JobInfo.cpp + ${ORTHANC_ROOT}/Core/JobsEngine/JobStatus.cpp + ${ORTHANC_ROOT}/Core/JobsEngine/JobsEngine.cpp + ${ORTHANC_ROOT}/Core/JobsEngine/JobsRegistry.cpp ${ORTHANC_ROOT}/Core/MultiThreading/RunnableWorkersPool.cpp ${ORTHANC_ROOT}/Core/MultiThreading/Semaphore.cpp ${ORTHANC_ROOT}/Core/MultiThreading/SharedMessageQueue.cpp
--- a/UnitTestsSources/MultiThreadingTests.cpp Mon May 07 15:02:34 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Mon May 07 15:37:20 2018 +0200 @@ -34,11 +34,13 @@ #include "PrecompiledHeadersUnitTests.h" #include "gtest/gtest.h" -#include "../OrthancServer/Scheduler/ServerScheduler.h" +#include "../Core/JobsEngine/JobStepRetry.h" +#include "../Core/JobsEngine/JobsEngine.h" +#include "../Core/MultiThreading/Locker.h" #include "../Core/OrthancException.h" #include "../Core/SystemToolbox.h" #include "../Core/Toolbox.h" -#include "../Core/MultiThreading/Locker.h" +#include "../OrthancServer/Scheduler/ServerScheduler.h" using namespace Orthanc; @@ -239,1486 +241,6 @@ - - -#if !defined(ORTHANC_SANDBOXED) -# error The macro ORTHANC_SANDBOXED must be defined -#endif - -#if ORTHANC_SANDBOXED == 1 -# error The job engine cannot be used in sandboxed environments -#endif - -#include "../Core/Logging.h" - -#include <boost/math/special_functions/round.hpp> -#include <boost/date_time/posix_time/posix_time.hpp> -#include <queue> - -namespace Orthanc -{ - enum JobState - { - JobState_Pending, - JobState_Running, - JobState_Success, - JobState_Failure, - JobState_Paused, - JobState_Retry - }; - - static const char* EnumerationToString(JobState state) - { - switch (state) - { - case JobState_Pending: - return "Pending"; - - case JobState_Running: - return "Running"; - - case JobState_Success: - return "Success"; - - case JobState_Failure: - return "Failure"; - - case JobState_Paused: - return "Paused"; - - case JobState_Retry: - return "Retry"; - - default: - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - } - - enum JobStepCode - { - JobStepCode_Success, - JobStepCode_Failure, - JobStepCode_Continue, - JobStepCode_Retry - }; - - class JobStepResult - { - private: - JobStepCode code_; - - public: - explicit JobStepResult(JobStepCode code) : - code_(code) - { - } - - virtual ~JobStepResult() - { - } - - JobStepCode GetCode() const - { - return code_; - } - }; - - - class JobStepRetry : public JobStepResult - { - private: - unsigned int timeout_; // Retry after "timeout_" milliseconds - - public: - JobStepRetry(unsigned int timeout) : - JobStepResult(JobStepCode_Retry), - timeout_(timeout) - { - } - - unsigned int GetTimeout() const - { - return timeout_; - } - }; - - - class IJob : public boost::noncopyable - { - public: - virtual ~IJob() - { - } - - virtual JobStepResult* ExecuteStep() = 0; - - virtual void ReleaseResources() = 0; // For pausing jobs - - virtual float GetProgress() = 0; - - virtual void GetDescription(Json::Value& value) = 0; - }; - - - class JobStatus - { - private: - ErrorCode errorCode_; - float progress_; - Json::Value description_; - - public: - JobStatus() : - errorCode_(ErrorCode_InternalError), - progress_(0), - description_(Json::objectValue) - { - } - - JobStatus(ErrorCode code, - IJob& job) : - errorCode_(code), - progress_(job.GetProgress()) - { - if (progress_ < 0) - { - progress_ = 0; - } - - if (progress_ > 1) - { - progress_ = 1; - } - - job.GetDescription(description_); - } - - ErrorCode GetErrorCode() const - { - return errorCode_; - } - - float GetProgress() const - { - return progress_; - } - - const Json::Value& GetDescription() const - { - return description_; - } - }; - - - class JobInfo - { - private: - std::string id_; - int priority_; - JobState state_; - boost::posix_time::ptime timestamp_; - boost::posix_time::ptime creationTime_; - boost::posix_time::ptime lastStateChangeTime_; - boost::posix_time::time_duration runtime_; - bool hasEta_; - boost::posix_time::ptime eta_; - JobStatus status_; - - public: - JobInfo(const std::string& id, - int priority, - JobState state, - const JobStatus& status, - const boost::posix_time::ptime& creationTime, - const boost::posix_time::ptime& lastStateChangeTime, - const boost::posix_time::time_duration& runtime) : - id_(id), - priority_(priority), - state_(state), - timestamp_(boost::posix_time::microsec_clock::universal_time()), - creationTime_(creationTime), - lastStateChangeTime_(lastStateChangeTime), - runtime_(runtime), - hasEta_(false), - status_(status) - { - if (state_ == JobState_Running) - { - float ms = static_cast<float>(runtime_.total_milliseconds()); - - if (status_.GetProgress() > 0.01f && - ms > 0.01f) - { - float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms; - eta_ = timestamp_ + boost::posix_time::milliseconds(remaining); - hasEta_ = true; - } - } - } - - JobInfo() : - priority_(0), - state_(JobState_Failure), - timestamp_(boost::posix_time::microsec_clock::universal_time()), - creationTime_(timestamp_), - lastStateChangeTime_(timestamp_), - runtime_(boost::posix_time::milliseconds(0)), - hasEta_(false) - { - } - - const std::string& GetIdentifier() const - { - return id_; - } - - int GetPriority() const - { - return priority_; - } - - JobState GetState() const - { - return state_; - } - - const boost::posix_time::ptime& GetInfoTime() const - { - return timestamp_; - } - - const boost::posix_time::ptime& GetCreationTime() const - { - return creationTime_; - } - - const boost::posix_time::time_duration& GetRuntime() const - { - return runtime_; - } - - bool HasEstimatedTimeOfArrival() const - { - return hasEta_; - } - - bool HasCompletionTime() const - { - return (state_ == JobState_Success || - state_ == JobState_Failure); - } - - const boost::posix_time::ptime& GetEstimatedTimeOfArrival() const - { - if (hasEta_) - { - return eta_; - } - else - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - const boost::posix_time::ptime& GetCompletionTime() const - { - if (HasCompletionTime()) - { - return lastStateChangeTime_; - } - else - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - const JobStatus& GetStatus() const - { - return status_; - } - - JobStatus& GetStatus() - { - return status_; - } - - void Format(Json::Value& target) const - { - target = Json::objectValue; - target["ID"] = id_; - target["Priority"] = priority_; - target["ErrorCode"] = static_cast<int>(status_.GetErrorCode()); - target["ErrorDescription"] = EnumerationToString(status_.GetErrorCode()); - target["State"] = EnumerationToString(state_); - target["Timestamp"] = boost::posix_time::to_iso_string(timestamp_); - target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_); - target["Runtime"] = static_cast<uint32_t>(runtime_.total_milliseconds()); - target["Progress"] = boost::math::iround(status_.GetProgress() * 100.0f); - target["Description"] = status_.GetDescription(); - - if (HasEstimatedTimeOfArrival()) - { - target["EstimatedTimeOfArrival"] = boost::posix_time::to_iso_string(GetEstimatedTimeOfArrival()); - } - - if (HasCompletionTime()) - { - target["CompletionTime"] = boost::posix_time::to_iso_string(GetCompletionTime()); - } - } - }; - - - - - class JobsRegistry : public boost::noncopyable - { - private: - class JobHandler : public boost::noncopyable - { - private: - std::string id_; - JobState state_; - std::auto_ptr<IJob> job_; - int priority_; // "+inf()" means highest priority - boost::posix_time::ptime creationTime_; - boost::posix_time::ptime lastStateChangeTime_; - boost::posix_time::time_duration runtime_; - boost::posix_time::ptime retryTime_; - bool pauseScheduled_; - JobStatus lastStatus_; - - void Touch() - { - const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - - if (state_ == JobState_Running) - { - runtime_ += (now - lastStateChangeTime_); - } - - lastStateChangeTime_ = now; - } - - void SetStateInternal(JobState state) - { - state_ = state; - pauseScheduled_ = false; - Touch(); - } - - public: - JobHandler(IJob* job, - int priority) : - id_(Toolbox::GenerateUuid()), - state_(JobState_Pending), - job_(job), - priority_(priority), - creationTime_(boost::posix_time::microsec_clock::universal_time()), - lastStateChangeTime_(creationTime_), - runtime_(boost::posix_time::milliseconds(0)), - retryTime_(creationTime_), - pauseScheduled_(false) - { - if (job == NULL) - { - throw OrthancException(ErrorCode_NullPointer); - } - - lastStatus_ = JobStatus(ErrorCode_Success, *job); - } - - const std::string& GetId() const - { - return id_; - } - - IJob& GetJob() const - { - assert(job_.get() != NULL); - return *job_; - } - - void SetPriority(int priority) - { - priority_ = priority; - } - - int GetPriority() const - { - return priority_; - } - - JobState GetState() const - { - return state_; - } - - void SetState(JobState state) - { - if (state == JobState_Retry) - { - // Use "SetRetryState()" - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - SetStateInternal(state); - } - } - - void SetRetryState(unsigned int timeout) - { - if (state_ == JobState_Running) - { - SetStateInternal(JobState_Retry); - retryTime_ = (boost::posix_time::microsec_clock::universal_time() + - boost::posix_time::milliseconds(timeout)); - } - else - { - // Only valid for running jobs - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - void SchedulePause() - { - if (state_ == JobState_Running) - { - pauseScheduled_ = true; - } - else - { - // Only valid for running jobs - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } - - bool IsPauseScheduled() - { - return pauseScheduled_; - } - - bool IsRetryReady(const boost::posix_time::ptime& now) const - { - if (state_ != JobState_Retry) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return retryTime_ <= now; - } - } - - const boost::posix_time::ptime& GetCreationTime() const - { - return creationTime_; - } - - const boost::posix_time::ptime& GetLastStateChangeTime() const - { - return lastStateChangeTime_; - } - - const boost::posix_time::time_duration& GetRuntime() const - { - return runtime_; - } - - const JobStatus& GetLastStatus() const - { - return lastStatus_; - } - - void SetLastStatus(const JobStatus& status) - { - lastStatus_ = status; - Touch(); - } - }; - - struct PriorityComparator - { - bool operator() (JobHandler*& a, - JobHandler*& b) const - { - return a->GetPriority() < b->GetPriority(); - } - }; - - typedef std::map<std::string, JobHandler*> JobsIndex; - typedef std::list<JobHandler*> CompletedJobs; - typedef std::set<JobHandler*> RetryJobs; - typedef std::priority_queue<JobHandler*, - std::vector<JobHandler*>, // Could be a "std::deque" - PriorityComparator> PendingJobs; - - boost::mutex mutex_; - JobsIndex jobsIndex_; - PendingJobs pendingJobs_; - CompletedJobs completedJobs_; - RetryJobs retryJobs_; - - boost::condition_variable pendingJobAvailable_; - size_t maxCompletedJobs_; - - -#ifndef NDEBUG - bool IsPendingJob(const JobHandler& job) const - { - PendingJobs copy = pendingJobs_; - while (!copy.empty()) - { - if (copy.top() == &job) - { - return true; - } - - copy.pop(); - } - - return false; - } - - bool IsCompletedJob(JobHandler& job) const - { - for (CompletedJobs::const_iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - if (*it == &job) - { - return true; - } - } - - return false; - } - - bool IsRetryJob(JobHandler& job) const - { - return retryJobs_.find(&job) != retryJobs_.end(); - } -#endif - - - void CheckInvariants() const - { -#ifndef NDEBUG - { - PendingJobs copy = pendingJobs_; - while (!copy.empty()) - { - assert(copy.top()->GetState() == JobState_Pending); - copy.pop(); - } - } - - assert(completedJobs_.size() <= maxCompletedJobs_); - - for (CompletedJobs::const_iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - assert((*it)->GetState() == JobState_Success || - (*it)->GetState() == JobState_Failure); - } - - for (RetryJobs::const_iterator it = retryJobs_.begin(); - it != retryJobs_.end(); ++it) - { - assert((*it)->GetState() == JobState_Retry); - } - - for (JobsIndex::const_iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) - { - JobHandler& job = *it->second; - - assert(job.GetId() == it->first); - - switch (job.GetState()) - { - case JobState_Pending: - assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); - break; - - case JobState_Success: - case JobState_Failure: - assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); - break; - - case JobState_Retry: - assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); - break; - - case JobState_Running: - case JobState_Paused: - assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); - break; - - default: - throw OrthancException(ErrorCode_InternalError); - } - } -#endif - } - - - void ForgetOldCompletedJobs() - { - if (maxCompletedJobs_ != 0) - { - while (completedJobs_.size() > maxCompletedJobs_) - { - assert(completedJobs_.front() != NULL); - - std::string id = completedJobs_.front()->GetId(); - assert(jobsIndex_.find(id) != jobsIndex_.end()); - - jobsIndex_.erase(id); - delete(completedJobs_.front()); - completedJobs_.pop_front(); - } - } - } - - - void MarkRunningAsCompleted(JobHandler& job, - bool success) - { - LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") - << ": " << job.GetId(); - - CheckInvariants(); - assert(job.GetState() == JobState_Running); - - job.SetState(success ? JobState_Success : JobState_Failure); - - completedJobs_.push_back(&job); - ForgetOldCompletedJobs(); - - CheckInvariants(); - } - - - void MarkRunningAsRetry(JobHandler& job, - unsigned int timeout) - { - LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); - - CheckInvariants(); - - assert(job.GetState() == JobState_Running && - retryJobs_.find(&job) == retryJobs_.end()); - - retryJobs_.insert(&job); - job.SetRetryState(timeout); - - CheckInvariants(); - } - - - void MarkRunningAsPaused(JobHandler& job) - { - LOG(INFO) << "Job paused: " << job.GetId(); - - CheckInvariants(); - assert(job.GetState() == JobState_Running); - - job.SetState(JobState_Paused); - - CheckInvariants(); - } - - - public: - JobsRegistry() : - maxCompletedJobs_(10) - { - } - - - ~JobsRegistry() - { - for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) - { - assert(it->second != NULL); - delete it->second; - } - } - - - void SetMaxCompletedJobs(size_t i) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - maxCompletedJobs_ = i; - ForgetOldCompletedJobs(); - - CheckInvariants(); - } - - - void ListJobs(std::set<std::string>& target) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - for (JobsIndex::const_iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) - { - target.insert(it->first); - } - } - - - bool GetJobInfo(JobInfo& target, - const std::string& id) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::const_iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - return false; - } - else - { - const JobHandler& handler = *found->second; - target = JobInfo(handler.GetId(), - handler.GetPriority(), - handler.GetState(), - handler.GetLastStatus(), - handler.GetCreationTime(), - handler.GetLastStateChangeTime(), - handler.GetRuntime()); - return true; - } - } - - - void Submit(std::string& id, - IJob* job, // Takes ownership - int priority) - { - std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - id = handler->GetId(); - - pendingJobs_.push(handler.get()); - pendingJobAvailable_.notify_one(); - - jobsIndex_.insert(std::make_pair(id, handler.release())); - - LOG(INFO) << "New job submitted with priority " << priority << ": " << id; - - CheckInvariants(); - } - - - void Submit(IJob* job, // Takes ownership - int priority) - { - std::string id; - Submit(id, job, priority); - } - - - void SetPriority(const std::string& id, - int priority) - { - LOG(INFO) << "Changing priority to " << priority << " for job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else - { - found->second->SetPriority(priority); - - if (found->second->GetState() == JobState_Pending) - { - // If the job is pending, we need to reconstruct the - // priority queue, as the heap condition has changed - - PendingJobs copy; - std::swap(copy, pendingJobs_); - - assert(pendingJobs_.empty()); - while (!copy.empty()) - { - pendingJobs_.push(copy.top()); - copy.pop(); - } - } - } - - CheckInvariants(); - } - - - void Pause(const std::string& id) - { - LOG(INFO) << "Pausing job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else - { - switch (found->second->GetState()) - { - case JobState_Pending: - { - // If the job is pending, we need to reconstruct the - // priority queue to remove it - PendingJobs copy; - std::swap(copy, pendingJobs_); - - assert(pendingJobs_.empty()); - while (!copy.empty()) - { - if (copy.top()->GetId() != id) - { - pendingJobs_.push(copy.top()); - } - - copy.pop(); - } - - found->second->SetState(JobState_Paused); - - break; - } - - case JobState_Retry: - { - RetryJobs::iterator item = retryJobs_.find(found->second); - assert(item != retryJobs_.end()); - retryJobs_.erase(item); - - found->second->SetState(JobState_Paused); - - break; - } - - case JobState_Paused: - case JobState_Success: - case JobState_Failure: - // Nothing to be done - break; - - case JobState_Running: - found->second->SchedulePause(); - break; - - default: - throw OrthancException(ErrorCode_InternalError); - } - } - - CheckInvariants(); - } - - - void Resume(const std::string& id) - { - LOG(INFO) << "Resuming job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else if (found->second->GetState() != JobState_Paused) - { - LOG(WARNING) << "Cannot resume a job that is not paused: " << id; - } - else - { - found->second->SetState(JobState_Pending); - pendingJobs_.push(found->second); - pendingJobAvailable_.notify_one(); - } - - CheckInvariants(); - } - - - void Resubmit(const std::string& id) - { - LOG(INFO) << "Resubmitting failed job: " << id; - - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::iterator found = jobsIndex_.find(id); - - if (found == jobsIndex_.end()) - { - LOG(WARNING) << "Unknown job: " << id; - } - else if (found->second->GetState() != JobState_Failure) - { - LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; - } - else - { - bool ok = false; - for (CompletedJobs::iterator it = completedJobs_.begin(); - it != completedJobs_.end(); ++it) - { - if (*it == found->second) - { - ok = true; - completedJobs_.erase(it); - break; - } - } - - assert(ok); - - found->second->SetState(JobState_Pending); - pendingJobs_.push(found->second); - pendingJobAvailable_.notify_one(); - } - - CheckInvariants(); - } - - - void ScheduleRetries() - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - RetryJobs copy; - std::swap(copy, retryJobs_); - - const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - - assert(retryJobs_.empty()); - for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) - { - if ((*it)->IsRetryReady(now)) - { - LOG(INFO) << "Retrying job: " << (*it)->GetId(); - (*it)->SetState(JobState_Pending); - pendingJobs_.push(*it); - pendingJobAvailable_.notify_one(); - } - else - { - retryJobs_.insert(*it); - } - } - - CheckInvariants(); - } - - - bool GetState(JobState& state, - const std::string& id) - { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::const_iterator it = jobsIndex_.find(id); - if (it == jobsIndex_.end()) - { - return false; - } - else - { - state = it->second->GetState(); - return true; - } - } - - - class RunningJob : public boost::noncopyable - { - private: - JobsRegistry& registry_; - JobHandler* handler_; // Can only be accessed if the registry - // mutex is locked! - IJob* job_; // Will by design be in mutual exclusion, - // because only one RunningJob can be - // executed at a time on a JobHandler - - std::string id_; - int priority_; - JobState targetState_; - unsigned int targetRetryTimeout_; - - public: - RunningJob(JobsRegistry& registry, - unsigned int timeout) : - registry_(registry), - handler_(NULL), - targetState_(JobState_Failure), - targetRetryTimeout_(0) - { - { - boost::mutex::scoped_lock lock(registry_.mutex_); - - while (registry_.pendingJobs_.empty()) - { - if (timeout == 0) - { - registry_.pendingJobAvailable_.wait(lock); - } - else - { - bool success = registry_.pendingJobAvailable_.timed_wait - (lock, boost::posix_time::milliseconds(timeout)); - if (!success) - { - // No pending job - return; - } - } - } - - handler_ = registry_.pendingJobs_.top(); - registry_.pendingJobs_.pop(); - - assert(handler_->GetState() == JobState_Pending); - handler_->SetState(JobState_Running); - - job_ = &handler_->GetJob(); - id_ = handler_->GetId(); - priority_ = handler_->GetPriority(); - } - } - - ~RunningJob() - { - if (IsValid()) - { - boost::mutex::scoped_lock lock(registry_.mutex_); - - switch (targetState_) - { - case JobState_Failure: - registry_.MarkRunningAsCompleted(*handler_, false); - break; - - case JobState_Success: - registry_.MarkRunningAsCompleted(*handler_, true); - break; - - case JobState_Paused: - registry_.MarkRunningAsPaused(*handler_); - break; - - case JobState_Retry: - registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); - break; - - default: - assert(0); - } - } - } - - bool IsValid() const - { - return (handler_ != NULL && - job_ != NULL); - } - - const std::string& GetId() const - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return id_; - } - } - - int GetPriority() const - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return priority_; - } - } - - IJob& GetJob() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - return *job_; - } - } - - bool IsPauseScheduled() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - boost::mutex::scoped_lock lock(registry_.mutex_); - registry_.CheckInvariants(); - assert(handler_->GetState() == JobState_Running); - - return handler_->IsPauseScheduled(); - } - } - - void MarkSuccess() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Success; - } - } - - void MarkFailure() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Failure; - } - } - - void MarkPause() - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Paused; - } - } - - void MarkRetry(unsigned int timeout) - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - targetState_ = JobState_Retry; - targetRetryTimeout_ = timeout; - } - } - - void UpdateStatus(ErrorCode code) - { - if (!IsValid()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - JobStatus status(code, *job_); - - boost::mutex::scoped_lock lock(registry_.mutex_); - registry_.CheckInvariants(); - assert(handler_->GetState() == JobState_Running); - - handler_->SetLastStatus(status); - } - } - }; - }; - - - - class JobsEngine - { - private: - enum State - { - State_Setup, - State_Running, - State_Stopping, - State_Done - }; - - boost::mutex stateMutex_; - State state_; - JobsRegistry registry_; - boost::thread retryHandler_; - std::vector<boost::thread> workers_; - - bool ExecuteStep(JobsRegistry::RunningJob& running, - size_t workerIndex) - { - assert(running.IsValid()); - - LOG(INFO) << "Executing job with priority " << running.GetPriority() - << " in worker thread " << workerIndex << ": " << running.GetId(); - - if (running.IsPauseScheduled()) - { - running.GetJob().ReleaseResources(); - running.MarkPause(); - return false; - } - - std::auto_ptr<JobStepResult> result; - - { - try - { - result.reset(running.GetJob().ExecuteStep()); - - if (result->GetCode() == JobStepCode_Failure) - { - running.UpdateStatus(ErrorCode_InternalError); - } - else - { - running.UpdateStatus(ErrorCode_Success); - } - } - catch (OrthancException& e) - { - running.UpdateStatus(e.GetErrorCode()); - } - catch (boost::bad_lexical_cast&) - { - running.UpdateStatus(ErrorCode_BadFileFormat); - } - catch (...) - { - running.UpdateStatus(ErrorCode_InternalError); - } - - if (result.get() == NULL) - { - result.reset(new JobStepResult(JobStepCode_Failure)); - } - } - - switch (result->GetCode()) - { - case JobStepCode_Success: - running.MarkSuccess(); - return false; - - case JobStepCode_Failure: - running.MarkFailure(); - return false; - - case JobStepCode_Retry: - running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout()); - return false; - - case JobStepCode_Continue: - return true; - - default: - throw OrthancException(ErrorCode_InternalError); - } - } - - static void RetryHandler(JobsEngine* engine) - { - assert(engine != NULL); - - for (;;) - { - boost::this_thread::sleep(boost::posix_time::milliseconds(200)); - - { - boost::mutex::scoped_lock lock(engine->stateMutex_); - - if (engine->state_ != State_Running) - { - return; - } - } - - engine->GetRegistry().ScheduleRetries(); - } - } - - static void Worker(JobsEngine* engine, - size_t workerIndex) - { - assert(engine != NULL); - - LOG(INFO) << "Worker thread " << workerIndex << " has started"; - - for (;;) - { - { - boost::mutex::scoped_lock lock(engine->stateMutex_); - - if (engine->state_ != State_Running) - { - return; - } - } - - JobsRegistry::RunningJob running(engine->GetRegistry(), 100); - - if (running.IsValid()) - { - for (;;) - { - if (!engine->ExecuteStep(running, workerIndex)) - { - break; - } - } - } - } - } - - public: - JobsEngine() : - state_(State_Setup), - workers_(1) - { - } - - ~JobsEngine() - { - if (state_ != State_Setup && - state_ != State_Done) - { - LOG(ERROR) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!"; - Stop(); - } - } - - void SetWorkersCount(size_t count) - { - if (count == 0) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - - boost::mutex::scoped_lock lock(stateMutex_); - - if (state_ != State_Setup) - { - // Can only be invoked before calling "Start()" - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - workers_.resize(count); - } - - JobsRegistry& GetRegistry() - { - return registry_; - } - - void Start() - { - boost::mutex::scoped_lock lock(stateMutex_); - - if (state_ != State_Setup) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - retryHandler_ = boost::thread(RetryHandler, this); - - for (size_t i = 0; i < workers_.size(); i++) - { - workers_[i] = boost::thread(Worker, this, i); - } - - state_ = State_Running; - - LOG(WARNING) << "The jobs engine has started"; - } - - - void Stop() - { - { - boost::mutex::scoped_lock lock(stateMutex_); - - if (state_ != State_Running) - { - return; - } - - state_ = State_Stopping; - } - - LOG(INFO) << "Stopping the jobs engine"; - - if (retryHandler_.joinable()) - { - retryHandler_.join(); - } - - for (size_t i = 0; i < workers_.size(); i++) - { - if (workers_[i].joinable()) - { - workers_[i].join(); - } - } - - { - boost::mutex::scoped_lock lock(stateMutex_); - state_ = State_Done; - } - - LOG(WARNING) << "The jobs engine has stopped"; - } - }; -} - - - class DummyJob : public Orthanc::IJob { private: