Mercurial > hg > orthanc
changeset 1436:0a3e3be59094
uncoupling of SignalChange for Lua scripts
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Tue, 30 Jun 2015 17:19:26 +0200 |
parents | 6406f5493d92 |
children | 02f5a3f5c0a0 |
files | OrthancServer/LuaScripting.cpp OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerIndexChange.h OrthancServer/main.cpp Plugins/Engine/OrthancPlugins.cpp Plugins/Engine/OrthancPlugins.h |
diffstat | 7 files changed, 84 insertions(+), 82 deletions(-) [+] |
line wrap: on
line diff
--- a/OrthancServer/LuaScripting.cpp Tue Jun 30 16:46:23 2015 +0200 +++ b/OrthancServer/LuaScripting.cpp Tue Jun 30 17:19:26 2015 +0200 @@ -280,7 +280,7 @@ Json::Value tags; - //if (context_.GetIndex().LookupResource(tags, change.GetPublicId(), change.GetResourceType())) + if (context_.GetIndex().LookupResource(tags, change.GetPublicId(), change.GetResourceType())) { boost::mutex::scoped_lock lock(mutex_); @@ -290,7 +290,7 @@ LuaFunctionCall call(lua_, name); call.PushString(change.GetPublicId()); - call.PushJson(tags); + call.PushJson(tags["MainDicomTags"]); call.Execute(); SubmitJob(std::string("Lua script: ") + name);
--- a/OrthancServer/ServerContext.cpp Tue Jun 30 16:46:23 2015 +0200 +++ b/OrthancServer/ServerContext.cpp Tue Jun 30 17:19:26 2015 +0200 @@ -67,6 +67,34 @@ namespace Orthanc { + void ServerContext::ChangeThread(ServerContext* that) + { + while (!that->done_) + { + std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(500)); + + if (obj.get() != NULL) + { + const ServerIndexChange& change = dynamic_cast<const ServerIndexChange&>(*obj.get()); + + for (ServerListeners::iterator it = that->listeners_.begin(); + it != that->listeners_.end(); ++it) + { + try + { + it->GetListener().SignalChange(change); + } + catch (OrthancException& e) + { + LOG(ERROR) << "Error in the " << it->GetDescription() + << " callback while signaling a change: " << e.What(); + } + } + } + } + } + + ServerContext::ServerContext(IDatabaseWrapper& database) : index_(*this, database), compressionEnabled_(false), @@ -75,6 +103,7 @@ scheduler_(Configuration::GetGlobalIntegerParameter("LimitJobs", 10)), lua_(*this), plugins_(NULL), + done_(false), queryRetrieveArchive_(Configuration::GetGlobalIntegerParameter("QueryRetrieveSize", 10)), defaultLocalAet_(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC")) { @@ -82,8 +111,23 @@ scu_.SetMillisecondsBeforeClose(s * 1000); // Milliseconds are expected here listeners_.push_back(ServerListener(lua_, "Lua")); + + changeThread_ = boost::thread(ChangeThread, this); } + + + ServerContext::~ServerContext() + { + done_ = true; + + if (changeThread_.joinable()) + { + changeThread_.join(); + } + } + + void ServerContext::SetCompressionEnabled(bool enabled) { if (enabled) @@ -360,18 +404,7 @@ void ServerContext::SignalChange(const ServerIndexChange& change) { - for (ServerListeners::iterator it = listeners_.begin(); it != listeners_.end(); ++it) - { - try - { - it->GetListener().SignalChange(change); - } - catch (OrthancException& e) - { - LOG(ERROR) << "Error in the " << it->GetDescription() - << " callback while signaling a change: " << e.What(); - } - } + pendingChanges_.Enqueue(change.Clone()); }
--- a/OrthancServer/ServerContext.h Tue Jun 30 16:46:23 2015 +0200 +++ b/OrthancServer/ServerContext.h Tue Jun 30 17:19:26 2015 +0200 @@ -32,6 +32,7 @@ #pragma once +#include "../Core/MultiThreading/SharedMessageQueue.h" #include "../Core/Cache/MemoryCache.h" #include "../Core/Cache/SharedArchive.h" #include "../Core/FileStorage/CompressedFileStorageAccessor.h" @@ -48,6 +49,8 @@ #include "ServerIndex.h" #include <boost/filesystem.hpp> +#include <boost/thread.hpp> + namespace Orthanc { @@ -100,6 +103,9 @@ typedef std::list<ServerListener> ServerListeners; + static void ChangeThread(ServerContext* that); + + ServerIndex index_; CompressedFileStorageAccessor accessor_; bool compressionEnabled_; @@ -114,6 +120,10 @@ OrthancPlugins* plugins_; ServerListeners listeners_; + bool done_; + SharedMessageQueue pendingChanges_; + boost::thread changeThread_; + SharedArchive queryRetrieveArchive_; std::string defaultLocalAet_; @@ -139,6 +149,8 @@ ServerContext(IDatabaseWrapper& database); + ~ServerContext(); + void SetStorageArea(IStorageArea& storage) { accessor_.SetStorageArea(storage);
--- a/OrthancServer/ServerIndexChange.h Tue Jun 30 16:46:23 2015 +0200 +++ b/OrthancServer/ServerIndexChange.h Tue Jun 30 17:19:26 2015 +0200 @@ -40,7 +40,7 @@ namespace Orthanc { - struct ServerIndexChange + struct ServerIndexChange : public IDynamicObject { private: int64_t seq_; @@ -74,6 +74,20 @@ { } + ServerIndexChange(const ServerIndexChange& other) + : seq_(other.seq_), + changeType_(other.changeType_), + resourceType_(other.resourceType_), + publicId_(other.publicId_), + date_(other.date_) + { + } + + ServerIndexChange* Clone() const + { + return new ServerIndexChange(*this); + } + int64_t GetSeq() const { return seq_;
--- a/OrthancServer/main.cpp Tue Jun 30 16:46:23 2015 +0200 +++ b/OrthancServer/main.cpp Tue Jun 30 17:19:26 2015 +0200 @@ -545,7 +545,6 @@ #if ENABLE_PLUGINS == 1 context->ResetPlugins(); - plugins.Stop(); plugins.ResetOrthancRestApi(); LOG(WARNING) << " Plugins have stopped"; #endif
--- a/Plugins/Engine/OrthancPlugins.cpp Tue Jun 30 16:46:23 2015 +0200 +++ b/Plugins/Engine/OrthancPlugins.cpp Tue Jun 30 17:19:26 2015 +0200 @@ -35,7 +35,6 @@ #include "../../Core/ChunkedBuffer.h" #include "../../Core/HttpServer/HttpOutput.h" #include "../../Core/ImageFormats/PngWriter.h" -#include "../../Core/MultiThreading/SharedMessageQueue.h" #include "../../Core/OrthancException.h" #include "../../Core/Toolbox.h" #include "../../OrthancServer/OrthancInitialization.h" @@ -43,7 +42,6 @@ #include "../../OrthancServer/ServerContext.h" #include "../../OrthancServer/ServerToolbox.h" -#include <boost/thread.hpp> #include <boost/regex.hpp> #include <glog/logging.h> @@ -141,33 +139,6 @@ } } }; - - - class PendingChange : public IDynamicObject - { - private: - OrthancPluginChangeType changeType_; - OrthancPluginResourceType resourceType_; - std::string publicId_; - - public: - PendingChange(const ServerIndexChange& change) - { - changeType_ = Convert(change.GetChangeType()); - resourceType_ = Convert(change.GetResourceType()); - publicId_ = change.GetPublicId(); - } - - void Submit(std::list<OrthancPluginOnChangeCallback>& callbacks) - { - for (std::list<OrthancPluginOnChangeCallback>::const_iterator - callback = callbacks.begin(); - callback != callbacks.end(); ++callback) - { - (*callback) (changeType_, resourceType_, publicId_.c_str()); - } - } - }; } @@ -191,9 +162,6 @@ bool hasStorageArea_; _OrthancPluginRegisterStorageArea storageArea_; boost::recursive_mutex callbackMutex_; - SharedMessageQueue pendingChanges_; - boost::thread changeThread_; - bool done_; Properties properties_; int argc_; char** argv_; @@ -203,28 +171,11 @@ context_(NULL), restApi_(NULL), hasStorageArea_(false), - done_(false), argc_(1), argv_(NULL) { memset(&storageArea_, 0, sizeof(storageArea_)); } - - - static void ChangeThread(PImpl* that) - { - while (!that->done_) - { - std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(500)); - - if (obj.get() != NULL) - { - boost::recursive_mutex::scoped_lock lock(that->callbackMutex_); - PendingChange& change = *dynamic_cast<PendingChange*>(obj.get()); - change.Submit(that->onChangeCallbacks_); - } - } - } }; @@ -254,7 +205,6 @@ { pimpl_.reset(new PImpl()); pimpl_->manager_.RegisterServiceProvider(*this); - pimpl_->changeThread_ = boost::thread(PImpl::ChangeThread, pimpl_.get()); } @@ -267,8 +217,6 @@ OrthancPlugins::~OrthancPlugins() { - Stop(); - for (PImpl::RestCallbacks::iterator it = pimpl_->restCallbacks_.begin(); it != pimpl_->restCallbacks_.end(); ++it) { @@ -278,17 +226,6 @@ } - void OrthancPlugins::Stop() - { - if (!pimpl_->done_) - { - pimpl_->done_ = true; - pimpl_->changeThread_.join(); - } - } - - - static void ArgumentsToPlugin(std::vector<const char*>& keys, std::vector<const char*>& values, const HttpHandler::Arguments& arguments) @@ -474,7 +411,16 @@ { try { - pimpl_->pendingChanges_.Enqueue(new PendingChange(change)); + boost::recursive_mutex::scoped_lock lock(pimpl_->callbackMutex_); + + for (std::list<OrthancPluginOnChangeCallback>::const_iterator + callback = pimpl_->onChangeCallbacks_.begin(); + callback != pimpl_->onChangeCallbacks_.end(); ++callback) + { + (*callback) (Convert(change.GetChangeType()), + Convert(change.GetResourceType()), + change.GetPublicId().c_str()); + } } catch (OrthancException&) {