changeset 5410:16cbfefa15e9

Solved a deadlock related to the Job Engine events and plugins
author Alain Mazy <am@osimis.io>
date Tue, 07 Nov 2023 12:52:37 +0100
parents 68231ca4363a
children ca9cf4d46883
files NEWS OrthancServer/Plugins/Engine/OrthancPlugins.cpp OrthancServer/Plugins/Engine/OrthancPlugins.h OrthancServer/Sources/IServerListener.h OrthancServer/Sources/JobEvent.h OrthancServer/Sources/LuaScripting.cpp OrthancServer/Sources/LuaScripting.h OrthancServer/Sources/ServerContext.cpp OrthancServer/Sources/ServerContext.h
diffstat 9 files changed, 188 insertions(+), 81 deletions(-) [+]
line wrap: on
line diff
--- a/NEWS	Tue Nov 07 08:38:48 2023 +0100
+++ b/NEWS	Tue Nov 07 12:52:37 2023 +0100
@@ -19,6 +19,13 @@
     files through WADO-RS e.g in StoneViewer when working on large bandwidth networks.
   - When "HttpCompressionEnabled" is true, content < 2KB are never compressed.
 
+Bug Fixes
+---------
+
+* Solved a deadlock related to the Job Engine events and plugins.  Job events are now pushed
+  into a queue to be handled asynchronously by plugins.
+
+
 REST API
 --------
 
--- a/OrthancServer/Plugins/Engine/OrthancPlugins.cpp	Tue Nov 07 08:38:48 2023 +0100
+++ b/OrthancServer/Plugins/Engine/OrthancPlugins.cpp	Tue Nov 07 12:52:37 2023 +0100
@@ -2730,6 +2730,25 @@
   }
 
 
+  void OrthancPlugins::SignalJobEvent(const JobEvent& event)
+  {
+    // job events are actually considered as changes inside plugins -> translate
+    switch (event.GetEventType())
+    {
+      case JobEventType_Submitted:
+        SignalChangeInternal(OrthancPluginChangeType_JobSubmitted, OrthancPluginResourceType_None, event.GetJobId().c_str());
+        break;
+      case JobEventType_Success:
+        SignalChangeInternal(OrthancPluginChangeType_JobSuccess, OrthancPluginResourceType_None, event.GetJobId().c_str());
+        break;
+      case JobEventType_Failure:
+        SignalChangeInternal(OrthancPluginChangeType_JobFailure, OrthancPluginResourceType_None, event.GetJobId().c_str());
+        break;
+      default:
+        throw OrthancException(ErrorCode_InternalError);
+    }
+  }
+
 
   void OrthancPlugins::RegisterRestCallback(const void* parameters,
                                             bool mutualExclusion)
@@ -2776,6 +2795,8 @@
 
   void OrthancPlugins::RegisterOnChangeCallback(const void* parameters)
   {
+    boost::recursive_mutex::scoped_lock lock(pimpl_->changeCallbackMutex_);
+
     const _OrthancPluginOnChangeCallback& p = 
       *reinterpret_cast<const _OrthancPluginOnChangeCallback*>(parameters);
 
--- a/OrthancServer/Plugins/Engine/OrthancPlugins.h	Tue Nov 07 08:38:48 2023 +0100
+++ b/OrthancServer/Plugins/Engine/OrthancPlugins.h	Tue Nov 07 12:52:37 2023 +0100
@@ -269,7 +269,9 @@
                                const void* parameters) ORTHANC_OVERRIDE;
 
     virtual void SignalChange(const ServerIndexChange& change) ORTHANC_OVERRIDE;
-    
+
+    virtual void SignalJobEvent(const JobEvent& event) ORTHANC_OVERRIDE;
+
     virtual void SignalStoredInstance(const std::string& instanceId,
                                       const DicomInstanceToStore& instance,
                                       const Json::Value& simplifiedTags) ORTHANC_OVERRIDE;
@@ -319,21 +321,6 @@
       SignalChangeInternal(OrthancPluginChangeType_OrthancStopped, OrthancPluginResourceType_None, NULL);
     }
 
