changeset 1098:17660df24c36 broker

simplification of IOracleRunner
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 25 Oct 2019 13:01:24 +0200
parents 4383382db01d
children 66e21ef2d657
files Framework/Messages/IMessage.h Framework/Messages/IMessageEmitter.h Framework/Messages/IObservable.cpp Framework/Messages/IObservable.h Framework/Messages/LockingEmitter.cpp Framework/Messages/LockingEmitter.h Framework/Oracle/CustomOracleCommand.h Framework/Oracle/GenericOracleRunner.cpp Framework/Oracle/GenericOracleRunner.h Framework/Oracle/GetOrthancImageCommand.cpp Framework/Oracle/GetOrthancImageCommand.h Framework/Oracle/GetOrthancWebViewerJpegCommand.cpp Framework/Oracle/GetOrthancWebViewerJpegCommand.h Framework/Oracle/IOracle.h Framework/Oracle/IOracleRunner.h Framework/Oracle/ThreadedOracle.cpp Framework/Oracle/ThreadedOracle.h
diffstat 17 files changed, 90 insertions(+), 96 deletions(-) [+]
line wrap: on
line diff
--- a/Framework/Messages/IMessage.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Messages/IMessage.h	Fri Oct 25 13:01:24 2019 +0200
@@ -21,6 +21,7 @@
 
 #pragma once
 
+#include <boost/lexical_cast.hpp>
 #include <boost/noncopyable.hpp>
 
 #include <string.h>
@@ -53,6 +54,11 @@
     {
     }
 
+    std::string AsString() const
+    {
+      return std::string(file_) + ":" + boost::lexical_cast<std::string>(line_);
+    }
+
     bool operator< (const MessageIdentifier& other) const
     {
       if (file_ == NULL)
--- a/Framework/Messages/IMessageEmitter.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Messages/IMessageEmitter.h	Fri Oct 25 13:01:24 2019 +0200
@@ -41,7 +41,7 @@
     {
     }
 
-    virtual void EmitMessage(boost::weak_ptr<IObserver>& observer,
+    virtual void EmitMessage(boost::weak_ptr<IObserver> observer,
                              const IMessage& message) = 0;
   };
 }
--- a/Framework/Messages/IObservable.cpp	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Messages/IObservable.cpp	Fri Oct 25 13:01:24 2019 +0200
@@ -94,7 +94,7 @@
   }
 
   
