changeset 1199:a843ee8bb903 db-changes

separated thread for change callbacks in plugins
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 23 Oct 2014 14:29:45 +0200
parents 1169528a9a5f
children 1e1390665639
files OrthancServer/main.cpp Plugins/Engine/OrthancPlugins.cpp Plugins/Engine/OrthancPlugins.h Plugins/Samples/Basic/Plugin.c Resources/Configuration.json
diffstat 5 files changed, 168 insertions(+), 97 deletions(-) [+]
line wrap: on
line diff
--- a/OrthancServer/main.cpp	Thu Oct 23 13:52:01 2014 +0200
+++ b/OrthancServer/main.cpp	Thu Oct 23 14:29:45 2014 +0200
@@ -585,6 +585,10 @@
     // We're done
     LOG(WARNING) << "Orthanc is stopping";
 
+#if ENABLE_PLUGINS == 1
+    orthancPlugins.Stop();
+#endif
+
     dicomServer.Stop();
     httpServer.Stop();
   }
--- a/Plugins/Engine/OrthancPlugins.cpp	Thu Oct 23 13:52:01 2014 +0200
+++ b/Plugins/Engine/OrthancPlugins.cpp	Thu Oct 23 14:29:45 2014 +0200
@@ -39,6 +39,7 @@
 #include "../../Core/ImageFormats/PngWriter.h"
 #include "../../OrthancServer/ServerToolbox.h"
 #include "../../OrthancServer/OrthancInitialization.h"
+#include "../../Core/MultiThreading/SharedMessageQueue.h"
 
 #include <boost/thread.hpp>
 #include <boost/regex.hpp> 
