changeset 2815:925d8dc03a23

unserialization of jobs from plugins
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 11 Sep 2018 16:34:21 +0200
parents 7d1d3136f6cf
children 567d1be0849e
files NEWS OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerJobs/OrthancJobUnserializer.cpp OrthancServer/main.cpp Plugins/Engine/OrthancPlugins.cpp Plugins/Engine/OrthancPlugins.h Plugins/Engine/PluginsJob.cpp Plugins/Engine/PluginsJob.h Plugins/Include/orthanc/OrthancCPlugin.h UnitTestsSources/MultiThreadingTests.cpp UnitTestsSources/ServerIndexTests.cpp
diffstat 12 files changed, 169 insertions(+), 78 deletions(-) [+]
line wrap: on
line diff
--- a/NEWS	Fri Sep 07 10:09:17 2018 +0200
+++ b/NEWS	Tue Sep 11 16:34:21 2018 +0200
@@ -19,7 +19,7 @@
 
 * New primitives to access Orthanc peers from plugins
 * New events in change callbacks: "UpdatedPeers" and "UpdatedModalities"
-* New primitives to create jobs from plugins: "OrthancPluginSubmitJob()"
+* New primitives to handle jobs from plugins: "OrthancPluginSubmitJob()"
   and "OrthancPluginRegisterJobsUnserializer()"
 
 Maintenance
--- a/OrthancServer/ServerContext.cpp	Fri Sep 07 10:09:17 2018 +0200
+++ b/OrthancServer/ServerContext.cpp	Tue Sep 11 16:34:21 2018 +0200
@@ -156,33 +156,17 @@
   void ServerContext::SetupJobsEngine(bool unitTesting,
                                       bool loadJobsFromDatabase)
   {
-    jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2));
-    jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200);
-
     if (loadJobsFromDatabase)
     {
       std::string serialized;
       if (index_.LookupGlobalProperty(serialized, GlobalProperty_JobsRegistry))
       {
         LOG(WARNING) << "Reloading the jobs from the last execution of Orthanc";
+        OrthancJobUnserializer unserializer(*this);
 
         try
         {
-          bool plugin = false;
-        
-#if ORTHANC_ENABLE_PLUGINS == 1
-          if (HasPlugins() &&
-              plugins_->UnserializeJob(serialized))
-          {
-            plugin = true;
-          }
-#endif
-
-          if (!plugin)
-          {
-            OrthancJobUnserializer unserializer(*this);
-            jobsEngine_.LoadRegistryFromString(unserializer, serialized);
-          }
+          jobsEngine_.LoadRegistryFromString(unserializer, serialized);
         }
         catch (OrthancException& e)
         {
@@ -204,6 +188,9 @@
 
     jobsEngine_.GetRegistry().SetObserver(*this);
     jobsEngine_.Start();
+    isJobsEngineUnserialized_ = true;
+
+    saveJobsThread_ = boost::thread(SaveJobsThread, this, (unitTesting ? 20 : 100));
   }
 
 
@@ -230,8 +217,7 @@
 
   ServerContext::ServerContext(IDatabaseWrapper& database,
                                IStorageArea& area,
-                               bool unitTesting,
-                               bool loadJobsFromDatabase) :
+                               bool unitTesting) :
     index_(*this, database, (unitTesting ? 20 : 500)),
     area_(area),
     compressionEnabled_(false),
@@ -246,15 +232,15 @@
 #endif
     done_(false),
     haveJobsChanged_(false),
+    isJobsEngineUnserialized_(false),
     queryRetrieveArchive_(Configuration::GetGlobalUnsignedIntegerParameter("QueryRetrieveSize", 10)),
     defaultLocalAet_(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC"))
   {
-    listeners_.push_back(ServerListener(luaListener_, "Lua"));
+    jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2));
+    jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200);
 
-    SetupJobsEngine(unitTesting, loadJobsFromDatabase);
-
+    listeners_.push_back(ServerListener(luaListener_, "Lua"));
     changeThread_ = boost::thread(ChangeThread, this, (unitTesting ? 20 : 100));
-    saveJobsThread_ = boost::thread(SaveJobsThread, this, (unitTesting ? 20 : 100));
   }
 
 
@@ -291,7 +277,12 @@
       }
 
       jobsEngine_.GetRegistry().ResetObserver();
-      SaveJobsEngine();
+
+      if (isJobsEngineUnserialized_)
+      {
+        // Avoid losing jobs if the JobsRegistry cannot be unserialized
+        SaveJobsEngine();
+      }
 
       // Do not change the order below!
       jobsEngine_.Stop();
