Mercurial > hg > orthanc
changeset 2815:925d8dc03a23
unserialization of jobs from plugins
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Tue, 11 Sep 2018 16:34:21 +0200 |
parents | 7d1d3136f6cf |
children | 567d1be0849e |
files | NEWS OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerJobs/OrthancJobUnserializer.cpp OrthancServer/main.cpp Plugins/Engine/OrthancPlugins.cpp Plugins/Engine/OrthancPlugins.h Plugins/Engine/PluginsJob.cpp Plugins/Engine/PluginsJob.h Plugins/Include/orthanc/OrthancCPlugin.h UnitTestsSources/MultiThreadingTests.cpp UnitTestsSources/ServerIndexTests.cpp |
diffstat | 12 files changed, 169 insertions(+), 78 deletions(-) [+] |
line wrap: on
line diff
--- a/NEWS Fri Sep 07 10:09:17 2018 +0200 +++ b/NEWS Tue Sep 11 16:34:21 2018 +0200 @@ -19,7 +19,7 @@ * New primitives to access Orthanc peers from plugins * New events in change callbacks: "UpdatedPeers" and "UpdatedModalities" -* New primitives to create jobs from plugins: "OrthancPluginSubmitJob()" +* New primitives to handle jobs from plugins: "OrthancPluginSubmitJob()" and "OrthancPluginRegisterJobsUnserializer()" Maintenance
--- a/OrthancServer/ServerContext.cpp Fri Sep 07 10:09:17 2018 +0200 +++ b/OrthancServer/ServerContext.cpp Tue Sep 11 16:34:21 2018 +0200 @@ -156,33 +156,17 @@ void ServerContext::SetupJobsEngine(bool unitTesting, bool loadJobsFromDatabase) { - jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2)); - jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200); - if (loadJobsFromDatabase) { std::string serialized; if (index_.LookupGlobalProperty(serialized, GlobalProperty_JobsRegistry)) { LOG(WARNING) << "Reloading the jobs from the last execution of Orthanc"; + OrthancJobUnserializer unserializer(*this); try { - bool plugin = false; - -#if ORTHANC_ENABLE_PLUGINS == 1 - if (HasPlugins() && - plugins_->UnserializeJob(serialized)) - { - plugin = true; - } -#endif - - if (!plugin) - { - OrthancJobUnserializer unserializer(*this); - jobsEngine_.LoadRegistryFromString(unserializer, serialized); - } + jobsEngine_.LoadRegistryFromString(unserializer, serialized); } catch (OrthancException& e) { @@ -204,6 +188,9 @@ jobsEngine_.GetRegistry().SetObserver(*this); jobsEngine_.Start(); + isJobsEngineUnserialized_ = true; + + saveJobsThread_ = boost::thread(SaveJobsThread, this, (unitTesting ? 20 : 100)); } @@ -230,8 +217,7 @@ ServerContext::ServerContext(IDatabaseWrapper& database, IStorageArea& area, - bool unitTesting, - bool loadJobsFromDatabase) : + bool unitTesting) : index_(*this, database, (unitTesting ? 20 : 500)), area_(area), compressionEnabled_(false), @@ -246,15 +232,15 @@ #endif done_(false), haveJobsChanged_(false), + isJobsEngineUnserialized_(false), queryRetrieveArchive_(Configuration::GetGlobalUnsignedIntegerParameter("QueryRetrieveSize", 10)), defaultLocalAet_(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC")) { - listeners_.push_back(ServerListener(luaListener_, "Lua")); + jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2)); + jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200); - SetupJobsEngine(unitTesting, loadJobsFromDatabase); - + listeners_.push_back(ServerListener(luaListener_, "Lua")); changeThread_ = boost::thread(ChangeThread, this, (unitTesting ? 20 : 100)); - saveJobsThread_ = boost::thread(SaveJobsThread, this, (unitTesting ? 20 : 100)); } @@ -291,7 +277,12 @@ } jobsEngine_.GetRegistry().ResetObserver(); - SaveJobsEngine(); + + if (isJobsEngineUnserialized_) + { + // Avoid losing jobs if the JobsRegistry cannot be unserialized + SaveJobsEngine(); + } // Do not change the order below! jobsEngine_.Stop();
--- a/OrthancServer/ServerContext.h Fri Sep 07 10:09:17 2018 +0200 +++ b/OrthancServer/ServerContext.h Tue Sep 11 16:34:21 2018 +0200 @@ -143,9 +143,6 @@ void ReadDicomAsJsonInternal(std::string& result, const std::string& instancePublicId); - void SetupJobsEngine(bool unitTesting, - bool loadJobsFromDatabase); - void SaveJobsEngine(); virtual void SignalJobSubmitted(const std::string& jobId); @@ -178,6 +175,7 @@ bool done_; bool haveJobsChanged_; + bool isJobsEngineUnserialized_; SharedMessageQueue pendingChanges_; boost::thread changeThread_; boost::thread saveJobsThread_; @@ -208,11 +206,13 @@ ServerContext(IDatabaseWrapper& database, IStorageArea& area, - bool unitTesting, - bool loadJobsFromDatabase); + bool unitTesting); ~ServerContext(); + void SetupJobsEngine(bool unitTesting, + bool loadJobsFromDatabase); + ServerIndex& GetIndex() { return index_;
--- a/OrthancServer/ServerJobs/OrthancJobUnserializer.cpp Fri Sep 07 10:09:17 2018 +0200 +++ b/OrthancServer/ServerJobs/OrthancJobUnserializer.cpp Tue Sep 11 16:34:21 2018 +0200 @@ -55,6 +55,17 @@ { const std::string type = SerializationToolbox::ReadString(source, "Type"); +#if ORTHANC_ENABLE_PLUGINS == 1 + if (context_.HasPlugins()) + { + std::auto_ptr<IJob> job(context_.GetPlugins().UnserializeJob(type, source)); + if (job.get() != NULL) + { + return job.release(); + } + } +#endif + if (type == "DicomModalityStore") { return new DicomModalityStoreJob(context_, source);
--- a/OrthancServer/main.cpp Fri Sep 07 10:09:17 2018 +0200 +++ b/OrthancServer/main.cpp Tue Sep 11 16:34:21 2018 +0200 @@ -875,7 +875,8 @@ static bool ConfigureHttpHandler(ServerContext& context, - OrthancPlugins *plugins) + OrthancPlugins *plugins, + bool loadJobsFromDatabase) { #if ORTHANC_ENABLE_PLUGINS == 1 // By order of priority, first apply the "plugins" layer, so that @@ -900,7 +901,13 @@ OrthancRestApi restApi(context); context.GetHttpHandler().Register(restApi, true); - return StartDicomServer(context, restApi, plugins); + context.SetupJobsEngine(false /* not running unit tests */, loadJobsFromDatabase); + + bool restart = StartDicomServer(context, restApi, plugins); + + context.Stop(); + + return restart; } @@ -973,7 +980,7 @@ DicomUserConnection::SetDefaultTimeout(Configuration::GetGlobalUnsignedIntegerParameter("DicomScuTimeout", 10)); - ServerContext context(database, storageArea, false /* not running unit tests */, loadJobsFromDatabase); + ServerContext context(database, storageArea, false /* not running unit tests */); context.SetCompressionEnabled(Configuration::GetGlobalBoolParameter("StorageCompression", false)); context.SetStoreMD5ForAttachments(Configuration::GetGlobalBoolParameter("StoreMD5ForAttachments", true)); @@ -1012,15 +1019,13 @@ try { - restart = ConfigureHttpHandler(context, plugins); + restart = ConfigureHttpHandler(context, plugins, loadJobsFromDatabase); } catch (OrthancException& e) { error = e.GetErrorCode(); } - context.Stop(); - #if ORTHANC_ENABLE_PLUGINS == 1 if (plugins) {
--- a/Plugins/Engine/OrthancPlugins.cpp Fri Sep 07 10:09:17 2018 +0200 +++ b/Plugins/Engine/OrthancPlugins.cpp Tue Sep 11 16:34:21 2018 +0200 @@ -2876,6 +2876,27 @@ CallPeerApi(parameters); return true; + case _OrthancPluginService_CreateJob: + { + const _OrthancPluginCreateJob& p = + *reinterpret_cast<const _OrthancPluginCreateJob*>(parameters); + *(p.target) = reinterpret_cast<OrthancPluginJob*>(new PluginsJob(p)); + return true; + } + + case _OrthancPluginService_FreeJob: + { + const _OrthancPluginFreeJob& p = + *reinterpret_cast<const _OrthancPluginFreeJob*>(parameters); + + if (p.job != NULL) + { + delete reinterpret_cast<PluginsJob*>(p.job); + } + + return true; + } + case _OrthancPluginService_SubmitJob: { const _OrthancPluginSubmitJob& p = @@ -2884,7 +2905,8 @@ std::string uuid; PImpl::ServerContextLock lock(*pimpl_); - lock.GetContext().GetJobsEngine().GetRegistry().Submit(uuid, new PluginsJob(p), p.priority); + lock.GetContext().GetJobsEngine().GetRegistry().Submit + (uuid, reinterpret_cast<PluginsJob*>(p.job), p.priority); *p.resultId = CopyString(uuid); @@ -3434,9 +3456,9 @@ } - bool OrthancPlugins::UnserializeJob(const Json::Value& value) + IJob* OrthancPlugins::UnserializeJob(const std::string& type, + const Json::Value& value) { - const std::string type = SerializationToolbox::ReadString(value, "Type"); const std::string serialized = value.toStyledString(); boost::mutex::scoped_lock lock(pimpl_->jobsUnserializersMutex_); @@ -3445,12 +3467,13 @@ unserializer = pimpl_->jobsUnserializers_.begin(); unserializer != pimpl_->jobsUnserializers_.end(); ++unserializer) { - if ((*unserializer) (type.c_str(), serialized.c_str()) == OrthancPluginErrorCode_Success) + OrthancPluginJob* job = (*unserializer) (type.c_str(), serialized.c_str()); + if (job != NULL) { - return true; + return reinterpret_cast<PluginsJob*>(job); } } - return false; + return NULL; } }
--- a/Plugins/Engine/OrthancPlugins.h Fri Sep 07 10:09:17 2018 +0200 +++ b/Plugins/Engine/OrthancPlugins.h Tue Sep 11 16:34:21 2018 +0200 @@ -59,6 +59,7 @@ #include "../../Core/FileStorage/IStorageArea.h" #include "../../Core/HttpServer/IHttpHandler.h" #include "../../Core/HttpServer/IIncomingHttpRequestFilter.h" +#include "../../Core/JobsEngine/IJob.h" #include "../../OrthancServer/IDicomImageDecoder.h" #include "../../OrthancServer/IServerListener.h" #include "OrthancPluginDatabase.h" @@ -308,7 +309,8 @@ virtual IMoveRequestHandler* ConstructMoveRequestHandler(); - bool UnserializeJob(const Json::Value& value); + IJob* UnserializeJob(const std::string& type, + const Json::Value& value); }; }
--- a/Plugins/Engine/PluginsJob.cpp Fri Sep 07 10:09:17 2018 +0200 +++ b/Plugins/Engine/PluginsJob.cpp Tue Sep 11 16:34:21 2018 +0200 @@ -47,7 +47,7 @@ namespace Orthanc { - PluginsJob::PluginsJob(const _OrthancPluginSubmitJob& parameters) : + PluginsJob::PluginsJob(const _OrthancPluginCreateJob& parameters) : parameters_(parameters) { if (parameters_.job == NULL) @@ -55,8 +55,8 @@ throw OrthancException(ErrorCode_NullPointer); } - if (parameters_.resultId == NULL || - parameters_.freeJob == NULL || + if (parameters_.target == NULL || + parameters_.finalize == NULL || parameters_.type == NULL || parameters_.getProgress == NULL || parameters_.getContent == NULL || @@ -65,7 +65,7 @@ parameters_.stop == NULL || parameters_.reset == NULL) { - parameters_.freeJob(parameters.job); + parameters_.finalize(parameters.job); throw OrthancException(ErrorCode_NullPointer); } @@ -75,7 +75,7 @@ PluginsJob::~PluginsJob() { assert(parameters_.job != NULL); - parameters_.freeJob(parameters_.job); + parameters_.finalize(parameters_.job); } JobStepResult PluginsJob::Step()
--- a/Plugins/Engine/PluginsJob.h Fri Sep 07 10:09:17 2018 +0200 +++ b/Plugins/Engine/PluginsJob.h Tue Sep 11 16:34:21 2018 +0200 @@ -43,11 +43,11 @@ class PluginsJob : public IJob { private: - _OrthancPluginSubmitJob parameters_; + _OrthancPluginCreateJob parameters_; std::string type_; public: - PluginsJob(const _OrthancPluginSubmitJob& parameters); + PluginsJob(const _OrthancPluginCreateJob& parameters); virtual ~PluginsJob();
--- a/Plugins/Include/orthanc/OrthancCPlugin.h Fri Sep 07 10:09:17 2018 +0200 +++ b/Plugins/Include/orthanc/OrthancCPlugin.h Tue Sep 11 16:34:21 2018 +0200 @@ -423,7 +423,6 @@ _OrthancPluginService_CallHttpClient2 = 27, _OrthancPluginService_GenerateUuid = 28, _OrthancPluginService_RegisterPrivateDictionaryTag = 29, - _OrthancPluginService_SubmitJob = 30, /* Registration of callbacks */ _OrthancPluginService_RegisterRestCallback = 1000, @@ -437,7 +436,6 @@ _OrthancPluginService_RegisterFindCallback = 1008, _OrthancPluginService_RegisterMoveCallback = 1009, _OrthancPluginService_RegisterIncomingHttpRequestFilter2 = 1010, - _OrthancPluginService_RegisterJobsUnserializer = 1011, /* Sending answers to REST calls */ _OrthancPluginService_AnswerBuffer = 2000, @@ -529,6 +527,12 @@ _OrthancPluginService_GetPeerName = 8004, _OrthancPluginService_GetPeerUrl = 8005, _OrthancPluginService_CallPeerApi = 8006, + + /* Primitives for handling jobs (new in 1.4.2) */ + _OrthancPluginService_CreateJob = 9000, + _OrthancPluginService_FreeJob = 9001, + _OrthancPluginService_SubmitJob = 9002, + _OrthancPluginService_RegisterJobsUnserializer = 9003, _OrthancPluginService_INTERNAL = 0x7fffffff } _OrthancPluginService; @@ -1272,7 +1276,10 @@ - typedef void (*OrthancPluginJobFree) (void* job); + + + typedef struct _OrthancPluginJob_t OrthancPluginJob; + typedef void (*OrthancPluginJobFinalize) (void* job); typedef float (*OrthancPluginJobGetProgress) (void* job); typedef const char* (*OrthancPluginJobGetContent) (void* job); typedef const char* (*OrthancPluginJobGetSerialized) (void* job); @@ -1280,8 +1287,8 @@ typedef OrthancPluginErrorCode (*OrthancPluginJobStop) (void* job, OrthancPluginJobStopReason reason); typedef OrthancPluginErrorCode (*OrthancPluginJobReset) (void* job); - typedef OrthancPluginErrorCode (*OrthancPluginJobsUnserializer) (const char* jobType, - const char* serialized); + typedef OrthancPluginJob* (*OrthancPluginJobsUnserializer) (const char* jobType, + const char* serialized); @@ -6041,12 +6048,13 @@ + + typedef struct { - char** resultId; + OrthancPluginJob** target; void *job; - OrthancPluginJobFree freeJob; - int priority; + OrthancPluginJobFinalize finalize; const char *type; OrthancPluginJobGetProgress getProgress; OrthancPluginJobGetContent getContent; @@ -6054,13 +6062,12 @@ OrthancPluginJobStep step; OrthancPluginJobStop stop; OrthancPluginJobReset reset; - } _OrthancPluginSubmitJob; - - ORTHANC_PLUGIN_INLINE char *OrthancPluginSubmitJob( + } _OrthancPluginCreateJob; + + ORTHANC_PLUGIN_INLINE OrthancPluginJob *OrthancPluginCreateJob( OrthancPluginContext *context, void *job, - OrthancPluginJobFree freeJob, - int priority, + OrthancPluginJobFinalize finalize, const char *type, OrthancPluginJobGetProgress getProgress, OrthancPluginJobGetContent getContent, @@ -6069,6 +6076,65 @@ OrthancPluginJobStop stop, OrthancPluginJobReset reset) { + OrthancPluginJob* target = NULL; + + _OrthancPluginCreateJob params; + memset(¶ms, 0, sizeof(params)); + + params.target = ⌖ + params.job = job; + params.finalize = finalize; + params.type = type; + params.getProgress = getProgress; + params.getContent = getContent; + params.getSerialized = getSerialized; + params.step = step; + params.stop = stop; + params.reset = reset; + + if (context->InvokeService(context, _OrthancPluginService_CreateJob, ¶ms) != OrthancPluginErrorCode_Success || + target == NULL) + { + /* Error */ + return NULL; + } + else + { + return target; + } + } + + + typedef struct + { + OrthancPluginJob* job; + } _OrthancPluginFreeJob; + + ORTHANC_PLUGIN_INLINE void OrthancPluginFreeJob( + OrthancPluginContext* context, + OrthancPluginJob* job) + { + _OrthancPluginFreeJob params; + params.job = job; + + context->InvokeService(context, _OrthancPluginService_FreeJob, ¶ms); + } + + + + + typedef struct + { + char** resultId; + OrthancPluginJob *job; + int priority; + } _OrthancPluginSubmitJob; + + ORTHANC_PLUGIN_INLINE char *OrthancPluginSubmitJob( + OrthancPluginContext *context, + OrthancPluginJob *job, + int priority) + { char* resultId = NULL; _OrthancPluginSubmitJob params; @@ -6076,15 +6142,7 @@ params.resultId = &resultId; params.job = job; - params.freeJob = freeJob; params.priority = priority; - params.type = type; - params.getProgress = getProgress; - params.getContent = getContent; - params.getSerialized = getSerialized; - params.step = step; - params.stop = stop; - params.reset = reset; if (context->InvokeService(context, _OrthancPluginService_SubmitJob, ¶ms) != OrthancPluginErrorCode_Success || resultId == NULL)
--- a/UnitTestsSources/MultiThreadingTests.cpp Fri Sep 07 10:09:17 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Tue Sep 11 16:34:21 2018 +0200 @@ -1204,8 +1204,8 @@ OrthancJobsSerialization() { db_.Open(); - context_.reset(new ServerContext(db_, storage_, true /* running unit tests */, - false /* don't reload jobs */)); + context_.reset(new ServerContext(db_, storage_, true /* running unit tests */)); + context_->SetupJobsEngine(true, false); } virtual ~OrthancJobsSerialization()
--- a/UnitTestsSources/ServerIndexTests.cpp Fri Sep 07 10:09:17 2018 +0200 +++ b/UnitTestsSources/ServerIndexTests.cpp Tue Sep 11 16:34:21 2018 +0200 @@ -675,8 +675,9 @@ FilesystemStorage storage(path); DatabaseWrapper db; // The SQLite DB is in memory db.Open(); - ServerContext context(db, storage, true /* running unit tests */, - false /* don't reload jobs */); + ServerContext context(db, storage, true /* running unit tests */); + context.SetupJobsEngine(true, false); + ServerIndex& index = context.GetIndex(); ASSERT_EQ(1u, index.IncrementGlobalSequence(GlobalProperty_AnonymizationSequence)); @@ -774,8 +775,8 @@ FilesystemStorage storage(path); DatabaseWrapper db; // The SQLite DB is in memory db.Open(); - ServerContext context(db, storage, true /* running unit tests */, - false /* don't reload jobs */); + ServerContext context(db, storage, true /* running unit tests */); + context.SetupJobsEngine(true, false); ServerIndex& index = context.GetIndex(); index.SetMaximumStorageSize(10);