changeset 4024:1d2b31fc782f more-changes

new 'changes': JobSubmitted, JobSuccess, JobFailure
author Alain Mazy <alain@mazy.be>
date Tue, 09 Jun 2020 12:20:42 +0200
parents cbdf62468d77
children 52bc28f8ac8c 994d4da97dea
files Core/JobsEngine/JobsRegistry.cpp Core/JobsEngine/JobsRegistry.h NEWS OrthancServer/ServerContext.cpp Plugins/Engine/OrthancPlugins.h Plugins/Engine/PluginsEnumerations.cpp Plugins/Engine/PluginsEnumerations.h Plugins/Include/orthanc/OrthancCPlugin.h
diffstat 8 files changed, 124 insertions(+), 110 deletions(-) [+]
line wrap: on
line diff
--- a/Core/JobsEngine/JobsRegistry.cpp	Tue Jun 09 11:54:58 2020 +0200
+++ b/Core/JobsEngine/JobsRegistry.cpp	Tue Jun 09 12:20:42 2020 +0200
@@ -20,7 +20,7 @@
  * you do not wish to do so, delete this exception statement from your
  * version. If you delete this exception statement from all source files
  * in the program, then also delete it here.
- * 
+ *
  * This program is distributed in the hope that it will be useful, but
  * WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
@@ -50,10 +50,10 @@
   static const char* CREATION_TIME = "CreationTime";
   static const char* LAST_CHANGE_TIME = "LastChangeTime";
   static const char* RUNTIME = "Runtime";
-  
+
 
   class JobsRegistry::JobHandler : public boost::noncopyable
-  {   
+  {
   private:
     std::string                       id_;
     JobState                          state_;
@@ -80,7 +80,7 @@
       lastStateChangeTime_ = now;
     }
 
-    void SetStateInternal(JobState state) 
+    void SetStateInternal(JobState state)
     {
       state_ = state;
       pauseScheduled_ = false;
@@ -139,7 +139,7 @@
       return state_;
     }
 
-    void SetState(JobState state) 
+    void SetState(JobState state)
     {
       if (state == JobState_Retry)
       {
@@ -157,7 +157,7 @@
       if (state_ == JobState_Running)
       {
         SetStateInternal(JobState_Retry);
-        retryTime_ = (boost::posix_time::microsec_clock::universal_time() + 
+        retryTime_ = (boost::posix_time::microsec_clock::universal_time() +
                       boost::posix_time::milliseconds(timeout));
       }
       else
@@ -275,7 +275,7 @@
           ok = false;
         }
       }
-      else 
+      else
       {
         ok = job_->Serialize(target[JOB]);
       }
@@ -327,14 +327,14 @@
                                                      JobHandler*& b) const
   {
     return a->GetPriority() < b->GetPriority();
-  }                       
+  }
 
 
 #if defined(NDEBUG)
   void JobsRegistry::CheckInvariants() const
   {
   }
-  
+
 #else
   bool JobsRegistry::IsPendingJob(const JobHandler& job) const
   {
@@ -409,16 +409,16 @@
         case JobState_Pending:
           assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
           break;
-            
+
         case JobState_Success:
         case JobState_Failure:
           assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
           break;
-            
+
         case JobState_Retry:
           assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
           break;
-            
+
         case JobState_Running:
         case JobState_Paused:
           assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
@@ -482,7 +482,7 @@
       default:
         throw OrthancException(ErrorCode_InternalError);
     }
-    
+
     LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId();
 
     CheckInvariants();
@@ -495,20 +495,15 @@
       job.SetLastErrorCode(ErrorCode_CanceledJob);
     }
 
+    if (observer_ != NULL)
     {
-      boost::shared_lock<boost::shared_mutex> lock(observersMutex_);
-
-      for (Observers::iterator it = observers_.begin(); it != observers_.end(); ++it)
+      if (reason == CompletedReason_Success)
       {
-        if (reason == CompletedReason_Success)
-        {
-          (*it)->SignalJobSuccess(job.GetId());
-        }
-        else
-        {
-          (*it)->SignalJobFailure(job.GetId());
-        }
-
+        observer_->SignalJobSuccess(job.GetId());
+      }
+      else
+      {
+        observer_->SignalJobFailure(job.GetId());
       }
     }
 
@@ -565,7 +560,7 @@
     }
   }
 