--- a/OrthancServer/ServerContext.h	Fri Sep 07 10:09:17 2018 +0200
+++ b/OrthancServer/ServerContext.h	Tue Sep 11 16:34:21 2018 +0200
@@ -143,9 +143,6 @@
     void ReadDicomAsJsonInternal(std::string& result,
                                  const std::string& instancePublicId);
 
-    void SetupJobsEngine(bool unitTesting,
-                         bool loadJobsFromDatabase);
-
     void SaveJobsEngine();
 
     virtual void SignalJobSubmitted(const std::string& jobId);
@@ -178,6 +175,7 @@
 
     bool done_;
     bool haveJobsChanged_;
+    bool isJobsEngineUnserialized_;
     SharedMessageQueue  pendingChanges_;
     boost::thread  changeThread_;
     boost::thread  saveJobsThread_;
@@ -208,11 +206,13 @@
 
     ServerContext(IDatabaseWrapper& database,
                   IStorageArea& area,
-                  bool unitTesting,
-                  bool loadJobsFromDatabase);
+                  bool unitTesting);
 
     ~ServerContext();
 
+    void SetupJobsEngine(bool unitTesting,
+                         bool loadJobsFromDatabase);
+
     ServerIndex& GetIndex()
     {
       return index_;
--- a/OrthancServer/ServerJobs/OrthancJobUnserializer.cpp	Fri Sep 07 10:09:17 2018 +0200
+++ b/OrthancServer/ServerJobs/OrthancJobUnserializer.cpp	Tue Sep 11 16:34:21 2018 +0200
@@ -55,6 +55,17 @@
   {
     const std::string type = SerializationToolbox::ReadString(source, "Type");
 
+#if ORTHANC_ENABLE_PLUGINS == 1
+    if (context_.HasPlugins())
+    {
+      std::auto_ptr<IJob> job(context_.GetPlugins().UnserializeJob(type, source));
+      if (job.get() != NULL)
+      {
+        return job.release();
+      }
+    }
+#endif
+
     if (type == "DicomModalityStore")
     {
       return new DicomModalityStoreJob(context_, source);
--- a/OrthancServer/main.cpp	Fri Sep 07 10:09:17 2018 +0200
+++ b/OrthancServer/main.cpp	Tue Sep 11 16:34:21 2018 +0200
@@ -875,7 +875,8 @@
 
 
 static bool ConfigureHttpHandler(ServerContext& context,
-                                 OrthancPlugins *plugins)
+                                 OrthancPlugins *plugins,
+                                 bool loadJobsFromDatabase)
 {
 #if ORTHANC_ENABLE_PLUGINS == 1
   // By order of priority, first apply the "plugins" layer, so that
@@ -900,7 +901,13 @@
   OrthancRestApi restApi(context);
   context.GetHttpHandler().Register(restApi, true);
 
-  return StartDicomServer(context, restApi, plugins);
+  context.SetupJobsEngine(false /* not running unit tests */, loadJobsFromDatabase);
+
+  bool restart = StartDicomServer(context, restApi, plugins);
+
+  context.Stop();
+
+  return restart;
 }
 
 
@@ -973,7 +980,7 @@
 
   DicomUserConnection::SetDefaultTimeout(Configuration::GetGlobalUnsignedIntegerParameter("DicomScuTimeout", 10));
 
-  ServerContext context(database, storageArea, false /* not running unit tests */, loadJobsFromDatabase);
+  ServerContext context(database, storageArea, false /* not running unit tests */);
   context.SetCompressionEnabled(Configuration::GetGlobalBoolParameter("StorageCompression", false));
   context.SetStoreMD5ForAttachments(Configuration::GetGlobalBoolParameter("StoreMD5ForAttachments", true));
 
@@ -1012,15 +1019,13 @@
 
   try
   {
-    restart = ConfigureHttpHandler(context, plugins);
+    restart = ConfigureHttpHandler(context, plugins, loadJobsFromDatabase);
   }
   catch (OrthancException& e)
   {
     error = e.GetErrorCode();
   }
 
-  context.Stop();
-
 #if ORTHANC_ENABLE_PLUGINS == 1
   if (plugins)
   {
--- a/Plugins/Engine/OrthancPlugins.cpp	Fri Sep 07 10:09:17 2018 +0200
+++ b/Plugins/Engine/OrthancPlugins.cpp	Tue Sep 11 16:34:21 2018 +0200
@@ -2876,6 +2876,27 @@
         CallPeerApi(parameters);
         return true;
 
+      case _OrthancPluginService_CreateJob:
+      {
+        const _OrthancPluginCreateJob& p =
+          *reinterpret_cast<const _OrthancPluginCreateJob*>(parameters);
+        *(p.target) = reinterpret_cast<OrthancPluginJob*>(new PluginsJob(p));
+        return true;
+      }
+
+      case _OrthancPluginService_FreeJob:
+      {
+        const _OrthancPluginFreeJob& p =
+          *reinterpret_cast<const _OrthancPluginFreeJob*>(parameters);
+
+        if (p.job != NULL)
+        {
+          delete reinterpret_cast<PluginsJob*>(p.job);
+        }
+
+        return true;
+      }
+
       case _OrthancPluginService_SubmitJob:
       {
         const _OrthancPluginSubmitJob& p =
@@ -2884,7 +2905,8 @@
         std::string uuid;
 
         PImpl::ServerContextLock lock(*pimpl_);
-        lock.GetContext().GetJobsEngine().GetRegistry().Submit(uuid, new PluginsJob(p), p.priority);
+        lock.GetContext().GetJobsEngine().GetRegistry().Submit
+          (uuid, reinterpret_cast<PluginsJob*>(p.job), p.priority);
         
         *p.resultId = CopyString(uuid);
 
@@ -3434,9 +3456,9 @@
   }
 
 
-  bool OrthancPlugins::UnserializeJob(const Json::Value& value)
+  IJob* OrthancPlugins::UnserializeJob(const std::string& type,
+                                       const Json::Value& value)
   {
-    const std::string type = SerializationToolbox::ReadString(value, "Type");
     const std::string serialized = value.toStyledString();
 
     boost::mutex::scoped_lock lock(pimpl_->jobsUnserializersMutex_);
@@ -3445,12 +3467,13 @@
            unserializer = pimpl_->jobsUnserializers_.begin();
          unserializer != pimpl_->jobsUnserializers_.end(); ++unserializer)
     {
-      if ((*unserializer) (type.c_str(), serialized.c_str()) == OrthancPluginErrorCode_Success)
+      OrthancPluginJob* job = (*unserializer) (type.c_str(), serialized.c_str());
+      if (job != NULL)
       {
-        return true;
+        return reinterpret_cast<PluginsJob*>(job);
       }
     }
 
-    return false;
+    return NULL;
   }
 }
--- a/Plugins/Engine/OrthancPlugins.h	Fri Sep 07 10:09:17 2018 +0200
+++ b/Plugins/Engine/OrthancPlugins.h	Tue Sep 11 16:34:21 2018 +0200
@@ -59,6 +59,7 @@
 #include "../../Core/FileStorage/IStorageArea.h"
 #include "../../Core/HttpServer/IHttpHandler.h"
 #include "../../Core/HttpServer/IIncomingHttpRequestFilter.h"
+#include "../../Core/JobsEngine/IJob.h"
 #include "../../OrthancServer/IDicomImageDecoder.h"
 #include "../../OrthancServer/IServerListener.h"
 #include "OrthancPluginDatabase.h"
@@ -308,7 +309,8 @@
 
     virtual IMoveRequestHandler* ConstructMoveRequestHandler();
 
-    bool UnserializeJob(const Json::Value& value);
+    IJob* UnserializeJob(const std::string& type,
+                         const Json::Value& value);
   };
 }
 