-  void IObservable::EmitMessage(boost::weak_ptr<IObserver>& observer,
+  void IObservable::EmitMessage(boost::weak_ptr<IObserver> observer,
                                 const IMessage& message)
   {
     LOG(TRACE) << "IObservable::EmitMessage observer = "
--- a/Framework/Messages/IObservable.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Messages/IObservable.h	Fri Oct 25 13:01:24 2019 +0200
@@ -48,7 +48,7 @@
 
     void BroadcastMessage(const IMessage& message);
 
-    void EmitMessage(boost::weak_ptr<IObserver>& observer,
+    void EmitMessage(boost::weak_ptr<IObserver> observer,
                      const IMessage& message);
   };
 }
--- a/Framework/Messages/LockingEmitter.cpp	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Messages/LockingEmitter.cpp	Fri Oct 25 13:01:24 2019 +0200
@@ -26,7 +26,7 @@
 {
   namespace Deprecated
   {
-    void LockingEmitter::EmitMessage(boost::weak_ptr<IObserver>& observer,
+    void LockingEmitter::EmitMessage(boost::weak_ptr<IObserver> observer,
                                      const IMessage& message)
     {
       try
--- a/Framework/Messages/LockingEmitter.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Messages/LockingEmitter.h	Fri Oct 25 13:01:24 2019 +0200
@@ -50,7 +50,7 @@
       IObservable          oracleObservable_;
 
     public:
-      virtual void EmitMessage(boost::weak_ptr<IObserver>& observer,
+      virtual void EmitMessage(boost::weak_ptr<IObserver> observer,
                                const IMessage& message) ORTHANC_OVERRIDE;
 
 
--- a/Framework/Oracle/CustomOracleCommand.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/CustomOracleCommand.h	Fri Oct 25 13:01:24 2019 +0200
@@ -23,8 +23,6 @@
 
 #include "IOracleRunner.h"
 
-#include "../Messages/IMessageEmitter.h"
-
 namespace OrthancStone
 {
   class CustomOracleCommand : public IOracleCommand
@@ -35,8 +33,6 @@
       return Type_Custom;
     }
 
-    virtual void Execute(IMessageEmitter& emitter,
-                         boost::weak_ptr<IObserver>& receiver,
-                         IOracleRunner& runner) = 0;
+    virtual IMessage* Execute(IOracleRunner& runner) = 0;
   };
 }
--- a/Framework/Oracle/GenericOracleRunner.cpp	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/GenericOracleRunner.cpp	Fri Oct 25 13:01:24 2019 +0200
@@ -87,9 +87,7 @@
   }
 
 
-  static void Execute(IMessageEmitter& emitter,
-                      boost::weak_ptr<IObserver>& receiver,
-                      const HttpCommand& command)
+  static IMessage* Execute(const HttpCommand& command)
   {
     Orthanc::HttpClient client;
     client.SetUrl(command.GetUrl());
@@ -115,15 +113,12 @@
 
     DecodeAnswer(answer, answerHeaders);
 
-    HttpCommand::SuccessMessage message(command, answerHeaders, answer);
-    emitter.EmitMessage(receiver, message);
+    return new HttpCommand::SuccessMessage(command, answerHeaders, answer);
   }
 
 
-  static void Execute(IMessageEmitter& emitter,
-                      const Orthanc::WebServiceParameters& orthanc,
-                      boost::weak_ptr<IObserver>& receiver,
-                      const OrthancRestApiCommand& command)
+  static IMessage* Execute(const Orthanc::WebServiceParameters& orthanc,
+                           const OrthancRestApiCommand& command)
   {
     Orthanc::HttpClient client(orthanc, command.GetUri());
     client.SetMethod(command.GetMethod());
@@ -143,15 +138,12 @@
 
     DecodeAnswer(answer, answerHeaders);
 
-    OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer);
-    emitter.EmitMessage(receiver, message);
+    return new OrthancRestApiCommand::SuccessMessage(command, answerHeaders, answer);
   }
 
 
-  static void Execute(IMessageEmitter& emitter,
-                      const Orthanc::WebServiceParameters& orthanc,
-                      boost::weak_ptr<IObserver>& receiver,
-                      const GetOrthancImageCommand& command)
+  static IMessage* Execute(const Orthanc::WebServiceParameters& orthanc,
+                           const GetOrthancImageCommand& command)
   {
     Orthanc::HttpClient client(orthanc, command.GetUri());
     client.SetTimeout(command.GetTimeout());
@@ -164,14 +156,12 @@
 
     DecodeAnswer(answer, answerHeaders);
 
-    command.ProcessHttpAnswer(emitter, receiver, answer, answerHeaders);
+    return command.ProcessHttpAnswer(answer, answerHeaders);
   }
 
 
-  static void Execute(IMessageEmitter& emitter,
-                      const Orthanc::WebServiceParameters& orthanc,
-                      boost::weak_ptr<IObserver>& receiver,
-                      const GetOrthancWebViewerJpegCommand& command)
+  static IMessage* Execute(const Orthanc::WebServiceParameters& orthanc,
+                           const GetOrthancWebViewerJpegCommand& command)
   {
     Orthanc::HttpClient client(orthanc, command.GetUri());
     client.SetTimeout(command.GetTimeout());
@@ -184,12 +174,11 @@
 
     DecodeAnswer(answer, answerHeaders);
 
-    command.ProcessHttpAnswer(emitter, receiver, answer);
+    return command.ProcessHttpAnswer(answer);
   }
 
 
-  void GenericOracleRunner::Run(boost::weak_ptr<IObserver>& receiver,
-                                IOracleCommand& command)
+  IMessage* GenericOracleRunner::Run(IOracleCommand& command)
   {
     try
     {
@@ -198,27 +187,21 @@
         case IOracleCommand::Type_Sleep:
           throw Orthanc::OrthancException(Orthanc::ErrorCode_BadParameterType,
                                           "Sleep command cannot be executed by the runner");
-          break;
 
         case IOracleCommand::Type_Http:
-          Execute(emitter_, receiver, dynamic_cast<const HttpCommand&>(command));
-          break;
+          return Execute(dynamic_cast<const HttpCommand&>(command));
 
         case IOracleCommand::Type_OrthancRestApi:
-          Execute(emitter_, orthanc_, receiver, dynamic_cast<const OrthancRestApiCommand&>(command));
-          break;
+          return Execute(orthanc_, dynamic_cast<const OrthancRestApiCommand&>(command));
 
         case IOracleCommand::Type_GetOrthancImage:
-          Execute(emitter_, orthanc_, receiver, dynamic_cast<const GetOrthancImageCommand&>(command));
-          break;
+          return Execute(orthanc_, dynamic_cast<const GetOrthancImageCommand&>(command));
 
         case IOracleCommand::Type_GetOrthancWebViewerJpeg:
-          Execute(emitter_, orthanc_, receiver, dynamic_cast<const GetOrthancWebViewerJpegCommand&>(command));
-          break;
+          return Execute(orthanc_, dynamic_cast<const GetOrthancWebViewerJpegCommand&>(command));
 
         case IOracleCommand::Type_Custom:
-          dynamic_cast<CustomOracleCommand&>(command).Execute(emitter_, receiver, *this);
-          break;
+          return dynamic_cast<CustomOracleCommand&>(command).Execute(*this);
 
         default:
           throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented);
@@ -227,13 +210,12 @@
     catch (Orthanc::OrthancException& e)
     {
       LOG(ERROR) << "Exception within the oracle: " << e.What();
-      emitter_.EmitMessage(receiver, OracleCommandExceptionMessage(command, e));
+      return new OracleCommandExceptionMessage(command, e);
     }
     catch (...)
     {
       LOG(ERROR) << "Threaded exception within the oracle";
-      emitter_.EmitMessage(receiver, OracleCommandExceptionMessage
-                           (command, Orthanc::ErrorCode_InternalError));
+      return new OracleCommandExceptionMessage(command, Orthanc::ErrorCode_InternalError);
     }
   }
 }
--- a/Framework/Oracle/GenericOracleRunner.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/GenericOracleRunner.h	Fri Oct 25 13:01:24 2019 +0200
@@ -21,7 +21,6 @@
 
 #pragma once
 
-#include "../Messages/IMessageEmitter.h"
 #include "IOracleRunner.h"
 
 #include <Core/Enumerations.h>  // For ORTHANC_OVERRIDE
@@ -32,18 +31,14 @@
   class GenericOracleRunner : public IOracleRunner
   {
   private:
-    IMessageEmitter&                      emitter_;
     const Orthanc::WebServiceParameters&  orthanc_;
 
   public:
-    GenericOracleRunner(IMessageEmitter&  emitter,
-                        const Orthanc::WebServiceParameters& orthanc) :
-      emitter_(emitter),
+    GenericOracleRunner(const Orthanc::WebServiceParameters& orthanc) :
       orthanc_(orthanc)
     {
     }
 
-    virtual void Run(boost::weak_ptr<IObserver>& receiver,
-                     IOracleCommand& command) ORTHANC_OVERRIDE;
+    virtual IMessage* Run(IOracleCommand& command) ORTHANC_OVERRIDE;
   };
 }
