Mercurial > hg > orthanc
changeset 2668:d26dd081df97 jobs
saving jobs engine on exit
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 08 Jun 2018 18:08:48 +0200 |
parents | 5fa2f2ce74f0 |
children | eaf10085ffa1 |
files | Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsEngine.h Core/JobsEngine/JobsRegistry.cpp OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerEnumerations.h OrthancServer/ServerIndex.cpp OrthancServer/ServerIndex.h |
diffstat | 8 files changed, 173 insertions(+), 26 deletions(-) [+] |
line wrap: on
line diff
--- a/Core/JobsEngine/JobsEngine.cpp Fri Jun 08 15:48:35 2018 +0200 +++ b/Core/JobsEngine/JobsEngine.cpp Fri Jun 08 18:08:48 2018 +0200 @@ -37,6 +37,8 @@ #include "../Logging.h" #include "../OrthancException.h" +#include <json/reader.h> + namespace Orthanc { bool JobsEngine::IsRunning() @@ -156,6 +158,7 @@ JobsEngine::JobsEngine() : state_(State_Setup), + registry_(new JobsRegistry), threadSleep_(200), workers_(1) { @@ -172,7 +175,49 @@ } } - + + JobsRegistry& JobsEngine::GetRegistry() + { + if (registry_.get() == NULL) + { + throw OrthancException(ErrorCode_InternalError); + } + + return *registry_; + } + + + void JobsEngine::LoadRegistryFromJson(IJobUnserializer& unserializer, + const Json::Value& serialized) + { + boost::mutex::scoped_lock lock(stateMutex_); + + if (state_ != State_Setup) + { + // Can only be invoked before calling "Start()" + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + registry_.reset(new JobsRegistry(unserializer, serialized)); + } + + + void JobsEngine::LoadRegistryFromString(IJobUnserializer& unserializer, + const std::string& serialized) + { + Json::Value value; + Json::Reader reader; + if (reader.parse(serialized, value)) + { + LoadRegistryFromJson(unserializer, value); + } + else + { + throw OrthancException(ErrorCode_BadFileFormat); + } + } + + void JobsEngine::SetWorkersCount(size_t count) { boost::mutex::scoped_lock lock(stateMutex_);
--- a/Core/JobsEngine/JobsEngine.h Fri Jun 08 15:48:35 2018 +0200 +++ b/Core/JobsEngine/JobsEngine.h Fri Jun 08 18:08:48 2018 +0200 @@ -52,7 +52,7 @@ boost::mutex stateMutex_; State state_; - JobsRegistry registry_; + std::auto_ptr<JobsRegistry> registry_; boost::thread retryHandler_; unsigned int threadSleep_; std::vector<boost::thread*> workers_; @@ -72,14 +72,17 @@ ~JobsEngine(); + JobsRegistry& GetRegistry(); + + void LoadRegistryFromJson(IJobUnserializer& unserializer, + const Json::Value& serialized); + + void LoadRegistryFromString(IJobUnserializer& unserializer, + const std::string& serialized); + void SetWorkersCount(size_t count); void SetThreadSleep(unsigned int sleep); - - JobsRegistry& GetRegistry() - { - return registry_; - } void Start();
--- a/Core/JobsEngine/JobsRegistry.cpp Fri Jun 08 15:48:35 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.cpp Fri Jun 08 18:08:48 2018 +0200 @@ -109,6 +109,7 @@ } job->GetJobType(jobType_); + job->Start(); lastStatus_ = JobStatus(ErrorCode_Success, *job_); } @@ -592,6 +593,11 @@ void JobsRegistry::SubmitInternal(std::string& id, JobHandler* handlerRaw) { + if (handlerRaw == NULL) + { + throw OrthancException(ErrorCode_NullPointer); + } + std::auto_ptr<JobHandler> handler(handlerRaw); boost::mutex::scoped_lock lock(mutex_); @@ -600,8 +606,32 @@ id = handler->GetId(); int priority = handler->GetPriority(); - pendingJobs_.push(handler.get()); - pendingJobAvailable_.notify_one(); + switch (handler->GetState()) + { + case JobState_Pending: + case JobState_Retry: + case JobState_Running: + handler->SetState(JobState_Pending); + pendingJobs_.push(handler.get()); + pendingJobAvailable_.notify_one(); + break; + + case JobState_Success: + SetCompletedJob(*handler, true); + break; + + case JobState_Failure: + SetCompletedJob(*handler, false); + break; + + case JobState_Paused: + break; + + default: + LOG(ERROR) << "A job should not be loaded from state: " + << EnumerationToString(handler->GetState()); + throw OrthancException(ErrorCode_InternalError); + } jobsIndex_.insert(std::make_pair(id, handler.release()));
--- a/OrthancServer/ServerContext.cpp Fri Jun 08 15:48:35 2018 +0200 +++ b/OrthancServer/ServerContext.cpp Fri Jun 08 18:08:48 2018 +0200 @@ -34,20 +34,21 @@ #include "PrecompiledHeadersServer.h" #include "ServerContext.h" +#include "../Core/DicomParsing/FromDcmtkBridge.h" #include "../Core/FileStorage/StorageAccessor.h" #include "../Core/HttpServer/FilesystemHttpSender.h" #include "../Core/HttpServer/HttpStreamTranscoder.h" #include "../Core/Logging.h" -#include "../Core/DicomParsing/FromDcmtkBridge.h" +#include "../Plugins/Engine/OrthancPlugins.h" +#include "OrthancInitialization.h" +#include "OrthancRestApi/OrthancRestApi.h" +#include "Search/LookupResource.h" +#include "ServerJobs/OrthancJobUnserializer.h" #include "ServerToolbox.h" -#include "OrthancInitialization.h" #include <EmbeddedResources.h> #include <dcmtk/dcmdata/dcfilefo.h> -#include "OrthancRestApi/OrthancRestApi.h" -#include "../Plugins/Engine/OrthancPlugins.h" -#include "Search/LookupResource.h" #define ENABLE_DICOM_CACHE 1 @@ -65,11 +66,12 @@ namespace Orthanc { - void ServerContext::ChangeThread(ServerContext* that) + void ServerContext::ChangeThread(ServerContext* that, + unsigned int sleepDelay) { while (!that->done_) { - std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(100)); + std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(sleepDelay)); if (obj.get() != NULL) { @@ -106,6 +108,58 @@ } + void ServerContext::SetupJobsEngine(bool unitTesting) + { + jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2)); + jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200); + + std::string serialized; + if (index_.LookupGlobalProperty(serialized, GlobalProperty_JobsRegistry)) + { + LOG(WARNING) << "Reloading the jobs from the last execution of Orthanc"; + OrthancJobUnserializer unserializer(*this); + + try + { + jobsEngine_.LoadRegistryFromString(unserializer, serialized); + } + catch (OrthancException& e) + { + LOG(ERROR) << "Cannot unserialize the jobs engine: " << e.What(); + throw; + } + } + else + { + LOG(INFO) << "The last execution of Orthanc has archived no job"; + //jobsEngine_.GetRegistry().SetMaxCompleted // TODO + } + + jobsEngine_.Start(); + } + + + void ServerContext::SaveJobsEngine() + { + LOG(INFO) << "Serializing the content of the jobs engine"; + + try + { + Json::Value value; + jobsEngine_.GetRegistry().Serialize(value); + + Json::FastWriter writer; + std::string serialized = writer.write(value); + + index_.SetGlobalProperty(GlobalProperty_JobsRegistry, serialized); + } + catch (OrthancException& e) + { + LOG(ERROR) << "Cannot serialize the jobs engine: " << e.What(); + } + } + + ServerContext::ServerContext(IDatabaseWrapper& database, IStorageArea& area, bool unitTesting) : @@ -125,12 +179,9 @@ { listeners_.push_back(ServerListener(lua_, "Lua")); - jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2)); - //jobsEngine_.SetMaxCompleted // TODO - jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200); - jobsEngine_.Start(); + SetupJobsEngine(unitTesting); - changeThread_ = boost::thread(ChangeThread, this); + changeThread_ = boost::thread(ChangeThread, this, (unitTesting ? 20 : 100)); } @@ -161,6 +212,8 @@ changeThread_.join(); } + SaveJobsEngine(); + // Do not change the order below! jobsEngine_.Stop(); index_.Stop();
--- a/OrthancServer/ServerContext.h Fri Jun 08 15:48:35 2018 +0200 +++ b/OrthancServer/ServerContext.h Fri Jun 08 18:08:48 2018 +0200 @@ -104,11 +104,16 @@ typedef std::list<ServerListener> ServerListeners; - static void ChangeThread(ServerContext* that); + static void ChangeThread(ServerContext* that, + unsigned int sleepDelay); void ReadDicomAsJsonInternal(std::string& result, const std::string& instancePublicId); + void SetupJobsEngine(bool unitTesting); + + void SaveJobsEngine(); + ServerIndex index_; IStorageArea& area_;
--- a/OrthancServer/ServerEnumerations.h Fri Jun 08 15:48:35 2018 +0200 +++ b/OrthancServer/ServerEnumerations.h Fri Jun 08 18:08:48 2018 +0200 @@ -76,7 +76,8 @@ GlobalProperty_DatabaseSchemaVersion = 1, // Unused in the Orthanc core as of Orthanc 0.9.5 GlobalProperty_FlushSleep = 2, GlobalProperty_AnonymizationSequence = 3, - GlobalProperty_DatabasePatchLevel = 4 // Reserved for internal use of the database plugins + GlobalProperty_DatabasePatchLevel = 4, // Reserved for internal use of the database plugins + GlobalProperty_JobsRegistry = 5 }; enum MetadataType
--- a/OrthancServer/ServerIndex.cpp Fri Jun 08 15:48:35 2018 +0200 +++ b/OrthancServer/ServerIndex.cpp Fri Jun 08 18:08:48 2018 +0200 @@ -2096,13 +2096,20 @@ } + bool ServerIndex::LookupGlobalProperty(std::string& value, + GlobalProperty property) + { + boost::mutex::scoped_lock lock(mutex_); + return db_.LookupGlobalProperty(value, property); + } + + std::string ServerIndex::GetGlobalProperty(GlobalProperty property, const std::string& defaultValue) { - boost::mutex::scoped_lock lock(mutex_); - std::string value; - if (db_.LookupGlobalProperty(value, property)) + + if (LookupGlobalProperty(value, property)) { return value; }
--- a/OrthancServer/ServerIndex.h Fri Jun 08 15:48:35 2018 +0200 +++ b/OrthancServer/ServerIndex.h Fri Jun 08 18:08:48 2018 +0200 @@ -258,6 +258,9 @@ void SetGlobalProperty(GlobalProperty property, const std::string& value); + bool LookupGlobalProperty(std::string& value, + GlobalProperty property); + std::string GetGlobalProperty(GlobalProperty property, const std::string& defaultValue);