Mercurial > hg > orthanc-stone
view UnitTestsSources/TestMessageBroker2_promise_and_connect_ok.cpp @ 302:4a79193ffb58 am-callable-and-promise
support for custom messages + no leaks in unit-tests
author | am@osimis.io |
---|---|
date | Tue, 18 Sep 2018 18:04:53 +0200 |
parents | 3897f9f28cfa |
children | b70e9be013e4 |
line wrap: on
line source
/** * Stone of Orthanc * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics * Department, University Hospital of Liege, Belgium * Copyright (C) 2017-2018 Osimis S.A., Belgium * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Affero General Public License * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. **/ #include "gtest/gtest.h" #include <boost/noncopyable.hpp> #include <boost/function.hpp> #include <boost/bind.hpp> #include <string> #include <map> #include <set> int testCounter = 0; namespace { enum MessageType { MessageType_Test1, MessageType_Test2, MessageType_CustomMessage, MessageType_LastGenericStoneMessage }; struct IMessage : public boost::noncopyable { MessageType messageType_; public: IMessage(const MessageType& messageType) : messageType_(messageType) {} virtual ~IMessage() {} virtual int GetType() const {return messageType_;} }; struct ICustomMessage : public IMessage { int customMessageType_; public: ICustomMessage(int customMessageType) : IMessage(MessageType_CustomMessage), customMessageType_(customMessageType) {} virtual ~ICustomMessage() {} virtual int GetType() const {return customMessageType_;} }; class IObserver; class IObservable; class IPromiseTarget; class IPromiseSource; class Promise; /* * This is a central message broker. It keeps track of all observers and knows * when an observer is deleted. * This way, it can prevent an observable to send a message to a delete observer. * It does the same book-keeping for the IPromiseTarget and IPromiseSource */ class MessageBroker : public boost::noncopyable { std::set<IObserver*> activeObservers_; // the list of observers that are currently alive (that have not been deleted) std::set<IPromiseTarget*> activePromiseTargets_; std::set<IPromiseSource*> activePromiseSources_; public: void Register(IObserver& observer) { activeObservers_.insert(&observer); } void Unregister(IObserver& observer) { activeObservers_.erase(&observer); } void Register(IPromiseTarget& target) { activePromiseTargets_.insert(&target); } void Unregister(IPromiseTarget& target) { activePromiseTargets_.erase(&target); } void Register(IPromiseSource& source) { activePromiseSources_.insert(&source); } void Unregister(IPromiseSource& source) { activePromiseSources_.erase(&source); } void EmitMessage(IObservable& from, std::set<IObserver*> observers, const IMessage& message); bool IsActive(IPromiseTarget* target) { return activePromiseTargets_.find(target) != activePromiseTargets_.end(); } bool IsActive(IPromiseSource* source) { return activePromiseSources_.find(source) != activePromiseSources_.end(); } bool IsActive(IObserver* observer) { return activeObservers_.find(observer) != activeObservers_.end(); } }; struct IPromiseArgs { public: virtual ~IPromiseArgs() {} }; class EmptyPromiseArguments : public IPromiseArgs { }; class Promise : public boost::noncopyable { protected: MessageBroker& broker_; IPromiseTarget* successTarget_; boost::function<void (const IPromiseArgs& message)> successCallable_; IPromiseTarget* failureTarget_; boost::function<void (const IPromiseArgs& message)> failureCallable_; public: Promise(MessageBroker& broker) : broker_(broker), successTarget_(NULL), failureTarget_(NULL) { } void Success(const IPromiseArgs& message) { // check the target is still alive in the broker if (broker_.IsActive(successTarget_)) { successCallable_(message); } } void Failure(const IPromiseArgs& message) { // check the target is still alive in the broker if (broker_.IsActive(failureTarget_)) { failureCallable_(message); } } Promise& Then(IPromiseTarget* target, boost::function<void (const IPromiseArgs& message)> f) { if (successTarget_ != NULL) { // TODO: throw throw new "Promise may only have a single success target" } successTarget_ = target; successCallable_ = f; return *this; } Promise& Else(IPromiseTarget* target, boost::function<void (const IPromiseArgs& message)> f) { if (failureTarget_ != NULL) { // TODO: throw throw new "Promise may only have a single failure target" } failureTarget_ = target; failureCallable_ = f; return *this; } }; class IObserver : public boost::noncopyable { protected: MessageBroker& broker_; public: IObserver(MessageBroker& broker) : broker_(broker) { broker_.Register(*this); } virtual ~IObserver() { broker_.Unregister(*this); } }; class IPromiseTarget : public boost::noncopyable { protected: MessageBroker& broker_; public: IPromiseTarget(MessageBroker& broker) : broker_(broker) { broker_.Register(*this); } virtual ~IPromiseTarget() { broker_.Unregister(*this); } }; class IPromiseSource : public boost::noncopyable { protected: MessageBroker& broker_; public: IPromiseSource(MessageBroker& broker) : broker_(broker) { broker_.Register(*this); } virtual ~IPromiseSource() { broker_.Unregister(*this); } }; struct CallableObserver { IObserver* observer; boost::function<void (IObservable& from, const IMessage& message)> f; }; class IObservable : public boost::noncopyable { protected: MessageBroker& broker_; std::set<IObserver*> observers_; std::map<int, std::set<CallableObserver*> > callables_; public: IObservable(MessageBroker& broker) : broker_(broker) { } virtual ~IObservable() { } void EmitMessage(const IMessage& message) { //broker_.EmitMessage(*this, observers_, message); int messageType = message.GetType(); if (callables_.find(messageType) != callables_.end()) { for (std::set<CallableObserver*>::iterator observer = callables_[messageType].begin(); observer != callables_[messageType].end(); observer++) { CallableObserver* callable = *observer; if (broker_.IsActive(callable->observer)) { callable->f(*this, message); } } } } void RegisterObserver(IObserver& observer) { observers_.insert(&observer); } void UnregisterObserver(IObserver& observer) { observers_.erase(&observer); } //template<typename TObserver> void Connect(MessageType messageType, IObserver& observer, void (TObserver::*ptrToMemberHandler)(IObservable& from, const IMessage& message)) void Connect(int messageType, IObserver& observer, boost::function<void (IObservable& from, const IMessage& message)> f) { callables_[messageType] = std::set<CallableObserver*>(); CallableObserver* callable = new CallableObserver(); callable->observer = &observer; callable->f = f; callables_[messageType].insert(callable); } }; enum CustomMessageType { CustomMessageType_First = MessageType_LastGenericStoneMessage + 1, CustomMessageType_Completed }; class MyObservable : public IObservable { public: struct MyCustomMessage: public ICustomMessage { int payload_; MyCustomMessage(int payload) : ICustomMessage(CustomMessageType_Completed), payload_(payload) {} }; MyObservable(MessageBroker& broker) : IObservable(broker) {} }; class MyObserver : public IObserver { public: MyObserver(MessageBroker& broker) : IObserver(broker) {} void HandleCompletedMessage(IObservable& from, const IMessage& message) { const MyObservable::MyCustomMessage& msg = dynamic_cast<const MyObservable::MyCustomMessage&>(message); testCounter += msg.payload_; } }; class MyPromiseSource : public IPromiseSource { Promise* currentPromise_; public: struct MyPromiseArgs : public IPromiseArgs { int increment; }; MyPromiseSource(MessageBroker& broker) : IPromiseSource(broker), currentPromise_(NULL) {} Promise& StartSomethingAsync() { currentPromise_ = new Promise(broker_); return *currentPromise_; } void CompleteSomethingAsyncWithSuccess() { currentPromise_->Success(EmptyPromiseArguments()); delete currentPromise_; } void CompleteSomethingAsyncWithFailure() { currentPromise_->Failure(EmptyPromiseArguments()); delete currentPromise_; } }; class MyPromiseTarget : public IPromiseTarget { public: MyPromiseTarget(MessageBroker& broker) : IPromiseTarget(broker) {} void IncrementCounter(const IPromiseArgs& args) { testCounter++; } void DecrementCounter(const IPromiseArgs& args) { testCounter--; } }; } #define CONNECT_MESSAGES(observablePtr, messageType, observerPtr, observerFnPtr) (observablePtr)->Connect(messageType, *(observerPtr), boost::bind(observerFnPtr, observerPtr, _1, _2)) #define PTHEN(targetPtr, targetFnPtr) Then(targetPtr, boost::bind(targetFnPtr, targetPtr, _1)) #define PELSE(targetPtr, targetFnPtr) Else(targetPtr, boost::bind(targetFnPtr, targetPtr, _1)) TEST(MessageBroker2, TestPermanentConnectionSimpleUseCase) { MessageBroker broker; MyObservable observable(broker); MyObserver observer(broker); // create a permanent connection between an observable and an observer CONNECT_MESSAGES(&observable, CustomMessageType_Completed, &observer, &MyObserver::HandleCompletedMessage); testCounter = 0; observable.EmitMessage(MyObservable::MyCustomMessage(12)); ASSERT_EQ(12, testCounter); // the connection is permanent; if we emit the same message again, the observer will be notified again testCounter = 0; observable.EmitMessage(MyObservable::MyCustomMessage(20)); ASSERT_EQ(20, testCounter); } TEST(MessageBroker2, TestPermanentConnectionDeleteObserver) { MessageBroker broker; MyObservable observable(broker); MyObserver* observer = new MyObserver(broker); // create a permanent connection between an observable and an observer CONNECT_MESSAGES(&observable, CustomMessageType_Completed, observer, &MyObserver::HandleCompletedMessage); testCounter = 0; observable.EmitMessage(MyObservable::MyCustomMessage(12)); ASSERT_EQ(12, testCounter); // delete the observer and check that the callback is not called anymore delete observer; // the connection is permanent; if we emit the same message again, the observer will be notified again testCounter = 0; observable.EmitMessage(MyObservable::MyCustomMessage(20)); ASSERT_EQ(0, testCounter); } TEST(MessageBroker2, TestPromiseSuccessFailure) { MessageBroker broker; MyPromiseSource source(broker); MyPromiseTarget target(broker); // test a successful promise source.StartSomethingAsync() .PTHEN(&target, &MyPromiseTarget::IncrementCounter) .PELSE(&target, &MyPromiseTarget::DecrementCounter); testCounter = 0; source.CompleteSomethingAsyncWithSuccess(); ASSERT_EQ(1, testCounter); // test a failing promise source.StartSomethingAsync() .PTHEN(&target, &MyPromiseTarget::IncrementCounter) .PELSE(&target, &MyPromiseTarget::DecrementCounter); testCounter = 0; source.CompleteSomethingAsyncWithFailure(); ASSERT_EQ(-1, testCounter); } //TEST(MessageBroker2, TestPromiseDeleteTarget) //{ // MessageBroker broker; // MyPromiseSource source(broker); // MyPromiseTarget target(broker); // // test a successful promise // source.StartSomethingAsync() // .PTHEN(&target, &MyPromiseTarget::IncrementCounter) // .PELSE(&target, &MyPromiseTarget::DecrementCounter); // testCounter = 0; // source.CompleteSomethingAsyncWithSuccess(); // ASSERT_EQ(1, testCounter); // // test a failing promise // source.StartSomethingAsync() // .PTHEN(&target, &MyPromiseTarget::IncrementCounter) // .PELSE(&target, &MyPromiseTarget::DecrementCounter); // testCounter = 0; // source.CompleteSomethingAsyncWithFailure(); // ASSERT_EQ(-1, testCounter); //}