changeset 2668:d26dd081df97 jobs

saving jobs engine on exit
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 08 Jun 2018 18:08:48 +0200
parents 5fa2f2ce74f0
children eaf10085ffa1
files Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsEngine.h Core/JobsEngine/JobsRegistry.cpp OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerEnumerations.h OrthancServer/ServerIndex.cpp OrthancServer/ServerIndex.h
diffstat 8 files changed, 173 insertions(+), 26 deletions(-) [+]
line wrap: on
line diff
--- a/Core/JobsEngine/JobsEngine.cpp	Fri Jun 08 15:48:35 2018 +0200
+++ b/Core/JobsEngine/JobsEngine.cpp	Fri Jun 08 18:08:48 2018 +0200
@@ -37,6 +37,8 @@
 #include "../Logging.h"
 #include "../OrthancException.h"
 
+#include <json/reader.h>
+
 namespace Orthanc
 {
   bool JobsEngine::IsRunning()
@@ -156,6 +158,7 @@
 
   JobsEngine::JobsEngine() :
     state_(State_Setup),
+    registry_(new JobsRegistry),
     threadSleep_(200),
     workers_(1)
   {
@@ -172,7 +175,49 @@
     }
   }
 
-    
+ 
+  JobsRegistry& JobsEngine::GetRegistry()
+  {
+    if (registry_.get() == NULL)
+    {
+      throw OrthancException(ErrorCode_InternalError);
+    }
+
+    return *registry_;
+  }
+  
+   
+  void JobsEngine::LoadRegistryFromJson(IJobUnserializer& unserializer,
+                                        const Json::Value& serialized)
+  {
+    boost::mutex::scoped_lock lock(stateMutex_);
+      
+    if (state_ != State_Setup)
+    {
+      // Can only be invoked before calling "Start()"
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+
+    registry_.reset(new JobsRegistry(unserializer, serialized));
+  }
+
+
+  void JobsEngine::LoadRegistryFromString(IJobUnserializer& unserializer,
+                                          const std::string& serialized)
+  {
+    Json::Value value;
+    Json::Reader reader;
+    if (reader.parse(serialized, value))
+    {
+      LoadRegistryFromJson(unserializer, value);
+    }
+    else
+    {
+      throw OrthancException(ErrorCode_BadFileFormat);
+    }
+  }
+
+
   void JobsEngine::SetWorkersCount(size_t count)
   {
     boost::mutex::scoped_lock lock(stateMutex_);
--- a/Core/JobsEngine/JobsEngine.h	Fri Jun 08 15:48:35 2018 +0200
+++ b/Core/JobsEngine/JobsEngine.h	Fri Jun 08 18:08:48 2018 +0200
@@ -52,7 +52,7 @@
 
     boost::mutex                 stateMutex_;
     State                        state_;
-    JobsRegistry                 registry_;
+    std::auto_ptr<JobsRegistry>  registry_;
     boost::thread                retryHandler_;
     unsigned int                 threadSleep_;
     std::vector<boost::thread*>  workers_;
@@ -72,14 +72,17 @@
 
     ~JobsEngine();
 
+    JobsRegistry& GetRegistry();
+
+    void LoadRegistryFromJson(IJobUnserializer& unserializer,
+                              const Json::Value& serialized);
+
+    void LoadRegistryFromString(IJobUnserializer& unserializer,
+                                const std::string& serialized);
+
     void SetWorkersCount(size_t count);
 
     void SetThreadSleep(unsigned int sleep);
-    
-    JobsRegistry& GetRegistry()
-    {
-      return registry_;
-    }
 
     void Start();
 
--- a/Core/JobsEngine/JobsRegistry.cpp	Fri Jun 08 15:48:35 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.cpp	Fri Jun 08 18:08:48 2018 +0200
@@ -109,6 +109,7 @@
       }
 
       job->GetJobType(jobType_);
+      job->Start();
 
       lastStatus_ = JobStatus(ErrorCode_Success, *job_);
     }