--- a/Framework/Oracle/GetOrthancImageCommand.cpp	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/GetOrthancImageCommand.cpp	Fri Oct 25 13:01:24 2019 +0200
@@ -82,10 +82,8 @@
     }
   }
 
-  void GetOrthancImageCommand::ProcessHttpAnswer(IMessageEmitter& emitter,
-                                                 boost::weak_ptr<IObserver>& receiver,
-                                                 const std::string& answer,
-                                                 const HttpHeaders& answerHeaders) const
+  IMessage* GetOrthancImageCommand::ProcessHttpAnswer(const std::string& answer,
+                                                      const HttpHeaders& answerHeaders) const
   {
     Orthanc::MimeType contentType = Orthanc::MimeType_Binary;
 
@@ -147,7 +145,6 @@
       }
     }
 
-    SuccessMessage message(*this, image.release(), contentType);
-    emitter.EmitMessage(receiver, message);
+    return new SuccessMessage(*this, image.release(), contentType);
   }
 }
--- a/Framework/Oracle/GetOrthancImageCommand.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/GetOrthancImageCommand.h	Fri Oct 25 13:01:24 2019 +0200
@@ -21,7 +21,7 @@
 
 #pragma once
 
-#include "../Messages/IMessageEmitter.h"
+#include "../Messages/IMessage.h"
 #include "OracleCommandWithPayload.h"
 
 #include <Core/Images/ImageAccessor.h>
