comparison UnitTestsSources/TestMessageBroker2.cpp @ 299:3897f9f28cfa am-callable-and-promise

backup work in progress: updated messaging framework with ICallable
author am@osimis.io
date Fri, 14 Sep 2018 16:44:01 +0200
parents
children b4abaeb783b1
comparison
equal deleted inserted replaced
298:f58bfb7bbcc9 299:3897f9f28cfa
1 /**
2 * Stone of Orthanc
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU Affero General Public License
9 * as published by the Free Software Foundation, either version 3 of
10 * the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 **/
20
21
22 #include "gtest/gtest.h"
23
24 #include "Framework/Messages/MessageBroker.h"
25
26 #include <boost/noncopyable.hpp>
27 #include <boost/function.hpp>
28 #include <boost/bind.hpp>
29
30 #include <string>
31 #include <map>
32 #include <set>
33
34 int testCounter = 0;
35 namespace {
36
37 class IObserver;
38 class IObservable;
39 class Promise;
40
41 enum MessageType
42 {
43 MessageType_Test1,
44 MessageType_Test2,
45
46 MessageType_CustomMessage,
47 MessageType_LastGenericStoneMessage
48 };
49
50 struct IMessage : public boost::noncopyable
51 {
52 MessageType messageType_;
53 public:
54 IMessage(const MessageType& messageType)
55 : messageType_(messageType)
56 {}
57 virtual ~IMessage() {}
58
59 virtual int GetType() const {return messageType_;}
60 };
61
62
63 struct ICustomMessage : public IMessage
64 {
65 int customMessageType_;
66 public:
67 ICustomMessage(int customMessageType)
68 : IMessage(MessageType_CustomMessage),
69 customMessageType_(customMessageType)
70 {}
71 virtual ~ICustomMessage() {}
72
73 virtual int GetType() const {return customMessageType_;}
74 };
75
76
77 // This is referencing an object and member function that can be notified
78 // by an IObservable. The object must derive from IO
79 // The member functions must be of type "void Function(const IMessage& message)" or reference a derived class of IMessage
80 class ICallable : public boost::noncopyable
81 {
82 public:
83 virtual ~ICallable()
84 {
85 }
86
87 virtual void Apply(const IMessage& message) = 0;
88
89 virtual MessageType GetMessageType() const = 0;
90 virtual IObserver* GetObserver() const = 0;
91 };
92
93 template <typename TObserver,
94 typename TMessage>
95 class Callable : public ICallable
96 {
97 private:
98 typedef void (TObserver::* MemberFunction) (const TMessage&);
99
100 TObserver& observer_;
101 MemberFunction function_;
102
103 public:
104 Callable(TObserver& observer,
105 MemberFunction function) :
106 observer_(observer),
107 function_(function)
108 {
109 }
110
111 void ApplyInternal(const TMessage& message)
112 {
113 (observer_.*function_) (message);
114 }
115
116 virtual void Apply(const IMessage& message)
117 {
118 ApplyInternal(dynamic_cast<const TMessage&>(message));
119 }
120
121 virtual MessageType GetMessageType() const
122 {
123 return static_cast<MessageType>(TMessage::Type);
124 }
125
126 virtual IObserver* GetObserver() const
127 {
128 return &observer_;
129 }
130 };
131
132
133
134
135 /*
136 * This is a central message broker. It keeps track of all observers and knows
137 * when an observer is deleted.
138 * This way, it can prevent an observable to send a message to a delete observer.
139 */
140 class MessageBroker : public boost::noncopyable
141 {
142
143 std::set<IObserver*> activeObservers_; // the list of observers that are currently alive (that have not been deleted)
144
145 public:
146
147 void Register(IObserver& observer)
148 {
149 activeObservers_.insert(&observer);
150 }
151
152 void Unregister(IObserver& observer)
153 {
154 activeObservers_.erase(&observer);
155 }
156
157 bool IsActive(IObserver* observer)
158 {
159 return activeObservers_.find(observer) != activeObservers_.end();
160 }
161 };
162
163
164 class Promise : public boost::noncopyable
165 {
166 protected:
167 MessageBroker& broker_;
168
169 ICallable* successCallable_;
170 ICallable* failureCallable_;
171
172 public:
173 Promise(MessageBroker& broker)
174 : broker_(broker),
175 successCallable_(NULL),
176 failureCallable_(NULL)
177 {
178 }
179
180 void Success(const IMessage& message)
181 {
182 // check the target is still alive in the broker
183 if (broker_.IsActive(successCallable_->GetObserver()))
184 {
185 successCallable_->Apply(message);
186 }
187 }
188
189 void Failure(const IMessage& message)
190 {
191 // check the target is still alive in the broker
192 if (broker_.IsActive(failureCallable_->GetObserver()))
193 {
194 failureCallable_->Apply(message);
195 }
196 }
197
198 Promise& Then(ICallable* successCallable)
199 {
200 if (successCallable_ != NULL)
201 {
202 // TODO: throw throw new "Promise may only have a single success target"
203 }
204 successCallable_ = successCallable;
205 return *this;
206 }
207
208 Promise& Else(ICallable* failureCallable)
209 {
210 if (failureCallable_ != NULL)
211 {
212 // TODO: throw throw new "Promise may only have a single failure target"
213 }
214 failureCallable_ = failureCallable;
215 return *this;
216 }
217
218 };
219
220 class IObserver : public boost::noncopyable
221 {
222 protected:
223 MessageBroker& broker_;
224
225 public:
226 IObserver(MessageBroker& broker)
227 : broker_(broker)
228 {
229 broker_.Register(*this);
230 }
231
232 virtual ~IObserver()
233 {
234 broker_.Unregister(*this);
235 }
236
237 };
238
239
240 class IObservable : public boost::noncopyable
241 {
242 protected:
243 MessageBroker& broker_;
244
245 typedef std::map<int, std::set<ICallable*> > Callables;
246 Callables callables_;
247 public:
248
249 IObservable(MessageBroker& broker)
250 : broker_(broker)
251 {
252 }
253
254 virtual ~IObservable()
255 {
256 for (Callables::const_iterator it = callables_.begin();
257 it != callables_.end(); ++it)
258 {
259 for (std::set<ICallable*>::const_iterator
260 it2 = it->second.begin(); it2 != it->second.end(); ++it2)
261 {
262 delete *it2;
263 }
264 }
265 }
266
267 void Register(ICallable* callable)
268 {
269 MessageType messageType = callable->GetMessageType();
270
271 callables_[messageType].insert(callable);
272 }
273
274 void EmitMessage(const IMessage& message)
275 {
276 Callables::const_iterator found = callables_.find(message.GetType());
277
278 if (found != callables_.end())
279 {
280 for (std::set<ICallable*>::const_iterator
281 it = found->second.begin(); it != found->second.end(); ++it)
282 {
283 if (broker_.IsActive((*it)->GetObserver()))
284 {
285 (*it)->Apply(message);
286 }
287 }
288 }
289 }
290
291 };
292
293
294 enum CustomMessageType
295 {
296 CustomMessageType_First = MessageType_LastGenericStoneMessage + 1,
297
298 CustomMessageType_Completed,
299 CustomMessageType_Increment
300 };
301
302 class MyObservable : public IObservable
303 {
304 public:
305 struct MyCustomMessage: public ICustomMessage
306 {
307 int payload_;
308 enum
309 {
310 Type = CustomMessageType_Completed
311 };
312
313 MyCustomMessage(int payload)
314 : ICustomMessage(Type),
315 payload_(payload)
316 {}
317 };
318
319 MyObservable(MessageBroker& broker)
320 : IObservable(broker)
321 {}
322
323 };
324
325 class MyObserver : public IObserver
326 {
327 public:
328 MyObserver(MessageBroker& broker)
329 : IObserver(broker)
330 {}
331
332 void HandleCompletedMessage(const MyObservable::MyCustomMessage& message)
333 {
334 testCounter += message.payload_;
335 }
336
337 };
338
339
340 class MyPromiseSource : public IObservable
341 {
342 Promise* currentPromise_;
343 public:
344 struct MyPromiseMessage: public ICustomMessage
345 {
346 int increment;
347 enum
348 {
349 Type = CustomMessageType_Increment
350 };
351
352 MyPromiseMessage(int increment)
353 : ICustomMessage(Type),
354 increment(increment)
355 {}
356 };
357
358 MyPromiseSource(MessageBroker& broker)
359 : IObservable(broker),
360 currentPromise_(NULL)
361 {}
362
363 Promise& StartSomethingAsync()
364 {
365 currentPromise_ = new Promise(broker_);
366 return *currentPromise_;
367 }
368
369 void CompleteSomethingAsyncWithSuccess(int payload)
370 {
371 currentPromise_->Success(MyPromiseMessage(payload));
372 delete currentPromise_;
373 }
374
375 void CompleteSomethingAsyncWithFailure(int payload)
376 {
377 currentPromise_->Failure(MyPromiseMessage(payload));
378 delete currentPromise_;
379 }
380 };
381
382
383 class MyPromiseTarget : public IObserver
384 {
385 public:
386 MyPromiseTarget(MessageBroker& broker)
387 : IObserver(broker)
388 {}
389
390 void IncrementCounter(const MyPromiseSource::MyPromiseMessage& args)
391 {
392 testCounter += args.increment;
393 }
394
395 void DecrementCounter(const MyPromiseSource::MyPromiseMessage& args)
396 {
397 testCounter -= args.increment;
398 }
399 };
400 }
401
402
403 TEST(MessageBroker2, TestPermanentConnectionSimpleUseCase)
404 {
405 MessageBroker broker;
406 MyObservable observable(broker);
407 MyObserver observer(broker);
408
409 // create a permanent connection between an observable and an observer
410 observable.Register(new Callable<MyObserver, MyObservable::MyCustomMessage>(observer, &MyObserver::HandleCompletedMessage));
411
412 testCounter = 0;
413 observable.EmitMessage(MyObservable::MyCustomMessage(12));
414 ASSERT_EQ(12, testCounter);
415
416 // the connection is permanent; if we emit the same message again, the observer will be notified again
417 testCounter = 0;
418 observable.EmitMessage(MyObservable::MyCustomMessage(20));
419 ASSERT_EQ(20, testCounter);
420 }
421
422 TEST(MessageBroker2, TestPermanentConnectionDeleteObserver)
423 {
424 MessageBroker broker;
425 MyObservable observable(broker);
426 MyObserver* observer = new MyObserver(broker);
427
428 // create a permanent connection between an observable and an observer
429 observable.Register(new Callable<MyObserver, MyObservable::MyCustomMessage>(*observer, &MyObserver::HandleCompletedMessage));
430
431 testCounter = 0;
432 observable.EmitMessage(MyObservable::MyCustomMessage(12));
433 ASSERT_EQ(12, testCounter);
434
435 // delete the observer and check that the callback is not called anymore
436 delete observer;
437
438 // the connection is permanent; if we emit the same message again, the observer will be notified again
439 testCounter = 0;
440 observable.EmitMessage(MyObservable::MyCustomMessage(20));
441 ASSERT_EQ(0, testCounter);
442 }
443
444
445 TEST(MessageBroker2, TestPromiseSuccessFailure)
446 {
447 MessageBroker broker;
448 MyPromiseSource source(broker);
449 MyPromiseTarget target(broker);
450
451 // test a successful promise
452 source.StartSomethingAsync()
453 .Then(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(target, &MyPromiseTarget::IncrementCounter))
454 .Else(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(target, &MyPromiseTarget::DecrementCounter));
455
456 testCounter = 0;
457 source.CompleteSomethingAsyncWithSuccess(10);
458 ASSERT_EQ(10, testCounter);
459
460 // test a failing promise
461 source.StartSomethingAsync()
462 .Then(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(target, &MyPromiseTarget::IncrementCounter))
463 .Else(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(target, &MyPromiseTarget::DecrementCounter));
464
465 testCounter = 0;
466 source.CompleteSomethingAsyncWithFailure(15);
467 ASSERT_EQ(-15, testCounter);
468 }
469
470 TEST(MessageBroker2, TestPromiseDeleteTarget)
471 {
472 MessageBroker broker;
473 MyPromiseSource source(broker);
474 MyPromiseTarget* target = new MyPromiseTarget(broker);
475
476 // create the promise
477 source.StartSomethingAsync()
478 .Then(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(*target, &MyPromiseTarget::IncrementCounter))
479 .Else(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(*target, &MyPromiseTarget::DecrementCounter));
480
481 // delete the promise target
482 delete target;
483
484 // trigger the promise, make sure it does not throw and does not call the callback
485 testCounter = 0;
486 source.CompleteSomethingAsyncWithSuccess(10);
487 ASSERT_EQ(0, testCounter);
488
489 // test a failing promise
490 source.StartSomethingAsync()
491 .Then(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(*target, &MyPromiseTarget::IncrementCounter))
492 .Else(new Callable<MyPromiseTarget, MyPromiseSource::MyPromiseMessage>(*target, &MyPromiseTarget::DecrementCounter));
493
494 testCounter = 0;
495 source.CompleteSomethingAsyncWithFailure(15);
496 ASSERT_EQ(0, testCounter);
497 }
498
499
500
501 //#include <stdio.h>
502 //#include <boost/noncopyable.hpp>
503
504 //#include <string>
505 //#include <memory>
506 //#include <map>
507 //#include <set>
508
509 //enum MessageType
510 //{
511 // MessageType_SeriesDownloaded = 1
512 //};
513
514
515 //class IMessage : public boost::noncopyable
516 //{
517 //private:
518 // MessageType type_;
519
520 //public:
521 // IMessage(MessageType type) :
522 // type_(type)
523 // {
524 // }
525
526 // virtual ~IMessage()
527 // {
528 // }
529
530 // MessageType GetMessageType() const
531 // {
532 // return type_;
533 // }
534 //};
535
536
537 //class IObserver : public boost::noncopyable
538 //{
539 //public:
540 // virtual ~IObserver()
541 // {
542 // }
543 //};
544
545
546 //class SeriesDownloadedMessage : public IMessage
547 //{
548 //private:
549 // std::string value_;
550
551 //public:
552 // enum
553 // {
554 // Type = MessageType_SeriesDownloaded
555 // };
556
557 // SeriesDownloadedMessage(const std::string& value) :
558 // IMessage(static_cast<MessageType>(Type)),
559 // value_(value)
560 // {
561 // }
562
563 // const std::string& GetValue() const
564 // {
565 // return value_;
566 // }
567 //};
568
569
570 //class MyObserver : public IObserver
571 //{
572 //public:
573 // void OnSeriesDownloaded(const SeriesDownloadedMessage& message)
574 // {
575 // printf("received: [%s]\n", message.GetValue().c_str());
576 // }
577 //};
578
579
580
581 //class ICallable : public boost::noncopyable // ne peut referencer que les classes de base
582 //{
583 //public:
584 // virtual ~ICallable()
585 // {
586 // }
587
588 // virtual void Apply(const IMessage& message) = 0;
589
590 // virtual MessageType GetMessageType() const = 0;
591 //};
592
593
594
595 //template <typename Observer,
596 // typename Message>
597 //class Callable : public ICallable
598 //{
599 //private:
600 // typedef void (Observer::* MemberFunction) (const Message&);
601
602 // Observer& observer_;
603 // MemberFunction function_;
604
605 //public:
606 // Callable(Observer& observer,
607 // MemberFunction function) :
608 // observer_(observer),
609 // function_(function)
610 // {
611 // }
612
613 // void ApplyInternal(const Message& message)
614 // {
615 // (observer_.*function_) (message);
616 // }
617
618 // virtual void Apply(const IMessage& message)
619 // {
620 // ApplyInternal(dynamic_cast<const Message&>(message));
621 // }
622
623 // virtual MessageType GetMessageType() const
624 // {
625 // return static_cast<MessageType>(Message::Type);
626 // }
627 //};
628
629
630
631 //class IObservable : public boost::noncopyable
632 //{
633 //private:
634 // typedef std::map<MessageType, std::set<ICallable*> > Callables;
635
636 // Callables callables_;
637
638 //public:
639 // virtual ~IObservable()
640 // {
641 // for (Callables::const_iterator it = callables_.begin();
642 // it != callables_.end(); ++it)
643 // {
644 // for (std::set<ICallable*>::const_iterator
645 // it2 = it->second.begin(); it2 != it->second.end(); ++it2)
646 // {
647 // delete *it2;
648 // }
649 // }
650 // }
651
652 // void Register(ICallable* callable)
653 // {
654 // MessageType type = callable->GetMessageType();
655
656 // callables_[type].insert(callable);
657 // }
658
659 // void Emit(const IMessage& message) const
660 // {
661 // Callables::const_iterator found = callables_.find(message.GetMessageType());
662
663 // if (found != callables_.end())
664 // {
665 // for (std::set<ICallable*>::const_iterator
666 // it = found->second.begin(); it != found->second.end(); ++it)
667 // {
668 // (*it)->Apply(message);
669 // }
670 // }
671 // }
672 //};
673
674
675
676
677 //int main()
678 //{
679 // MyObserver observer;
680
681 // SeriesDownloadedMessage message("coucou");
682
683 // IObservable observable;
684 // observable.Register(new Callable<MyObserver, SeriesDownloadedMessage>(observer, &MyObserver::OnSeriesDownloaded));
685 // observable.Register(new Callable<MyObserver, SeriesDownloadedMessage>(observer, &MyObserver::OnSeriesDownloaded));
686
687 // SeriesDownloadedMessage message2("hello");
688 // observable.Emit(message2);
689
690 // printf("%d\n", SeriesDownloadedMessage::Type);
691 //}