changeset 2673:8e0bc055d18c jobs

JobsRegistry::IObserver
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 11 Jun 2018 16:29:33 +0200
parents 3efc44fac209
children 373b44af938f
files Core/JobsEngine/JobsRegistry.cpp Core/JobsEngine/JobsRegistry.h OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h
diffstat 4 files changed, 135 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/Core/JobsEngine/JobsRegistry.cpp	Mon Jun 11 15:57:25 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.cpp	Mon Jun 11 16:29:33 2018 +0200
@@ -292,7 +292,7 @@
       }
       else
       {
-        LOG(WARNING) << "Job backup is not supported for job of type: " << jobType_;
+        LOG(INFO) << "Job backup is not supported for job of type: " << jobType_;
         return false;
       }
     }
@@ -475,6 +475,18 @@
     assert(job.GetState() == JobState_Running);
     SetCompletedJob(job, success);
 
+    if (observer_ != NULL)
+    {
+      if (success)
+      {
+        observer_->SignalJobSuccess(job.GetId());
+      }
+      else
+      {
+        observer_->SignalJobFailure(job.GetId());
+      }
+    }
+
     CheckInvariants();
   }
 
@@ -604,49 +616,56 @@
 
     boost::posix_time::ptime lastChangeTime = handler->GetLastStateChangeTime();
 
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
       
-    id = handler->GetId();
-    int priority = handler->GetPriority();
+      id = handler->GetId();
+      int priority = handler->GetPriority();
 
-    switch (handler->GetState())
-    {
-      case JobState_Pending:
-      case JobState_Retry:
-      case JobState_Running:
-        handler->SetState(JobState_Pending);
-        pendingJobs_.push(handler.get());
-        pendingJobAvailable_.notify_one();
-        break;
+      switch (handler->GetState())
+      {
+        case JobState_Pending:
+        case JobState_Retry:
+        case JobState_Running:
+          handler->SetState(JobState_Pending);
+          pendingJobs_.push(handler.get());
+          pendingJobAvailable_.notify_one();
+          break;
  
-      case JobState_Success:
-        SetCompletedJob(*handler, true);
-        break;
+        case JobState_Success:
+          SetCompletedJob(*handler, true);
+          break;
         
-      case JobState_Failure:
-        SetCompletedJob(*handler, false);
-        break;
+        case JobState_Failure:
+          SetCompletedJob(*handler, false);
+          break;
+
+        case JobState_Paused:
+          break;
+        
+        default:
+          LOG(ERROR) << "A job should not be loaded from state: "
+                     << EnumerationToString(handler->GetState());
+          throw OrthancException(ErrorCode_InternalError);
+      }
 
-      case JobState_Paused:
-        break;
-        
-      default:
-        LOG(ERROR) << "A job should not be loaded from state: "
-                   << EnumerationToString(handler->GetState());
-        throw OrthancException(ErrorCode_InternalError);
+      if (keepLastChangeTime)
+      {
+        handler->SetLastStateChangeTime(lastChangeTime);
+      }
+    
+      jobsIndex_.insert(std::make_pair(id, handler.release()));
+
+      LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
+
+      if (observer_ != NULL)
+      {
+        observer_->SignalJobSubmitted(id);
+      }
+
+      CheckInvariants();
     }
-
-    if (keepLastChangeTime)
-    {
-      handler->SetLastStateChangeTime(lastChangeTime);
-    }
-    
-    jobsIndex_.insert(std::make_pair(id, handler.release()));
-
-    LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
-
-    CheckInvariants();
   }
 
 
@@ -974,6 +993,20 @@
     return GetStateInternal(state, id);
   }
 
+
+  void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    observer_ = &observer;
+  }
+
+  
+  void JobsRegistry::ResetObserver()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    observer_ = NULL;
+  }
+
   
   JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry,
                                        unsigned int timeout) :
@@ -1245,7 +1278,8 @@
 
 
   JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