@@ -111,9 +111,7 @@
       return timeout_;
     }
 
-    void ProcessHttpAnswer(IMessageEmitter& emitter,
-                           boost::weak_ptr<IObserver>& receiver,
-                           const std::string& answer,
-                           const HttpHeaders& answerHeaders) const;
+    IMessage* ProcessHttpAnswer(const std::string& answer,
+                                const HttpHeaders& answerHeaders) const;
   };
 }
--- a/Framework/Oracle/GetOrthancWebViewerJpegCommand.cpp	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/GetOrthancWebViewerJpegCommand.cpp	Fri Oct 25 13:01:24 2019 +0200
@@ -76,9 +76,7 @@
   }
 
 
-  void GetOrthancWebViewerJpegCommand::ProcessHttpAnswer(IMessageEmitter& emitter,
-                                                         boost::weak_ptr<IObserver>& receiver,
-                                                         const std::string& answer) const
+  IMessage* GetOrthancWebViewerJpegCommand::ProcessHttpAnswer(const std::string& answer) const
   {
     // This code comes from older "OrthancSlicesLoader::ParseSliceImageJpeg()"
       
@@ -149,9 +147,7 @@
       }
       else
       {
-        SuccessMessage message(*this, reader.release());
-        emitter.EmitMessage(receiver, message);
-        return;
+        return new SuccessMessage(*this, reader.release());
       }
     }
     
@@ -168,9 +164,7 @@
       }
       else
       {
-        SuccessMessage message(*this, reader.release());
-        emitter.EmitMessage(receiver, message);
-        return;
+        return new SuccessMessage(*this, reader.release());
       }
     }
     
@@ -210,8 +204,7 @@
       float offset = static_cast<float>(stretchLow) / scaling;
       Orthanc::ImageProcessing::ShiftScale(*image, offset, scaling, true);
     }
-    
-    SuccessMessage message(*this, image.release());
-    emitter.EmitMessage(receiver, message);
+
+    return new SuccessMessage(*this, image.release());
   }
 }
--- a/Framework/Oracle/GetOrthancWebViewerJpegCommand.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/GetOrthancWebViewerJpegCommand.h	Fri Oct 25 13:01:24 2019 +0200
@@ -21,7 +21,7 @@
 
 #pragma once
 
-#include "../Messages/IMessageEmitter.h"
+#include "../Messages/IMessage.h"
 #include "OracleCommandWithPayload.h"
 
 #include <Core/Images/ImageAccessor.h>
@@ -128,8 +128,6 @@
 
     std::string GetUri() const;
 
-    void ProcessHttpAnswer(IMessageEmitter& emitter,
-                           boost::weak_ptr<IObserver>& receiver,
-                           const std::string& answer) const;
+    IMessage* ProcessHttpAnswer(const std::string& answer) const;
   };
 }
--- a/Framework/Oracle/IOracle.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/IOracle.h	Fri Oct 25 13:01:24 2019 +0200
@@ -35,7 +35,12 @@
     {
     }
 
-    virtual void Schedule(boost::shared_ptr<IObserver>& receiver,
+    /**
+     * Returns "true" iff the command has actually been queued. If
+     * "false" is returned, the command has been freed, and it won't
+     * be processed (this is the case if the oracle is stopped).
+     **/
+    virtual bool Schedule(boost::shared_ptr<IObserver> receiver,
                           IOracleCommand* command) = 0;  // Takes ownership
   };
 }
