Mercurial > hg > orthanc
changeset 1199:a843ee8bb903 db-changes
separated thread for change callbacks in plugins
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 23 Oct 2014 14:29:45 +0200 |
parents | 1169528a9a5f |
children | 1e1390665639 |
files | OrthancServer/main.cpp Plugins/Engine/OrthancPlugins.cpp Plugins/Engine/OrthancPlugins.h Plugins/Samples/Basic/Plugin.c Resources/Configuration.json |
diffstat | 5 files changed, 168 insertions(+), 97 deletions(-) [+] |
line wrap: on
line diff
--- a/OrthancServer/main.cpp Thu Oct 23 13:52:01 2014 +0200 +++ b/OrthancServer/main.cpp Thu Oct 23 14:29:45 2014 +0200 @@ -585,6 +585,10 @@ // We're done LOG(WARNING) << "Orthanc is stopping"; +#if ENABLE_PLUGINS == 1 + orthancPlugins.Stop(); +#endif + dicomServer.Stop(); httpServer.Stop(); }
--- a/Plugins/Engine/OrthancPlugins.cpp Thu Oct 23 13:52:01 2014 +0200 +++ b/Plugins/Engine/OrthancPlugins.cpp Thu Oct 23 14:29:45 2014 +0200 @@ -39,6 +39,7 @@ #include "../../Core/ImageFormats/PngWriter.h" #include "../../OrthancServer/ServerToolbox.h" #include "../../OrthancServer/OrthancInitialization.h" +#include "../../Core/MultiThreading/SharedMessageQueue.h" #include <boost/thread.hpp> #include <boost/regex.hpp> @@ -46,87 +47,6 @@ namespace Orthanc { - namespace - { - // Anonymous namespace to avoid clashes between compilation modules - class StringHttpOutput : public IHttpOutputStream - { - private: - ChunkedBuffer buffer_; - - public: - void GetOutput(std::string& output) - { - buffer_.Flatten(output); - } - - virtual void OnHttpStatusReceived(HttpStatus status) - { - if (status != HttpStatus_200_Ok) - { - throw OrthancException(ErrorCode_BadRequest); - } - } - - virtual void Send(bool isHeader, const void* buffer, size_t length) - { - if (!isHeader) - { - buffer_.AddChunk(reinterpret_cast<const char*>(buffer), length); - } - } - }; - } - - - - struct OrthancPlugins::PImpl - { - typedef std::pair<boost::regex*, OrthancPluginRestCallback> RestCallback; - typedef std::list<RestCallback> RestCallbacks; - typedef std::list<OrthancPluginOnStoredInstanceCallback> OnStoredCallbacks; - typedef std::list<OrthancPluginOnChangeCallback> OnChangeCallbacks; - - ServerContext& context_; - RestCallbacks restCallbacks_; - OrthancRestApi* restApi_; - OnStoredCallbacks onStoredCallbacks_; - OnChangeCallbacks onChangeCallbacks_; - bool hasStorageArea_; - _OrthancPluginRegisterStorageArea storageArea_; - boost::mutex callbackMutex_; - - PImpl(ServerContext& context) : - context_(context), - restApi_(NULL), - hasStorageArea_(false) - { - memset(&storageArea_, 0, sizeof(storageArea_)); - } - }; - - - static char* CopyString(const std::string& str) - { - char *result = reinterpret_cast<char*>(malloc(str.size() + 1)); - if (result == NULL) - { - throw OrthancException(ErrorCode_NotEnoughMemory); - } - - if (str.size() == 0) - { - result[0] = '\0'; - } - else - { - memcpy(result, &str[0], str.size() + 1); - } - - return result; - } - - static OrthancPluginResourceType Convert(ResourceType type) { switch (type) @@ -189,14 +109,146 @@ } + namespace + { + // Anonymous namespace to avoid clashes between compilation modules + class StringHttpOutput : public IHttpOutputStream + { + private: + ChunkedBuffer buffer_; + + public: + void GetOutput(std::string& output) + { + buffer_.Flatten(output); + } + + virtual void OnHttpStatusReceived(HttpStatus status) + { + if (status != HttpStatus_200_Ok) + { + throw OrthancException(ErrorCode_BadRequest); + } + } + + virtual void Send(bool isHeader, const void* buffer, size_t length) + { + if (!isHeader) + { + buffer_.AddChunk(reinterpret_cast<const char*>(buffer), length); + } + } + }; + + + class PendingChange : public IDynamicObject + { + private: + OrthancPluginChangeType changeType_; + OrthancPluginResourceType resourceType_; + std::string publicId_; + + public: + PendingChange(const ServerIndexChange& change) + { + changeType_ = Convert(change.GetChangeType()); + resourceType_ = Convert(change.GetResourceType()); + publicId_ = change.GetPublicId(); + } + + void Submit(std::list<OrthancPluginOnChangeCallback>& callbacks) + { + for (std::list<OrthancPluginOnChangeCallback>::const_iterator + callback = callbacks.begin(); + callback != callbacks.end(); ++callback) + { + (*callback) (changeType_, resourceType_, publicId_.c_str()); + } + } + }; + } + + + + struct OrthancPlugins::PImpl + { + typedef std::pair<boost::regex*, OrthancPluginRestCallback> RestCallback; + typedef std::list<RestCallback> RestCallbacks; + typedef std::list<OrthancPluginOnStoredInstanceCallback> OnStoredCallbacks; + typedef std::list<OrthancPluginOnChangeCallback> OnChangeCallbacks; + + ServerContext& context_; + RestCallbacks restCallbacks_; + OrthancRestApi* restApi_; + OnStoredCallbacks onStoredCallbacks_; + OnChangeCallbacks onChangeCallbacks_; + bool hasStorageArea_; + _OrthancPluginRegisterStorageArea storageArea_; + boost::mutex callbackMutex_; + SharedMessageQueue pendingChanges_; + boost::thread changeThread_; + bool done_; + + PImpl(ServerContext& context) : + context_(context), + restApi_(NULL), + hasStorageArea_(false), + done_(false) + { + memset(&storageArea_, 0, sizeof(storageArea_)); + } + + + static void ChangeThread(PImpl* that) + { + while (!that->done_) + { + std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(500)); + + if (obj.get() != NULL) + { + boost::mutex::scoped_lock lock(that->callbackMutex_); + PendingChange& change = *dynamic_cast<PendingChange*>(obj.get()); + change.Submit(that->onChangeCallbacks_); + } + } + } + }; + + + + static char* CopyString(const std::string& str) + { + char *result = reinterpret_cast<char*>(malloc(str.size() + 1)); + if (result == NULL) + { + throw OrthancException(ErrorCode_NotEnoughMemory); + } + + if (str.size() == 0) + { + result[0] = '\0'; + } + else + { + memcpy(result, &str[0], str.size() + 1); + } + + return result; + } + + OrthancPlugins::OrthancPlugins(ServerContext& context) { pimpl_.reset(new PImpl(context)); + pimpl_->changeThread_ = boost::thread(PImpl::ChangeThread, pimpl_.get()); } OrthancPlugins::~OrthancPlugins() { + Stop(); + for (PImpl::RestCallbacks::iterator it = pimpl_->restCallbacks_.begin(); it != pimpl_->restCallbacks_.end(); ++it) { @@ -206,6 +258,17 @@ } + void OrthancPlugins::Stop() + { + if (!pimpl_->done_) + { + pimpl_->done_ = true; + pimpl_->changeThread_.join(); + } + } + + + static void ArgumentsToPlugin(std::vector<const char*>& keys, std::vector<const char*>& values, const HttpHandler::Arguments& arguments) @@ -367,28 +430,15 @@ void OrthancPlugins::SignalChange(const ServerIndexChange& change) { - OrthancPluginChangeType c; - OrthancPluginResourceType r; - try { - c = Convert(change.GetChangeType()); - r = Convert(change.GetResourceType()); + pimpl_->pendingChanges_.Enqueue(new PendingChange(change)); } catch (OrthancException&) { // This change type or resource type is not supported by the plugin SDK return; } - - boost::mutex::scoped_lock lock(pimpl_->callbackMutex_); - - for (PImpl::OnChangeCallbacks::const_iterator - callback = pimpl_->onChangeCallbacks_.begin(); - callback != pimpl_->onChangeCallbacks_.end(); ++callback) - { - (*callback) (c, r, change.GetPublicId().c_str()); - } } @@ -683,7 +733,7 @@ void OrthancPlugins::LookupResource(_OrthancPluginService service, - const void* parameters) + const void* parameters) { const _OrthancPluginRetrieveDynamicString& p = *reinterpret_cast<const _OrthancPluginRetrieveDynamicString*>(parameters);
--- a/Plugins/Engine/OrthancPlugins.h Thu Oct 23 13:52:01 2014 +0200 +++ b/Plugins/Engine/OrthancPlugins.h Thu Oct 23 14:29:45 2014 +0200 @@ -107,5 +107,7 @@ bool HasStorageArea() const; IStorageArea* GetStorageArea(); + + void Stop(); }; }
--- a/Plugins/Samples/Basic/Plugin.c Thu Oct 23 13:52:01 2014 +0200 +++ b/Plugins/Samples/Basic/Plugin.c Thu Oct 23 14:29:45 2014 +0200 @@ -255,8 +255,23 @@ const char* resourceId) { char info[1024]; + OrthancPluginMemoryBuffer tmp; + sprintf(info, "Change %d on resource %s of type %d", changeType, resourceId, resourceType); OrthancPluginLogWarning(context, info); + + if (changeType == OrthancPluginChangeType_NewInstance) + { + sprintf(info, "/instances/%s/metadata/ReceptionDate", resourceId); + if (OrthancPluginRestApiGet(context, &tmp, info) == 0) + { + sprintf(info, " Instance %s comes from the anonymization of instance %s", + resourceId, (const char*) tmp.data); + OrthancPluginLogWarning(context, info); + OrthancPluginFreeMemoryBuffer(context, &tmp); + } + } + return 0; }
--- a/Resources/Configuration.json Thu Oct 23 13:52:01 2014 +0200 +++ b/Resources/Configuration.json Thu Oct 23 14:29:45 2014 +0200 @@ -34,7 +34,7 @@ ], // List of paths to the plugins that are to be loaded into this - // instance of Orthanc (e.g. "/libPluginTest.so" for Linux, or + // instance of Orthanc (e.g. "./libPluginTest.so" for Linux, or // "./PluginTest.dll" for Windows). "Plugins" : [ ],