Mercurial > hg > orthanc-stone
diff Framework/Oracle/ThreadedOracle.cpp @ 748:ab236bb5dbc7
ThreadedOracle
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 22 May 2019 14:46:26 +0200 |
parents | |
children | 1181e1ad98ec |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Framework/Oracle/ThreadedOracle.cpp Wed May 22 14:46:26 2019 +0200 @@ -0,0 +1,507 @@ +/** + * Stone of Orthanc + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2019 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "ThreadedOracle.h" + +#include "GetOrthancImageCommand.h" +#include "GetOrthancWebViewerJpegCommand.h" +#include "OrthancRestApiCommand.h" +#include "SleepOracleCommand.h" +#include "OracleCommandExceptionMessage.h" + +#include <Core/Compression/GzipCompressor.h> +#include <Core/HttpClient.h> +#include <Core/OrthancException.h> +#include <Core/Toolbox.h> + + +namespace OrthancStone +{ + class ThreadedOracle::Item : public Orthanc::IDynamicObject + { + private: + const IObserver& receiver_; + std::auto_ptr<IOracleCommand> command_; + + public: + Item(const IObserver& receiver, + IOracleCommand* command) : + receiver_(receiver), + command_(command) + { + if (command == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); + } + } + + const IObserver& GetReceiver() const + { + return receiver_; + } + + IOracleCommand& GetCommand() + { + assert(command_.get() != NULL); + return *command_; + } + }; + + + class ThreadedOracle::SleepingCommands : public boost::noncopyable + { + private: + class Item + { + private: + const IObserver& receiver_; + std::auto_ptr<SleepOracleCommand> command_; + boost::posix_time::ptime expiration_; + + public: + Item(const IObserver& receiver, + SleepOracleCommand* command) : + receiver_(receiver), + command_(command) + { + if (command == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); + } + + expiration_ = (boost::posix_time::second_clock::local_time() + + boost::posix_time::milliseconds(command_->GetDelay())); + } + + const boost::posix_time::ptime& GetExpirationTime() const + { + return expiration_; + } + + void Awake(IMessageEmitter& emitter) + { + assert(command_.get() != NULL); + + SleepOracleCommand::TimeoutMessage message(*command_); + emitter.EmitMessage(receiver_, message); + } + }; + + typedef std::list<Item*> Content; + + boost::mutex mutex_; + Content content_; + + public: + ~SleepingCommands() + { + for (Content::iterator it = content_.begin(); it != content_.end(); ++it) + { + if (*it != NULL) + { + delete *it; + } + } + } + + void Add(const IObserver& receiver, + SleepOracleCommand* command) // Takes ownership + { + boost::mutex::scoped_lock lock(mutex_); + + content_.push_back(new Item(receiver, command)); + } + + void AwakeExpired(IMessageEmitter& emitter) + { + boost::mutex::scoped_lock lock(mutex_); + + const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); + + Content stillSleeping; + + for (Content::iterator it = content_.begin(); it != content_.end(); ++it) + { + if (*it != NULL && + (*it)->GetExpirationTime() <= now) + { + (*it)->Awake(emitter); + delete *it; + *it = NULL; + } + else + { + stillSleeping.push_back(*it); + } + } + + // Compact the still-sleeping commands + content_ = stillSleeping; + } + }; + + + static void CopyHttpHeaders(Orthanc::HttpClient& client, + const Orthanc::HttpClient::HttpHeaders& headers) + { + for (Orthanc::HttpClient::HttpHeaders::const_iterator + it = headers.begin(); it != headers.end(); it++ ) + { + client.AddHeader(it->first, it->second); + } + } + + + static void DecodeAnswer(std::string& answer, + const Orthanc::HttpClient::HttpHeaders& headers) + { + Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None; + + for (Orthanc::HttpClient::HttpHeaders::const_iterator it = headers.begin(); + it != headers.end(); ++it) + { + std::string s; + Orthanc::Toolbox::ToLowerCase(s, it->first); + + if (s == "content-encoding") + { + if (it->second == "gzip") + { + contentEncoding = Orthanc::HttpCompression_Gzip; + } + else + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol, + "Unsupported HTTP Content-Encoding: " + it->second); + } + + break; + } + } + + if (contentEncoding == Orthanc::HttpCompression_Gzip) + { + std::string compressed; + answer.swap(compressed); + + Orthanc::GzipCompressor compressor; + compressor.Uncompress(answer, compressed.c_str(), compressed.size()); + } + } + + + static void Execute(IMessageEmitter& emitter, + const Orthanc::WebServiceParameters& orthanc, + const IObserver& receiver, + const OrthancRestApiCommand& command) + { + Orthanc::HttpClient client(orthanc, command.GetUri()); + client.SetMethod(command.GetMethod()); + client.SetTimeout(command.GetTimeout()); + + CopyHttpHeaders(client, command.GetHttpHeaders()); + + if (command.GetMethod() == Orthanc::HttpMethod_Post || + command.GetMethod() == Orthanc::HttpMethod_Put) + { + client.SetBody(command.GetBody()); + } + + std::string answer; + Orthanc::HttpClient::HttpHeaders answerHeaders; + client.ApplyAndThrowException(answer, answerHeaders); + + DecodeAnswer(answer, answerHeaders); + + OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer); + emitter.EmitMessage(receiver, message); + } + + + static void Execute(IMessageEmitter& emitter, + const Orthanc::WebServiceParameters& orthanc, + const IObserver& receiver, + const GetOrthancImageCommand& command) + { + Orthanc::HttpClient client(orthanc, command.GetUri()); + client.SetTimeout(command.GetTimeout()); + + CopyHttpHeaders(client, command.GetHttpHeaders()); + + std::string answer; + Orthanc::HttpClient::HttpHeaders answerHeaders; + client.ApplyAndThrowException(answer, answerHeaders); + + DecodeAnswer(answer, answerHeaders); + + command.ProcessHttpAnswer(emitter, receiver, answer, answerHeaders); + } + + + static void Execute(IMessageEmitter& emitter, + const Orthanc::WebServiceParameters& orthanc, + const IObserver& receiver, + const GetOrthancWebViewerJpegCommand& command) + { + Orthanc::HttpClient client(orthanc, command.GetUri()); + client.SetTimeout(command.GetTimeout()); + + CopyHttpHeaders(client, command.GetHttpHeaders()); + + std::string answer; + Orthanc::HttpClient::HttpHeaders answerHeaders; + client.ApplyAndThrowException(answer, answerHeaders); + + DecodeAnswer(answer, answerHeaders); + + command.ProcessHttpAnswer(emitter, receiver, answer); + } + + + void ThreadedOracle::Step() + { + std::auto_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100)); + + if (object.get() != NULL) + { + Item& item = dynamic_cast<Item&>(*object); + + try + { + switch (item.GetCommand().GetType()) + { + case IOracleCommand::Type_Sleep: + { + SleepOracleCommand& command = dynamic_cast<SleepOracleCommand&>(item.GetCommand()); + + std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay())); + + if (command.HasPayload()) + { + copy->SetPayload(command.ReleasePayload()); + } + + sleepingCommands_->Add(item.GetReceiver(), copy.release()); + + break; + } + + case IOracleCommand::Type_OrthancRestApi: + Execute(emitter_, orthanc_, item.GetReceiver(), + dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand())); + break; + + case IOracleCommand::Type_GetOrthancImage: + Execute(emitter_, orthanc_, item.GetReceiver(), + dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand())); + break; + + case IOracleCommand::Type_GetOrthancWebViewerJpeg: + Execute(emitter_, orthanc_, item.GetReceiver(), + dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand())); + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); + } + } + catch (Orthanc::OrthancException& e) + { + LOG(ERROR) << "Exception within the oracle: " << e.What(); + emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e)); + } + catch (...) + { + LOG(ERROR) << "Threaded exception within the oracle"; + emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage + (item.GetCommand(), Orthanc::ErrorCode_InternalError)); + } + } + } + + + void ThreadedOracle::Worker(ThreadedOracle* that) + { + assert(that != NULL); + + for (;;) + { + { + boost::mutex::scoped_lock lock(that->mutex_); + if (that->state_ != State_Running) + { + return; + } + } + + that->Step(); + } + } + + + void ThreadedOracle::SleepingWorker(ThreadedOracle* that) + { + assert(that != NULL); + + for (;;) + { + { + boost::mutex::scoped_lock lock(that->mutex_); + if (that->state_ != State_Running) + { + return; + } + } + + that->sleepingCommands_->AwakeExpired(that->emitter_); + + boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_)); + } + } + + + void ThreadedOracle::StopInternal() + { + { + boost::mutex::scoped_lock lock(mutex_); + + if (state_ == State_Setup || + state_ == State_Stopped) + { + return; + } + else + { + state_ = State_Stopped; + } + } + + if (sleepingWorker_.joinable()) + { + sleepingWorker_.join(); + } + + for (size_t i = 0; i < workers_.size(); i++) + { + if (workers_[i] != NULL) + { + if (workers_[i]->joinable()) + { + workers_[i]->join(); + } + + delete workers_[i]; + } + } + } + + + ThreadedOracle::ThreadedOracle(IMessageEmitter& emitter) : + emitter_(emitter), + state_(State_Setup), + workers_(4), + sleepingCommands_(new SleepingCommands), + sleepingTimeResolution_(50) // By default, time resolution of 50ms + { + } + + + void ThreadedOracle::SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc) + { + boost::mutex::scoped_lock lock(mutex_); + + if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + orthanc_ = orthanc; + } + } + + + void ThreadedOracle::SetWorkersCount(unsigned int count) + { + boost::mutex::scoped_lock lock(mutex_); + + if (count <= 0) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + else if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + workers_.resize(count); + } + } + + + void ThreadedOracle::SetSleepingTimeResolution(unsigned int milliseconds) + { + boost::mutex::scoped_lock lock(mutex_); + + if (milliseconds <= 0) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + else if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + sleepingTimeResolution_ = milliseconds; + } + } + + + void ThreadedOracle::Start() + { + boost::mutex::scoped_lock lock(mutex_); + + if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + state_ = State_Running; + + for (unsigned int i = 0; i < workers_.size(); i++) + { + workers_[i] = new boost::thread(Worker, this); + } + + sleepingWorker_ = boost::thread(SleepingWorker, this); + } + } + + + void ThreadedOracle::Schedule(const IObserver& receiver, + IOracleCommand* command) + { + queue_.Enqueue(new Item(receiver, command)); + } +}