--- a/Framework/Oracle/IOracleRunner.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/IOracleRunner.h	Fri Oct 25 13:01:24 2019 +0200
@@ -22,8 +22,7 @@
 #pragma once
 
 #include "IOracleCommand.h"
-
-#include <boost/weak_ptr.hpp>
+#include "../Messages/IMessage.h"
 
 namespace OrthancStone
 {
@@ -34,7 +33,6 @@
     {
     }
 
-    virtual void Run(boost::weak_ptr<IObserver>& receiver,
-                     IOracleCommand& command) = 0;
+    virtual IMessage* Run(IOracleCommand& command) = 0;
   };
 }
--- a/Framework/Oracle/ThreadedOracle.cpp	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/ThreadedOracle.cpp	Fri Oct 25 13:01:24 2019 +0200
@@ -46,7 +46,7 @@
       }
     }
 
-    boost::weak_ptr<IObserver>& GetReceiver()
+    boost::weak_ptr<IObserver> GetReceiver()
     {
       return receiver_;
     }
@@ -70,7 +70,7 @@
       boost::posix_time::ptime           expiration_;
 
     public:
-      Item(boost::weak_ptr<IObserver>& receiver,
+      Item(boost::weak_ptr<IObserver> receiver,
            SleepOracleCommand* command) :
         receiver_(receiver),
         command_(command)
@@ -115,7 +115,7 @@
       }
     }
 
-    void Add(boost::weak_ptr<IObserver>& receiver,
+    void Add(boost::weak_ptr<IObserver> receiver,
              SleepOracleCommand* command)   // Takes ownership
     {
       boost::mutex::scoped_lock lock(mutex_);
@@ -175,8 +175,10 @@
       }
       else
       {
-        GenericOracleRunner runner(emitter_, orthanc_);
-        runner.Run(item.GetReceiver(), item.GetCommand());
+        GenericOracleRunner runner(orthanc_);
+        std::auto_ptr<IMessage> message(runner.Run(item.GetCommand()));
+        
+        emitter_.EmitMessage(item.GetReceiver(), *message);
       }
     }
   }
@@ -255,8 +257,6 @@
         delete workers_[i];
       }
     } 
-
-    queue_.Clear();
   }
 
 
@@ -372,9 +372,34 @@
   }
 
 
-  void ThreadedOracle::Schedule(boost::shared_ptr<IObserver>& receiver,
+  bool ThreadedOracle::Schedule(boost::shared_ptr<IObserver> receiver,
                                 IOracleCommand* command)
   {
-    queue_.Enqueue(new Item(receiver, command));
+    std::auto_ptr<Item> item(new Item(receiver, command));
+
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      if (state_ == State_Running)
+      {
+        //LOG(INFO) << "New oracle command queued";
+        queue_.Enqueue(item.release());
+        return true;
+      }
+      else
+      {
+        LOG(INFO) << "Command not enqueued, as the oracle is stopped";
+
+        /**
+         * Answering "true" below results in a memory leak within
+         * "OracleScheduler", as the scheduler believes that the
+         * command is still active (i.e. pending to be executed by the
+         * oracle), hereby stalling the scheduler during its
+         * destruction (check out
+         * "sjo-playground/WebViewer/Backend/Leak")
+         **/
+        return false;
+      }
+    }
   }
 }
--- a/Framework/Oracle/ThreadedOracle.h	Thu Oct 24 22:31:18 2019 +0200
+++ b/Framework/Oracle/ThreadedOracle.h	Fri Oct 25 13:01:24 2019 +0200
@@ -31,6 +31,7 @@
 
 #include "IOracle.h"
 #include "GenericOracleRunner.h"
+#include "../Messages/IMessageEmitter.h"
 
 #include <Core/MultiThreading/SharedMessageQueue.h>
 
@@ -86,7 +87,7 @@
       StopInternal();
     }
 
-    virtual void Schedule(boost::shared_ptr<IObserver>& receiver,
+    virtual bool Schedule(boost::shared_ptr<IObserver> receiver,
                           IOracleCommand* command) ORTHANC_OVERRIDE;
   };
 }