changeset 2604:76ef12fa136c jobs

fix race conditions if creating Lua jobs
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 18 May 2018 17:37:14 +0200
parents 988936118354
children 1e11b0229e04
files OrthancServer/ServerJobs/LuaJobManager.cpp OrthancServer/ServerJobs/LuaJobManager.h UnitTestsSources/MultiThreadingTests.cpp
diffstat 3 files changed, 147 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/OrthancServer/ServerJobs/LuaJobManager.cpp	Fri May 18 17:02:25 2018 +0200
+++ b/OrthancServer/ServerJobs/LuaJobManager.cpp	Fri May 18 17:37:14 2018 +0200
@@ -34,6 +34,13 @@
 #include "../PrecompiledHeadersServer.h"
 #include "LuaJobManager.h"
 
+#include "DeleteResourceOperation.h"
+#include "StoreScuOperation.h"
+#include "../../Core/JobsEngine/Operations/LogJobOperation.h"
+
+#include "DicomInstanceOperationValue.h"
+#include "../../Core/JobsEngine/Operations/NullOperationValue.h"
+#include "../../Core/JobsEngine/Operations/StringOperationValue.h"
 
 namespace Orthanc
 {
@@ -59,8 +66,7 @@
   }
 
 
-  LuaJobManager::LuaJobManager(JobsEngine&  engine) :
-    engine_(engine),
+  LuaJobManager::LuaJobManager() :
     currentJob_(NULL),
     maxOperations_(1000),
     priority_(0),
@@ -102,30 +108,108 @@
   }
 
 
-  LuaJobManager::Lock* LuaJobManager::Modify()
+  LuaJobManager::Lock::Lock(LuaJobManager& that,
+                            JobsEngine& engine) :
+    that_(that),
+    lock_(that.mutex_),
+    engine_(engine)
   {
-    boost::mutex::scoped_lock lock(mutex_);
-
-    if (currentJob_ != NULL)
+    if (that_.currentJob_ == NULL)
+    {
+      isNewJob_ = true;
+    }
+    else
     {
-      std::auto_ptr<Lock> result(new Lock(*currentJob_));
+      jobLock_.reset(new SequenceOfOperationsJob::Lock(*that_.currentJob_));
 
-      if (!result->IsDone() &&
-          result->GetOperationsCount() < maxOperations_)
+      if (jobLock_->IsDone() ||
+          jobLock_->GetOperationsCount() >= that_.maxOperations_)
       {
-        return result.release();
+        jobLock_.reset(NULL);
+        isNewJob_ = true;
+      }
+      else
+      {
+        isNewJob_ = false;
       }
     }
 
-    // Need to create a new job, as the previous one is either
-    // finished, or is getting too long
-    currentJob_ = new SequenceOfOperationsJob;
+    if (isNewJob_)
+    {
+      // Need to create a new job, as the previous one is either
+      // finished, or is getting too long
+      that_.currentJob_ = new SequenceOfOperationsJob;
+      jobLock_.reset(new SequenceOfOperationsJob::Lock(*that_.currentJob_));
+      jobLock_->SetTrailingOperationTimeout(that_.trailingTimeout_);
+    }
+
+    assert(jobLock_.get() != NULL);
+  }
+
+
+  LuaJobManager::Lock::~Lock()
+  {
+    assert(jobLock_.get() != NULL);
+    jobLock_.reset(NULL);
+
+    if (isNewJob_)
+    {
+      engine_.GetRegistry().Submit(that_.currentId_, that_.currentJob_, that_.priority_);
+    }
+  }
+
+
+  size_t LuaJobManager::Lock::AddDeleteResourceOperation(ServerContext& context)
+  {
+    assert(jobLock_.get() != NULL);
+    return jobLock_->AddOperation(new DeleteResourceOperation(context));
+  }
+
+
+  size_t LuaJobManager::Lock::AddLogOperation()
+  {
+    assert(jobLock_.get() != NULL);
+    return jobLock_->AddOperation(new LogJobOperation);
+  }
+
 
-    engine_.GetRegistry().Submit(currentId_, currentJob_, priority_);
+  size_t LuaJobManager::Lock::AddStoreScuOperation(const std::string& localAet,
+                                                   const RemoteModalityParameters& modality,
+                                                   IDicomConnectionManager& manager)
+  {
+    assert(jobLock_.get() != NULL);
+    return jobLock_->AddOperation(new StoreScuOperation(localAet, modality, that_.connectionManager_));    
+  }
+
+
+  void LuaJobManager::Lock::AddNullInput(size_t operation)
+  {
+    assert(jobLock_.get() != NULL);
+    jobLock_->AddInput(operation, NullOperationValue());
+  }
+
 
