changeset 1436:0a3e3be59094

uncoupling of SignalChange for Lua scripts
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 30 Jun 2015 17:19:26 +0200
parents 6406f5493d92
children 02f5a3f5c0a0
files OrthancServer/LuaScripting.cpp OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerIndexChange.h OrthancServer/main.cpp Plugins/Engine/OrthancPlugins.cpp Plugins/Engine/OrthancPlugins.h
diffstat 7 files changed, 84 insertions(+), 82 deletions(-) [+]
line wrap: on
line diff
--- a/OrthancServer/LuaScripting.cpp	Tue Jun 30 16:46:23 2015 +0200
+++ b/OrthancServer/LuaScripting.cpp	Tue Jun 30 17:19:26 2015 +0200
@@ -280,7 +280,7 @@
 
 
     Json::Value tags;
-    //if (context_.GetIndex().LookupResource(tags, change.GetPublicId(), change.GetResourceType()))
+    if (context_.GetIndex().LookupResource(tags, change.GetPublicId(), change.GetResourceType()))
     {
       boost::mutex::scoped_lock lock(mutex_);
 
@@ -290,7 +290,7 @@
 
         LuaFunctionCall call(lua_, name);
         call.PushString(change.GetPublicId());
-        call.PushJson(tags);
+        call.PushJson(tags["MainDicomTags"]);
         call.Execute();
 
         SubmitJob(std::string("Lua script: ") + name);
--- a/OrthancServer/ServerContext.cpp	Tue Jun 30 16:46:23 2015 +0200
+++ b/OrthancServer/ServerContext.cpp	Tue Jun 30 17:19:26 2015 +0200
@@ -67,6 +67,34 @@
 
 namespace Orthanc
 {
+  void ServerContext::ChangeThread(ServerContext* that)
+  {
+    while (!that->done_)
+    {
+      std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(500));
+        
+      if (obj.get() != NULL)
+      {
+        const ServerIndexChange& change = dynamic_cast<const ServerIndexChange&>(*obj.get());
+
+        for (ServerListeners::iterator it = that->listeners_.begin(); 
+             it != that->listeners_.end(); ++it)
+        {
+          try
+          {
+            it->GetListener().SignalChange(change);
+          }
+          catch (OrthancException& e)
+          {
+            LOG(ERROR) << "Error in the " << it->GetDescription() 
+                       << " callback while signaling a change: " << e.What();
+          }
+        }
+      }
+    }
+  }
+
+
   ServerContext::ServerContext(IDatabaseWrapper& database) :
     index_(*this, database),
     compressionEnabled_(false),
@@ -75,6 +103,7 @@
     scheduler_(Configuration::GetGlobalIntegerParameter("LimitJobs", 10)),
     lua_(*this),
     plugins_(NULL),
+    done_(false),
     queryRetrieveArchive_(Configuration::GetGlobalIntegerParameter("QueryRetrieveSize", 10)),
     defaultLocalAet_(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC"))
   {
@@ -82,8 +111,23 @@
     scu_.SetMillisecondsBeforeClose(s * 1000);  // Milliseconds are expected here
 
     listeners_.push_back(ServerListener(lua_, "Lua"));
+
+    changeThread_ = boost::thread(ChangeThread, this);
   }
 
+
+  
+  ServerContext::~ServerContext()
+  {
+    done_ = true;
+
+    if (changeThread_.joinable())
+    {
+      changeThread_.join();
+    }
+  }
+
+
   void ServerContext::SetCompressionEnabled(bool enabled)
   {
     if (enabled)
@@ -360,18 +404,7 @@
 
   void ServerContext::SignalChange(const ServerIndexChange& change)
   {
-    for (ServerListeners::iterator it = listeners_.begin(); it != listeners_.end(); ++it)
-    {
-      try
-      {
-        it->GetListener().SignalChange(change);
-      }
-      catch (OrthancException& e)
-      {
-        LOG(ERROR) << "Error in the " << it->GetDescription() 
-                   << " callback while signaling a change: " << e.What();
-      }
-    }
+    pendingChanges_.Enqueue(change.Clone());
   }
 
 
--- a/OrthancServer/ServerContext.h	Tue Jun 30 16:46:23 2015 +0200
+++ b/OrthancServer/ServerContext.h	Tue Jun 30 17:19:26 2015 +0200
@@ -32,6 +32,7 @@
 
 #pragma once
 
+#include "../Core/MultiThreading/SharedMessageQueue.h"
 #include "../Core/Cache/MemoryCache.h"
 #include "../Core/Cache/SharedArchive.h"
 #include "../Core/FileStorage/CompressedFileStorageAccessor.h"
@@ -48,6 +49,8 @@
 #include "ServerIndex.h"
 
 #include <boost/filesystem.hpp>
+#include <boost/thread.hpp>
+
 
 namespace Orthanc
 {
@@ -100,6 +103,9 @@
     typedef std::list<ServerListener>  ServerListeners;
 
 
+    static void ChangeThread(ServerContext* that);
+
+
     ServerIndex index_;
     CompressedFileStorageAccessor accessor_;
     bool compressionEnabled_;
@@ -114,6 +120,10 @@
     OrthancPlugins* plugins_;
     ServerListeners listeners_;
 
+    bool done_;
+    SharedMessageQueue  pendingChanges_;
+    boost::thread  changeThread_;
+        
     SharedArchive  queryRetrieveArchive_;
     std::string defaultLocalAet_;
 
@@ -139,6 +149,8 @@
 
     ServerContext(IDatabaseWrapper& database);
 
+    ~ServerContext();
+
     void SetStorageArea(IStorageArea& storage)
     {
       accessor_.SetStorageArea(storage);
--- a/OrthancServer/ServerIndexChange.h	Tue Jun 30 16:46:23 2015 +0200
+++ b/OrthancServer/ServerIndexChange.h	Tue Jun 30 17:19:26 2015 +0200
@@ -40,7 +40,7 @@
 
 namespace Orthanc
 {
-  struct ServerIndexChange
+  struct ServerIndexChange : public IDynamicObject
   {
   private:
     int64_t      seq_;
@@ -74,6 +74,20 @@
     {
     }
 
+    ServerIndexChange(const ServerIndexChange& other) 
+    : seq_(other.seq_),
+      changeType_(other.changeType_),
+      resourceType_(other.resourceType_),
+      publicId_(other.publicId_),
+      date_(other.date_)
+    {
+    }
+
+    ServerIndexChange* Clone() const
+    {
+      return new ServerIndexChange(*this);
+    }
+
     int64_t  GetSeq() const
     {
       return seq_;
--- a/OrthancServer/main.cpp	Tue Jun 30 16:46:23 2015 +0200
+++ b/OrthancServer/main.cpp	Tue Jun 30 17:19:26 2015 +0200
@@ -545,7 +545,6 @@
 
 #if ENABLE_PLUGINS == 1
     context->ResetPlugins();
-    plugins.Stop();
     plugins.ResetOrthancRestApi();
     LOG(WARNING) << "    Plugins have stopped";
 #endif
--- a/Plugins/Engine/OrthancPlugins.cpp	Tue Jun 30 16:46:23 2015 +0200
+++ b/Plugins/Engine/OrthancPlugins.cpp	Tue Jun 30 17:19:26 2015 +0200
@@ -35,7 +35,6 @@
 #include "../../Core/ChunkedBuffer.h"
 #include "../../Core/HttpServer/HttpOutput.h"
 #include "../../Core/ImageFormats/PngWriter.h"
-#include "../../Core/MultiThreading/SharedMessageQueue.h"
 #include "../../Core/OrthancException.h"
 #include "../../Core/Toolbox.h"
 #include "../../OrthancServer/OrthancInitialization.h"
@@ -43,7 +42,6 @@
 #include "../../OrthancServer/ServerContext.h"
 #include "../../OrthancServer/ServerToolbox.h"
 
-#include <boost/thread.hpp>
 #include <boost/regex.hpp> 
 #include <glog/logging.h>
 
@@ -141,33 +139,6 @@
         }
       }
     };
-
-
-    class PendingChange : public IDynamicObject
-    {
-    private:
-      OrthancPluginChangeType changeType_;
-      OrthancPluginResourceType resourceType_;
-      std::string publicId_;
-
-    public:
-      PendingChange(const ServerIndexChange& change)
-      {
-        changeType_ = Convert(change.GetChangeType());
-        resourceType_ = Convert(change.GetResourceType());
-        publicId_ = change.GetPublicId();
-      }
-
-      void Submit(std::list<OrthancPluginOnChangeCallback>& callbacks)
-      {
-        for (std::list<OrthancPluginOnChangeCallback>::const_iterator 
-               callback = callbacks.begin(); 
-             callback != callbacks.end(); ++callback)
-        {
-          (*callback) (changeType_, resourceType_, publicId_.c_str());
-        }
-      }
-    };
   }
 
 
@@ -191,9 +162,6 @@
     bool hasStorageArea_;
     _OrthancPluginRegisterStorageArea storageArea_;
     boost::recursive_mutex callbackMutex_;
-    SharedMessageQueue  pendingChanges_;
-    boost::thread  changeThread_;
-    bool done_;
     Properties properties_;
     int argc_;
     char** argv_;
@@ -203,28 +171,11 @@
       context_(NULL), 
       restApi_(NULL),
       hasStorageArea_(false),
-      done_(false),
       argc_(1),
       argv_(NULL)
     {
       memset(&storageArea_, 0, sizeof(storageArea_));
     }
-
-
-    static void ChangeThread(PImpl* that)
-    {
-      while (!that->done_)
-      {
-        std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(500));
-        
-        if (obj.get() != NULL)
-        {
-          boost::recursive_mutex::scoped_lock lock(that->callbackMutex_);
-          PendingChange& change = *dynamic_cast<PendingChange*>(obj.get());
-          change.Submit(that->onChangeCallbacks_);
-        }
-      }
-    }
   };
 
 
@@ -254,7 +205,6 @@
   {
     pimpl_.reset(new PImpl());
     pimpl_->manager_.RegisterServiceProvider(*this);
-    pimpl_->changeThread_ = boost::thread(PImpl::ChangeThread, pimpl_.get());
   }
 
   
@@ -267,8 +217,6 @@
   
   OrthancPlugins::~OrthancPlugins()
   {
-    Stop();
-
     for (PImpl::RestCallbacks::iterator it = pimpl_->restCallbacks_.begin(); 
          it != pimpl_->restCallbacks_.end(); ++it)
     {
@@ -278,17 +226,6 @@
   }
 
 
-  void OrthancPlugins::Stop()
-  {
-    if (!pimpl_->done_)
-    {
-      pimpl_->done_ = true;
-      pimpl_->changeThread_.join();
-    }
-  }
-
-
-
   static void ArgumentsToPlugin(std::vector<const char*>& keys,
                                 std::vector<const char*>& values,
                                 const HttpHandler::Arguments& arguments)
@@ -474,7 +411,16 @@
   {
     try
     {
-      pimpl_->pendingChanges_.Enqueue(new PendingChange(change));
+      boost::recursive_mutex::scoped_lock lock(pimpl_->callbackMutex_);
+
+      for (std::list<OrthancPluginOnChangeCallback>::const_iterator 
+             callback = pimpl_->onChangeCallbacks_.begin(); 
+           callback != pimpl_->onChangeCallbacks_.end(); ++callback)
+      {
+        (*callback) (Convert(change.GetChangeType()),
+                     Convert(change.GetResourceType()),
+                     change.GetPublicId().c_str());
+      }
     }
     catch (OrthancException&)
     {
--- a/Plugins/Engine/OrthancPlugins.h	Tue Jun 30 16:46:23 2015 +0200
+++ b/Plugins/Engine/OrthancPlugins.h	Tue Jun 30 17:19:26 2015 +0200
@@ -135,8 +135,6 @@
 
     IDatabaseWrapper& GetDatabase();
 
-    void Stop();
-
     const char* GetProperty(const char* plugin,
                             _OrthancPluginProperty property) const;