@@ -592,6 +593,11 @@
   void JobsRegistry::SubmitInternal(std::string& id,
                                     JobHandler* handlerRaw)
   {
+    if (handlerRaw == NULL)
+    {
+      throw OrthancException(ErrorCode_NullPointer);
+    }
+    
     std::auto_ptr<JobHandler>  handler(handlerRaw);
 
     boost::mutex::scoped_lock lock(mutex_);
@@ -600,8 +606,32 @@
     id = handler->GetId();
     int priority = handler->GetPriority();
 
-    pendingJobs_.push(handler.get());
-    pendingJobAvailable_.notify_one();
+    switch (handler->GetState())
+    {
+      case JobState_Pending:
+      case JobState_Retry:
+      case JobState_Running:
+        handler->SetState(JobState_Pending);
+        pendingJobs_.push(handler.get());
+        pendingJobAvailable_.notify_one();
+        break;
+ 
+      case JobState_Success:
+        SetCompletedJob(*handler, true);
+        break;
+        
+      case JobState_Failure:
+        SetCompletedJob(*handler, false);
+        break;
+
+      case JobState_Paused:
+        break;
+        
+      default:
+        LOG(ERROR) << "A job should not be loaded from state: "
+                   << EnumerationToString(handler->GetState());
+        throw OrthancException(ErrorCode_InternalError);
+    }
 
     jobsIndex_.insert(std::make_pair(id, handler.release()));
 
--- a/OrthancServer/ServerContext.cpp	Fri Jun 08 15:48:35 2018 +0200
+++ b/OrthancServer/ServerContext.cpp	Fri Jun 08 18:08:48 2018 +0200
@@ -34,20 +34,21 @@
 #include "PrecompiledHeadersServer.h"
 #include "ServerContext.h"
 
+#include "../Core/DicomParsing/FromDcmtkBridge.h"
 #include "../Core/FileStorage/StorageAccessor.h"
 #include "../Core/HttpServer/FilesystemHttpSender.h"
 #include "../Core/HttpServer/HttpStreamTranscoder.h"
 #include "../Core/Logging.h"
-#include "../Core/DicomParsing/FromDcmtkBridge.h"
+#include "../Plugins/Engine/OrthancPlugins.h"
+#include "OrthancInitialization.h"
+#include "OrthancRestApi/OrthancRestApi.h"
+#include "Search/LookupResource.h"
+#include "ServerJobs/OrthancJobUnserializer.h"
 #include "ServerToolbox.h"
-#include "OrthancInitialization.h"
 
 #include <EmbeddedResources.h>
 #include <dcmtk/dcmdata/dcfilefo.h>
 
-#include "OrthancRestApi/OrthancRestApi.h"
-#include "../Plugins/Engine/OrthancPlugins.h"
-#include "Search/LookupResource.h"
 
 
 #define ENABLE_DICOM_CACHE  1
@@ -65,11 +66,12 @@
 
 namespace Orthanc
 {
-  void ServerContext::ChangeThread(ServerContext* that)
+  void ServerContext::ChangeThread(ServerContext* that,
+                                   unsigned int sleepDelay)
   {
     while (!that->done_)
     {
-      std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(100));
+      std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(sleepDelay));
         
       if (obj.get() != NULL)
       {
@@ -106,6 +108,58 @@
   }
 
 
+  void ServerContext::SetupJobsEngine(bool unitTesting)
+  {
+    jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2));
+    jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200);
+
+    std::string serialized;
+    if (index_.LookupGlobalProperty(serialized, GlobalProperty_JobsRegistry))
+    {
+      LOG(WARNING) << "Reloading the jobs from the last execution of Orthanc";
+      OrthancJobUnserializer unserializer(*this);
+
+      try
+      {
+        jobsEngine_.LoadRegistryFromString(unserializer, serialized);
+      }
+      catch (OrthancException& e)
+      {
+        LOG(ERROR) << "Cannot unserialize the jobs engine: " << e.What();
+        throw;
+      }
+    }
+    else
+    {
+      LOG(INFO) << "The last execution of Orthanc has archived no job";
+      //jobsEngine_.GetRegistry().SetMaxCompleted   // TODO
+    }
+
+    jobsEngine_.Start();
+  }
+
+
+  void ServerContext::SaveJobsEngine()
+  {
+    LOG(INFO) << "Serializing the content of the jobs engine";
+    
+    try
+    {
+      Json::Value value;
+      jobsEngine_.GetRegistry().Serialize(value);
+
+      Json::FastWriter writer;
+      std::string serialized = writer.write(value);
+
+      index_.SetGlobalProperty(GlobalProperty_JobsRegistry, serialized);
+    }
+    catch (OrthancException& e)
+    {
+      LOG(ERROR) << "Cannot serialize the jobs engine: " << e.What();
+    }
+  }
+
+
   ServerContext::ServerContext(IDatabaseWrapper& database,
                                IStorageArea& area,
                                bool unitTesting) :
