changeset 744:a4bfb420b869

SleepOracleCommand
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 22 May 2019 11:28:42 +0200
parents bcd3ea868bcd
children c44c1d2d3598
files Samples/Sdl/Loader.cpp
diffstat 1 files changed, 237 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/Samples/Sdl/Loader.cpp	Wed May 22 09:55:01 2019 +0200
+++ b/Samples/Sdl/Loader.cpp	Wed May 22 11:28:42 2019 +0200
@@ -69,6 +69,7 @@
   public:
     enum Type
     {
+      Type_Sleep,
       Type_OrthancRestApi,
       Type_GetOrthancImage,
       Type_GetOrthancWebViewerJpeg
@@ -153,6 +154,18 @@
         throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
       }
     }
+
+    Orthanc::IDynamicObject* ReleasePayload()
+    {
+      if (HasPayload())
+      {
+        return payload_.release();
+      }
+      else
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
+      }
+    }
   };
 
 
@@ -190,6 +203,32 @@
       return exception_;
     }
   };
+
+
+  class SleepOracleCommand : public OracleCommandWithPayload
+  {
+  private:
+    unsigned int  milliseconds_;
+
+  public:
+    ORTHANC_STONE_DEFINE_ORIGIN_MESSAGE(__FILE__, __LINE__, TimeoutMessage, SleepOracleCommand);
+
+    SleepOracleCommand(unsigned int milliseconds) : 
+    milliseconds_(milliseconds)
+    {
+    }
+
+    virtual Type GetType() const
+    {
+      return Type_Sleep;
+    }
+
+    unsigned int GetDelay() const
+    {
+      return milliseconds_;
+    }
+  };
+
   
 
   typedef std::map<std::string, std::string>  HttpHeaders;
@@ -1937,7 +1976,7 @@
 
 
 
-  class NativeOracle : public IOracle
+  class ThreadedOracle : public IOracle
   {
   private:
     class Item : public Orthanc::IDynamicObject
@@ -1963,7 +2002,7 @@
         return receiver_;
       }
 
-      const IOracleCommand& GetCommand() const
+      IOracleCommand& GetCommand()
       {
         assert(command_.get() != NULL);
         return *command_;
@@ -1979,13 +2018,108 @@
     };
 
 
+    class SleepingCommands : public boost::noncopyable
+    {
+    private:
+      class Item
+      {
+      private:
+        const OrthancStone::IObserver&     receiver_;
+        std::auto_ptr<SleepOracleCommand>  command_;
+        boost::posix_time::ptime           expiration_;
+
+      public:
+        Item(const OrthancStone::IObserver& receiver,
+             SleepOracleCommand* command) :
+          receiver_(receiver),
+          command_(command)
+        {
+          if (command == NULL)
+          {
+            throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
+          }
+
+          expiration_ = (boost::posix_time::second_clock::local_time() + 
+                         boost::posix_time::milliseconds(command_->GetDelay()));
+        }
+
+        const boost::posix_time::ptime& GetExpirationTime() const
+        {
+          return expiration_;
+        }
+
+        void Awake(IMessageEmitter& emitter)
+        {
+          assert(command_.get() != NULL);
+
+          SleepOracleCommand::TimeoutMessage message(*command_);
+          emitter.EmitMessage(receiver_, message);
+        }
+      };
+
+      typedef std::list<Item*>  Content;
+
+      boost::mutex  mutex_;
+      Content       content_;
+
+    public:
+      ~SleepingCommands()
+      {
+        for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
+        {
+          if (*it != NULL)
+          {
+            delete *it;
+          }
+        }
+      }
+
+      void Add(const OrthancStone::IObserver& receiver,
+               SleepOracleCommand* command)   // Takes ownership
+      {
+        boost::mutex::scoped_lock lock(mutex_);
+
+        content_.push_back(new Item(receiver, command));
+      }
+
+      void AwakeExpired(IMessageEmitter& emitter)
+      {
+        boost::mutex::scoped_lock lock(mutex_);
+
+        const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
+
+        Content  stillSleeping;
+        
+        for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
+        {
+          if (*it != NULL &&
+              (*it)->GetExpirationTime() <= now)
+          {
+            (*it)->Awake(emitter);
+            delete *it;
+            *it = NULL;
+          }
+          else
+          {
+            stillSleeping.push_back(*it);
+          }
+        }
+
+        // Compact the still-sleeping commands
+        content_ = stillSleeping;
+      }
+    };
+
+
     IMessageEmitter&               emitter_;
     Orthanc::WebServiceParameters  orthanc_;
     Orthanc::SharedMessageQueue    queue_;
     State                          state_;
     boost::mutex                   mutex_;
     std::vector<boost::thread*>    workers_;