-  
+
   JobsRegistry::~JobsRegistry()
   {
     for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
@@ -673,13 +668,13 @@
     {
       throw OrthancException(ErrorCode_NullPointer);
     }
-    
+
     std::unique_ptr<JobHandler>  protection(handler);
 
     {
       boost::mutex::scoped_lock lock(mutex_);
       CheckInvariants();
-      
+
       id = handler->GetId();
       int priority = handler->GetPriority();
 
@@ -694,18 +689,18 @@
           pendingJobs_.push(handler);
           pendingJobAvailable_.notify_one();
           break;
- 
+
         case JobState_Success:
           SetCompletedJob(*handler, true);
           break;
-        
+
         case JobState_Failure:
           SetCompletedJob(*handler, false);
           break;
 
         case JobState_Paused:
           break;
-        
+
         default:
         {
           std::string details = ("A job should not be loaded from state: " +
@@ -716,13 +711,9 @@
 
       LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
 
+      if (observer_ != NULL)
       {
-        boost::shared_lock<boost::shared_mutex> lock(observersMutex_);
-
-        for (Observers::iterator it = observers_.begin(); it != observers_.end(); ++it)
-        {
-          (*it)->SignalJobSubmitted(id);
-        }
+        observer_->SignalJobSubmitted(id);
       }
 
       // WARNING: The following call might make "handler" invalid if
@@ -807,7 +798,7 @@
             const JobStatus& status = it->second->GetLastStatus();
             successContent = status.GetPublicContent();
           }
-          
+
           return;
         }
         else
@@ -884,7 +875,7 @@
   void JobsRegistry::RemoveRetryJob(JobHandler* handler)
   {
     RetryJobs::iterator item = retryJobs_.find(handler);
-    assert(item != retryJobs_.end());            
+    assert(item != retryJobs_.end());
     retryJobs_.erase(item);
   }
 
@@ -971,7 +962,7 @@
           SetCompletedJob(*found->second, false);
           found->second->SetLastErrorCode(ErrorCode_CanceledJob);
           break;
-        
+
         case JobState_Success:
         case JobState_Failure:
           // Nothing to be done
@@ -1019,7 +1010,7 @@
       pendingJobs_.push(found->second);
       pendingJobAvailable_.notify_one();
       CheckInvariants();
-      return true;      
+      return true;
     }
   }
 
@@ -1046,9 +1037,9 @@
     else
     {
       found->second->GetJob().Reset();
-      
+
       bool ok = false;
-      for (CompletedJobs::iterator it = completedJobs_.begin(); 
+      for (CompletedJobs::iterator it = completedJobs_.begin();
            it != completedJobs_.end(); ++it)
       {
         if (*it == found->second)
@@ -1109,24 +1100,20 @@
   }
 
 
-  void JobsRegistry::AddObserver(JobsRegistry::IObserver& observer)
+  void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer)
   {
-    boost::unique_lock<boost::shared_mutex> lock(observersMutex_);
-    observers_.insert(&observer);
+    boost::mutex::scoped_lock lock(mutex_);
+    observer_ = &observer;
   }
 
-  
-  void JobsRegistry::ResetObserver(JobsRegistry::IObserver& observer)
+
+  void JobsRegistry::ResetObserver()
   {
-    boost::unique_lock<boost::shared_mutex> lock(observersMutex_);
-    Observers::iterator it = observers_.find(&observer);
-    if (it != observers_.end())
-    {
-      observers_.erase(it);
-    }
+    boost::mutex::scoped_lock lock(mutex_);
+    observer_ = NULL;
   }
 
-  
+
   JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry,
                                        unsigned int timeout) :
     registry_(registry),
@@ -1169,7 +1156,7 @@
     }
   }
 
-      
+
   JobsRegistry::RunningJob::~RunningJob()
   {
     if (IsValid())
@@ -1189,26 +1176,26 @@
 
         case JobState_Paused:
           registry_.MarkRunningAsPaused(*handler_);
-          break;            
+          break;
 
         case JobState_Retry:
           registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
           break;
-            
+
         default:
           assert(0);
       }
     }
   }
 
-      
+
   bool JobsRegistry::RunningJob::IsValid() const
   {
     return (handler_ != NULL &&
             job_ != NULL);
   }
 
-      
+
   const std::string& JobsRegistry::RunningJob::GetId() const
   {
     if (!IsValid())
@@ -1221,7 +1208,7 @@
     }
   }
 
-      
+
   int JobsRegistry::RunningJob::GetPriority() const
   {
     if (!IsValid())
@@ -1233,7 +1220,7 @@
       return priority_;
     }
   }