@@ -46,87 +47,6 @@
 
 namespace Orthanc
 {
-  namespace
-  {
-    // Anonymous namespace to avoid clashes between compilation modules
-    class StringHttpOutput : public IHttpOutputStream
-    {
-    private:
-      ChunkedBuffer buffer_;
-
-    public:
-      void GetOutput(std::string& output)
-      {
-        buffer_.Flatten(output);
-      }
-
-      virtual void OnHttpStatusReceived(HttpStatus status)
-      {
-        if (status != HttpStatus_200_Ok)
-        {
-          throw OrthancException(ErrorCode_BadRequest);
-        }
-      }
-
-      virtual void Send(bool isHeader, const void* buffer, size_t length)
-      {
-        if (!isHeader)
-        {
-          buffer_.AddChunk(reinterpret_cast<const char*>(buffer), length);
-        }
-      }
-    };
-  }
-
-
-
-  struct OrthancPlugins::PImpl
-  {
-    typedef std::pair<boost::regex*, OrthancPluginRestCallback> RestCallback;
-    typedef std::list<RestCallback>  RestCallbacks;
-    typedef std::list<OrthancPluginOnStoredInstanceCallback>  OnStoredCallbacks;
-    typedef std::list<OrthancPluginOnChangeCallback>  OnChangeCallbacks;
-
-    ServerContext& context_;
-    RestCallbacks restCallbacks_;
-    OrthancRestApi* restApi_;
-    OnStoredCallbacks  onStoredCallbacks_;
-    OnChangeCallbacks  onChangeCallbacks_;
-    bool hasStorageArea_;
-    _OrthancPluginRegisterStorageArea storageArea_;
-    boost::mutex callbackMutex_;
-
-    PImpl(ServerContext& context) : 
-      context_(context), 
-      restApi_(NULL),
-      hasStorageArea_(false)
-    {
-      memset(&storageArea_, 0, sizeof(storageArea_));
-    }
-  };
-
-
-  static char* CopyString(const std::string& str)
-  {
-    char *result = reinterpret_cast<char*>(malloc(str.size() + 1));
-    if (result == NULL)
-    {
-      throw OrthancException(ErrorCode_NotEnoughMemory);
-    }
-
-    if (str.size() == 0)
-    {
-      result[0] = '\0';
-    }
-    else
-    {
-      memcpy(result, &str[0], str.size() + 1);
-    }
-
-    return result;
-  }
-
-
   static OrthancPluginResourceType Convert(ResourceType type)
   {
     switch (type)
@@ -189,14 +109,146 @@
   }
 
 
+  namespace
+  {
+    // Anonymous namespace to avoid clashes between compilation modules
+    class StringHttpOutput : public IHttpOutputStream
+    {
+    private:
+      ChunkedBuffer buffer_;
+
+    public:
+      void GetOutput(std::string& output)
+      {
+        buffer_.Flatten(output);
+      }
+
+      virtual void OnHttpStatusReceived(HttpStatus status)
+      {
+        if (status != HttpStatus_200_Ok)
+        {
+          throw OrthancException(ErrorCode_BadRequest);
+        }
+      }
+
+      virtual void Send(bool isHeader, const void* buffer, size_t length)
+      {
+        if (!isHeader)
+        {
+          buffer_.AddChunk(reinterpret_cast<const char*>(buffer), length);
+        }
+      }
+    };
+
+
+    class PendingChange : public IDynamicObject
+    {
+    private:
+      OrthancPluginChangeType changeType_;
+      OrthancPluginResourceType resourceType_;
+      std::string publicId_;
+
+    public:
+      PendingChange(const ServerIndexChange& change)
+      {
+        changeType_ = Convert(change.GetChangeType());
+        resourceType_ = Convert(change.GetResourceType());
+        publicId_ = change.GetPublicId();
+      }
+
+      void Submit(std::list<OrthancPluginOnChangeCallback>& callbacks)
+      {
+        for (std::list<OrthancPluginOnChangeCallback>::const_iterator 
+               callback = callbacks.begin(); 
+             callback != callbacks.end(); ++callback)
+        {
+          (*callback) (changeType_, resourceType_, publicId_.c_str());
+        }
+      }
+    };
+  }
+
+
+
+  struct OrthancPlugins::PImpl
+  {
+    typedef std::pair<boost::regex*, OrthancPluginRestCallback> RestCallback;
+    typedef std::list<RestCallback>  RestCallbacks;
+    typedef std::list<OrthancPluginOnStoredInstanceCallback>  OnStoredCallbacks;
+    typedef std::list<OrthancPluginOnChangeCallback>  OnChangeCallbacks;
+
+    ServerContext& context_;
+    RestCallbacks restCallbacks_;
+    OrthancRestApi* restApi_;
+    OnStoredCallbacks  onStoredCallbacks_;
+    OnChangeCallbacks  onChangeCallbacks_;
+    bool hasStorageArea_;
+    _OrthancPluginRegisterStorageArea storageArea_;
+    boost::mutex callbackMutex_;
+    SharedMessageQueue  pendingChanges_;
+    boost::thread  changeThread_;
+    bool done_;
+
+    PImpl(ServerContext& context) : 
+      context_(context), 
+      restApi_(NULL),
+      hasStorageArea_(false),
+      done_(false)
+    {
+      memset(&storageArea_, 0, sizeof(storageArea_));
+    }
+
+
+    static void ChangeThread(PImpl* that)
+    {
+      while (!that->done_)
+      {
+        std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(500));
+        
+        if (obj.get() != NULL)
+        {
+          boost::mutex::scoped_lock lock(that->callbackMutex_);
+          PendingChange& change = *dynamic_cast<PendingChange*>(obj.get());
+          change.Submit(that->onChangeCallbacks_);
+        }
+      }
+    }
+  };
+
+
+  
+  static char* CopyString(const std::string& str)
+  {
+    char *result = reinterpret_cast<char*>(malloc(str.size() + 1));
+    if (result == NULL)
+    {
+      throw OrthancException(ErrorCode_NotEnoughMemory);
+    }
+
+    if (str.size() == 0)
+    {
+      result[0] = '\0';
+    }
+    else
+    {
+      memcpy(result, &str[0], str.size() + 1);
+    }
+
+    return result;
+  }
+
+
   OrthancPlugins::OrthancPlugins(ServerContext& context)
   {
     pimpl_.reset(new PImpl(context));
+    pimpl_->changeThread_ = boost::thread(PImpl::ChangeThread, pimpl_.get());
   }
 
   
   OrthancPlugins::~OrthancPlugins()
   {
+    Stop();
+
     for (PImpl::RestCallbacks::iterator it = pimpl_->restCallbacks_.begin(); 
          it != pimpl_->restCallbacks_.end(); ++it)
     {
@@ -206,6 +258,17 @@
   }
 
 
+  void OrthancPlugins::Stop()
+  {
+    if (!pimpl_->done_)
+    {
+      pimpl_->done_ = true;
+      pimpl_->changeThread_.join();
+    }
+  }
+
+
+
   static void ArgumentsToPlugin(std::vector<const char*>& keys,
                                 std::vector<const char*>& values,
                                 const HttpHandler::Arguments& arguments)
@@ -367,28 +430,15 @@
 
   void OrthancPlugins::SignalChange(const ServerIndexChange& change)
   {
-    OrthancPluginChangeType c;
-    OrthancPluginResourceType r;
-
     try
     {
-      c = Convert(change.GetChangeType());
-      r = Convert(change.GetResourceType());
+      pimpl_->pendingChanges_.Enqueue(new PendingChange(change));
     }
     catch (OrthancException&)
     {
       // This change type or resource type is not supported by the plugin SDK
       return;
     }
-
-    boost::mutex::scoped_lock lock(pimpl_->callbackMutex_);
-
-    for (PImpl::OnChangeCallbacks::const_iterator
-           callback = pimpl_->onChangeCallbacks_.begin(); 
-         callback != pimpl_->onChangeCallbacks_.end(); ++callback)
-    {
-      (*callback) (c, r, change.GetPublicId().c_str());
-    }
   }
 
 
@@ -683,7 +733,7 @@
 
 
   void OrthancPlugins::LookupResource(_OrthancPluginService service,
-                                          const void* parameters)
+                                      const void* parameters)
   {
     const _OrthancPluginRetrieveDynamicString& p = 
       *reinterpret_cast<const _OrthancPluginRetrieveDynamicString*>(parameters);
--- a/Plugins/Engine/OrthancPlugins.h	Thu Oct 23 13:52:01 2014 +0200
+++ b/Plugins/Engine/OrthancPlugins.h	Thu Oct 23 14:29:45 2014 +0200
@@ -107,5 +107,7 @@
     bool HasStorageArea() const;
 
     IStorageArea* GetStorageArea();
+
+    void Stop();
   };
 }