-    void SignalJobSubmitted(const std::string& jobId)
-    {
-      SignalChangeInternal(OrthancPluginChangeType_JobSubmitted, OrthancPluginResourceType_None, jobId.c_str());
-    }
-
-    void SignalJobSuccess(const std::string& jobId)
-    {
-      SignalChangeInternal(OrthancPluginChangeType_JobSuccess, OrthancPluginResourceType_None, jobId.c_str());
-    }
-
-    void SignalJobFailure(const std::string& jobId)
-    {
-      SignalChangeInternal(OrthancPluginChangeType_JobFailure, OrthancPluginResourceType_None, jobId.c_str());
-    }
-
     void SignalUpdatedPeers()
     {
       SignalChangeInternal(OrthancPluginChangeType_UpdatedPeers, OrthancPluginResourceType_None, NULL);
--- a/OrthancServer/Sources/IServerListener.h	Tue Nov 07 08:38:48 2023 +0100
+++ b/OrthancServer/Sources/IServerListener.h	Tue Nov 07 12:52:37 2023 +0100
@@ -24,6 +24,7 @@
 
 #include "DicomInstanceToStore.h"
 #include "ServerIndexChange.h"
+#include "JobEvent.h"
 
 #include <json/value.h>
 
@@ -42,6 +43,8 @@
     
     virtual void SignalChange(const ServerIndexChange& change) = 0;
 
+    virtual void SignalJobEvent(const JobEvent& event) = 0;
+
     virtual bool FilterIncomingInstance(const DicomInstanceToStore& instance,
                                         const Json::Value& simplified) = 0;
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Sources/JobEvent.h	Tue Nov 07 12:52:37 2023 +0100
@@ -0,0 +1,77 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
+ * Department, University Hospital of Liege, Belgium
+ * Copyright (C) 2017-2023 Osimis S.A., Belgium
+ * Copyright (C) 2021-2023 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "ServerEnumerations.h"
+#include "../../OrthancFramework/Sources/IDynamicObject.h"
+#include "../../OrthancFramework/Sources/SystemToolbox.h"
+
+#include <string>
+#include <json/value.h>
+
+namespace Orthanc
+{
+  enum JobEventType
+  {
+    JobEventType_Failure,
+    JobEventType_Submitted,
+    JobEventType_Success
+  };
+
+
+  struct JobEvent : public IDynamicObject
+  {
+  private:
+    JobEventType eventType_;
+    std::string  jobId_;
+
+  public:
+    JobEvent(JobEventType eventType,
+             const std::string& jobId) :
+      eventType_(eventType),
+      jobId_(jobId)
+    {
+    }
+
+    JobEvent(const JobEvent& other) 
+    : eventType_(other.eventType_),
+      jobId_(other.jobId_)
+    {
+    }
+
+    // JobEvent* Clone() const
+    // {
+    //   return new JobEvent(*this);
+    // }
+
+    JobEventType  GetEventType() const
+    {
+      return eventType_;
+    }
+
+    const std::string&  GetJobId() const
+    {
+      return jobId_;
+    }
+  };
+}
--- a/OrthancServer/Sources/LuaScripting.cpp	Tue Nov 07 08:38:48 2023 +0100
+++ b/OrthancServer/Sources/LuaScripting.cpp	Tue Nov 07 12:52:37 2023 +0100
@@ -239,25 +239,14 @@
   };
 
 