-      
+
 
   IJob& JobsRegistry::RunningJob::GetJob()
   {
@@ -1247,7 +1234,7 @@
     }
   }
 
-      
+
   bool JobsRegistry::RunningJob::IsPauseScheduled()
   {
     if (!IsValid())
@@ -1259,12 +1246,12 @@
       boost::mutex::scoped_lock lock(registry_.mutex_);
       registry_.CheckInvariants();
       assert(handler_->GetState() == JobState_Running);
-        
+
       return handler_->IsPauseScheduled();
     }
   }
 
-      
+
   bool JobsRegistry::RunningJob::IsCancelScheduled()
   {
     if (!IsValid())
@@ -1276,12 +1263,12 @@
       boost::mutex::scoped_lock lock(registry_.mutex_);
       registry_.CheckInvariants();
       assert(handler_->GetState() == JobState_Running);
-        
+
       return handler_->IsCancelScheduled();
     }
   }
 
-      
+
   void JobsRegistry::RunningJob::MarkSuccess()
   {
     if (!IsValid())
@@ -1294,7 +1281,7 @@
     }
   }
 
-      
+
   void JobsRegistry::RunningJob::MarkFailure()
   {
     if (!IsValid())
@@ -1307,7 +1294,7 @@
     }
   }
 
-      
+
   void JobsRegistry::RunningJob::MarkCanceled()
   {
     if (!IsValid())
@@ -1321,7 +1308,7 @@
     }
   }
 
-      
+
   void JobsRegistry::RunningJob::MarkPause()
   {
     if (!IsValid())
@@ -1334,7 +1321,7 @@
     }
   }
 
-      
+
   void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout)
   {
     if (!IsValid())
@@ -1347,7 +1334,7 @@
       targetRetryTimeout_ = timeout;
     }
   }
-      
+
 
   void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code,
                                               const std::string& details)
@@ -1359,11 +1346,11 @@
     else
     {
       JobStatus status(code, details, *job_);
-          
+
       boost::mutex::scoped_lock lock(registry_.mutex_);
       registry_.CheckInvariants();
       assert(handler_->GetState() == JobState_Running);
-        
+
       handler_->SetLastStatus(status);
     }
   }
@@ -1378,8 +1365,8 @@
     target = Json::objectValue;
     target[TYPE] = JOBS_REGISTRY;
     target[JOBS] = Json::objectValue;
-    
-    for (JobsIndex::const_iterator it = jobsIndex_.begin(); 
+
+    for (JobsIndex::const_iterator it = jobsIndex_.begin();
          it != jobsIndex_.end(); ++it)
     {
       Json::Value v;
@@ -1394,7 +1381,8 @@
   JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
                              const Json::Value& s,
                              size_t maxCompletedJobs) :
-    maxCompletedJobs_(maxCompletedJobs)
+    maxCompletedJobs_(maxCompletedJobs),
+    observer_(NULL)
   {
     if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
         !s.isMember(JOBS) ||
@@ -1452,7 +1440,7 @@
     running = 0;
     success = 0;
     failed = 0;
-    
+
     for (JobsIndex::const_iterator it = jobsIndex_.begin();
          it != jobsIndex_.end(); ++it)
     {
@@ -1469,7 +1457,7 @@
         case JobState_Running:
           running ++;
           break;
-          
+
         case JobState_Success:
           success ++;
           break;
@@ -1481,6 +1469,6 @@
         default:
           throw OrthancException(ErrorCode_InternalError);
       }
-    }    
+    }
   }
 }
--- a/Core/JobsEngine/JobsRegistry.h	Tue Jun 09 11:54:58 2020 +0200
+++ b/Core/JobsEngine/JobsRegistry.h	Tue Jun 09 12:20:42 2020 +0200
@@ -20,7 +20,7 @@
  * you do not wish to do so, delete this exception statement from your
  * version. If you delete this exception statement from all source files
  * in the program, then also delete it here.
- * 
+ *
  * This program is distributed in the hope that it will be useful, but
  * WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
@@ -69,7 +69,7 @@
 
       virtual void SignalJobFailure(const std::string& jobId) = 0;
     };
-    
+
   private:
     enum CompletedReason
     {
@@ -77,7 +77,7 @@
       CompletedReason_Failure,
       CompletedReason_Canceled
     };
-    
+
     class JobHandler;
 
     struct PriorityComparator
@@ -86,11 +86,10 @@
                        JobHandler*& b) const;
     };
 
