Mercurial > hg > orthanc-stone
diff Framework/Loaders/OracleScheduler.cpp @ 1228:c471a0aa137b broker
adding the next generation of loaders
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 09 Dec 2019 13:58:37 +0100 |
parents | |
children | 0ca50d275b9a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Framework/Loaders/OracleScheduler.cpp Mon Dec 09 13:58:37 2019 +0100 @@ -0,0 +1,557 @@ +/** + * Stone of Orthanc + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2019 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "OracleScheduler.h" + +#include "../Oracle/ParseDicomFromFileCommand.h" + +namespace OrthancStone +{ + class OracleScheduler::ReceiverPayload : public Orthanc::IDynamicObject + { + private: + Priority priority_; + boost::weak_ptr<IObserver> receiver_; + std::auto_ptr<IOracleCommand> command_; + + public: + ReceiverPayload(Priority priority, + boost::weak_ptr<IObserver> receiver, + IOracleCommand* command) : + priority_(priority), + receiver_(receiver), + command_(command) + { + if (command == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); + } + } + + Priority GetActivePriority() const + { + return priority_; + } + + boost::weak_ptr<IObserver> GetOriginalReceiver() const + { + return receiver_; + } + + const IOracleCommand& GetOriginalCommand() const + { + assert(command_.get() != NULL); + return *command_; + } + }; + + + class OracleScheduler::ScheduledCommand : public boost::noncopyable + { + private: + boost::weak_ptr<IObserver> receiver_; + std::auto_ptr<IOracleCommand> command_; + + public: + ScheduledCommand(boost::shared_ptr<IObserver> receiver, + IOracleCommand* command) : + receiver_(receiver), + command_(command) + { + if (command == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); + } + } + + boost::weak_ptr<IObserver> GetReceiver() + { + return receiver_; + } + + bool IsSameReceiver(boost::shared_ptr<OrthancStone::IObserver> receiver) const + { + boost::shared_ptr<IObserver> lock(receiver_.lock()); + + return (lock && + lock.get() == receiver.get()); + } + + IOracleCommand* WrapCommand(Priority priority) + { + if (command_.get() == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + std::auto_ptr<IOracleCommand> wrapped(command_->Clone()); + dynamic_cast<OracleCommandBase&>(*wrapped).AcquirePayload(new ReceiverPayload(priority, receiver_, command_.release())); + return wrapped.release(); + } + } + }; + + + + void OracleScheduler::ClearQueue(Queue& queue) + { + for (Queue::iterator it = queue.begin(); it != queue.end(); ++it) + { + assert(it->second != NULL); + delete it->second; + + totalProcessed_ ++; + } + + queue.clear(); + } + + + void OracleScheduler::RemoveReceiverFromQueue(Queue& queue, + boost::shared_ptr<IObserver> receiver) + { + if (!receiver) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); + } + + Queue tmp; + + for (Queue::iterator it = queue.begin(); it != queue.end(); ++it) + { + assert(it->second != NULL); + + if (!(it->second->IsSameReceiver(receiver))) + { + // This promise is still active + tmp.insert(std::make_pair(it->first, it->second)); + } + else + { + delete it->second; + + totalProcessed_ ++; + } + } + + queue = tmp; + } + + + void OracleScheduler::CheckInvariants() const + { +#ifndef NDEBUG + /*char buf[1024]; + sprintf(buf, "active: %d %d %d ; pending: %lu %lu %lu", + activeHighPriorityCommands_, activeStandardPriorityCommands_, activeLowPriorityCommands_, + highPriorityQueue_.size(), standardPriorityQueue_.size(), lowPriorityQueue_.size()); + LOG(INFO) << buf;*/ + + assert(activeHighPriorityCommands_ <= maxHighPriorityCommands_); + assert(activeStandardPriorityCommands_ <= maxStandardPriorityCommands_); + assert(activeLowPriorityCommands_ <= maxLowPriorityCommands_); + assert(totalProcessed_ <= totalScheduled_); + + for (Queue::const_iterator it = standardPriorityQueue_.begin(); it != standardPriorityQueue_.end(); ++it) + { + assert(it->first > PRIORITY_HIGH && + it->first < PRIORITY_LOW); + } + + for (Queue::const_iterator it = highPriorityQueue_.begin(); it != highPriorityQueue_.end(); ++it) + { + assert(it->first <= PRIORITY_HIGH); + } + + for (Queue::const_iterator it = lowPriorityQueue_.begin(); it != lowPriorityQueue_.end(); ++it) + { + assert(it->first >= PRIORITY_LOW); + } +#endif + } + + + void OracleScheduler::SpawnFromQueue(Queue& queue, + Priority priority) + { + CheckInvariants(); + + Queue::iterator item = queue.begin(); + assert(item != queue.end()); + + std::auto_ptr<ScheduledCommand> command(dynamic_cast<ScheduledCommand*>(item->second)); + queue.erase(item); + + if (command.get() != NULL) + { + /** + * Only schedule the command for execution in the oracle, if its + * receiver has not been destroyed yet. + **/ + boost::shared_ptr<IObserver> observer(command->GetReceiver().lock()); + if (observer) + { + if (oracle_.Schedule(GetSharedObserver(), command->WrapCommand(priority))) + { + /** + * Executing this code if "Schedule()" returned "false" + * above, will result in a memory leak within + * "OracleScheduler", as the scheduler believes that some + * command is still active (i.e. pending to be executed by + * the oracle), hereby stalling the scheduler during its + * destruction, and not freeing the + * "shared_ptr<OracleScheduler>" of the Stone context (check + * out "sjo-playground/WebViewer/Backend/Leak") + **/ + + switch (priority) + { + case Priority_High: + activeHighPriorityCommands_ ++; + break; + + case Priority_Standard: + activeStandardPriorityCommands_ ++; + break; + + case Priority_Low: + activeLowPriorityCommands_ ++; + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + } + else + { + totalProcessed_ ++; + } + } + } + else + { + LOG(ERROR) << "NULL command, should never happen"; + } + + CheckInvariants(); + } + + + void OracleScheduler::SpawnCommands() + { + // Send as many commands as possible to the oracle + while (!highPriorityQueue_.empty()) + { + if (activeHighPriorityCommands_ < maxHighPriorityCommands_) + { + // First fill the high-priority lane + SpawnFromQueue(highPriorityQueue_, Priority_High); + } + else if (activeStandardPriorityCommands_ < maxStandardPriorityCommands_) + { + // There remain too many high-priority commands for the + // high-priority lane, schedule them to the standard-priority lanes + SpawnFromQueue(highPriorityQueue_, Priority_Standard); + } + else if (activeLowPriorityCommands_ < maxLowPriorityCommands_) + { + SpawnFromQueue(highPriorityQueue_, Priority_Low); + } + else + { + return; // No slot available + } + } + + while (!standardPriorityQueue_.empty()) + { + if (activeStandardPriorityCommands_ < maxStandardPriorityCommands_) + { + SpawnFromQueue(standardPriorityQueue_, Priority_Standard); + } + else if (activeLowPriorityCommands_ < maxLowPriorityCommands_) + { + SpawnFromQueue(standardPriorityQueue_, Priority_Low); + } + else + { + return; + } + } + + while (!lowPriorityQueue_.empty()) + { + if (activeLowPriorityCommands_ < maxLowPriorityCommands_) + { + SpawnFromQueue(lowPriorityQueue_, Priority_Low); + } + else + { + return; + } + } + } + + + void OracleScheduler::RemoveActiveCommand(const ReceiverPayload& payload) + { + CheckInvariants(); + + totalProcessed_ ++; + + switch (payload.GetActivePriority()) + { + case Priority_High: + assert(activeHighPriorityCommands_ > 0); + activeHighPriorityCommands_ --; + break; + + case Priority_Standard: + assert(activeStandardPriorityCommands_ > 0); + activeStandardPriorityCommands_ --; + break; + + case Priority_Low: + assert(activeLowPriorityCommands_ > 0); + activeLowPriorityCommands_ --; + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } + + SpawnCommands(); + + CheckInvariants(); + } + + + void OracleScheduler::Handle(const GetOrthancImageCommand::SuccessMessage& message) + { + assert(message.GetOrigin().HasPayload()); + const ReceiverPayload& payload = dynamic_cast<const ReceiverPayload&>(message.GetOrigin().GetPayload()); + + RemoveActiveCommand(payload); + + GetOrthancImageCommand::SuccessMessage bis( + dynamic_cast<const GetOrthancImageCommand&>(payload.GetOriginalCommand()), + message.GetImage(), message.GetMimeType()); + emitter_.EmitMessage(payload.GetOriginalReceiver(), bis); + } + + + void OracleScheduler::Handle(const GetOrthancWebViewerJpegCommand::SuccessMessage& message) + { + assert(message.GetOrigin().HasPayload()); + const ReceiverPayload& payload = dynamic_cast<const ReceiverPayload&>(message.GetOrigin().GetPayload()); + + RemoveActiveCommand(payload); + + GetOrthancWebViewerJpegCommand::SuccessMessage bis( + dynamic_cast<const GetOrthancWebViewerJpegCommand&>(payload.GetOriginalCommand()), + message.GetImage()); + emitter_.EmitMessage(payload.GetOriginalReceiver(), bis); + } + + + void OracleScheduler::Handle(const HttpCommand::SuccessMessage& message) + { + assert(message.GetOrigin().HasPayload()); + const ReceiverPayload& payload = dynamic_cast<const ReceiverPayload&>(message.GetOrigin().GetPayload()); + + RemoveActiveCommand(payload); + + HttpCommand::SuccessMessage bis( + dynamic_cast<const HttpCommand&>(payload.GetOriginalCommand()), + message.GetAnswerHeaders(), message.GetAnswer()); + emitter_.EmitMessage(payload.GetOriginalReceiver(), bis); + } + + + void OracleScheduler::Handle(const OrthancRestApiCommand::SuccessMessage& message) + { + assert(message.GetOrigin().HasPayload()); + const ReceiverPayload& payload = dynamic_cast<const ReceiverPayload&>(message.GetOrigin().GetPayload()); + + RemoveActiveCommand(payload); + + OrthancRestApiCommand::SuccessMessage bis( + dynamic_cast<const OrthancRestApiCommand&>(payload.GetOriginalCommand()), + message.GetAnswerHeaders(), message.GetAnswer()); + emitter_.EmitMessage(payload.GetOriginalReceiver(), bis); + } + + +#if ORTHANC_ENABLE_DCMTK == 1 + void OracleScheduler::Handle(const ParseDicomSuccessMessage& message) + { + assert(message.GetOrigin().HasPayload()); + const ReceiverPayload& payload = dynamic_cast<const ReceiverPayload&>(message.GetOrigin().GetPayload()); + + RemoveActiveCommand(payload); + + ParseDicomSuccessMessage bis( + dynamic_cast<const OracleCommandBase&>(payload.GetOriginalCommand()), + message.GetDicom(), message.GetFileSize(), message.HasPixelData()); + emitter_.EmitMessage(payload.GetOriginalReceiver(), bis); + } +#endif + + + void OracleScheduler::Handle(const ReadFileCommand::SuccessMessage& message) + { + assert(message.GetOrigin().HasPayload()); + const ReceiverPayload& payload = dynamic_cast<const ReceiverPayload&>(message.GetOrigin().GetPayload()); + + RemoveActiveCommand(payload); + + ReadFileCommand::SuccessMessage bis( + dynamic_cast<const ReadFileCommand&>(payload.GetOriginalCommand()), + message.GetContent()); + emitter_.EmitMessage(payload.GetOriginalReceiver(), bis); + } + + + void OracleScheduler::Handle(const OracleCommandExceptionMessage& message) + { + const OracleCommandBase& command = dynamic_cast<const OracleCommandBase&>(message.GetOrigin()); + + assert(command.HasPayload()); + const ReceiverPayload& payload = dynamic_cast<const ReceiverPayload&>(command.GetPayload()); + + RemoveActiveCommand(payload); + + OracleCommandExceptionMessage bis(payload.GetOriginalCommand(), message.GetException()); + emitter_.EmitMessage(payload.GetOriginalReceiver(), bis); + } + + + OracleScheduler::OracleScheduler(IOracle& oracle, + IMessageEmitter& emitter, + unsigned int maxHighPriority, + unsigned int maxStandardPriority, + unsigned int maxLowPriority) : + oracle_(oracle), + emitter_(emitter), + maxHighPriorityCommands_(maxHighPriority), + maxStandardPriorityCommands_(maxStandardPriority), + maxLowPriorityCommands_(maxLowPriority), + activeHighPriorityCommands_(0), + activeStandardPriorityCommands_(0), + activeLowPriorityCommands_(0), + totalScheduled_(0), + totalProcessed_(0) + { + assert(PRIORITY_HIGH < 0 && + PRIORITY_LOW > 0); + + if (maxLowPriority <= 0) + { + // There must be at least 1 lane available to deal with low-priority commands + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + } + + + boost::shared_ptr<OracleScheduler> OracleScheduler::Create(IOracle& oracle, + IObservable& oracleObservable, + IMessageEmitter& emitter, + unsigned int maxHighPriority, + unsigned int maxStandardPriority, + unsigned int maxLowPriority) + { + boost::shared_ptr<OracleScheduler> scheduler + (new OracleScheduler(oracle, emitter, maxHighPriority, maxStandardPriority, maxLowPriority)); + scheduler->Register<GetOrthancImageCommand::SuccessMessage>(oracleObservable, &OracleScheduler::Handle); + scheduler->Register<GetOrthancWebViewerJpegCommand::SuccessMessage>(oracleObservable, &OracleScheduler::Handle); + scheduler->Register<HttpCommand::SuccessMessage>(oracleObservable, &OracleScheduler::Handle); + scheduler->Register<OrthancRestApiCommand::SuccessMessage>(oracleObservable, &OracleScheduler::Handle); + scheduler->Register<ReadFileCommand::SuccessMessage>(oracleObservable, &OracleScheduler::Handle); + scheduler->Register<OracleCommandExceptionMessage>(oracleObservable, &OracleScheduler::Handle); + +#if ORTHANC_ENABLE_DCMTK == 1 + scheduler->Register<ParseDicomSuccessMessage>(oracleObservable, &OracleScheduler::Handle); +#endif + + return scheduler; + } + + + OracleScheduler::~OracleScheduler() + { + CancelAllRequests(); + } + + + void OracleScheduler::CancelRequests(boost::shared_ptr<IObserver> receiver) + { + RemoveReceiverFromQueue(standardPriorityQueue_, receiver); + RemoveReceiverFromQueue(highPriorityQueue_, receiver); + RemoveReceiverFromQueue(lowPriorityQueue_, receiver); + } + + + void OracleScheduler::CancelAllRequests() + { + ClearQueue(standardPriorityQueue_); + ClearQueue(highPriorityQueue_); + ClearQueue(lowPriorityQueue_); + } + + + void OracleScheduler::Schedule(boost::shared_ptr<IObserver> receiver, + int priority, + IOracleCommand* command /* Takes ownership */) + { + std::auto_ptr<ScheduledCommand> pending(new ScheduledCommand(receiver, dynamic_cast<IOracleCommand*>(command))); + + /** + * Safeguard to remember that a new "Handle()" method and a call + * to "scheduler->Register()" must be implemented for each + * possible oracle command. + **/ + assert(command->GetType() == IOracleCommand::Type_GetOrthancImage || + command->GetType() == IOracleCommand::Type_GetOrthancWebViewerJpeg || + command->GetType() == IOracleCommand::Type_Http || + command->GetType() == IOracleCommand::Type_OrthancRestApi || + command->GetType() == IOracleCommand::Type_ParseDicomFromFile || + command->GetType() == IOracleCommand::Type_ParseDicomFromWado || + command->GetType() == IOracleCommand::Type_ReadFile); + + if (priority <= PRIORITY_HIGH) + { + highPriorityQueue_.insert(std::make_pair(priority, pending.release())); + } + else if (priority >= PRIORITY_LOW) + { + lowPriorityQueue_.insert(std::make_pair(priority, pending.release())); + } + else + { + standardPriorityQueue_.insert(std::make_pair(priority, pending.release())); + } + + totalScheduled_ ++; + + SpawnCommands(); + } +}