Mercurial > hg > orthanc-stone
changeset 744:a4bfb420b869
SleepOracleCommand
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 22 May 2019 11:28:42 +0200 |
parents | bcd3ea868bcd |
children | c44c1d2d3598 |
files | Samples/Sdl/Loader.cpp |
diffstat | 1 files changed, 237 insertions(+), 17 deletions(-) [+] |
line wrap: on
line diff
--- a/Samples/Sdl/Loader.cpp Wed May 22 09:55:01 2019 +0200 +++ b/Samples/Sdl/Loader.cpp Wed May 22 11:28:42 2019 +0200 @@ -69,6 +69,7 @@ public: enum Type { + Type_Sleep, Type_OrthancRestApi, Type_GetOrthancImage, Type_GetOrthancWebViewerJpeg @@ -153,6 +154,18 @@ throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } } + + Orthanc::IDynamicObject* ReleasePayload() + { + if (HasPayload()) + { + return payload_.release(); + } + else + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + } }; @@ -190,6 +203,32 @@ return exception_; } }; + + + class SleepOracleCommand : public OracleCommandWithPayload + { + private: + unsigned int milliseconds_; + + public: + ORTHANC_STONE_DEFINE_ORIGIN_MESSAGE(__FILE__, __LINE__, TimeoutMessage, SleepOracleCommand); + + SleepOracleCommand(unsigned int milliseconds) : + milliseconds_(milliseconds) + { + } + + virtual Type GetType() const + { + return Type_Sleep; + } + + unsigned int GetDelay() const + { + return milliseconds_; + } + }; + typedef std::map<std::string, std::string> HttpHeaders; @@ -1937,7 +1976,7 @@ - class NativeOracle : public IOracle + class ThreadedOracle : public IOracle { private: class Item : public Orthanc::IDynamicObject @@ -1963,7 +2002,7 @@ return receiver_; } - const IOracleCommand& GetCommand() const + IOracleCommand& GetCommand() { assert(command_.get() != NULL); return *command_; @@ -1979,13 +2018,108 @@ }; + class SleepingCommands : public boost::noncopyable + { + private: + class Item + { + private: + const OrthancStone::IObserver& receiver_; + std::auto_ptr<SleepOracleCommand> command_; + boost::posix_time::ptime expiration_; + + public: + Item(const OrthancStone::IObserver& receiver, + SleepOracleCommand* command) : + receiver_(receiver), + command_(command) + { + if (command == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); + } + + expiration_ = (boost::posix_time::second_clock::local_time() + + boost::posix_time::milliseconds(command_->GetDelay())); + } + + const boost::posix_time::ptime& GetExpirationTime() const + { + return expiration_; + } + + void Awake(IMessageEmitter& emitter) + { + assert(command_.get() != NULL); + + SleepOracleCommand::TimeoutMessage message(*command_); + emitter.EmitMessage(receiver_, message); + } + }; + + typedef std::list<Item*> Content; + + boost::mutex mutex_; + Content content_; + + public: + ~SleepingCommands() + { + for (Content::iterator it = content_.begin(); it != content_.end(); ++it) + { + if (*it != NULL) + { + delete *it; + } + } + } + + void Add(const OrthancStone::IObserver& receiver, + SleepOracleCommand* command) // Takes ownership + { + boost::mutex::scoped_lock lock(mutex_); + + content_.push_back(new Item(receiver, command)); + } + + void AwakeExpired(IMessageEmitter& emitter) + { + boost::mutex::scoped_lock lock(mutex_); + + const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); + + Content stillSleeping; + + for (Content::iterator it = content_.begin(); it != content_.end(); ++it) + { + if (*it != NULL && + (*it)->GetExpirationTime() <= now) + { + (*it)->Awake(emitter); + delete *it; + *it = NULL; + } + else + { + stillSleeping.push_back(*it); + } + } + + // Compact the still-sleeping commands + content_ = stillSleeping; + } + }; + + IMessageEmitter& emitter_; Orthanc::WebServiceParameters orthanc_; Orthanc::SharedMessageQueue queue_; State state_; boost::mutex mutex_; std::vector<boost::thread*> workers_; - + SleepingCommands sleepingCommands_; + boost::thread sleepingWorker_; + unsigned int sleepingTimeResolution_; void CopyHttpHeaders(Orthanc::HttpClient& client, const HttpHeaders& headers) @@ -2036,6 +2170,20 @@ void Execute(const OrthancStone::IObserver& receiver, + SleepOracleCommand& command) + { + std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay())); + + if (command.HasPayload()) + { + copy->SetPayload(command.ReleasePayload()); + } + + sleepingCommands_.Add(receiver, copy.release()); + } + + + void Execute(const OrthancStone::IObserver& receiver, const OrthancRestApiCommand& command) { Orthanc::HttpClient client(orthanc_, command.GetUri()); @@ -2103,12 +2251,17 @@ if (object.get() != NULL) { - const Item& item = dynamic_cast<Item&>(*object); + Item& item = dynamic_cast<Item&>(*object); try { switch (item.GetCommand().GetType()) { + case IOracleCommand::Type_Sleep: + Execute(item.GetReceiver(), + dynamic_cast<SleepOracleCommand&>(item.GetCommand())); + break; + case IOracleCommand::Type_OrthancRestApi: Execute(item.GetReceiver(), dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand())); @@ -2143,7 +2296,7 @@ } - static void Worker(NativeOracle* that) + static void Worker(ThreadedOracle* that) { assert(that != NULL); @@ -2162,6 +2315,27 @@ } + static void SleepingWorker(ThreadedOracle* that) + { + assert(that != NULL); + + for (;;) + { + { + boost::mutex::scoped_lock lock(that->mutex_); + if (that->state_ != State_Running) + { + return; + } + } + + that->sleepingCommands_.AwakeExpired(that->emitter_); + + boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_)); + } + } + + void StopInternal() { { @@ -2178,6 +2352,11 @@ } } + if (sleepingWorker_.joinable()) + { + sleepingWorker_.join(); + } + for (size_t i = 0; i < workers_.size(); i++) { if (workers_[i] != NULL) @@ -2194,14 +2373,15 @@ public: - NativeOracle(IMessageEmitter& emitter) : + ThreadedOracle(IMessageEmitter& emitter) : emitter_(emitter), state_(State_Setup), - workers_(4) + workers_(4), + sleepingTimeResolution_(50) // By default, time resolution of 50ms { } - virtual ~NativeOracle() + virtual ~ThreadedOracle() { StopInternal(); } @@ -2238,6 +2418,24 @@ } } + void SetSleepingTimeResolution(unsigned int milliseconds) + { + boost::mutex::scoped_lock lock(mutex_); + + if (milliseconds <= 0) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + else if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + sleepingTimeResolution_ = milliseconds; + } + } + void Start() { boost::mutex::scoped_lock lock(mutex_); @@ -2254,6 +2452,8 @@ { workers_[i] = new boost::thread(Worker, this); } + + sleepingWorker_ = boost::thread(SleepingWorker, this); } } @@ -2345,6 +2545,11 @@ class Toto : public OrthancStone::IObserver { private: + void Handle(const Refactoring::SleepOracleCommand::TimeoutMessage& message) + { + printf("TIMEOUT! %d\n", dynamic_cast<const Orthanc::SingleValueObject<unsigned int>& >(message.GetOrigin().GetPayload()).GetValue()); + } + void Handle(const Refactoring::OrthancRestApiCommand::SuccessMessage& message) { Json::Value v; @@ -2385,6 +2590,10 @@ { oracle.RegisterObserverCallback (new OrthancStone::Callable + <Toto, Refactoring::SleepOracleCommand::TimeoutMessage>(*this, &Toto::Handle)); + + oracle.RegisterObserverCallback + (new OrthancStone::Callable <Toto, Refactoring::OrthancRestApiCommand::SuccessMessage>(*this, &Toto::Handle)); oracle.RegisterObserverCallback @@ -2415,7 +2624,7 @@ loader2.reset(new Refactoring::VolumeSeriesOrthancLoader(oracle, lock.GetOracleObservable())); } - if (1) + if (0) { Json::Value v = Json::objectValue; v["Level"] = "Series"; @@ -2429,7 +2638,7 @@ oracle.Schedule(*toto, command.release()); } - if (1) + if (0) { std::auto_ptr<Refactoring::GetOrthancImageCommand> command(new Refactoring::GetOrthancImageCommand); command->SetHttpHeader("Accept", std::string(Orthanc::EnumerationToString(Orthanc::MimeType_Jpeg))); @@ -2437,7 +2646,7 @@ oracle.Schedule(*toto, command.release()); } - if (1) + if (0) { std::auto_ptr<Refactoring::GetOrthancImageCommand> command(new Refactoring::GetOrthancImageCommand); command->SetHttpHeader("Accept", std::string(Orthanc::EnumerationToString(Orthanc::MimeType_Png))); @@ -2445,7 +2654,7 @@ oracle.Schedule(*toto, command.release()); } - if (1) + if (0) { std::auto_ptr<Refactoring::GetOrthancImageCommand> command(new Refactoring::GetOrthancImageCommand); command->SetHttpHeader("Accept", std::string(Orthanc::EnumerationToString(Orthanc::MimeType_Png))); @@ -2453,7 +2662,7 @@ oracle.Schedule(*toto, command.release()); } - if (1) + if (0) { std::auto_ptr<Refactoring::GetOrthancImageCommand> command(new Refactoring::GetOrthancImageCommand); command->SetHttpHeader("Accept-Encoding", "gzip"); @@ -2462,7 +2671,7 @@ oracle.Schedule(*toto, command.release()); } - if (1) + if (0) { std::auto_ptr<Refactoring::GetOrthancImageCommand> command(new Refactoring::GetOrthancImageCommand); command->SetHttpHeader("Accept", std::string(Orthanc::EnumerationToString(Orthanc::MimeType_Pam))); @@ -2470,7 +2679,7 @@ oracle.Schedule(*toto, command.release()); } - if (1) + if (0) { std::auto_ptr<Refactoring::GetOrthancWebViewerJpegCommand> command(new Refactoring::GetOrthancWebViewerJpegCommand); command->SetHttpHeader("Accept-Encoding", "gzip"); @@ -2480,12 +2689,23 @@ } + if (0) + { + for (unsigned int i = 0; i < 10; i++) + { + std::auto_ptr<Refactoring::SleepOracleCommand> command(new Refactoring::SleepOracleCommand(i * 1000)); + command->SetPayload(new Orthanc::SingleValueObject<unsigned int>(42 * i)); + oracle.Schedule(*toto, command.release()); + } + } + // 2017-11-17-Anonymized //loader1->LoadSeries("cb3ea4d1-d08f3856-ad7b6314-74d88d77-60b05618"); // CT //loader2->LoadInstance("41029085-71718346-811efac4-420e2c15-d39f99b6"); // RT-DOSE // Delphine - loader1->LoadSeries("5990e39c-51e5f201-fe87a54c-31a55943-e59ef80e"); // CT + //loader1->LoadSeries("5990e39c-51e5f201-fe87a54c-31a55943-e59ef80e"); // CT + loader1->LoadSeries("67f1b334-02c16752-45026e40-a5b60b6b-030ecab5"); // Lung 1/10mm LOG(WARNING) << "...Waiting for Ctrl-C..."; Orthanc::SystemToolbox::ServerBarrier(); @@ -2508,7 +2728,7 @@ { Refactoring::NativeApplicationContext context; - Refactoring::NativeOracle oracle(context); + Refactoring::ThreadedOracle oracle(context); { Orthanc::WebServiceParameters p;