diff OrthancServer/Sources/ServerContext.cpp @ 5410:16cbfefa15e9

Solved a deadlock related to the Job Engine events and plugins
author Alain Mazy <am@osimis.io>
date Tue, 07 Nov 2023 12:52:37 +0100
parents 566e8d32bd3a
children d37dff2c0028
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerContext.cpp	Tue Nov 07 08:38:48 2023 +0100
+++ b/OrthancServer/Sources/ServerContext.cpp	Tue Nov 07 12:52:37 2023 +0100
@@ -164,7 +164,7 @@
             }
             catch (...)
             {
-              throw OrthancException(ErrorCode_InternalError);
+              throw OrthancException(ErrorCode_InternalError, "Error while signaling a change");
             }
           }
           catch (OrthancException& e)
@@ -179,6 +179,48 @@
   }
 
 
+  void ServerContext::JobEventsThread(ServerContext* that,
+                                      unsigned int sleepDelay)
+  {
+    while (!that->done_)
+    {
+      std::unique_ptr<IDynamicObject> obj(that->pendingJobEvents_.Dequeue(sleepDelay));
+        
+      if (obj.get() != NULL)
+      {
+        const JobEvent& event = dynamic_cast<const JobEvent&>(*obj.get());
+
+        boost::shared_lock<boost::shared_mutex> lock(that->listenersMutex_);
+        for (ServerListeners::iterator it = that->listeners_.begin(); 
+             it != that->listeners_.end(); ++it)
+        {
+          try
+          {
+            try
+            {
+              it->GetListener().SignalJobEvent(event);
+            }
+            catch (std::bad_alloc&)
+            {
+              LOG(ERROR) << "Not enough memory while signaling a job event";
+            }
+            catch (...)
+            {
+              throw OrthancException(ErrorCode_InternalError, "Error while signaling a job event");
+            }
+          }
+          catch (OrthancException& e)
+          {
+            LOG(ERROR) << "Error in the " << it->GetDescription() 
+                       << " callback while signaling a job event: " << e.What()
+                       << " (code " << e.GetErrorCode() << ")";
+          }
+        }
+      }
+    }
+  }
+
+
   void ServerContext::SaveJobsThread(ServerContext* that,
                                      unsigned int sleepDelay)
   {
@@ -206,42 +248,21 @@
   void ServerContext::SignalJobSubmitted(const std::string& jobId)
   {
     haveJobsChanged_ = true;
-    mainLua_.SignalJobSubmitted(jobId);
-
-#if ORTHANC_ENABLE_PLUGINS == 1
-    if (HasPlugins())
-    {
-      GetPlugins().SignalJobSubmitted(jobId);
-    }
-#endif
+    pendingJobEvents_.Enqueue(new JobEvent(JobEventType_Submitted, jobId));
   }
   
 
   void ServerContext::SignalJobSuccess(const std::string& jobId)
   {
     haveJobsChanged_ = true;
-    mainLua_.SignalJobSuccess(jobId);
-
-#if ORTHANC_ENABLE_PLUGINS == 1
-    if (HasPlugins())
-    {
-      GetPlugins().SignalJobSuccess(jobId);
-    }
-#endif
+    pendingJobEvents_.Enqueue(new JobEvent(JobEventType_Success, jobId));
   }
 
   
   void ServerContext::SignalJobFailure(const std::string& jobId)
   {
     haveJobsChanged_ = true;
-    mainLua_.SignalJobFailure(jobId);
-
-#if ORTHANC_ENABLE_PLUGINS == 1
-    if (HasPlugins())
-    {
-      GetPlugins().SignalJobFailure(jobId);
-    }
-#endif
+    pendingJobEvents_.Enqueue(new JobEvent(JobEventType_Failure, jobId));
   }
 
 
@@ -449,6 +470,7 @@
 
       listeners_.push_back(ServerListener(luaListener_, "Lua"));
       changeThread_ = boost::thread(ChangeThread, this, (unitTesting ? 20 : 100));
+      jobEventsThread_ = boost::thread(JobEventsThread, this, (unitTesting ? 20 : 100));
       
 #if HAVE_MALLOC_TRIM == 1
       LOG(INFO) << "Starting memory trimming thread at 30 seconds interval";
@@ -494,6 +516,11 @@
         changeThread_.join();
       }
 
+      if (jobEventsThread_.joinable())
+      {
+        jobEventsThread_.join();
+      }
+
       if (saveJobsThread_.joinable())
       {
         saveJobsThread_.join();