--- a/Plugins/Engine/PluginsJob.cpp	Fri Sep 07 10:09:17 2018 +0200
+++ b/Plugins/Engine/PluginsJob.cpp	Tue Sep 11 16:34:21 2018 +0200
@@ -47,7 +47,7 @@
 
 namespace Orthanc
 {
-  PluginsJob::PluginsJob(const _OrthancPluginSubmitJob& parameters) :
+  PluginsJob::PluginsJob(const _OrthancPluginCreateJob& parameters) :
     parameters_(parameters)
   {
     if (parameters_.job == NULL)
@@ -55,8 +55,8 @@
       throw OrthancException(ErrorCode_NullPointer);
     }
     
-    if (parameters_.resultId == NULL ||
-        parameters_.freeJob == NULL ||
+    if (parameters_.target == NULL ||
+        parameters_.finalize == NULL ||
         parameters_.type == NULL ||
         parameters_.getProgress == NULL ||
         parameters_.getContent == NULL ||
@@ -65,7 +65,7 @@
         parameters_.stop == NULL ||
         parameters_.reset == NULL)
     {
-      parameters_.freeJob(parameters.job);
+      parameters_.finalize(parameters.job);
       throw OrthancException(ErrorCode_NullPointer);
     }
 
@@ -75,7 +75,7 @@
   PluginsJob::~PluginsJob()
   {
     assert(parameters_.job != NULL);
-    parameters_.freeJob(parameters_.job);
+    parameters_.finalize(parameters_.job);
   }
 
   JobStepResult PluginsJob::Step()
--- a/Plugins/Engine/PluginsJob.h	Fri Sep 07 10:09:17 2018 +0200
+++ b/Plugins/Engine/PluginsJob.h	Tue Sep 11 16:34:21 2018 +0200
@@ -43,11 +43,11 @@
   class PluginsJob : public IJob
   {
   private:
-    _OrthancPluginSubmitJob  parameters_;
+    _OrthancPluginCreateJob  parameters_;
     std::string              type_;
 
   public:
-    PluginsJob(const _OrthancPluginSubmitJob& parameters);
+    PluginsJob(const _OrthancPluginCreateJob& parameters);
 
     virtual ~PluginsJob();
 
--- a/Plugins/Include/orthanc/OrthancCPlugin.h	Fri Sep 07 10:09:17 2018 +0200
+++ b/Plugins/Include/orthanc/OrthancCPlugin.h	Tue Sep 11 16:34:21 2018 +0200
@@ -423,7 +423,6 @@
     _OrthancPluginService_CallHttpClient2 = 27,
     _OrthancPluginService_GenerateUuid = 28,
     _OrthancPluginService_RegisterPrivateDictionaryTag = 29,
-    _OrthancPluginService_SubmitJob = 30,
 
     /* Registration of callbacks */
     _OrthancPluginService_RegisterRestCallback = 1000,
@@ -437,7 +436,6 @@
     _OrthancPluginService_RegisterFindCallback = 1008,
     _OrthancPluginService_RegisterMoveCallback = 1009,
     _OrthancPluginService_RegisterIncomingHttpRequestFilter2 = 1010,
-    _OrthancPluginService_RegisterJobsUnserializer = 1011,
 
     /* Sending answers to REST calls */
     _OrthancPluginService_AnswerBuffer = 2000,
@@ -529,6 +527,12 @@
     _OrthancPluginService_GetPeerName = 8004,
     _OrthancPluginService_GetPeerUrl = 8005,
     _OrthancPluginService_CallPeerApi = 8006,
+
+    /* Primitives for handling jobs (new in 1.4.2) */
+    _OrthancPluginService_CreateJob = 9000,
+    _OrthancPluginService_FreeJob = 9001,
+    _OrthancPluginService_SubmitJob = 9002,
+    _OrthancPluginService_RegisterJobsUnserializer = 9003,
     
     _OrthancPluginService_INTERNAL = 0x7fffffff
   } _OrthancPluginService;
@@ -1272,7 +1276,10 @@
 
 
 
-  typedef void (*OrthancPluginJobFree) (void* job);
+
+  
+  typedef struct _OrthancPluginJob_t OrthancPluginJob;
+  typedef void (*OrthancPluginJobFinalize) (void* job);
   typedef float (*OrthancPluginJobGetProgress) (void* job);
   typedef const char* (*OrthancPluginJobGetContent) (void* job);
   typedef const char* (*OrthancPluginJobGetSerialized) (void* job);
@@ -1280,8 +1287,8 @@
   typedef OrthancPluginErrorCode (*OrthancPluginJobStop) (void* job, 
                                                           OrthancPluginJobStopReason reason);
   typedef OrthancPluginErrorCode (*OrthancPluginJobReset) (void* job);
-  typedef OrthancPluginErrorCode (*OrthancPluginJobsUnserializer) (const char* jobType,
-                                                                   const char* serialized);
+  typedef OrthancPluginJob* (*OrthancPluginJobsUnserializer) (const char* jobType,
+                                                              const char* serialized);
   
 
 
@@ -6041,12 +6048,13 @@
 
 
 
+
+
   typedef struct
   {
-    char**                          resultId;
+    OrthancPluginJob**              target;
     void                           *job;
-    OrthancPluginJobFree            freeJob;
-    int                             priority;
+    OrthancPluginJobFinalize        finalize;
     const char                     *type;
     OrthancPluginJobGetProgress     getProgress;
     OrthancPluginJobGetContent      getContent;
@@ -6054,13 +6062,12 @@
     OrthancPluginJobStep            step;
     OrthancPluginJobStop            stop;
     OrthancPluginJobReset           reset;
-  } _OrthancPluginSubmitJob;
-
-  ORTHANC_PLUGIN_INLINE char *OrthancPluginSubmitJob(
+  } _OrthancPluginCreateJob;
+
+  ORTHANC_PLUGIN_INLINE OrthancPluginJob *OrthancPluginCreateJob(
     OrthancPluginContext           *context,
     void                           *job,
-    OrthancPluginJobFree            freeJob,
-    int                             priority,
+    OrthancPluginJobFinalize        finalize,
     const char                     *type,
     OrthancPluginJobGetProgress     getProgress,
     OrthancPluginJobGetContent      getContent,
@@ -6069,6 +6076,65 @@
     OrthancPluginJobStop            stop,
     OrthancPluginJobReset           reset)
   {
+    OrthancPluginJob* target = NULL;
+
+    _OrthancPluginCreateJob params;
+    memset(&params, 0, sizeof(params));
+
+    params.target = &target;
+    params.job = job;
+    params.finalize = finalize;
+    params.type = type;
+    params.getProgress = getProgress;
+    params.getContent = getContent;
+    params.getSerialized = getSerialized;
+    params.step = step;
+    params.stop = stop;
+    params.reset = reset;
+
+    if (context->InvokeService(context, _OrthancPluginService_CreateJob, &params) != OrthancPluginErrorCode_Success ||
+        target == NULL)
+    {
+      /* Error */
+      return NULL;
+    }
+    else
+    {
+      return target;
+    }
+  }
+
+
+  typedef struct
+  {
+    OrthancPluginJob*   job;
+  } _OrthancPluginFreeJob;
+
+  ORTHANC_PLUGIN_INLINE void  OrthancPluginFreeJob(
+    OrthancPluginContext* context, 
+    OrthancPluginJob*     job)
+  {
+    _OrthancPluginFreeJob params;
+    params.job = job;
+
+    context->InvokeService(context, _OrthancPluginService_FreeJob, &params);
+  }
+
+
+  
+
+  typedef struct
+  {
+    char**             resultId;
+    OrthancPluginJob  *job;
+    int                priority;
+  } _OrthancPluginSubmitJob;
+
+  ORTHANC_PLUGIN_INLINE char *OrthancPluginSubmitJob(
+    OrthancPluginContext   *context,
+    OrthancPluginJob       *job,
+    int                     priority)
+  {
     char* resultId = NULL;
 
     _OrthancPluginSubmitJob params;
@@ -6076,15 +6142,7 @@
 
     params.resultId = &resultId;
     params.job = job;
-    params.freeJob = freeJob;
     params.priority = priority;
-    params.type = type;
-    params.getProgress = getProgress;
-    params.getContent = getContent;
-    params.getSerialized = getSerialized;
-    params.step = step;
-    params.stop = stop;
-    params.reset = reset;
 
     if (context->InvokeService(context, _OrthancPluginService_SubmitJob, &params) != OrthancPluginErrorCode_Success ||
         resultId == NULL)
--- a/UnitTestsSources/MultiThreadingTests.cpp	Fri Sep 07 10:09:17 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Tue Sep 11 16:34:21 2018 +0200
@@ -1204,8 +1204,8 @@
     OrthancJobsSerialization()
     {
       db_.Open();
-      context_.reset(new ServerContext(db_, storage_, true /* running unit tests */,
-                                       false /* don't reload jobs */));
+      context_.reset(new ServerContext(db_, storage_, true /* running unit tests */));
+      context_->SetupJobsEngine(true, false);
     }
 
     virtual ~OrthancJobsSerialization()
--- a/UnitTestsSources/ServerIndexTests.cpp	Fri Sep 07 10:09:17 2018 +0200
+++ b/UnitTestsSources/ServerIndexTests.cpp	Tue Sep 11 16:34:21 2018 +0200
@@ -675,8 +675,9 @@
   FilesystemStorage storage(path);
   DatabaseWrapper db;   // The SQLite DB is in memory
   db.Open();
-  ServerContext context(db, storage, true /* running unit tests */,
-                        false /* don't reload jobs */);
+  ServerContext context(db, storage, true /* running unit tests */);
+  context.SetupJobsEngine(true, false);
+
   ServerIndex& index = context.GetIndex();
 
   ASSERT_EQ(1u, index.IncrementGlobalSequence(GlobalProperty_AnonymizationSequence));
@@ -774,8 +775,8 @@
   FilesystemStorage storage(path);
   DatabaseWrapper db;   // The SQLite DB is in memory
   db.Open();
-  ServerContext context(db, storage, true /* running unit tests */,
-                        false /* don't reload jobs */);
+  ServerContext context(db, storage, true /* running unit tests */);
+  context.SetupJobsEngine(true, false);
   ServerIndex& index = context.GetIndex();
 
   index.SetMaximumStorageSize(10);