--- a/Plugins/Samples/Basic/Plugin.c	Thu Oct 23 13:52:01 2014 +0200
+++ b/Plugins/Samples/Basic/Plugin.c	Thu Oct 23 14:29:45 2014 +0200
@@ -255,8 +255,23 @@
                                              const char* resourceId)
 {
   char info[1024];
+  OrthancPluginMemoryBuffer tmp;
+
   sprintf(info, "Change %d on resource %s of type %d", changeType, resourceId, resourceType);
   OrthancPluginLogWarning(context, info);
+
+  if (changeType == OrthancPluginChangeType_NewInstance)
+  {
+    sprintf(info, "/instances/%s/metadata/ReceptionDate", resourceId);
+    if (OrthancPluginRestApiGet(context, &tmp, info) == 0)
+    {
+      sprintf(info, "  Instance %s comes from the anonymization of instance %s", 
+              resourceId, (const char*) tmp.data);
+      OrthancPluginLogWarning(context, info);
+      OrthancPluginFreeMemoryBuffer(context, &tmp);
+    }
+  }
+
   return 0;
 }
 
--- a/Resources/Configuration.json	Thu Oct 23 13:52:01 2014 +0200
+++ b/Resources/Configuration.json	Thu Oct 23 14:29:45 2014 +0200
@@ -34,7 +34,7 @@
   ],
 
   // List of paths to the plugins that are to be loaded into this
-  // instance of Orthanc (e.g. "/libPluginTest.so" for Linux, or
+  // instance of Orthanc (e.g. "./libPluginTest.so" for Linux, or
   // "./PluginTest.dll" for Windows).
   "Plugins" : [
   ],