@@ -125,12 +179,9 @@
   {
     listeners_.push_back(ServerListener(lua_, "Lua"));
 
-    jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2));
-    //jobsEngine_.SetMaxCompleted   // TODO
-    jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200);
-    jobsEngine_.Start();
+    SetupJobsEngine(unitTesting);
 
-    changeThread_ = boost::thread(ChangeThread, this);
+    changeThread_ = boost::thread(ChangeThread, this, (unitTesting ? 20 : 100));
   }
 
 
@@ -161,6 +212,8 @@
         changeThread_.join();
       }
 
+      SaveJobsEngine();
+
       // Do not change the order below!
       jobsEngine_.Stop();
       index_.Stop();
--- a/OrthancServer/ServerContext.h	Fri Jun 08 15:48:35 2018 +0200
+++ b/OrthancServer/ServerContext.h	Fri Jun 08 18:08:48 2018 +0200
@@ -104,11 +104,16 @@
     typedef std::list<ServerListener>  ServerListeners;
 
 
-    static void ChangeThread(ServerContext* that);
+    static void ChangeThread(ServerContext* that,
+                             unsigned int sleepDelay);
 
     void ReadDicomAsJsonInternal(std::string& result,
                                  const std::string& instancePublicId);
 
+    void SetupJobsEngine(bool unitTesting);
+
+    void SaveJobsEngine();
+
     ServerIndex index_;
     IStorageArea& area_;
 
--- a/OrthancServer/ServerEnumerations.h	Fri Jun 08 15:48:35 2018 +0200
+++ b/OrthancServer/ServerEnumerations.h	Fri Jun 08 18:08:48 2018 +0200
@@ -76,7 +76,8 @@
     GlobalProperty_DatabaseSchemaVersion = 1,   // Unused in the Orthanc core as of Orthanc 0.9.5
     GlobalProperty_FlushSleep = 2,
     GlobalProperty_AnonymizationSequence = 3,
-    GlobalProperty_DatabasePatchLevel = 4       // Reserved for internal use of the database plugins
+    GlobalProperty_DatabasePatchLevel = 4,      // Reserved for internal use of the database plugins
+    GlobalProperty_JobsRegistry = 5
   };
 
   enum MetadataType
--- a/OrthancServer/ServerIndex.cpp	Fri Jun 08 15:48:35 2018 +0200
+++ b/OrthancServer/ServerIndex.cpp	Fri Jun 08 18:08:48 2018 +0200
@@ -2096,13 +2096,20 @@
   }
 
 
+  bool ServerIndex::LookupGlobalProperty(std::string& value,
+                                         GlobalProperty property)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    return db_.LookupGlobalProperty(value, property);
+  }
+  
+
   std::string ServerIndex::GetGlobalProperty(GlobalProperty property,
                                              const std::string& defaultValue)
   {
-    boost::mutex::scoped_lock lock(mutex_);
-
     std::string value;
-    if (db_.LookupGlobalProperty(value, property))
+
+    if (LookupGlobalProperty(value, property))
     {
       return value;
     }
--- a/OrthancServer/ServerIndex.h	Fri Jun 08 15:48:35 2018 +0200
+++ b/OrthancServer/ServerIndex.h	Fri Jun 08 18:08:48 2018 +0200
@@ -258,6 +258,9 @@
     void SetGlobalProperty(GlobalProperty property,
                            const std::string& value);
 
+    bool LookupGlobalProperty(std::string& value,
+                              GlobalProperty property);
+
     std::string GetGlobalProperty(GlobalProperty property,
                                   const std::string& defaultValue);