Mercurial > hg > orthanc-stone
diff Samples/Sdl/Loader.cpp @ 748:ab236bb5dbc7
ThreadedOracle
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 22 May 2019 14:46:26 +0200 |
parents | d716bfb3e07c |
children | f7c236894c1a |
line wrap: on
line diff
--- a/Samples/Sdl/Loader.cpp Wed May 22 12:48:57 2019 +0200 +++ b/Samples/Sdl/Loader.cpp Wed May 22 14:46:26 2019 +0200 @@ -20,6 +20,7 @@ #include "../../Framework/Toolbox/DicomInstanceParameters.h" +#include "../../Framework/Oracle/ThreadedOracle.h" #include "../../Framework/Oracle/GetOrthancWebViewerJpegCommand.h" #include "../../Framework/Oracle/GetOrthancImageCommand.h" #include "../../Framework/Oracle/OrthancRestApiCommand.h" @@ -739,505 +740,6 @@ } } }; - - - - - - class ThreadedOracle : public IOracle - { - private: - typedef std::map<std::string, std::string> HttpHeaders; - - class Item : public Orthanc::IDynamicObject - { - private: - const IObserver& receiver_; - std::auto_ptr<IOracleCommand> command_; - - public: - Item(const IObserver& receiver, - IOracleCommand* command) : - receiver_(receiver), - command_(command) - { - if (command == NULL) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); - } - } - - const IObserver& GetReceiver() const - { - return receiver_; - } - - IOracleCommand& GetCommand() - { - assert(command_.get() != NULL); - return *command_; - } - }; - - - enum State - { - State_Setup, - State_Running, - State_Stopped - }; - - - class SleepingCommands : public boost::noncopyable - { - private: - class Item - { - private: - const IObserver& receiver_; - std::auto_ptr<SleepOracleCommand> command_; - boost::posix_time::ptime expiration_; - - public: - Item(const IObserver& receiver, - SleepOracleCommand* command) : - receiver_(receiver), - command_(command) - { - if (command == NULL) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); - } - - expiration_ = (boost::posix_time::second_clock::local_time() + - boost::posix_time::milliseconds(command_->GetDelay())); - } - - const boost::posix_time::ptime& GetExpirationTime() const - { - return expiration_; - } - - void Awake(IMessageEmitter& emitter) - { - assert(command_.get() != NULL); - - SleepOracleCommand::TimeoutMessage message(*command_); - emitter.EmitMessage(receiver_, message); - } - }; - - typedef std::list<Item*> Content; - - boost::mutex mutex_; - Content content_; - - public: - ~SleepingCommands() - { - for (Content::iterator it = content_.begin(); it != content_.end(); ++it) - { - if (*it != NULL) - { - delete *it; - } - } - } - - void Add(const IObserver& receiver, - SleepOracleCommand* command) // Takes ownership - { - boost::mutex::scoped_lock lock(mutex_); - - content_.push_back(new Item(receiver, command)); - } - - void AwakeExpired(IMessageEmitter& emitter) - { - boost::mutex::scoped_lock lock(mutex_); - - const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); - - Content stillSleeping; - - for (Content::iterator it = content_.begin(); it != content_.end(); ++it) - { - if (*it != NULL && - (*it)->GetExpirationTime() <= now) - { - (*it)->Awake(emitter); - delete *it; - *it = NULL; - } - else - { - stillSleeping.push_back(*it); - } - } - - // Compact the still-sleeping commands - content_ = stillSleeping; - } - }; - - - IMessageEmitter& emitter_; - Orthanc::WebServiceParameters orthanc_; - Orthanc::SharedMessageQueue queue_; - State state_; - boost::mutex mutex_; - std::vector<boost::thread*> workers_; - SleepingCommands sleepingCommands_; - boost::thread sleepingWorker_; - unsigned int sleepingTimeResolution_; - - void CopyHttpHeaders(Orthanc::HttpClient& client, - const HttpHeaders& headers) - { - for (HttpHeaders::const_iterator it = headers.begin(); it != headers.end(); it++ ) - { - client.AddHeader(it->first, it->second); - } - } - - - void DecodeAnswer(std::string& answer, - const HttpHeaders& headers) - { - Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None; - - for (HttpHeaders::const_iterator it = headers.begin(); - it != headers.end(); ++it) - { - std::string s; - Orthanc::Toolbox::ToLowerCase(s, it->first); - - if (s == "content-encoding") - { - if (it->second == "gzip") - { - contentEncoding = Orthanc::HttpCompression_Gzip; - } - else - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol, - "Unsupported HTTP Content-Encoding: " + it->second); - } - - break; - } - } - - if (contentEncoding == Orthanc::HttpCompression_Gzip) - { - std::string compressed; - answer.swap(compressed); - - Orthanc::GzipCompressor compressor; - compressor.Uncompress(answer, compressed.c_str(), compressed.size()); - } - } - - - void Execute(const IObserver& receiver, - SleepOracleCommand& command) - { - std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay())); - - if (command.HasPayload()) - { - copy->SetPayload(command.ReleasePayload()); - } - - sleepingCommands_.Add(receiver, copy.release()); - } - - - void Execute(const IObserver& receiver, - const OrthancRestApiCommand& command) - { - Orthanc::HttpClient client(orthanc_, command.GetUri()); - client.SetMethod(command.GetMethod()); - client.SetTimeout(command.GetTimeout()); - - CopyHttpHeaders(client, command.GetHttpHeaders()); - - if (command.GetMethod() == Orthanc::HttpMethod_Post || - command.GetMethod() == Orthanc::HttpMethod_Put) - { - client.SetBody(command.GetBody()); - } - - std::string answer; - HttpHeaders answerHeaders; - client.ApplyAndThrowException(answer, answerHeaders); - - DecodeAnswer(answer, answerHeaders); - - OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer); - emitter_.EmitMessage(receiver, message); - } - - - void Execute(const IObserver& receiver, - const GetOrthancImageCommand& command) - { - Orthanc::HttpClient client(orthanc_, command.GetUri()); - client.SetTimeout(command.GetTimeout()); - - CopyHttpHeaders(client, command.GetHttpHeaders()); - - std::string answer; - HttpHeaders answerHeaders; - client.ApplyAndThrowException(answer, answerHeaders); - - DecodeAnswer(answer, answerHeaders); - - command.ProcessHttpAnswer(emitter_, receiver, answer, answerHeaders); - } - - - void Execute(const IObserver& receiver, - const GetOrthancWebViewerJpegCommand& command) - { - Orthanc::HttpClient client(orthanc_, command.GetUri()); - client.SetTimeout(command.GetTimeout()); - - CopyHttpHeaders(client, command.GetHttpHeaders()); - - std::string answer; - HttpHeaders answerHeaders; - client.ApplyAndThrowException(answer, answerHeaders); - - DecodeAnswer(answer, answerHeaders); - - command.ProcessHttpAnswer(emitter_, receiver, answer); - } - - - void Step() - { - std::auto_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100)); - - if (object.get() != NULL) - { - Item& item = dynamic_cast<Item&>(*object); - - try - { - switch (item.GetCommand().GetType()) - { - case IOracleCommand::Type_Sleep: - Execute(item.GetReceiver(), - dynamic_cast<SleepOracleCommand&>(item.GetCommand())); - break; - - case IOracleCommand::Type_OrthancRestApi: - Execute(item.GetReceiver(), - dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand())); - break; - - case IOracleCommand::Type_GetOrthancImage: - Execute(item.GetReceiver(), - dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand())); - break; - - case IOracleCommand::Type_GetOrthancWebViewerJpeg: - Execute(item.GetReceiver(), - dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand())); - break; - - default: - throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); - } - } - catch (Orthanc::OrthancException& e) - { - LOG(ERROR) << "Exception within the oracle: " << e.What(); - emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e)); - } - catch (...) - { - LOG(ERROR) << "Native exception within the oracle"; - emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage - (item.GetCommand(), Orthanc::ErrorCode_InternalError)); - } - } - } - - - static void Worker(ThreadedOracle* that) - { - assert(that != NULL); - - for (;;) - { - { - boost::mutex::scoped_lock lock(that->mutex_); - if (that->state_ != State_Running) - { - return; - } - } - - that->Step(); - } - } - - - static void SleepingWorker(ThreadedOracle* that) - { - assert(that != NULL); - - for (;;) - { - { - boost::mutex::scoped_lock lock(that->mutex_); - if (that->state_ != State_Running) - { - return; - } - } - - that->sleepingCommands_.AwakeExpired(that->emitter_); - - boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_)); - } - } - - - void StopInternal() - { - { - boost::mutex::scoped_lock lock(mutex_); - - if (state_ == State_Setup || - state_ == State_Stopped) - { - return; - } - else - { - state_ = State_Stopped; - } - } - - if (sleepingWorker_.joinable()) - { - sleepingWorker_.join(); - } - - for (size_t i = 0; i < workers_.size(); i++) - { - if (workers_[i] != NULL) - { - if (workers_[i]->joinable()) - { - workers_[i]->join(); - } - - delete workers_[i]; - } - } - } - - - public: - ThreadedOracle(IMessageEmitter& emitter) : - emitter_(emitter), - state_(State_Setup), - workers_(4), - sleepingTimeResolution_(50) // By default, time resolution of 50ms - { - } - - virtual ~ThreadedOracle() - { - StopInternal(); - } - - void SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc) - { - boost::mutex::scoped_lock lock(mutex_); - - if (state_ != State_Setup) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); - } - else - { - orthanc_ = orthanc; - } - } - - void SetWorkersCount(unsigned int count) - { - boost::mutex::scoped_lock lock(mutex_); - - if (count <= 0) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); - } - else if (state_ != State_Setup) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); - } - else - { - workers_.resize(count); - } - } - - void SetSleepingTimeResolution(unsigned int milliseconds) - { - boost::mutex::scoped_lock lock(mutex_); - - if (milliseconds <= 0) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); - } - else if (state_ != State_Setup) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); - } - else - { - sleepingTimeResolution_ = milliseconds; - } - } - - void Start() - { - boost::mutex::scoped_lock lock(mutex_); - - if (state_ != State_Setup) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); - } - else - { - state_ = State_Running; - - for (unsigned int i = 0; i < workers_.size(); i++) - { - workers_[i] = new boost::thread(Worker, this); - } - - sleepingWorker_ = boost::thread(SleepingWorker, this); - } - } - - void Stop() - { - StopInternal(); - } - - virtual void Schedule(const IObserver& receiver, - IOracleCommand* command) - { - queue_.Enqueue(new Item(receiver, command)); - } - }; class NativeApplicationContext : public IMessageEmitter