-    typedef std::set<IObserver*>                            Observers;
     typedef std::map<std::string, JobHandler*>              JobsIndex;
     typedef std::list<JobHandler*>                          CompletedJobs;
     typedef std::set<JobHandler*>                           RetryJobs;
-    typedef std::priority_queue<JobHandler*, 
+    typedef std::priority_queue<JobHandler*,
                                 std::vector<JobHandler*>,   // Could be a "std::deque"
                                 PriorityComparator>         PendingJobs;
 
@@ -104,15 +103,14 @@
     boost::condition_variable  someJobComplete_;
     size_t                     maxCompletedJobs_;
 
-    boost::shared_mutex        observersMutex_;
-    Observers                  observers_;
+    IObserver*                 observer_;
 
 
 #ifndef NDEBUG
     bool IsPendingJob(const JobHandler& job) const;
 
     bool IsCompletedJob(JobHandler& job) const;
-    
+
     bool IsRetryJob(JobHandler& job) const;
 #endif
 
@@ -122,28 +120,29 @@
 
     void SetCompletedJob(JobHandler& job,
                          bool success);
-    
+
     void MarkRunningAsCompleted(JobHandler& job,
                                 CompletedReason reason);
 
     void MarkRunningAsRetry(JobHandler& job,
                             unsigned int timeout);
-    
+
     void MarkRunningAsPaused(JobHandler& job);
-    
+
     bool GetStateInternal(JobState& state,
                           const std::string& id);
 
     void RemovePendingJob(const std::string& id);
-      
+
     void RemoveRetryJob(JobHandler* handler);
-      
+
     void SubmitInternal(std::string& id,
                         JobHandler* handler);
-    
+
   public:
     JobsRegistry(size_t maxCompletedJobs) :
-      maxCompletedJobs_(maxCompletedJobs)
+      maxCompletedJobs_(maxCompletedJobs),
+      observer_(NULL)
     {
     }
 
@@ -154,7 +153,7 @@
     ~JobsRegistry();
 
     void SetMaxCompletedJobs(size_t i);
-    
+
     size_t GetMaxCompletedJobs();
 
     void ListJobs(std::set<std::string>& target);
@@ -168,37 +167,37 @@
                       const std::string& key);
 
     void Serialize(Json::Value& target);
-    
+
     void Submit(std::string& id,
                 IJob* job,        // Takes ownership
                 int priority);
-    
+
     void Submit(IJob* job,        // Takes ownership
                 int priority);
 
     void SubmitAndWait(Json::Value& successContent,
                        IJob* job,        // Takes ownership
                        int priority);
-    
+
     bool SetPriority(const std::string& id,
                      int priority);
 
     bool Pause(const std::string& id);
-    
+
     bool Resume(const std::string& id);
 
     bool Resubmit(const std::string& id);
 
     bool Cancel(const std::string& id);
-    
+
     void ScheduleRetries();
-    
+
     bool GetState(JobState& state,
                   const std::string& id);
 
-    void AddObserver(IObserver& observer);
+    void SetObserver(IObserver& observer);
 
-    void ResetObserver(IObserver& observer);
+    void ResetObserver();
 
     void GetStatistics(unsigned int& pending,
                        unsigned int& running,
@@ -220,7 +219,7 @@
       JobState       targetState_;
       unsigned int   targetRetryTimeout_;
       bool           canceled_;
-      
+
     public:
       RunningJob(JobsRegistry& registry,
                  unsigned int timeout);
--- a/NEWS	Tue Jun 09 11:54:58 2020 +0200
+++ b/NEWS	Tue Jun 09 12:20:42 2020 +0200
@@ -3,6 +3,12 @@
 
 * Private tags returned by C-FIND SCP (cf. option "DefaultPrivateCreator")
 
+Plugins
+-------
+
+* New functions in the SDK:
+  - new "changes": JobSubmitted, JobSuccess, JobFailure
+
 
 Version 1.7.1 (2020-05-27)
 ==========================
--- a/OrthancServer/ServerContext.cpp	Tue Jun 09 11:54:58 2020 +0200
+++ b/OrthancServer/ServerContext.cpp	Tue Jun 09 12:20:42 2020 +0200
@@ -143,6 +143,7 @@
   {
     haveJobsChanged_ = true;
     mainLua_.SignalJobSubmitted(jobId);
+    plugins_->SignalJobSubmitted(jobId);
   }
   
 
@@ -150,6 +151,7 @@
   {
     haveJobsChanged_ = true;
     mainLua_.SignalJobSuccess(jobId);
+    plugins_->SignalJobSuccess(jobId);
   }
 
   
@@ -157,6 +159,7 @@
   {
     haveJobsChanged_ = true;
     mainLua_.SignalJobFailure(jobId);
+    plugins_->SignalJobFailure(jobId);
   }
 
 
@@ -190,7 +193,7 @@
       LOG(INFO) << "Not reloading the jobs from the last execution of Orthanc";
     }
 