-  class LuaScripting::JobEvent : public LuaScripting::IEvent
+  class LuaScripting::LuaJobEvent : public LuaScripting::IEvent
   {
-  public:
-    enum Type
-    {
-      Type_Failure,
-      Type_Submitted,
-      Type_Success
-    };
-    
   private:
-    Type         type_;
-    std::string  jobId_;
+    JobEvent event_;
 
   public:
-    JobEvent(Type type,
-             const std::string& jobId) :
-      type_(type),
-      jobId_(jobId)
+    LuaJobEvent(const JobEvent& event) :
+      event_(event)
     {
     }
 
@@ -265,17 +254,17 @@
     {
       std::string functionName;
       
-      switch (type_)
+      switch (event_.GetEventType())
       {
-        case Type_Failure:
+        case JobEventType_Failure:
           functionName = "OnJobFailure";
           break;
 
-        case Type_Submitted:
+        case JobEventType_Submitted:
           functionName = "OnJobSubmitted";
           break;
 
-        case Type_Success:
+        case JobEventType_Success:
           functionName = "OnJobSuccess";
           break;
 
@@ -289,7 +278,7 @@
         if (lock.GetLua().IsExistingFunction(functionName.c_str()))
         {
           LuaFunctionCall call(lock.GetLua(), functionName.c_str());
-          call.PushString(jobId_);
+          call.PushString(event_.GetJobId());
           call.Execute();
         }
       }
@@ -1056,20 +1045,9 @@
   }
 
   
-  void LuaScripting::SignalJobSubmitted(const std::string& jobId)
-  {
-    pendingEvents_.Enqueue(new JobEvent(JobEvent::Type_Submitted, jobId));
-  }
-  
-
-  void LuaScripting::SignalJobSuccess(const std::string& jobId)
+  void LuaScripting::SignalJobEvent(const JobEvent& event)
   {
-    pendingEvents_.Enqueue(new JobEvent(JobEvent::Type_Success, jobId));
-  }
-  
-
-  void LuaScripting::SignalJobFailure(const std::string& jobId)
-  {
-    pendingEvents_.Enqueue(new JobEvent(JobEvent::Type_Failure, jobId));
+    // Lua has its own event thread and queue to dissociate it completely from the main JobEventsThread
+    pendingEvents_.Enqueue(new LuaJobEvent(event));
   }
 }
--- a/OrthancServer/Sources/LuaScripting.h	Tue Nov 07 08:38:48 2023 +0100
+++ b/OrthancServer/Sources/LuaScripting.h	Tue Nov 07 12:52:37 2023 +0100
@@ -24,6 +24,7 @@
 
 #include "DicomInstanceToStore.h"
 #include "ServerIndexChange.h"
+#include "JobEvent.h"
 #include "ServerJobs/LuaJobManager.h"
 
 #include "../../OrthancFramework/Sources/MultiThreading/SharedMessageQueue.h"
@@ -47,7 +48,7 @@
     class IEvent;
     class OnStoredInstanceEvent;
     class StableResourceEvent;
-    class JobEvent;
+    class LuaJobEvent;
     class DeleteEvent;
     class UpdateEvent;
 
@@ -128,11 +129,7 @@
 
     void Execute(const std::string& command);
 
-    void SignalJobSubmitted(const std::string& jobId);
-
-    void SignalJobSuccess(const std::string& jobId);
-
-    void SignalJobFailure(const std::string& jobId);
+    void SignalJobEvent(const JobEvent& event);
 
     TimeoutDicomConnectionManager& GetDicomConnectionManager()
     {
--- a/OrthancServer/Sources/ServerContext.cpp	Tue Nov 07 08:38:48 2023 +0100
+++ b/OrthancServer/Sources/ServerContext.cpp	Tue Nov 07 12:52:37 2023 +0100
@@ -164,7 +164,7 @@
             }
             catch (...)
             {
-              throw OrthancException(ErrorCode_InternalError);
+              throw OrthancException(ErrorCode_InternalError, "Error while signaling a change");
             }
           }
           catch (OrthancException& e)
@@ -179,6 +179,48 @@
   }
 
 
