Mercurial > hg > orthanc-stone
changeset 748:ab236bb5dbc7
ThreadedOracle
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 22 May 2019 14:46:26 +0200 |
parents | d716bfb3e07c |
children | f3a7092ed10e 92c400a09f1b |
files | Applications/Generic/NativeStoneApplicationRunner.cpp Applications/Generic/NativeStoneApplicationRunner.h Framework/Oracle/ThreadedOracle.cpp Framework/Oracle/ThreadedOracle.h Resources/CMake/OrthancStoneConfiguration.cmake Samples/Sdl/Loader.cpp |
diffstat | 6 files changed, 612 insertions(+), 506 deletions(-) [+] |
line wrap: on
line diff
--- a/Applications/Generic/NativeStoneApplicationRunner.cpp Wed May 22 12:48:57 2019 +0200 +++ b/Applications/Generic/NativeStoneApplicationRunner.cpp Wed May 22 14:46:26 2019 +0200 @@ -19,8 +19,8 @@ **/ -#if ORTHANC_ENABLE_NATIVE != 1 -#error this file shall be included only with the ORTHANC_ENABLE_NATIVE set to 1 +#if ORTHANC_ENABLE_THREADS != 1 +#error this file shall be included only with the ORTHANC_ENABLE_THREADS set to 1 #endif #include "NativeStoneApplicationRunner.h"
--- a/Applications/Generic/NativeStoneApplicationRunner.h Wed May 22 12:48:57 2019 +0200 +++ b/Applications/Generic/NativeStoneApplicationRunner.h Wed May 22 14:46:26 2019 +0200 @@ -23,8 +23,8 @@ #include "../IStoneApplication.h" -#if ORTHANC_ENABLE_NATIVE != 1 -#error this file shall be included only with the ORTHANC_ENABLE_NATIVE set to 1 +#if ORTHANC_ENABLE_THREADS != 1 +#error this file shall be included only with the ORTHANC_ENABLE_THREADS set to 1 #endif namespace OrthancStone
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Framework/Oracle/ThreadedOracle.cpp Wed May 22 14:46:26 2019 +0200 @@ -0,0 +1,507 @@ +/** + * Stone of Orthanc + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2019 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "ThreadedOracle.h" + +#include "GetOrthancImageCommand.h" +#include "GetOrthancWebViewerJpegCommand.h" +#include "OrthancRestApiCommand.h" +#include "SleepOracleCommand.h" +#include "OracleCommandExceptionMessage.h" + +#include <Core/Compression/GzipCompressor.h> +#include <Core/HttpClient.h> +#include <Core/OrthancException.h> +#include <Core/Toolbox.h> + + +namespace OrthancStone +{ + class ThreadedOracle::Item : public Orthanc::IDynamicObject + { + private: + const IObserver& receiver_; + std::auto_ptr<IOracleCommand> command_; + + public: + Item(const IObserver& receiver, + IOracleCommand* command) : + receiver_(receiver), + command_(command) + { + if (command == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); + } + } + + const IObserver& GetReceiver() const + { + return receiver_; + } + + IOracleCommand& GetCommand() + { + assert(command_.get() != NULL); + return *command_; + } + }; + + + class ThreadedOracle::SleepingCommands : public boost::noncopyable + { + private: + class Item + { + private: + const IObserver& receiver_; + std::auto_ptr<SleepOracleCommand> command_; + boost::posix_time::ptime expiration_; + + public: + Item(const IObserver& receiver, + SleepOracleCommand* command) : + receiver_(receiver), + command_(command) + { + if (command == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); + } + + expiration_ = (boost::posix_time::second_clock::local_time() + + boost::posix_time::milliseconds(command_->GetDelay())); + } + + const boost::posix_time::ptime& GetExpirationTime() const + { + return expiration_; + } + + void Awake(IMessageEmitter& emitter) + { + assert(command_.get() != NULL); + + SleepOracleCommand::TimeoutMessage message(*command_); + emitter.EmitMessage(receiver_, message); + } + }; + + typedef std::list<Item*> Content; + + boost::mutex mutex_; + Content content_; + + public: + ~SleepingCommands() + { + for (Content::iterator it = content_.begin(); it != content_.end(); ++it) + { + if (*it != NULL) + { + delete *it; + } + } + } + + void Add(const IObserver& receiver, + SleepOracleCommand* command) // Takes ownership + { + boost::mutex::scoped_lock lock(mutex_); + + content_.push_back(new Item(receiver, command)); + } + + void AwakeExpired(IMessageEmitter& emitter) + { + boost::mutex::scoped_lock lock(mutex_); + + const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); + + Content stillSleeping; + + for (Content::iterator it = content_.begin(); it != content_.end(); ++it) + { + if (*it != NULL && + (*it)->GetExpirationTime() <= now) + { + (*it)->Awake(emitter); + delete *it; + *it = NULL; + } + else + { + stillSleeping.push_back(*it); + } + } + + // Compact the still-sleeping commands + content_ = stillSleeping; + } + }; + + + static void CopyHttpHeaders(Orthanc::HttpClient& client, + const Orthanc::HttpClient::HttpHeaders& headers) + { + for (Orthanc::HttpClient::HttpHeaders::const_iterator + it = headers.begin(); it != headers.end(); it++ ) + { + client.AddHeader(it->first, it->second); + } + } + + + static void DecodeAnswer(std::string& answer, + const Orthanc::HttpClient::HttpHeaders& headers) + { + Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None; + + for (Orthanc::HttpClient::HttpHeaders::const_iterator it = headers.begin(); + it != headers.end(); ++it) + { + std::string s; + Orthanc::Toolbox::ToLowerCase(s, it->first); + + if (s == "content-encoding") + { + if (it->second == "gzip") + { + contentEncoding = Orthanc::HttpCompression_Gzip; + } + else + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol, + "Unsupported HTTP Content-Encoding: " + it->second); + } + + break; + } + } + + if (contentEncoding == Orthanc::HttpCompression_Gzip) + { + std::string compressed; + answer.swap(compressed); + + Orthanc::GzipCompressor compressor; + compressor.Uncompress(answer, compressed.c_str(), compressed.size()); + } + } + + + static void Execute(IMessageEmitter& emitter, + const Orthanc::WebServiceParameters& orthanc, + const IObserver& receiver, + const OrthancRestApiCommand& command) + { + Orthanc::HttpClient client(orthanc, command.GetUri()); + client.SetMethod(command.GetMethod()); + client.SetTimeout(command.GetTimeout()); + + CopyHttpHeaders(client, command.GetHttpHeaders()); + + if (command.GetMethod() == Orthanc::HttpMethod_Post || + command.GetMethod() == Orthanc::HttpMethod_Put) + { + client.SetBody(command.GetBody()); + } + + std::string answer; + Orthanc::HttpClient::HttpHeaders answerHeaders; + client.ApplyAndThrowException(answer, answerHeaders); + + DecodeAnswer(answer, answerHeaders); + + OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer); + emitter.EmitMessage(receiver, message); + } + + + static void Execute(IMessageEmitter& emitter, + const Orthanc::WebServiceParameters& orthanc, + const IObserver& receiver, + const GetOrthancImageCommand& command) + { + Orthanc::HttpClient client(orthanc, command.GetUri()); + client.SetTimeout(command.GetTimeout()); + + CopyHttpHeaders(client, command.GetHttpHeaders()); + + std::string answer; + Orthanc::HttpClient::HttpHeaders answerHeaders; + client.ApplyAndThrowException(answer, answerHeaders); + + DecodeAnswer(answer, answerHeaders); + + command.ProcessHttpAnswer(emitter, receiver, answer, answerHeaders); + } + + + static void Execute(IMessageEmitter& emitter, + const Orthanc::WebServiceParameters& orthanc, + const IObserver& receiver, + const GetOrthancWebViewerJpegCommand& command) + { + Orthanc::HttpClient client(orthanc, command.GetUri()); + client.SetTimeout(command.GetTimeout()); + + CopyHttpHeaders(client, command.GetHttpHeaders()); + + std::string answer; + Orthanc::HttpClient::HttpHeaders answerHeaders; + client.ApplyAndThrowException(answer, answerHeaders); + + DecodeAnswer(answer, answerHeaders); + + command.ProcessHttpAnswer(emitter, receiver, answer); + } + + + void ThreadedOracle::Step() + { + std::auto_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100)); + + if (object.get() != NULL) + { + Item& item = dynamic_cast<Item&>(*object); + + try + { + switch (item.GetCommand().GetType()) + { + case IOracleCommand::Type_Sleep: + { + SleepOracleCommand& command = dynamic_cast<SleepOracleCommand&>(item.GetCommand()); + + std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay())); + + if (command.HasPayload()) + { + copy->SetPayload(command.ReleasePayload()); + } + + sleepingCommands_->Add(item.GetReceiver(), copy.release()); + + break; + } + + case IOracleCommand::Type_OrthancRestApi: + Execute(emitter_, orthanc_, item.GetReceiver(), + dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand())); + break; + + case IOracleCommand::Type_GetOrthancImage: + Execute(emitter_, orthanc_, item.GetReceiver(), + dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand())); + break; + + case IOracleCommand::Type_GetOrthancWebViewerJpeg: + Execute(emitter_, orthanc_, item.GetReceiver(), + dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand())); + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); + } + } + catch (Orthanc::OrthancException& e) + { + LOG(ERROR) << "Exception within the oracle: " << e.What(); + emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e)); + } + catch (...) + { + LOG(ERROR) << "Threaded exception within the oracle"; + emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage + (item.GetCommand(), Orthanc::ErrorCode_InternalError)); + } + } + } + + + void ThreadedOracle::Worker(ThreadedOracle* that) + { + assert(that != NULL); + + for (;;) + { + { + boost::mutex::scoped_lock lock(that->mutex_); + if (that->state_ != State_Running) + { + return; + } + } + + that->Step(); + } + } + + + void ThreadedOracle::SleepingWorker(ThreadedOracle* that) + { + assert(that != NULL); + + for (;;) + { + { + boost::mutex::scoped_lock lock(that->mutex_); + if (that->state_ != State_Running) + { + return; + } + } + + that->sleepingCommands_->AwakeExpired(that->emitter_); + + boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_)); + } + } + + + void ThreadedOracle::StopInternal() + { + { + boost::mutex::scoped_lock lock(mutex_); + + if (state_ == State_Setup || + state_ == State_Stopped) + { + return; + } + else + { + state_ = State_Stopped; + } + } + + if (sleepingWorker_.joinable()) + { + sleepingWorker_.join(); + } + + for (size_t i = 0; i < workers_.size(); i++) + { + if (workers_[i] != NULL) + { + if (workers_[i]->joinable()) + { + workers_[i]->join(); + } + + delete workers_[i]; + } + } + } + + + ThreadedOracle::ThreadedOracle(IMessageEmitter& emitter) : + emitter_(emitter), + state_(State_Setup), + workers_(4), + sleepingCommands_(new SleepingCommands), + sleepingTimeResolution_(50) // By default, time resolution of 50ms + { + } + + + void ThreadedOracle::SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc) + { + boost::mutex::scoped_lock lock(mutex_); + + if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + orthanc_ = orthanc; + } + } + + + void ThreadedOracle::SetWorkersCount(unsigned int count) + { + boost::mutex::scoped_lock lock(mutex_); + + if (count <= 0) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + else if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + workers_.resize(count); + } + } + + + void ThreadedOracle::SetSleepingTimeResolution(unsigned int milliseconds) + { + boost::mutex::scoped_lock lock(mutex_); + + if (milliseconds <= 0) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + else if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + sleepingTimeResolution_ = milliseconds; + } + } + + + void ThreadedOracle::Start() + { + boost::mutex::scoped_lock lock(mutex_); + + if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + state_ = State_Running; + + for (unsigned int i = 0; i < workers_.size(); i++) + { + workers_[i] = new boost::thread(Worker, this); + } + + sleepingWorker_ = boost::thread(SleepingWorker, this); + } + } + + + void ThreadedOracle::Schedule(const IObserver& receiver, + IOracleCommand* command) + { + queue_.Enqueue(new Item(receiver, command)); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Framework/Oracle/ThreadedOracle.h Wed May 22 14:46:26 2019 +0200 @@ -0,0 +1,96 @@ +/** + * Stone of Orthanc + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2019 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#if !defined(ORTHANC_ENABLE_THREADS) +# error The macro ORTHANC_ENABLE_THREADS must be defined +#endif + +#if ORTHANC_ENABLE_THREADS != 1 +# error This file can only compiled for native targets +#endif + +#include "../Messages/IMessageEmitter.h" +#include "IOracle.h" + +#include <Core/WebServiceParameters.h> +#include <Core/MultiThreading/SharedMessageQueue.h> + + +namespace OrthancStone +{ + class ThreadedOracle : public IOracle + { + private: + enum State + { + State_Setup, + State_Running, + State_Stopped + }; + + class Item; + class SleepingCommands; + + IMessageEmitter& emitter_; + Orthanc::WebServiceParameters orthanc_; + Orthanc::SharedMessageQueue queue_; + State state_; + boost::mutex mutex_; + std::vector<boost::thread*> workers_; + boost::shared_ptr<SleepingCommands> sleepingCommands_; + boost::thread sleepingWorker_; + unsigned int sleepingTimeResolution_; + + void Step(); + + static void Worker(ThreadedOracle* that); + + static void SleepingWorker(ThreadedOracle* that); + + void StopInternal(); + + public: + ThreadedOracle(IMessageEmitter& emitter); + + virtual ~ThreadedOracle() + { + StopInternal(); + } + + void SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc); + + void SetWorkersCount(unsigned int count); + + void SetSleepingTimeResolution(unsigned int milliseconds); + + void Start(); + + void Stop() + { + StopInternal(); + } + + virtual void Schedule(const IObserver& receiver, + IOracleCommand* command); + }; +}
--- a/Resources/CMake/OrthancStoneConfiguration.cmake Wed May 22 12:48:57 2019 +0200 +++ b/Resources/CMake/OrthancStoneConfiguration.cmake Wed May 22 14:46:26 2019 +0200 @@ -109,7 +109,7 @@ message("SDL is enabled") include(${CMAKE_CURRENT_LIST_DIR}/SdlConfiguration.cmake) add_definitions( - -DORTHANC_ENABLE_NATIVE=1 + -DORTHANC_ENABLE_THREADS=1 -DORTHANC_ENABLE_QT=0 -DORTHANC_ENABLE_SDL=1 ) @@ -117,7 +117,7 @@ message("QT is enabled") include(${CMAKE_CURRENT_LIST_DIR}/QtConfiguration.cmake) add_definitions( - -DORTHANC_ENABLE_NATIVE=1 + -DORTHANC_ENABLE_THREADS=1 -DORTHANC_ENABLE_QT=1 -DORTHANC_ENABLE_SDL=0 ) @@ -127,7 +127,7 @@ add_definitions( -DORTHANC_ENABLE_SDL=0 -DORTHANC_ENABLE_QT=0 - -DORTHANC_ENABLE_NATIVE=0 + -DORTHANC_ENABLE_THREADS=0 ) endif() @@ -433,6 +433,7 @@ ${ORTHANC_STONE_ROOT}/Framework/Oracle/GetOrthancWebViewerJpegCommand.cpp ${ORTHANC_STONE_ROOT}/Framework/Oracle/OracleCommandWithPayload.cpp ${ORTHANC_STONE_ROOT}/Framework/Oracle/OrthancRestApiCommand.cpp + ${ORTHANC_STONE_ROOT}/Framework/Oracle/ThreadedOracle.cpp ${ORTHANC_STONE_ROOT}/Framework/Radiography/RadiographyAlphaLayer.cpp ${ORTHANC_STONE_ROOT}/Framework/Radiography/RadiographyDicomLayer.cpp ${ORTHANC_STONE_ROOT}/Framework/Radiography/RadiographyLayer.cpp
--- a/Samples/Sdl/Loader.cpp Wed May 22 12:48:57 2019 +0200 +++ b/Samples/Sdl/Loader.cpp Wed May 22 14:46:26 2019 +0200 @@ -20,6 +20,7 @@ #include "../../Framework/Toolbox/DicomInstanceParameters.h" +#include "../../Framework/Oracle/ThreadedOracle.h" #include "../../Framework/Oracle/GetOrthancWebViewerJpegCommand.h" #include "../../Framework/Oracle/GetOrthancImageCommand.h" #include "../../Framework/Oracle/OrthancRestApiCommand.h" @@ -739,505 +740,6 @@ } } }; - - - - - - class ThreadedOracle : public IOracle - { - private: - typedef std::map<std::string, std::string> HttpHeaders; - - class Item : public Orthanc::IDynamicObject - { - private: - const IObserver& receiver_; - std::auto_ptr<IOracleCommand> command_; - - public: - Item(const IObserver& receiver, - IOracleCommand* command) : - receiver_(receiver), - command_(command) - { - if (command == NULL) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); - } - } - - const IObserver& GetReceiver() const - { - return receiver_; - } - - IOracleCommand& GetCommand() - { - assert(command_.get() != NULL); - return *command_; - } - }; - - - enum State - { - State_Setup, - State_Running, - State_Stopped - }; - - - class SleepingCommands : public boost::noncopyable - { - private: - class Item - { - private: - const IObserver& receiver_; - std::auto_ptr<SleepOracleCommand> command_; - boost::posix_time::ptime expiration_; - - public: - Item(const IObserver& receiver, - SleepOracleCommand* command) : - receiver_(receiver), - command_(command) - { - if (command == NULL) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); - } - - expiration_ = (boost::posix_time::second_clock::local_time() + - boost::posix_time::milliseconds(command_->GetDelay())); - } - - const boost::posix_time::ptime& GetExpirationTime() const - { - return expiration_; - } - - void Awake(IMessageEmitter& emitter) - { - assert(command_.get() != NULL); - - SleepOracleCommand::TimeoutMessage message(*command_); - emitter.EmitMessage(receiver_, message); - } - }; - - typedef std::list<Item*> Content; - - boost::mutex mutex_; - Content content_; - - public: - ~SleepingCommands() - { - for (Content::iterator it = content_.begin(); it != content_.end(); ++it) - { - if (*it != NULL) - { - delete *it; - } - } - } - - void Add(const IObserver& receiver, - SleepOracleCommand* command) // Takes ownership - { - boost::mutex::scoped_lock lock(mutex_); - - content_.push_back(new Item(receiver, command)); - } - - void AwakeExpired(IMessageEmitter& emitter) - { - boost::mutex::scoped_lock lock(mutex_); - - const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); - - Content stillSleeping; - - for (Content::iterator it = content_.begin(); it != content_.end(); ++it) - { - if (*it != NULL && - (*it)->GetExpirationTime() <= now) - { - (*it)->Awake(emitter); - delete *it; - *it = NULL; - } - else - { - stillSleeping.push_back(*it); - } - } - - // Compact the still-sleeping commands - content_ = stillSleeping; - } - }; - - - IMessageEmitter& emitter_; - Orthanc::WebServiceParameters orthanc_; - Orthanc::SharedMessageQueue queue_; - State state_; - boost::mutex mutex_; - std::vector<boost::thread*> workers_; - SleepingCommands sleepingCommands_; - boost::thread sleepingWorker_; - unsigned int sleepingTimeResolution_; - - void CopyHttpHeaders(Orthanc::HttpClient& client, - const HttpHeaders& headers) - { - for (HttpHeaders::const_iterator it = headers.begin(); it != headers.end(); it++ ) - { - client.AddHeader(it->first, it->second); - } - } - - - void DecodeAnswer(std::string& answer, - const HttpHeaders& headers) - { - Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None; - - for (HttpHeaders::const_iterator it = headers.begin(); - it != headers.end(); ++it) - { - std::string s; - Orthanc::Toolbox::ToLowerCase(s, it->first); - - if (s == "content-encoding") - { - if (it->second == "gzip") - { - contentEncoding = Orthanc::HttpCompression_Gzip; - } - else - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol, - "Unsupported HTTP Content-Encoding: " + it->second); - } - - break; - } - } - - if (contentEncoding == Orthanc::HttpCompression_Gzip) - { - std::string compressed; - answer.swap(compressed); - - Orthanc::GzipCompressor compressor; - compressor.Uncompress(answer, compressed.c_str(), compressed.size()); - } - } - - - void Execute(const IObserver& receiver, - SleepOracleCommand& command) - { - std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay())); - - if (command.HasPayload()) - { - copy->SetPayload(command.ReleasePayload()); - } - - sleepingCommands_.Add(receiver, copy.release()); - } - - - void Execute(const IObserver& receiver, - const OrthancRestApiCommand& command) - { - Orthanc::HttpClient client(orthanc_, command.GetUri()); - client.SetMethod(command.GetMethod()); - client.SetTimeout(command.GetTimeout()); - - CopyHttpHeaders(client, command.GetHttpHeaders()); - - if (command.GetMethod() == Orthanc::HttpMethod_Post || - command.GetMethod() == Orthanc::HttpMethod_Put) - { - client.SetBody(command.GetBody()); - } - - std::string answer; - HttpHeaders answerHeaders; - client.ApplyAndThrowException(answer, answerHeaders); - - DecodeAnswer(answer, answerHeaders); - - OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer); - emitter_.EmitMessage(receiver, message); - } - - - void Execute(const IObserver& receiver, - const GetOrthancImageCommand& command) - { - Orthanc::HttpClient client(orthanc_, command.GetUri()); - client.SetTimeout(command.GetTimeout()); - - CopyHttpHeaders(client, command.GetHttpHeaders()); - - std::string answer; - HttpHeaders answerHeaders; - client.ApplyAndThrowException(answer, answerHeaders); - - DecodeAnswer(answer, answerHeaders); - - command.ProcessHttpAnswer(emitter_, receiver, answer, answerHeaders); - } - - - void Execute(const IObserver& receiver, - const GetOrthancWebViewerJpegCommand& command) - { - Orthanc::HttpClient client(orthanc_, command.GetUri()); - client.SetTimeout(command.GetTimeout()); - - CopyHttpHeaders(client, command.GetHttpHeaders()); - - std::string answer; - HttpHeaders answerHeaders; - client.ApplyAndThrowException(answer, answerHeaders); - - DecodeAnswer(answer, answerHeaders); - - command.ProcessHttpAnswer(emitter_, receiver, answer); - } - - - void Step() - { - std::auto_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100)); - - if (object.get() != NULL) - { - Item& item = dynamic_cast<Item&>(*object); - - try - { - switch (item.GetCommand().GetType()) - { - case IOracleCommand::Type_Sleep: - Execute(item.GetReceiver(), - dynamic_cast<SleepOracleCommand&>(item.GetCommand())); - break; - - case IOracleCommand::Type_OrthancRestApi: - Execute(item.GetReceiver(), - dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand())); - break; - - case IOracleCommand::Type_GetOrthancImage: - Execute(item.GetReceiver(), - dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand())); - break; - - case IOracleCommand::Type_GetOrthancWebViewerJpeg: - Execute(item.GetReceiver(), - dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand())); - break; - - default: - throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); - } - } - catch (Orthanc::OrthancException& e) - { - LOG(ERROR) << "Exception within the oracle: " << e.What(); - emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e)); - } - catch (...) - { - LOG(ERROR) << "Native exception within the oracle"; - emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage - (item.GetCommand(), Orthanc::ErrorCode_InternalError)); - } - } - } - - - static void Worker(ThreadedOracle* that) - { - assert(that != NULL); - - for (;;) - { - { - boost::mutex::scoped_lock lock(that->mutex_); - if (that->state_ != State_Running) - { - return; - } - } - - that->Step(); - } - } - - - static void SleepingWorker(ThreadedOracle* that) - { - assert(that != NULL); - - for (;;) - { - { - boost::mutex::scoped_lock lock(that->mutex_); - if (that->state_ != State_Running) - { - return; - } - } - - that->sleepingCommands_.AwakeExpired(that->emitter_); - - boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_)); - } - } - - - void StopInternal() - { - { - boost::mutex::scoped_lock lock(mutex_); - - if (state_ == State_Setup || - state_ == State_Stopped) - { - return; - } - else - { - state_ = State_Stopped; - } - } - - if (sleepingWorker_.joinable()) - { - sleepingWorker_.join(); - } - - for (size_t i = 0; i < workers_.size(); i++) - { - if (workers_[i] != NULL) - { - if (workers_[i]->joinable()) - { - workers_[i]->join(); - } - - delete workers_[i]; - } - } - } - - - public: - ThreadedOracle(IMessageEmitter& emitter) : - emitter_(emitter), - state_(State_Setup), - workers_(4), - sleepingTimeResolution_(50) // By default, time resolution of 50ms - { - } - - virtual ~ThreadedOracle() - { - StopInternal(); - } - - void SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc) - { - boost::mutex::scoped_lock lock(mutex_); - - if (state_ != State_Setup) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); - } - else - { - orthanc_ = orthanc; - } - } - - void SetWorkersCount(unsigned int count) - { - boost::mutex::scoped_lock lock(mutex_); - - if (count <= 0) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); - } - else if (state_ != State_Setup) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); - } - else - { - workers_.resize(count); - } - } - - void SetSleepingTimeResolution(unsigned int milliseconds) - { - boost::mutex::scoped_lock lock(mutex_); - - if (milliseconds <= 0) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); - } - else if (state_ != State_Setup) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); - } - else - { - sleepingTimeResolution_ = milliseconds; - } - } - - void Start() - { - boost::mutex::scoped_lock lock(mutex_); - - if (state_ != State_Setup) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); - } - else - { - state_ = State_Running; - - for (unsigned int i = 0; i < workers_.size(); i++) - { - workers_[i] = new boost::thread(Worker, this); - } - - sleepingWorker_ = boost::thread(SleepingWorker, this); - } - } - - void Stop() - { - StopInternal(); - } - - virtual void Schedule(const IObserver& receiver, - IOracleCommand* command) - { - queue_.Enqueue(new Item(receiver, command)); - } - }; class NativeApplicationContext : public IMessageEmitter