Mercurial > hg > orthanc-stone
view Framework/Oracle/ThreadedOracle.cpp @ 795:bc20e4c417ec
refactoring OrthancMultiframeVolumeLoader using LoaderStateMachine
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Tue, 28 May 2019 13:02:56 +0200 |
parents | f6438fdc447e |
children | a68cd7ae8838 |
line wrap: on
line source
/** * Stone of Orthanc * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics * Department, University Hospital of Liege, Belgium * Copyright (C) 2017-2019 Osimis S.A., Belgium * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Affero General Public License * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. **/ #include "ThreadedOracle.h" #include "GetOrthancImageCommand.h" #include "GetOrthancWebViewerJpegCommand.h" #include "OrthancRestApiCommand.h" #include "SleepOracleCommand.h" #include "OracleCommandExceptionMessage.h" #include <Core/Compression/GzipCompressor.h> #include <Core/HttpClient.h> #include <Core/OrthancException.h> #include <Core/Toolbox.h> namespace OrthancStone { class ThreadedOracle::Item : public Orthanc::IDynamicObject { private: const IObserver& receiver_; std::auto_ptr<IOracleCommand> command_; public: Item(const IObserver& receiver, IOracleCommand* command) : receiver_(receiver), command_(command) { if (command == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); } } const IObserver& GetReceiver() const { return receiver_; } IOracleCommand& GetCommand() { assert(command_.get() != NULL); return *command_; } }; class ThreadedOracle::SleepingCommands : public boost::noncopyable { private: class Item { private: const IObserver& receiver_; std::auto_ptr<SleepOracleCommand> command_; boost::posix_time::ptime expiration_; public: Item(const IObserver& receiver, SleepOracleCommand* command) : receiver_(receiver), command_(command) { if (command == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); } expiration_ = (boost::posix_time::microsec_clock::local_time() + boost::posix_time::milliseconds(command_->GetDelay())); } const boost::posix_time::ptime& GetExpirationTime() const { return expiration_; } void Awake(IMessageEmitter& emitter) { assert(command_.get() != NULL); SleepOracleCommand::TimeoutMessage message(*command_); emitter.EmitMessage(receiver_, message); } }; typedef std::list<Item*> Content; boost::mutex mutex_; Content content_; public: ~SleepingCommands() { for (Content::iterator it = content_.begin(); it != content_.end(); ++it) { if (*it != NULL) { delete *it; } } } void Add(const IObserver& receiver, SleepOracleCommand* command) // Takes ownership { boost::mutex::scoped_lock lock(mutex_); content_.push_back(new Item(receiver, command)); } void AwakeExpired(IMessageEmitter& emitter) { boost::mutex::scoped_lock lock(mutex_); const boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time(); Content stillSleeping; for (Content::iterator it = content_.begin(); it != content_.end(); ++it) { if (*it != NULL && (*it)->GetExpirationTime() <= now) { (*it)->Awake(emitter); delete *it; *it = NULL; } else { stillSleeping.push_back(*it); } } // Compact the still-sleeping commands content_ = stillSleeping; } }; static void CopyHttpHeaders(Orthanc::HttpClient& client, const Orthanc::HttpClient::HttpHeaders& headers) { for (Orthanc::HttpClient::HttpHeaders::const_iterator it = headers.begin(); it != headers.end(); it++ ) { client.AddHeader(it->first, it->second); } } static void DecodeAnswer(std::string& answer, const Orthanc::HttpClient::HttpHeaders& headers) { Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None; for (Orthanc::HttpClient::HttpHeaders::const_iterator it = headers.begin(); it != headers.end(); ++it) { std::string s; Orthanc::Toolbox::ToLowerCase(s, it->first); if (s == "content-encoding") { if (it->second == "gzip") { contentEncoding = Orthanc::HttpCompression_Gzip; } else { throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol, "Unsupported HTTP Content-Encoding: " + it->second); } break; } } if (contentEncoding == Orthanc::HttpCompression_Gzip) { std::string compressed; answer.swap(compressed); Orthanc::GzipCompressor compressor; compressor.Uncompress(answer, compressed.c_str(), compressed.size()); LOG(INFO) << "Uncompressing gzip Encoding: from " << compressed.size() << " to " << answer.size() << " bytes"; } } static void Execute(IMessageEmitter& emitter, const Orthanc::WebServiceParameters& orthanc, const IObserver& receiver, const OrthancRestApiCommand& command) { Orthanc::HttpClient client(orthanc, command.GetUri()); client.SetMethod(command.GetMethod()); client.SetTimeout(command.GetTimeout()); CopyHttpHeaders(client, command.GetHttpHeaders()); if (command.GetMethod() == Orthanc::HttpMethod_Post || command.GetMethod() == Orthanc::HttpMethod_Put) { client.SetBody(command.GetBody()); } std::string answer; Orthanc::HttpClient::HttpHeaders answerHeaders; client.ApplyAndThrowException(answer, answerHeaders); DecodeAnswer(answer, answerHeaders); OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer); emitter.EmitMessage(receiver, message); } static void Execute(IMessageEmitter& emitter, const Orthanc::WebServiceParameters& orthanc, const IObserver& receiver, const GetOrthancImageCommand& command) { Orthanc::HttpClient client(orthanc, command.GetUri()); client.SetTimeout(command.GetTimeout()); CopyHttpHeaders(client, command.GetHttpHeaders()); std::string answer; Orthanc::HttpClient::HttpHeaders answerHeaders; client.ApplyAndThrowException(answer, answerHeaders); DecodeAnswer(answer, answerHeaders); command.ProcessHttpAnswer(emitter, receiver, answer, answerHeaders); } static void Execute(IMessageEmitter& emitter, const Orthanc::WebServiceParameters& orthanc, const IObserver& receiver, const GetOrthancWebViewerJpegCommand& command) { Orthanc::HttpClient client(orthanc, command.GetUri()); client.SetTimeout(command.GetTimeout()); CopyHttpHeaders(client, command.GetHttpHeaders()); std::string answer; Orthanc::HttpClient::HttpHeaders answerHeaders; client.ApplyAndThrowException(answer, answerHeaders); DecodeAnswer(answer, answerHeaders); command.ProcessHttpAnswer(emitter, receiver, answer); } void ThreadedOracle::Step() { std::auto_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100)); if (object.get() != NULL) { Item& item = dynamic_cast<Item&>(*object); try { switch (item.GetCommand().GetType()) { case IOracleCommand::Type_Sleep: { SleepOracleCommand& command = dynamic_cast<SleepOracleCommand&>(item.GetCommand()); std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay())); if (command.HasPayload()) { copy->SetPayload(command.ReleasePayload()); } sleepingCommands_->Add(item.GetReceiver(), copy.release()); break; } case IOracleCommand::Type_OrthancRestApi: Execute(emitter_, orthanc_, item.GetReceiver(), dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand())); break; case IOracleCommand::Type_GetOrthancImage: Execute(emitter_, orthanc_, item.GetReceiver(), dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand())); break; case IOracleCommand::Type_GetOrthancWebViewerJpeg: Execute(emitter_, orthanc_, item.GetReceiver(), dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand())); break; default: throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); } } catch (Orthanc::OrthancException& e) { LOG(ERROR) << "Exception within the oracle: " << e.What(); emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e)); } catch (...) { LOG(ERROR) << "Threaded exception within the oracle"; emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage (item.GetCommand(), Orthanc::ErrorCode_InternalError)); } } } void ThreadedOracle::Worker(ThreadedOracle* that) { assert(that != NULL); for (;;) { { boost::mutex::scoped_lock lock(that->mutex_); if (that->state_ != State_Running) { return; } } that->Step(); } } void ThreadedOracle::SleepingWorker(ThreadedOracle* that) { assert(that != NULL); for (;;) { { boost::mutex::scoped_lock lock(that->mutex_); if (that->state_ != State_Running) { return; } } that->sleepingCommands_->AwakeExpired(that->emitter_); boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_)); } } void ThreadedOracle::StopInternal() { { boost::mutex::scoped_lock lock(mutex_); if (state_ == State_Setup || state_ == State_Stopped) { return; } else { state_ = State_Stopped; } } if (sleepingWorker_.joinable()) { sleepingWorker_.join(); } for (size_t i = 0; i < workers_.size(); i++) { if (workers_[i] != NULL) { if (workers_[i]->joinable()) { workers_[i]->join(); } delete workers_[i]; } } } ThreadedOracle::ThreadedOracle(IMessageEmitter& emitter) : emitter_(emitter), state_(State_Setup), workers_(4), sleepingCommands_(new SleepingCommands), sleepingTimeResolution_(50) // By default, time resolution of 50ms { } ThreadedOracle::~ThreadedOracle() { if (state_ == State_Running) { LOG(ERROR) << "The threaded oracle is still running, explicit call to " << "Stop() is mandatory to avoid crashes"; } try { StopInternal(); } catch (Orthanc::OrthancException& e) { LOG(ERROR) << "Exception while stopping the threaded oracle: " << e.What(); } catch (...) { LOG(ERROR) << "Native exception while stopping the threaded oracle"; } } void ThreadedOracle::SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc) { boost::mutex::scoped_lock lock(mutex_); if (state_ != State_Setup) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } else { orthanc_ = orthanc; } } void ThreadedOracle::SetThreadsCount(unsigned int count) { boost::mutex::scoped_lock lock(mutex_); if (count <= 0) { throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); } else if (state_ != State_Setup) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } else { workers_.resize(count); } } void ThreadedOracle::SetSleepingTimeResolution(unsigned int milliseconds) { boost::mutex::scoped_lock lock(mutex_); if (milliseconds <= 0) { throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); } else if (state_ != State_Setup) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } else { sleepingTimeResolution_ = milliseconds; } } void ThreadedOracle::Start() { boost::mutex::scoped_lock lock(mutex_); if (state_ != State_Setup) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } else { state_ = State_Running; for (unsigned int i = 0; i < workers_.size(); i++) { workers_[i] = new boost::thread(Worker, this); } sleepingWorker_ = boost::thread(SleepingWorker, this); } } void ThreadedOracle::Schedule(const IObserver& receiver, IOracleCommand* command) { queue_.Enqueue(new Item(receiver, command)); } }