-
+    SleepingCommands               sleepingCommands_;
+    boost::thread                  sleepingWorker_;
+    unsigned int                   sleepingTimeResolution_;
 
     void CopyHttpHeaders(Orthanc::HttpClient& client,
                          const HttpHeaders& headers)
@@ -2036,6 +2170,20 @@
 
 
     void Execute(const OrthancStone::IObserver& receiver,
+                 SleepOracleCommand& command)
+    {
+      std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay()));
+
+      if (command.HasPayload())
+      {
+        copy->SetPayload(command.ReleasePayload());
+      }
+
+      sleepingCommands_.Add(receiver, copy.release());
+    }
+
+
+    void Execute(const OrthancStone::IObserver& receiver,
                  const OrthancRestApiCommand& command)
     {
       Orthanc::HttpClient client(orthanc_, command.GetUri());
@@ -2103,12 +2251,17 @@
 
       if (object.get() != NULL)
       {
-        const Item& item = dynamic_cast<Item&>(*object);
+        Item& item = dynamic_cast<Item&>(*object);
 
         try
         {
           switch (item.GetCommand().GetType())
           {
+            case IOracleCommand::Type_Sleep:
+              Execute(item.GetReceiver(), 
+                      dynamic_cast<SleepOracleCommand&>(item.GetCommand()));
+              break;
+
             case IOracleCommand::Type_OrthancRestApi:
               Execute(item.GetReceiver(), 
                       dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand()));
@@ -2143,7 +2296,7 @@
     }
 
 
-    static void Worker(NativeOracle* that)
+    static void Worker(ThreadedOracle* that)
     {
       assert(that != NULL);
       
@@ -2162,6 +2315,27 @@
     }
 
 
+    static void SleepingWorker(ThreadedOracle* that)
+    {
+      assert(that != NULL);
+      
+      for (;;)
+      {
+        {
+          boost::mutex::scoped_lock lock(that->mutex_);
+          if (that->state_ != State_Running)
+          {
+            return;
+          }
+        }
+
+        that->sleepingCommands_.AwakeExpired(that->emitter_);
+
+        boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_));
+      }
+    }
+
+
     void StopInternal()
     {
       {
@@ -2178,6 +2352,11 @@
         }
       }
 
+      if (sleepingWorker_.joinable())
+      {
+        sleepingWorker_.join();
+      }
+
       for (size_t i = 0; i < workers_.size(); i++)
       {
         if (workers_[i] != NULL)
@@ -2194,14 +2373,15 @@
 
 
   public:
-    NativeOracle(IMessageEmitter& emitter) :
+    ThreadedOracle(IMessageEmitter& emitter) :
       emitter_(emitter),
       state_(State_Setup),
-      workers_(4)
+      workers_(4),
+      sleepingTimeResolution_(50)  // By default, time resolution of 50ms
     {
     }
 
-    virtual ~NativeOracle()
+    virtual ~ThreadedOracle()
     {
       StopInternal();
     }
@@ -2238,6 +2418,24 @@
       }
     }
 
+    void SetSleepingTimeResolution(unsigned int milliseconds)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      if (milliseconds <= 0)
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
+      }
+      else if (state_ != State_Setup)
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        sleepingTimeResolution_ = milliseconds;
+      }
+    }
+
     void Start()
     {
       boost::mutex::scoped_lock lock(mutex_);
@@ -2254,6 +2452,8 @@
         {
           workers_[i] = new boost::thread(Worker, this);
         }
+
+        sleepingWorker_ = boost::thread(SleepingWorker, this);
       }      
     }
 