+  void ServerContext::JobEventsThread(ServerContext* that,
+                                      unsigned int sleepDelay)
+  {
+    while (!that->done_)
+    {
+      std::unique_ptr<IDynamicObject> obj(that->pendingJobEvents_.Dequeue(sleepDelay));
+        
+      if (obj.get() != NULL)
+      {
+        const JobEvent& event = dynamic_cast<const JobEvent&>(*obj.get());
+
+        boost::shared_lock<boost::shared_mutex> lock(that->listenersMutex_);
+        for (ServerListeners::iterator it = that->listeners_.begin(); 
+             it != that->listeners_.end(); ++it)
+        {
+          try
+          {
+            try
+            {
+              it->GetListener().SignalJobEvent(event);
+            }
+            catch (std::bad_alloc&)
+            {
+              LOG(ERROR) << "Not enough memory while signaling a job event";
+            }
+            catch (...)
+            {
+              throw OrthancException(ErrorCode_InternalError, "Error while signaling a job event");
+            }
+          }
+          catch (OrthancException& e)
+          {
+            LOG(ERROR) << "Error in the " << it->GetDescription() 
+                       << " callback while signaling a job event: " << e.What()
+                       << " (code " << e.GetErrorCode() << ")";
+          }
+        }
+      }
+    }
+  }
+
+
   void ServerContext::SaveJobsThread(ServerContext* that,
                                      unsigned int sleepDelay)
   {
@@ -206,42 +248,21 @@
   void ServerContext::SignalJobSubmitted(const std::string& jobId)
   {
     haveJobsChanged_ = true;
-    mainLua_.SignalJobSubmitted(jobId);
-
-#if ORTHANC_ENABLE_PLUGINS == 1
-    if (HasPlugins())
-    {
-      GetPlugins().SignalJobSubmitted(jobId);
-    }
-#endif
+    pendingJobEvents_.Enqueue(new JobEvent(JobEventType_Submitted, jobId));
   }
   
 
   void ServerContext::SignalJobSuccess(const std::string& jobId)
   {
     haveJobsChanged_ = true;
-    mainLua_.SignalJobSuccess(jobId);
-
-#if ORTHANC_ENABLE_PLUGINS == 1
-    if (HasPlugins())
-    {
-      GetPlugins().SignalJobSuccess(jobId);
-    }
-#endif
+    pendingJobEvents_.Enqueue(new JobEvent(JobEventType_Success, jobId));
   }
 
   
   void ServerContext::SignalJobFailure(const std::string& jobId)
   {
     haveJobsChanged_ = true;
-    mainLua_.SignalJobFailure(jobId);
-
-#if ORTHANC_ENABLE_PLUGINS == 1
-    if (HasPlugins())
-    {
-      GetPlugins().SignalJobFailure(jobId);
-    }
-#endif
+    pendingJobEvents_.Enqueue(new JobEvent(JobEventType_Failure, jobId));
   }
 
 
@@ -449,6 +470,7 @@
 
       listeners_.push_back(ServerListener(luaListener_, "Lua"));
       changeThread_ = boost::thread(ChangeThread, this, (unitTesting ? 20 : 100));
+      jobEventsThread_ = boost::thread(JobEventsThread, this, (unitTesting ? 20 : 100));
       
 #if HAVE_MALLOC_TRIM == 1
       LOG(INFO) << "Starting memory trimming thread at 30 seconds interval";
@@ -494,6 +516,11 @@
         changeThread_.join();
       }
 
+      if (jobEventsThread_.joinable())
+      {
+        jobEventsThread_.join();
+      }
+
       if (saveJobsThread_.joinable())
       {
         saveJobsThread_.join();
--- a/OrthancServer/Sources/ServerContext.h	Tue Nov 07 08:38:48 2023 +0100
+++ b/OrthancServer/Sources/ServerContext.h	Tue Nov 07 12:52:37 2023 +0100
@@ -139,6 +139,11 @@
         context_.mainLua_.SignalChange(change);
       }
 
+      virtual void SignalJobEvent(const JobEvent& event) ORTHANC_OVERRIDE
+      {
+        context_.mainLua_.SignalJobEvent(event);
+      }
+
       virtual bool FilterIncomingInstance(const DicomInstanceToStore& instance,
                                           const Json::Value& simplified) ORTHANC_OVERRIDE
       {
@@ -184,6 +189,9 @@
     static void ChangeThread(ServerContext* that,
                              unsigned int sleepDelay);
 
+    static void JobEventsThread(ServerContext* that,
+                                unsigned int sleepDelay);
+
     static void SaveJobsThread(ServerContext* that,
                                unsigned int sleepDelay);
 
@@ -233,7 +241,9 @@
     bool haveJobsChanged_;
     bool isJobsEngineUnserialized_;
     SharedMessageQueue  pendingChanges_;
+    SharedMessageQueue  pendingJobEvents_;
     boost::thread  changeThread_;
+    boost::thread  jobEventsThread_;
     boost::thread  saveJobsThread_;
     boost::thread  memoryTrimmingThread_;