comparison UnitTestsSources/TestMessageBroker2.cpp @ 300:b4abaeb783b1 am-callable-and-promise

messaging refactoring almost complete: works fine in native
author am@osimis.io
date Tue, 18 Sep 2018 15:23:21 +0200
parents 3897f9f28cfa
children 4a79193ffb58
comparison
equal deleted inserted replaced
299:3897f9f28cfa 300:b4abaeb783b1
20 20
21 21
22 #include "gtest/gtest.h" 22 #include "gtest/gtest.h"
23 23
24 #include "Framework/Messages/MessageBroker.h" 24 #include "Framework/Messages/MessageBroker.h"
25 25 #include "Framework/Messages/Promise.h"
26 #include <boost/noncopyable.hpp> 26 #include "Framework/Messages/IObservable.h"
27 #include <boost/function.hpp> 27 #include "Framework/Messages/IObserver.h"
28 #include <boost/bind.hpp> 28 #include "Framework/Messages/MessageForwarder.h"
29 29
30 #include <string>
31 #include <map>
32 #include <set>
33 30
34 int testCounter = 0; 31 int testCounter = 0;
35 namespace { 32 namespace {
36 33
37 class IObserver; 34 // class IObserver;
38 class IObservable; 35 // class IObservable;
39 class Promise; 36 // class Promise;
40 37
41 enum MessageType 38 // enum MessageType
42 { 39 // {
43 MessageType_Test1, 40 // MessageType_Test1,
44 MessageType_Test2, 41 // MessageType_Test2,
45 42
46 MessageType_CustomMessage, 43 // MessageType_CustomMessage,
47 MessageType_LastGenericStoneMessage 44 // MessageType_LastGenericStoneMessage
48 }; 45 // };
49 46
50 struct IMessage : public boost::noncopyable 47 // struct IMessage : public boost::noncopyable
51 { 48 // {
52 MessageType messageType_; 49 // MessageType messageType_;
53 public: 50 // public:
54 IMessage(const MessageType& messageType) 51 // IMessage(const MessageType& messageType)
55 : messageType_(messageType) 52 // : messageType_(messageType)
56 {} 53 // {}
57 virtual ~IMessage() {} 54 // virtual ~IMessage() {}
58 55
59 virtual int GetType() const {return messageType_;} 56 // virtual int GetType() const {return messageType_;}
60 }; 57 // };
61 58
62 59
63 struct ICustomMessage : public IMessage 60 // struct ICustomMessage : public IMessage
64 { 61 // {
65 int customMessageType_; 62 // int customMessageType_;
66 public: 63 // public:
67 ICustomMessage(int customMessageType) 64 // ICustomMessage(int customMessageType)
68 : IMessage(MessageType_CustomMessage), 65 // : IMessage(MessageType_CustomMessage),
69 customMessageType_(customMessageType) 66 // customMessageType_(customMessageType)
70 {} 67 // {}
71 virtual ~ICustomMessage() {} 68 // virtual ~ICustomMessage() {}
72 69
73 virtual int GetType() const {return customMessageType_;} 70 // virtual int GetType() const {return customMessageType_;}
74 }; 71 // };
75 72
76 73
77 // This is referencing an object and member function that can be notified 74 // // This is referencing an object and member function that can be notified
78 // by an IObservable. The object must derive from IO 75 // // by an IObservable. The object must derive from IO
79 // The member functions must be of type "void Function(const IMessage& message)" or reference a derived class of IMessage 76 // // The member functions must be of type "void Function(const IMessage& message)" or reference a derived class of IMessage
80 class ICallable : public boost::noncopyable 77 // class ICallable : public boost::noncopyable
81 { 78 // {
82 public: 79 // public:
83 virtual ~ICallable() 80 // virtual ~ICallable()
84 { 81 // {
85 } 82 // }
86 83
87 virtual void Apply(const IMessage& message) = 0; 84 // virtual void Apply(const IMessage& message) = 0;
88 85
89 virtual MessageType GetMessageType() const = 0; 86 // virtual MessageType GetMessageType() const = 0;
90 virtual IObserver* GetObserver() const = 0; 87 // virtual IObserver* GetObserver() const = 0;
91 }; 88 // };
92 89
93 template <typename TObserver, 90 // template <typename TObserver,
94 typename TMessage> 91 // typename TMessage>
95 class Callable : public ICallable 92 // class Callable : public ICallable
96 { 93 // {
97 private: 94 // private:
98 typedef void (TObserver::* MemberFunction) (const TMessage&); 95 // typedef void (TObserver::* MemberFunction) (const TMessage&);
99 96
100 TObserver& observer_; 97 // TObserver& observer_;
101 MemberFunction function_; 98 // MemberFunction function_;
102 99
103 public: 100 // public:
104 Callable(TObserver& observer, 101 // Callable(TObserver& observer,
105 MemberFunction function) : 102 // MemberFunction function) :
106 observer_(observer), 103 // observer_(observer),
107 function_(function) 104 // function_(function)
108 { 105 // {
109 } 106 // }
110 107
111 void ApplyInternal(const TMessage& message) 108 // void ApplyInternal(const TMessage& message)
112 { 109 // {
113 (observer_.*function_) (message); 110 // (observer_.*function_) (message);
114 } 111 // }
115 112
116 virtual void Apply(const IMessage& message) 113 // virtual void Apply(const IMessage& message)
117 { 114 // {
118 ApplyInternal(dynamic_cast<const TMessage&>(message)); 115 // ApplyInternal(dynamic_cast<const TMessage&>(message));
119 } 116 // }
120 117
121 virtual MessageType GetMessageType() const 118 // virtual MessageType GetMessageType() const
122 { 119 // {
123 return static_cast<MessageType>(TMessage::Type); 120 // return static_cast<MessageType>(TMessage::Type);
124 } 121 // }
125 122
126 virtual IObserver* GetObserver() const 123 // virtual IObserver* GetObserver() const
127 { 124 // {
128 return &observer_; 125 // return &observer_;
129 } 126 // }
130 }; 127 // };
131 128
132 129
133 130
134 131
135 /* 132 // /*
136 * This is a central message broker. It keeps track of all observers and knows 133 // * This is a central message broker. It keeps track of all observers and knows
137 * when an observer is deleted. 134 // * when an observer is deleted.
138 * This way, it can prevent an observable to send a message to a delete observer. 135 // * This way, it can prevent an observable to send a message to a delete observer.
139 */ 136 // */
140 class MessageBroker : public boost::noncopyable 137 // class MessageBroker : public boost::noncopyable
141 { 138 // {
142 139
143 std::set<IObserver*> activeObservers_; // the list of observers that are currently alive (that have not been deleted) 140 // std::set<IObserver*> activeObservers_; // the list of observers that are currently alive (that have not been deleted)
144 141
145 public: 142 // public:
146 143
147 void Register(IObserver& observer) 144 // void Register(IObserver& observer)
148 { 145 // {
149 activeObservers_.insert(&observer); 146 // activeObservers_.insert(&observer);
150 } 147 // }
151 148
152 void Unregister(IObserver& observer) 149 // void Unregister(IObserver& observer)
153 { 150 // {
154 activeObservers_.erase(&observer); 151 // activeObservers_.erase(&observer);
155 } 152 // }
156 153
157 bool IsActive(IObserver* observer) 154 // bool IsActive(IObserver* observer)
158 { 155 // {
159 return activeObservers_.find(observer) != activeObservers_.end(); 156 // return activeObservers_.find(observer) != activeObservers_.end();
160 } 157 // }
161 }; 158 // };
162 159
163 160
164 class Promise : public boost::noncopyable 161 // class Promise : public boost::noncopyable
165 { 162 // {
166 protected: 163 // protected:
167 MessageBroker& broker_; 164 // MessageBroker& broker_;
168 165
169 ICallable* successCallable_; 166 // ICallable* successCallable_;
170 ICallable* failureCallable_; 167 // ICallable* failureCallable_;
171 168
172 public: 169 // public:
173 Promise(MessageBroker& broker) 170 // Promise(MessageBroker& broker)
174 : broker_(broker), 171 // : broker_(broker),
175 successCallable_(NULL), 172 // successCallable_(NULL),
176 failureCallable_(NULL) 173 // failureCallable_(NULL)
177 { 174 // {
178 } 175 // }
179 176
180 void Success(const IMessage& message) 177 // void Success(const IMessage& message)
181 { 178 // {
182 // check the target is still alive in the broker 179 // // check the target is still alive in the broker
183 if (broker_.IsActive(successCallable_->GetObserver())) 180 // if (broker_.IsActive(successCallable_->GetObserver()))
184 { 181 // {
185 successCallable_->Apply(message); 182 // successCallable_->Apply(message);
186 } 183 // }
187 } 184 // }
188 185
189 void Failure(const IMessage& message) 186 // void Failure(const IMessage& message)
190 { 187 // {
191 // check the target is still alive in the broker 188 // // check the target is still alive in the broker
192 if (broker_.IsActive(failureCallable_->GetObserver())) 189 // if (broker_.IsActive(failureCallable_->GetObserver()))
193 { 190 // {
194 failureCallable_->Apply(message); 191 // failureCallable_->Apply(message);
195 } 192 // }
196 } 193 // }
197 194
198 Promise& Then(ICallable* successCallable) 195 // Promise& Then(ICallable* successCallable)
199 { 196 // {
200 if (successCallable_ != NULL) 197 // if (successCallable_ != NULL)
201 { 198 // {
202 // TODO: throw throw new "Promise may only have a single success target" 199 // // TODO: throw throw new "Promise may only have a single success target"
203 } 200 // }
204 successCallable_ = successCallable; 201 // successCallable_ = successCallable;
205 return *this; 202 // return *this;
206 } 203 // }
207 204
208 Promise& Else(ICallable* failureCallable) 205 // Promise& Else(ICallable* failureCallable)
209 { 206 // {
210 if (failureCallable_ != NULL) 207 // if (failureCallable_ != NULL)
211 { 208 // {
212 // TODO: throw throw new "Promise may only have a single failure target" 209 // // TODO: throw throw new "Promise may only have a single failure target"
213 } 210 // }
214 failureCallable_ = failureCallable; 211 // failureCallable_ = failureCallable;
215 return *this; 212 // return *this;
216 } 213 // }
217 214
218 }; 215 // };
219 216
220 class IObserver : public boost::noncopyable 217 // class IObserver : public boost::noncopyable
221 { 218 // {
222 protected: 219 // protected:
223 MessageBroker& broker_; 220 // MessageBroker& broker_;
224 221
225 public: 222 // public:
226 IObserver(MessageBroker& broker) 223 // IObserver(MessageBroker& broker)
227 : broker_(broker) 224 // : broker_(broker)
228 { 225 // {
229 broker_.Register(*this); 226 // broker_.Register(*this);
230 } 227 // }
231 228
232 virtual ~IObserver() 229 // virtual ~IObserver()
233 { 230 // {
234 broker_.Unregister(*this); 231 // broker_.Unregister(*this);
235 } 232 // }
236 233
237 }; 234 // };
238 235
239 236
240 class IObservable : public boost::noncopyable 237 // class IObservable : public boost::noncopyable
241 { 238 // {
242 protected: 239 // protected:
243 MessageBroker& broker_; 240 // MessageBroker& broker_;
244 241
245 typedef std::map<int, std::set<ICallable*> > Callables; 242 // typedef std::map<int, std::set<ICallable*> > Callables;
246 Callables callables_; 243 // Callables callables_;
247 public: 244 // public:
248 245
249 IObservable(MessageBroker& broker) 246 // IObservable(MessageBroker& broker)
250 : broker_(broker) 247 // : broker_(broker)
251 { 248 // {
252 } 249 // }
253 250
254 virtual ~IObservable() 251 // virtual ~IObservable()
255 { 252 // {
256 for (Callables::const_iterator it = callables_.begin(); 253 // for (Callables::const_iterator it = callables_.begin();
257 it != callables_.end(); ++it) 254 // it != callables_.end(); ++it)
258 { 255 // {
259 for (std::set<ICallable*>::const_iterator 256 // for (std::set<ICallable*>::const_iterator
260 it2 = it->second.begin(); it2 != it->second.end(); ++it2) 257 // it2 = it->second.begin(); it2 != it->second.end(); ++it2)
261 { 258 // {
262 delete *it2; 259 // delete *it2;
263 } 260 // }
264 } 261 // }
265 } 262 // }
266 263
267 void Register(ICallable* callable) 264 // void Register(ICallable* callable)
268 { 265 // {
269 MessageType messageType = callable->GetMessageType(); 266 // MessageType messageType = callable->GetMessageType();
270 267
271 callables_[messageType].insert(callable); 268 // callables_[messageType].insert(callable);
272 } 269 // }
273 270
274 void EmitMessage(const IMessage& message) 271 // void EmitMessage(const IMessage& message)
275 { 272 // {
276 Callables::const_iterator found = callables_.find(message.GetType()); 273 // Callables::const_iterator found = callables_.find(message.GetType());
277 274
278 if (found != callables_.end()) 275 // if (found != callables_.end())
279 { 276 // {
280 for (std::set<ICallable*>::const_iterator 277 // for (std::set<ICallable*>::const_iterator
281 it = found->second.begin(); it != found->second.end(); ++it) 278 // it = found->second.begin(); it != found->second.end(); ++it)
282 { 279 // {
283 if (broker_.IsActive((*it)->GetObserver())) 280 // if (broker_.IsActive((*it)->GetObserver()))
284 { 281 // {
285 (*it)->Apply(message); 282 // (*it)->Apply(message);
286 } 283 // }
287 } 284 // }
288 } 285 // }
289 } 286 // }
290 287
291 }; 288 // };
292 289
293 290
294 enum CustomMessageType 291 // enum CustomMessageType
295 { 292 // {
296 CustomMessageType_First = MessageType_LastGenericStoneMessage + 1, 293 // CustomMessageType_First = MessageType_LastGenericStoneMessage + 1,
297 294
298 CustomMessageType_Completed, 295 // CustomMessageType_Completed,
299 CustomMessageType_Increment 296 // CustomMessageType_Increment
300 }; 297 // };
298
299 using namespace OrthancStone;
301 300
302 class MyObservable : public IObservable 301 class MyObservable : public IObservable
303 { 302 {
304 public: 303 public:
305 struct MyCustomMessage: public ICustomMessage 304 struct MyCustomMessage: public BaseMessage<MessageType_Test1>
306 { 305 {
307 int payload_; 306 int payload_;
308 enum
309 {
310 Type = CustomMessageType_Completed
311 };
312 307
313 MyCustomMessage(int payload) 308 MyCustomMessage(int payload)
314 : ICustomMessage(Type), 309 : BaseMessage(),
315 payload_(payload) 310 payload_(payload)
316 {} 311 {}
317 }; 312 };
318 313
319 MyObservable(MessageBroker& broker) 314 MyObservable(MessageBroker& broker)
335 } 330 }
336 331
337 }; 332 };
338 333
339 334
335 class MyIntermediate : public IObserver, public IObservable
336 {
337 IObservable& observedObject_;
338 public:
339 MyIntermediate(MessageBroker& broker, IObservable& observedObject)
340 : IObserver(broker),
341 IObservable(broker),
342 observedObject_(observedObject)
343 {
344 observedObject_.RegisterObserverCallback(new MessageForwarder<MyObservable::MyCustomMessage>(broker, *this));
345 }
346 };
347
348
340 class MyPromiseSource : public IObservable 349 class MyPromiseSource : public IObservable
341 { 350 {
342 Promise* currentPromise_; 351 Promise* currentPromise_;
343 public: 352 public:
344 struct MyPromiseMessage: public ICustomMessage 353 struct MyPromiseMessage: public BaseMessage<MessageType_Test1>
345 { 354 {
346 int increment; 355 int increment;
347 enum
348 {
349 Type = CustomMessageType_Increment
350 };
351 356
352 MyPromiseMessage(int increment) 357 MyPromiseMessage(int increment)
353 : ICustomMessage(Type), 358 : BaseMessage(),
354 increment(increment) 359 increment(increment)
355 {} 360 {}
356 }; 361 };
357 362
358 MyPromiseSource(MessageBroker& broker) 363 MyPromiseSource(MessageBroker& broker)
405 MessageBroker broker; 410 MessageBroker broker;
406 MyObservable observable(broker); 411 MyObservable observable(broker);
407 MyObserver observer(broker); 412 MyObserver observer(broker);
408 413
409 // create a permanent connection between an observable and an observer 414 // create a permanent connection between an observable and an observer
410 observable.Register(new Callable<MyObserver, MyObservable::MyCustomMessage>(observer, &MyObserver::HandleCompletedMessage)); 415 observable.RegisterObserverCallback(new Callable<MyObserver, MyObservable::MyCustomMessage>(observer, &MyObserver::HandleCompletedMessage));
411 416
412 testCounter = 0; 417 testCounter = 0;
413 observable.EmitMessage(MyObservable::MyCustomMessage(12)); 418 observable.EmitMessage(MyObservable::MyCustomMessage(12));
414 ASSERT_EQ(12, testCounter); 419 ASSERT_EQ(12, testCounter);
415 420
424 MessageBroker broker; 429 MessageBroker broker;
425 MyObservable observable(broker); 430 MyObservable observable(broker);
426 MyObserver* observer = new MyObserver(broker); 431 MyObserver* observer = new MyObserver(broker);
427 432
428 // create a permanent connection between an observable and an observer 433 // create a permanent connection between an observable and an observer
429 observable.Register(new Callable<MyObserver, MyObservable::MyCustomMessage>(*observer, &MyObserver::HandleCompletedMessage)); 434 observable.RegisterObserverCallback(new Callable<MyObserver, MyObservable::MyCustomMessage>(*observer, &MyObserver::HandleCompletedMessage));
430 435
431 testCounter = 0; 436 testCounter = 0;
432 observable.EmitMessage(MyObservable::MyCustomMessage(12)); 437 observable.EmitMessage(MyObservable::MyCustomMessage(12));
433 ASSERT_EQ(12, testCounter); 438 ASSERT_EQ(12, testCounter);
434 439
438 // the connection is permanent; if we emit the same message again, the observer will be notified again 443 // the connection is permanent; if we emit the same message again, the observer will be notified again
439 testCounter = 0; 444 testCounter = 0;
440 observable.EmitMessage(MyObservable::MyCustomMessage(20)); 445 observable.EmitMessage(MyObservable::MyCustomMessage(20));
441 ASSERT_EQ(0, testCounter); 446 ASSERT_EQ(0, testCounter);
442 } 447 }
448
449 TEST(MessageBroker2, TestMessageForwarderSimpleUseCase)
450 {
451 MessageBroker broker;
452 MyObservable observable(broker);
453 MyIntermediate intermediate(broker, observable);
454 MyObserver observer(broker);
455
456 // let the observer observers the intermediate that is actually forwarding the messages from the observable
457 intermediate.RegisterObserverCallback(new Callable<MyObserver, MyObservable::MyCustomMessage>(observer, &MyObserver::HandleCompletedMessage));
458
459 testCounter = 0;
460 observable.EmitMessage(MyObservable::MyCustomMessage(12));
461 ASSERT_EQ(12, testCounter);
462
463 // the connection is permanent; if we emit the same message again, the observer will be notified again
464 testCounter = 0;
465 observable.EmitMessage(MyObservable::MyCustomMessage(20));
466 ASSERT_EQ(20, testCounter);
467 }
468
443 469
444 470
445 TEST(MessageBroker2, TestPromiseSuccessFailure) 471 TEST(MessageBroker2, TestPromiseSuccessFailure)
446 { 472 {
447 MessageBroker broker; 473 MessageBroker broker;
494 testCounter = 0; 520 testCounter = 0;
495 source.CompleteSomethingAsyncWithFailure(15); 521 source.CompleteSomethingAsyncWithFailure(15);
496 ASSERT_EQ(0, testCounter); 522 ASSERT_EQ(0, testCounter);
497 } 523 }
498 524
499
500
501 //#include <stdio.h>
502 //#include <boost/noncopyable.hpp>
503
504 //#include <string>
505 //#include <memory>
506 //#include <map>
507 //#include <set>
508
509 //enum MessageType
510 //{
511 // MessageType_SeriesDownloaded = 1
512 //};
513
514
515 //class IMessage : public boost::noncopyable
516 //{
517 //private:
518 // MessageType type_;
519
520 //public:
521 // IMessage(MessageType type) :
522 // type_(type)
523 // {
524 // }
525
526 // virtual ~IMessage()
527 // {
528 // }
529
530 // MessageType GetMessageType() const
531 // {
532 // return type_;
533 // }
534 //};
535
536
537 //class IObserver : public boost::noncopyable
538 //{
539 //public:
540 // virtual ~IObserver()
541 // {
542 // }
543 //};
544
545
546 //class SeriesDownloadedMessage : public IMessage
547 //{
548 //private:
549 // std::string value_;
550
551 //public:
552 // enum
553 // {
554 // Type = MessageType_SeriesDownloaded
555 // };
556
557 // SeriesDownloadedMessage(const std::string& value) :
558 // IMessage(static_cast<MessageType>(Type)),
559 // value_(value)
560 // {
561 // }
562
563 // const std::string& GetValue() const
564 // {
565 // return value_;
566 // }
567 //};
568
569
570 //class MyObserver : public IObserver
571 //{
572 //public:
573 // void OnSeriesDownloaded(const SeriesDownloadedMessage& message)
574 // {
575 // printf("received: [%s]\n", message.GetValue().c_str());
576 // }
577 //};
578
579
580
581 //class ICallable : public boost::noncopyable // ne peut referencer que les classes de base
582 //{
583 //public:
584 // virtual ~ICallable()
585 // {
586 // }
587
588 // virtual void Apply(const IMessage& message) = 0;
589
590 // virtual MessageType GetMessageType() const = 0;
591 //};
592
593
594
595 //template <typename Observer,
596 // typename Message>
597 //class Callable : public ICallable
598 //{
599 //private:
600 // typedef void (Observer::* MemberFunction) (const Message&);
601
602 // Observer& observer_;
603 // MemberFunction function_;
604
605 //public:
606 // Callable(Observer& observer,
607 // MemberFunction function) :
608 // observer_(observer),
609 // function_(function)
610 // {
611 // }
612
613 // void ApplyInternal(const Message& message)
614 // {
615 // (observer_.*function_) (message);
616 // }
617
618 // virtual void Apply(const IMessage& message)
619 // {
620 // ApplyInternal(dynamic_cast<const Message&>(message));
621 // }
622
623 // virtual MessageType GetMessageType() const
624 // {
625 // return static_cast<MessageType>(Message::Type);
626 // }
627 //};
628
629
630
631 //class IObservable : public boost::noncopyable
632 //{
633 //private:
634 // typedef std::map<MessageType, std::set<ICallable*> > Callables;
635
636 // Callables callables_;
637
638 //public:
639 // virtual ~IObservable()
640 // {
641 // for (Callables::const_iterator it = callables_.begin();
642 // it != callables_.end(); ++it)
643 // {
644 // for (std::set<ICallable*>::const_iterator
645 // it2 = it->second.begin(); it2 != it->second.end(); ++it2)
646 // {
647 // delete *it2;
648 // }
649 // }
650 // }
651
652 // void Register(ICallable* callable)
653 // {
654 // MessageType type = callable->GetMessageType();
655
656 // callables_[type].insert(callable);
657 // }
658
659 // void Emit(const IMessage& message) const
660 // {
661 // Callables::const_iterator found = callables_.find(message.GetMessageType());
662
663 // if (found != callables_.end())
664 // {
665 // for (std::set<ICallable*>::const_iterator
666 // it = found->second.begin(); it != found->second.end(); ++it)
667 // {
668 // (*it)->Apply(message);
669 // }
670 // }
671 // }
672 //};
673
674
675
676
677 //int main()
678 //{
679 // MyObserver observer;
680
681 // SeriesDownloadedMessage message("coucou");
682
683 // IObservable observable;
684 // observable.Register(new Callable<MyObserver, SeriesDownloadedMessage>(observer, &MyObserver::OnSeriesDownloaded));
685 // observable.Register(new Callable<MyObserver, SeriesDownloadedMessage>(observer, &MyObserver::OnSeriesDownloaded));
686
687 // SeriesDownloadedMessage message2("hello");
688 // observable.Emit(message2);
689
690 // printf("%d\n", SeriesDownloadedMessage::Type);
691 //}