diff UnitTestsSources/TestMessageBroker2.cpp @ 312:a91ad36b684c am-2

Merged am-callable-and-promise into am-2
author Alain Mazy <am@osimis.io>
date Tue, 02 Oct 2018 10:15:36 +0200
parents 4a79193ffb58
children dbfe2e9e5020
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/UnitTestsSources/TestMessageBroker2.cpp	Tue Oct 02 10:15:36 2018 +0200
@@ -0,0 +1,564 @@
+/**
+ * Stone of Orthanc
+ * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
+ * Department, University Hospital of Liege, Belgium
+ * Copyright (C) 2017-2018 Osimis S.A., Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "gtest/gtest.h"
+
+#include "Framework/Messages/MessageBroker.h"
+#include "Framework/Messages/Promise.h"
+#include "Framework/Messages/IObservable.h"
+#include "Framework/Messages/IObserver.h"
+#include "Framework/Messages/MessageForwarder.h"
+
+
+int testCounter = 0;
+namespace {
+
+//  class IObserver;
+//  class IObservable;
+//  class Promise;
+
+//  enum MessageType
+//  {
+//    MessageType_Test1,
+//    MessageType_Test2,
+
+//    MessageType_CustomMessage,
+//    MessageType_LastGenericStoneMessage
+//  };
+
+//  struct IMessage  : public boost::noncopyable
+//  {
+//    MessageType messageType_;
+//  public:
+//    IMessage(const MessageType& messageType)
+//      : messageType_(messageType)
+//    {}
+//    virtual ~IMessage() {}
+
+//    virtual int GetType() const {return messageType_;}
+//  };
+
+
+//  struct ICustomMessage  : public IMessage
+//  {
+//    int customMessageType_;
+//  public:
+//    ICustomMessage(int customMessageType)
+//      : IMessage(MessageType_CustomMessage),
+//        customMessageType_(customMessageType)
+//    {}
+//    virtual ~ICustomMessage() {}
+
+//    virtual int GetType() const {return customMessageType_;}
+//  };
+
+
+//  // This is referencing an object and member function that can be notified
+//  // by an IObservable.  The object must derive from IO
+//  // The member functions must be of type "void Function(const IMessage& message)" or reference a derived class of IMessage
+//  class ICallable : public boost::noncopyable
+//  {
+//  public:
+//    virtual ~ICallable()
+//    {
+//    }
+
+//    virtual void Apply(const IMessage& message) = 0;
+
+//    virtual MessageType GetMessageType() const = 0;
+//    virtual IObserver* GetObserver() const = 0;
+//  };
+
+//  template <typename TObserver,
+//            typename TMessage>
+//  class Callable : public ICallable
+//  {
+//  private:
+//    typedef void (TObserver::* MemberFunction) (const TMessage&);
+
+//    TObserver&      observer_;
+//    MemberFunction  function_;
+
+//  public:
+//    Callable(TObserver& observer,
+//             MemberFunction function) :
+//      observer_(observer),
+//      function_(function)
+//    {
+//    }
+
+//    void ApplyInternal(const TMessage& message)
+//    {
+//      (observer_.*function_) (message);
+//    }
+
+//    virtual void Apply(const IMessage& message)
+//    {
+//      ApplyInternal(dynamic_cast<const TMessage&>(message));
+//    }
+
+//    virtual MessageType GetMessageType() const
+//    {
+//      return static_cast<MessageType>(TMessage::Type);
+//    }
+
+//    virtual IObserver* GetObserver() const
+//    {
+//      return &observer_;
+//    }
+//  };
+
+
+
+
+//  /*
+//   * This is a central message broker.  It keeps track of all observers and knows
+//   * when an observer is deleted.
+//   * This way, it can prevent an observable to send a message to a delete observer.
+//   */
+//  class MessageBroker : public boost::noncopyable
+//  {
+
+//    std::set<IObserver*> activeObservers_;  // the list of observers that are currently alive (that have not been deleted)
+
+//  public:
+
+//    void Register(IObserver& observer)
+//    {
+//      activeObservers_.insert(&observer);
+//    }
+
+//    void Unregister(IObserver& observer)
+//    {
+//      activeObservers_.erase(&observer);
+//    }
+
+//    bool IsActive(IObserver* observer)
+//    {
+//      return activeObservers_.find(observer) != activeObservers_.end();
+//    }
+//  };
+
+
+//  class Promise : public boost::noncopyable
+//  {
+//  protected:
+//    MessageBroker&                    broker_;
+
+//    ICallable* successCallable_;
+//    ICallable* failureCallable_;
+
+//  public:
+//    Promise(MessageBroker& broker)
+//      : broker_(broker),
+//        successCallable_(NULL),
+//        failureCallable_(NULL)
+//    {
+//    }
+
+//    void Success(const IMessage& message)
+//    {
+//      // check the target is still alive in the broker
+//      if (broker_.IsActive(successCallable_->GetObserver()))
+//      {
+//        successCallable_->Apply(message);
+//      }
+//    }
+
+//    void Failure(const IMessage& message)
+//    {
+//      // check the target is still alive in the broker
+//      if (broker_.IsActive(failureCallable_->GetObserver()))
+//      {
+//        failureCallable_->Apply(message);
+//      }
+//    }
+
+//    Promise& Then(ICallable* successCallable)
+//    {
+//      if (successCallable_ != NULL)
+//      {
+//        // TODO: throw throw new "Promise may only have a single success target"
+//      }
+//      successCallable_ = successCallable;
+//      return *this;
+//    }
+
+//    Promise& Else(ICallable* failureCallable)
+//    {
+//      if (failureCallable_ != NULL)
+//      {
+//        // TODO: throw throw new "Promise may only have a single failure target"
+//      }
+//      failureCallable_ = failureCallable;
+//      return *this;
+//    }
+
+//  };
+
+//  class IObserver : public boost::noncopyable
+//  {
+//  protected:
+//    MessageBroker&                    broker_;
+
+//  public:
+//    IObserver(MessageBroker& broker)
+//      : broker_(broker)
+//    {
+//      broker_.Register(*this);
+//    }
+
+//    virtual ~IObserver()
+//    {
+//      broker_.Unregister(*this);
+//    }
+
+//  };
+
+
+//  class IObservable : public boost::noncopyable
+//  {
+//  protected:
+//    MessageBroker&                     broker_;
+
+//    typedef std::map<int, std::set<ICallable*> >   Callables;
+//    Callables  callables_;
+//  public:
+
+//    IObservable(MessageBroker& broker)
+//      : broker_(broker)
+//    {
+//    }
+
+//    virtual ~IObservable()
+//    {
+//      for (Callables::const_iterator it = callables_.begin();
+//           it != callables_.end(); ++it)
+//      {
+//        for (std::set<ICallable*>::const_iterator
+//               it2 = it->second.begin(); it2 != it->second.end(); ++it2)
+//        {
+//          delete *it2;
+//        }
+//      }
+//    }
+
+//    void Register(ICallable* callable)
+//    {
+//      MessageType messageType = callable->GetMessageType();
+
+//      callables_[messageType].insert(callable);
+//    }
+
+//    void EmitMessage(const IMessage& message)
+//    {
+//      Callables::const_iterator found = callables_.find(message.GetType());
+
+//      if (found != callables_.end())
+//      {
+//        for (std::set<ICallable*>::const_iterator
+//               it = found->second.begin(); it != found->second.end(); ++it)
+//        {
+//          if (broker_.IsActive((*it)->GetObserver()))
+//          {
+//            (*it)->Apply(message);
+//          }
+//        }
+//      }
+//    }
+
+//  };
+  using namespace OrthancStone;
+
+
+  enum CustomMessageType
+  {
+    CustomMessageType_First = MessageType_CustomMessage + 1,
+
+    CustomMessageType_Completed,
+    CustomMessageType_Increment
+  };
+
+
+  class MyObservable : public IObservable
+  {
+  public:
+    struct MyCustomMessage: public BaseMessage<CustomMessageType_Completed>
+    {
+      int payload_;
+
+      MyCustomMessage(int payload)
+        : BaseMessage(),
+          payload_(payload)
+      {}
+    };
+
+    MyObservable(MessageBroker& broker)
+      : IObservable(broker)
+    {}
+
+  };
+
+  class MyObserver : public IObserver
+  {
+  public:
+    MyObserver(MessageBroker& broker)
+      : IObserver(broker)
+    {}
+
+    void HandleCompletedMessage(const MyObservable::MyCustomMessage& message)
+    {
+      testCounter += message.payload_;
+    }
+
+  };
+
+
+  class MyIntermediate : public IObserver, public IObservable
+  {
+    IObservable& observedObject_;
+  public:
+    MyIntermediate(MessageBroker& broker, IObservable& observedObject)
+      : IObserver(broker),
+        IObservable(broker),
+        observedObject_(observedObject)
+    {
+      observedObject_.RegisterObserverCallback(new MessageForwarder<MyObservable::MyCustomMessage>(broker, *this));
+    }
+  };
+
+
+  class MyPromiseSource : public IObservable
+  {
+    Promise* currentPromise_;
+  public:
+    struct MyPromiseMessage: public BaseMessage<MessageType_Test1>
+    {
+      int increment;
+
+      MyPromiseMessage(int increment)
+        : BaseMessage(),
+          increment(increment)
+      {}
+    };
+
+    MyPromiseSource(MessageBroker& broker)
+      : IObservable(broker),
+        currentPromise_(NULL)
+    {}
+
+    Promise& StartSomethingAsync()
+    {
+      currentPromise_ = new Promise(broker_);
+      return *currentPromise_;
+    }
+
+    void CompleteSomethingAsyncWithSuccess(int payload)
+    {
+      currentPromise_->Success(MyPromiseMessage(payload));
+      delete currentPromise_;
+    }
+
+    void CompleteSomethingAsyncWithFailure(int payload)
+    {
+      currentPromise_->Failure(MyPromiseMessage(payload));
+      delete currentPromise_;
+    }
+  };
+
+
+  class MyPromiseTarget : public IObserver
+  {
+  public:
+    MyPromiseTarget(MessageBroker& broker)
+      : IObserver(broker)
+    {}
+
+    void IncrementCounter(const MyPromiseSource::MyPromiseMessage& args)
+    {
+      testCounter += args.increment;
+    }
+
+    void DecrementCounter(const MyPromiseSource::MyPromiseMessage& args)
+    {
+      testCounter -= args.increment;
+    }
+  };
+}
+
+
+TEST(MessageBroker2, TestPermanentConnectionSimpleUseCase)
+{
+  MessageBroker broker;
+  MyObservable  observable(broker);
+  MyObserver    observer(broker);
+
+  // create a permanent connection between an observable and an observer
+  observable.RegisterObserverCallback(new Callable<MyObserver, MyObservable::MyCustomMessage>(observer, &MyObserver::HandleCompletedMessage));
+
+  testCounter = 0;
+  observable.EmitMessage(MyObservable::MyCustomMessage(12));
+  ASSERT_EQ(12, testCounter);
+
+  // the connection is permanent; if we emit the same message again, the observer will be notified again
+  testCounter = 0;
+  observable.EmitMessage(MyObservable::MyCustomMessage(20));
+  ASSERT_EQ(20, testCounter);
+}
+
+TEST(MessageBroker2, TestPermanentConnectionDeleteObserver)
+{
+  MessageBroker broker;
+  MyObservable  observable(broker);
+  MyObserver*   observer = new MyObserver(broker);
+
+  // create a permanent connection between an observable and an observer
+  observable.RegisterObserverCallback(new Callable<MyObserver, MyObservable::MyCustomMessage>(*observer, &MyObserver::HandleCompletedMessage));
+
+  testCounter = 0;
+  observable.EmitMessage(MyObservable::MyCustomMessage(12));
+  ASSERT_EQ(12, testCounter);
+
+  // delete the observer and check that the callback is not called anymore
+  delete observer;
+
+  // the connection is permanent; if we emit the same message again, the observer will be notified again
+  testCounter = 0;
+  observable.EmitMessage(MyObservable::MyCustomMessage(20));
+  ASSERT_EQ(0, testCounter);
+}
+
+TEST(MessageBroker2, TestMessageForwarderSimpleUseCase)
+{
+  MessageBroker broker;
+  MyObservable  observable(broker);
+  MyIntermediate intermediate(broker, observable);
+  MyObserver    observer(broker);
+
+  // let the observer observers the intermediate that is actually forwarding the messages from the observable
+  intermediate.RegisterObserverCallback(new Callable<MyObserver, MyObservable::MyCustomMessage>(observer, &MyObserver::HandleCompletedMessage));
+
+  testCounter = 0;
+  observable.EmitMessage(MyObservable::MyCustomMessage(12));
+  ASSERT_EQ(12, testCounter);
+
+  // the connection is permanent; if we emit the same message again, the observer will be notified again
+  testCounter = 0;
+  observable.EmitMessage(MyObservable::MyCustomMessage(20));
+  ASSERT_EQ(20, testCounter);
+}
+
+
+TEST(MessageBroker2, TestMessageForwarderDeleteIntermediate)
+{
+  MessageBroker broker;
+  MyObservable  observable(broker);
+  MyIntermediate* intermediate = new MyIntermediate(broker, observable);
+  MyObserver    observer(broker);
+
+  // let the observer observers the intermediate that is actually forwarding the messages from the observable
+  intermediate->RegisterObserverCallback(new Callable<MyObserver, MyObservable::MyCustomMessage>(observer, &MyObserver::HandleCompletedMessage));
+
+  testCounter = 0;
+  observable.EmitMessage(MyObservable::MyCustomMessage(12));
+  ASSERT_EQ(12, testCounter);
+
+  delete intermediate;
+
+  observable.EmitMessage(MyObservable::MyCustomMessage(20));
+  ASSERT_EQ(12, testCounter);
+}
+
+TEST(MessageBroker2, TestCustomMessage)
+{
+  MessageBroker broker;
+  MyObservable  observable(broker);
+  MyIntermediate intermediate(broker, observable);
+  MyObserver    observer(broker);
+
+  // let the observer observers the intermediate that is actually forwarding the messages from the observable
+  intermediate.RegisterObserverCallback(new Callable<MyObserver, MyObservable::MyCustomMessage>(observer, &MyObserver::HandleCompletedMessage));
+
+  testCounter = 0;
+  observable.EmitMessage(MyObservable::MyCustomMessage(12));
+  ASSERT_EQ(12, testCounter);
+
+  // the connection is permanent; if we emit the same message again, the observer will be notified again
+  testCounter = 0;
+  observable.EmitMessage(MyObservable::MyCustomMessage(20));
+  ASSERT_EQ(20, testCounter);
+}
+
+
+TEST(MessageBroker2, TestPromiseSuccessFailure)
+{
+  MessageBroker broker;
+  MyPromiseSource  source(broker);
+  MyPromiseTarget target(broker);
+
+  // test a successful promise
+  source.StartSomethingAsync()
+      .Then(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(target, &MyPromiseTarget::IncrementCounter))
+      .Else(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(target, &MyPromiseTarget::DecrementCounter));
+
+  testCounter = 0;
+  source.CompleteSomethingAsyncWithSuccess(10);
+  ASSERT_EQ(10, testCounter);
+
+  // test a failing promise
+  source.StartSomethingAsync()
+      .Then(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(target, &MyPromiseTarget::IncrementCounter))
+      .Else(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(target, &MyPromiseTarget::DecrementCounter));
+
+  testCounter = 0;
+  source.CompleteSomethingAsyncWithFailure(15);
+  ASSERT_EQ(-15, testCounter);
+}
+
+TEST(MessageBroker2, TestPromiseDeleteTarget)
+{
+  MessageBroker broker;
+  MyPromiseSource source(broker);
+  MyPromiseTarget* target = new MyPromiseTarget(broker);
+
+  // create the promise
+  source.StartSomethingAsync()
+      .Then(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(*target, &MyPromiseTarget::IncrementCounter))
+      .Else(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(*target, &MyPromiseTarget::DecrementCounter));
+
+  // delete the promise target
+  delete target;
+
+  // trigger the promise, make sure it does not throw and does not call the callback
+  testCounter = 0;
+  source.CompleteSomethingAsyncWithSuccess(10);
+  ASSERT_EQ(0, testCounter);
+
+  // test a failing promise
+  source.StartSomethingAsync()
+      .Then(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(*target, &MyPromiseTarget::IncrementCounter))
+      .Else(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(*target, &MyPromiseTarget::DecrementCounter));
+
+  testCounter = 0;
+  source.CompleteSomethingAsyncWithFailure(15);
+  ASSERT_EQ(0, testCounter);
+}
+