-    jobsEngine_.GetRegistry().AddObserver(*this);
+    jobsEngine_.GetRegistry().SetObserver(*this);
     jobsEngine_.Start();
     isJobsEngineUnserialized_ = true;
 
@@ -347,7 +350,7 @@
         saveJobsThread_.join();
       }
 
-      jobsEngine_.GetRegistry().ResetObserver(*this);
+      jobsEngine_.GetRegistry().ResetObserver();
 
       if (isJobsEngineUnserialized_)
       {
--- a/Plugins/Engine/OrthancPlugins.h	Tue Jun 09 11:54:58 2020 +0200
+++ b/Plugins/Engine/OrthancPlugins.h	Tue Jun 09 12:20:42 2020 +0200
@@ -311,6 +311,21 @@
       SignalChangeInternal(OrthancPluginChangeType_OrthancStopped, OrthancPluginResourceType_None, NULL);
     }
 
+    void SignalJobSubmitted(const std::string& jobId)
+    {
+      SignalChangeInternal(OrthancPluginChangeType_JobSubmitted, OrthancPluginResourceType_None, jobId.c_str());
+    }
+
+    void SignalJobSuccess(const std::string& jobId)
+    {
+      SignalChangeInternal(OrthancPluginChangeType_JobSuccess, OrthancPluginResourceType_None, jobId.c_str());
+    }
+
+    void SignalJobFailure(const std::string& jobId)
+    {
+      SignalChangeInternal(OrthancPluginChangeType_JobFailure, OrthancPluginResourceType_None, jobId.c_str());
+    }
+
     void SignalUpdatedPeers()
     {
       SignalChangeInternal(OrthancPluginChangeType_UpdatedPeers, OrthancPluginResourceType_None, NULL);
--- a/Plugins/Engine/PluginsEnumerations.cpp	Tue Jun 09 11:54:58 2020 +0200
+++ b/Plugins/Engine/PluginsEnumerations.cpp	Tue Jun 09 12:20:42 2020 +0200
@@ -92,7 +92,7 @@
 
   namespace Plugins
   {
-    OrthancPluginChangeType Convert55(ChangeType type)
+    OrthancPluginChangeType Convert(ChangeType type)
     {
       switch (type)
       {
--- a/Plugins/Engine/PluginsEnumerations.h	Tue Jun 09 11:54:58 2020 +0200
+++ b/Plugins/Engine/PluginsEnumerations.h	Tue Jun 09 12:20:42 2020 +0200
@@ -56,7 +56,7 @@
 
   namespace Plugins
   {
-    OrthancPluginChangeType Convert55(ChangeType type);
+    OrthancPluginChangeType Convert(ChangeType type);
 
     OrthancPluginPixelFormat Convert(PixelFormat format);
 
--- a/Plugins/Include/orthanc/OrthancCPlugin.h	Tue Jun 09 11:54:58 2020 +0200
+++ b/Plugins/Include/orthanc/OrthancCPlugin.h	Tue Jun 09 12:20:42 2020 +0200
@@ -716,7 +716,7 @@
 
 
   /**
-   * The supported types of changes that can happen to DICOM resources.
+   * The supported types of changes that can be signaled to the change callback.
    * @ingroup Callbacks
    **/
   typedef enum
@@ -737,6 +737,9 @@
     OrthancPluginChangeType_UpdatedMetadata = 13,   /*!< Some user-defined metadata has changed for this resource */
     OrthancPluginChangeType_UpdatedPeers = 14,      /*!< The list of Orthanc peers has changed */
     OrthancPluginChangeType_UpdatedModalities = 15, /*!< The list of DICOM modalities has changed */
+    OrthancPluginChangeType_JobSubmitted = 16,      /*!< New Job submitted */
+    OrthancPluginChangeType_JobSuccess = 17,        /*!< A Job has completed successfully */
+    OrthancPluginChangeType_JobFailure = 18,        /*!< A Job has failed */
 
     _OrthancPluginChangeType_INTERNAL = 0x7fffffff
   } OrthancPluginChangeType;