-                             const Json::Value& s)
+                             const Json::Value& s) :
+    observer_(NULL)
   {
     if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
         !s.isMember(JOBS) ||
--- a/Core/JobsEngine/JobsRegistry.h	Mon Jun 11 15:57:25 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.h	Mon Jun 11 16:29:33 2018 +0200
@@ -55,6 +55,21 @@
   // This class handles the state machine of the jobs engine
   class JobsRegistry : public boost::noncopyable
   {
+  public:
+    class IObserver : public boost::noncopyable
+    {
+    public:
+      virtual ~IObserver()
+      {
+      }
+
+      virtual void SignalJobSubmitted(const std::string& jobId) = 0;
+
+      virtual void SignalJobSuccess(const std::string& jobId) = 0;
+
+      virtual void SignalJobFailure(const std::string& jobId) = 0;
+    };
+    
   private:
     class JobHandler;
 
@@ -81,6 +96,8 @@
     boost::condition_variable  someJobComplete_;
     size_t                     maxCompletedJobs_;
 
+    IObserver*                 observer_;
+
 
 #ifndef NDEBUG
     bool IsPendingJob(const JobHandler& job) const;
@@ -118,7 +135,8 @@
     
   public:
     JobsRegistry() :
-      maxCompletedJobs_(10)
+      maxCompletedJobs_(10),
+      observer_(NULL)
     {
     }
 
@@ -162,6 +180,10 @@
     bool GetState(JobState& state,
                   const std::string& id);
 
+    void SetObserver(IObserver& observer);
+
+    void ResetObserver();
+
     class RunningJob : public boost::noncopyable
     {
     private:
--- a/OrthancServer/ServerContext.cpp	Mon Jun 11 15:57:25 2018 +0200
+++ b/OrthancServer/ServerContext.cpp	Mon Jun 11 16:29:33 2018 +0200
@@ -121,15 +121,41 @@
     {
       boost::this_thread::sleep(boost::posix_time::milliseconds(sleepDelay));
 
-      if (boost::posix_time::microsec_clock::universal_time() >= next)
+      if (that->haveJobsChanged_ ||
+          boost::posix_time::microsec_clock::universal_time() >= next)
       {
-        that->SaveJobsEngine();        
+        that->haveJobsChanged_ = false;
+        that->SaveJobsEngine();
         next = boost::posix_time::microsec_clock::universal_time() + PERIODICITY;
       }
     }
   }
+  
+
+  void ServerContext::SignalJobSubmitted(const std::string& jobId)
+  {
+    haveJobsChanged_ = true;
+    
+    // TODO: Call Lua
+  }
+  
+
+  void ServerContext::SignalJobSuccess(const std::string& jobId)
+  {
+    haveJobsChanged_ = true;
+    
+    // TODO: Call Lua
+  }
 
   
+  void ServerContext::SignalJobFailure(const std::string& jobId)
+  {
+    haveJobsChanged_ = true;
+    
+    // TODO: Call Lua
+  }
+
+
   void ServerContext::SetupJobsEngine(bool unitTesting,
                                       bool loadJobsFromDatabase)
   {
@@ -166,6 +192,7 @@
 
     //jobsEngine_.GetRegistry().SetMaxCompleted   // TODO
 
+    jobsEngine_.GetRegistry().SetObserver(*this);
     jobsEngine_.Start();
   }
 
@@ -206,6 +233,7 @@
     plugins_(NULL),
 #endif
     done_(false),
+    haveJobsChanged_(false),
     queryRetrieveArchive_(Configuration::GetGlobalUnsignedIntegerParameter("QueryRetrieveSize", 10)),
     defaultLocalAet_(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC"))
   {
@@ -250,6 +278,7 @@
         saveJobsThread_.join();
       }
 
+      jobsEngine_.GetRegistry().ResetObserver();
       SaveJobsEngine();
 
       // Do not change the order below!
--- a/OrthancServer/ServerContext.h	Mon Jun 11 15:57:25 2018 +0200
+++ b/OrthancServer/ServerContext.h	Mon Jun 11 16:29:33 2018 +0200
@@ -60,7 +60,7 @@
    * filesystem (including compression), as well as the index of the
    * DICOM store. It implements the required locking mechanisms.
    **/
-  class ServerContext
+  class ServerContext : private JobsRegistry::IObserver
   {
   private:
     class DicomCacheProvider : public ICachePageProvider
@@ -118,6 +118,12 @@
 
     void SaveJobsEngine();
 
+    virtual void SignalJobSubmitted(const std::string& jobId);
+
+    virtual void SignalJobSuccess(const std::string& jobId);
+
+    virtual void SignalJobFailure(const std::string& jobId);
+
     ServerIndex index_;
     IStorageArea& area_;
 
@@ -139,6 +145,7 @@
     boost::recursive_mutex listenersMutex_;
 
     bool done_;
+    bool haveJobsChanged_;
     SharedMessageQueue  pendingChanges_;
     boost::thread  changeThread_;
     boost::thread  saveJobsThread_;