-    std::auto_ptr<Lock> result(new Lock(*currentJob_));
-    result->SetTrailingOperationTimeout(trailingTimeout_);
+  void LuaJobManager::Lock::AddStringInput(size_t operation,
+                                           const std::string& content)
+  {
+    assert(jobLock_.get() != NULL);
+    jobLock_->AddInput(operation, StringOperationValue(content));
+  }
+
 
-    return result.release();
+  void LuaJobManager::Lock::AddDicomInstanceInput(size_t operation,
+                                                  ServerContext& context,
+                                                  const std::string& instanceId)
+  {
+    assert(jobLock_.get() != NULL);
+    jobLock_->AddInput(operation, DicomInstanceOperationValue(context, instanceId));
+  }
+
+
+  void LuaJobManager::Lock::Connect(size_t operation1,
+                                    size_t operation2)
+  {
+    assert(jobLock_.get() != NULL);
+    jobLock_->Connect(operation1, operation2);
   }
 }
--- a/OrthancServer/ServerJobs/LuaJobManager.h	Fri May 18 17:02:25 2018 +0200
+++ b/OrthancServer/ServerJobs/LuaJobManager.h	Fri May 18 17:37:14 2018 +0200
@@ -37,16 +37,14 @@
 #include "../../Core/JobsEngine/JobsEngine.h"
 #include "../../Core/JobsEngine/Operations/SequenceOfOperationsJob.h"
 
+#include "../ServerContext.h"
+
 namespace Orthanc
 {
   class LuaJobManager : private SequenceOfOperationsJob::IObserver
   {
-  public:
-    typedef SequenceOfOperationsJob::Lock  Lock;
-
   private:
     boost::mutex                   mutex_;
-    JobsEngine&                    engine_;
     TimeoutDicomConnectionManager  connectionManager_;
     std::string                    currentId_;
     SequenceOfOperationsJob*       currentJob_;
@@ -61,7 +59,7 @@
     virtual void SignalDone(const SequenceOfOperationsJob& job);
 
   public:
-    LuaJobManager(JobsEngine&  engine);
+    LuaJobManager();
 
     ~LuaJobManager();
 
@@ -71,6 +69,40 @@
 
     void SetTrailingOperationTimeout(unsigned int timeout);
 
-    Lock* Modify();
+    class Lock : public boost::noncopyable
+    {
+    private:
+      LuaJobManager&                                that_;
+      boost::mutex::scoped_lock                     lock_;
+      JobsEngine&                                   engine_;
+      std::auto_ptr<SequenceOfOperationsJob::Lock>  jobLock_;
+      bool                                          isNewJob_;
+
+    public:
+      Lock(LuaJobManager& that,
+           JobsEngine& engine);
+
+      ~Lock();
+
+      size_t AddLogOperation();
+
+      size_t AddDeleteResourceOperation(ServerContext& context);
+
+      size_t AddStoreScuOperation(const std::string& localAet,
+                                  const RemoteModalityParameters& modality,
+                                  IDicomConnectionManager& manager);
+
+      void AddNullInput(size_t operation); 
+
+      void AddStringInput(size_t operation,
+                          const std::string& content);
+
+      void AddDicomInstanceInput(size_t operation,
+                                 ServerContext& context,
+                                 const std::string& instanceId);
+
+      void Connect(size_t operation1,
+                   size_t operation2);
+    };
   };
 }
--- a/UnitTestsSources/MultiThreadingTests.cpp	Fri May 18 17:02:25 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Fri May 18 17:37:14 2018 +0200
@@ -788,16 +788,20 @@
   engine.SetWorkersCount(2);
   engine.Start();
 
-  LuaJobManager lua(engine);
+  LuaJobManager lua;
   lua.SetMaxOperationsPerJob(5);
   lua.SetTrailingOperationTimeout(200);
 
   for (size_t i = 0; i < 30; i++)
   {
     boost::this_thread::sleep(boost::posix_time::milliseconds(150));
-    std::auto_ptr<LuaJobManager::Lock> lock(lua.Modify());
-    size_t a = lock->AddOperation(new LogJobOperation);
-    lock->AddInput(a, StringOperationValue(boost::lexical_cast<std::string>(i)));
+
+    LuaJobManager::Lock lock(lua, engine);
+    size_t a = lock.AddLogOperation();
+    size_t b = lock.AddLogOperation();
+    lock.AddStringInput(a, boost::lexical_cast<std::string>(i));
+    lock.AddNullInput(a);
+    lock.Connect(a, b);
   }
 
   boost::this_thread::sleep(boost::posix_time::milliseconds(2000));