@@ -2345,6 +2545,11 @@
 class Toto : public OrthancStone::IObserver
 {
 private:
+  void Handle(const Refactoring::SleepOracleCommand::TimeoutMessage& message)
+  {
+    printf("TIMEOUT! %d\n", dynamic_cast<const Orthanc::SingleValueObject<unsigned int>& >(message.GetOrigin().GetPayload()).GetValue());
+  }
+
   void Handle(const Refactoring::OrthancRestApiCommand::SuccessMessage& message)
   {
     Json::Value v;
@@ -2385,6 +2590,10 @@
   {
     oracle.RegisterObserverCallback
       (new OrthancStone::Callable
+       <Toto, Refactoring::SleepOracleCommand::TimeoutMessage>(*this, &Toto::Handle));
+
+    oracle.RegisterObserverCallback
+      (new OrthancStone::Callable
        <Toto, Refactoring::OrthancRestApiCommand::SuccessMessage>(*this, &Toto::Handle));
 
     oracle.RegisterObserverCallback
@@ -2415,7 +2624,7 @@
     loader2.reset(new Refactoring::VolumeSeriesOrthancLoader(oracle, lock.GetOracleObservable()));
   }
 
-  if (1)
+  if (0)
   {
     Json::Value v = Json::objectValue;
     v["Level"] = "Series";
@@ -2429,7 +2638,7 @@
     oracle.Schedule(*toto, command.release());
   }
   
-  if (1)
+  if (0)
   {
     std::auto_ptr<Refactoring::GetOrthancImageCommand>  command(new Refactoring::GetOrthancImageCommand);
     command->SetHttpHeader("Accept", std::string(Orthanc::EnumerationToString(Orthanc::MimeType_Jpeg)));
@@ -2437,7 +2646,7 @@
     oracle.Schedule(*toto, command.release());
   }
   
-  if (1)
+  if (0)
   {
     std::auto_ptr<Refactoring::GetOrthancImageCommand>  command(new Refactoring::GetOrthancImageCommand);
     command->SetHttpHeader("Accept", std::string(Orthanc::EnumerationToString(Orthanc::MimeType_Png)));
@@ -2445,7 +2654,7 @@
     oracle.Schedule(*toto, command.release());
   }
   
-  if (1)
+  if (0)
   {
     std::auto_ptr<Refactoring::GetOrthancImageCommand>  command(new Refactoring::GetOrthancImageCommand);
     command->SetHttpHeader("Accept", std::string(Orthanc::EnumerationToString(Orthanc::MimeType_Png)));
@@ -2453,7 +2662,7 @@
     oracle.Schedule(*toto, command.release());
   }
   
-  if (1)
+  if (0)
   {
     std::auto_ptr<Refactoring::GetOrthancImageCommand>  command(new Refactoring::GetOrthancImageCommand);
     command->SetHttpHeader("Accept-Encoding", "gzip");
@@ -2462,7 +2671,7 @@
     oracle.Schedule(*toto, command.release());
   }
   
-  if (1)
+  if (0)
   {
     std::auto_ptr<Refactoring::GetOrthancImageCommand>  command(new Refactoring::GetOrthancImageCommand);
     command->SetHttpHeader("Accept", std::string(Orthanc::EnumerationToString(Orthanc::MimeType_Pam)));
@@ -2470,7 +2679,7 @@
     oracle.Schedule(*toto, command.release());
   }
 
-  if (1)
+  if (0)
   {
     std::auto_ptr<Refactoring::GetOrthancWebViewerJpegCommand>  command(new Refactoring::GetOrthancWebViewerJpegCommand);
     command->SetHttpHeader("Accept-Encoding", "gzip");
@@ -2480,12 +2689,23 @@
   }
 
 
+  if (0)
+  {
+    for (unsigned int i = 0; i < 10; i++)
+    {
+      std::auto_ptr<Refactoring::SleepOracleCommand> command(new Refactoring::SleepOracleCommand(i * 1000));
+      command->SetPayload(new Orthanc::SingleValueObject<unsigned int>(42 * i));
+      oracle.Schedule(*toto, command.release());
+    }
+  }
+
   // 2017-11-17-Anonymized
   //loader1->LoadSeries("cb3ea4d1-d08f3856-ad7b6314-74d88d77-60b05618");  // CT
   //loader2->LoadInstance("41029085-71718346-811efac4-420e2c15-d39f99b6");  // RT-DOSE
 
   // Delphine
-  loader1->LoadSeries("5990e39c-51e5f201-fe87a54c-31a55943-e59ef80e");  // CT
+  //loader1->LoadSeries("5990e39c-51e5f201-fe87a54c-31a55943-e59ef80e");  // CT
+  loader1->LoadSeries("67f1b334-02c16752-45026e40-a5b60b6b-030ecab5");  // Lung 1/10mm
 
   LOG(WARNING) << "...Waiting for Ctrl-C...";
   Orthanc::SystemToolbox::ServerBarrier();
@@ -2508,7 +2728,7 @@
   {
     Refactoring::NativeApplicationContext context;
 
-    Refactoring::NativeOracle oracle(context);
+    Refactoring::ThreadedOracle oracle(context);
 
     {
       Orthanc::WebServiceParameters p;