comparison UnitTestsSources/TestMessageBroker2_promise_and_connect_ok.cpp @ 299:3897f9f28cfa am-callable-and-promise

backup work in progress: updated messaging framework with ICallable
author am@osimis.io
date Fri, 14 Sep 2018 16:44:01 +0200
parents
children b70e9be013e4
comparison
equal deleted inserted replaced
298:f58bfb7bbcc9 299:3897f9f28cfa
1 /**
2 * Stone of Orthanc
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU Affero General Public License
9 * as published by the Free Software Foundation, either version 3 of
10 * the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 **/
20
21
22 #include "gtest/gtest.h"
23
24 #include <boost/noncopyable.hpp>
25 #include <boost/function.hpp>
26 #include <boost/bind.hpp>
27
28 #include <string>
29 #include <map>
30 #include <set>
31
32 int testCounter = 0;
33 namespace {
34
35 enum MessageType
36 {
37 MessageType_Test1,
38 MessageType_Test2,
39
40 MessageType_CustomMessage,
41 MessageType_LastGenericStoneMessage
42 };
43
44 struct IMessage : public boost::noncopyable
45 {
46 MessageType messageType_;
47 public:
48 IMessage(const MessageType& messageType)
49 : messageType_(messageType)
50 {}
51 virtual ~IMessage() {}
52
53 virtual int GetType() const {return messageType_;}
54 };
55
56
57 struct ICustomMessage : public IMessage
58 {
59 int customMessageType_;
60 public:
61 ICustomMessage(int customMessageType)
62 : IMessage(MessageType_CustomMessage),
63 customMessageType_(customMessageType)
64 {}
65 virtual ~ICustomMessage() {}
66
67 virtual int GetType() const {return customMessageType_;}
68 };
69
70
71 class IObserver;
72 class IObservable;
73 class IPromiseTarget;
74 class IPromiseSource;
75 class Promise;
76
77 /*
78 * This is a central message broker. It keeps track of all observers and knows
79 * when an observer is deleted.
80 * This way, it can prevent an observable to send a message to a delete observer.
81 * It does the same book-keeping for the IPromiseTarget and IPromiseSource
82 */
83 class MessageBroker : public boost::noncopyable
84 {
85
86 std::set<IObserver*> activeObservers_; // the list of observers that are currently alive (that have not been deleted)
87 std::set<IPromiseTarget*> activePromiseTargets_;
88 std::set<IPromiseSource*> activePromiseSources_;
89
90 public:
91
92 void Register(IObserver& observer)
93 {
94 activeObservers_.insert(&observer);
95 }
96
97 void Unregister(IObserver& observer)
98 {
99 activeObservers_.erase(&observer);
100 }
101
102 void Register(IPromiseTarget& target)
103 {
104 activePromiseTargets_.insert(&target);
105 }
106
107 void Unregister(IPromiseTarget& target)
108 {
109 activePromiseTargets_.erase(&target);
110 }
111
112 void Register(IPromiseSource& source)
113 {
114 activePromiseSources_.insert(&source);
115 }
116
117 void Unregister(IPromiseSource& source)
118 {
119 activePromiseSources_.erase(&source);
120 }
121
122 void EmitMessage(IObservable& from, std::set<IObserver*> observers, const IMessage& message);
123
124 bool IsActive(IPromiseTarget* target)
125 {
126 return activePromiseTargets_.find(target) != activePromiseTargets_.end();
127 }
128
129 bool IsActive(IPromiseSource* source)
130 {
131 return activePromiseSources_.find(source) != activePromiseSources_.end();
132 }
133
134 bool IsActive(IObserver* observer)
135 {
136 return activeObservers_.find(observer) != activeObservers_.end();
137 }
138 };
139
140 struct IPromiseArgs
141 {
142 public:
143 virtual ~IPromiseArgs() {}
144 };
145
146 class EmptyPromiseArguments : public IPromiseArgs
147 {
148
149 };
150
151 class Promise : public boost::noncopyable
152 {
153 protected:
154 MessageBroker& broker_;
155
156 IPromiseTarget* successTarget_;
157 boost::function<void (const IPromiseArgs& message)> successCallable_;
158
159 IPromiseTarget* failureTarget_;
160 boost::function<void (const IPromiseArgs& message)> failureCallable_;
161
162 public:
163 Promise(MessageBroker& broker)
164 : broker_(broker),
165 successTarget_(NULL),
166 failureTarget_(NULL)
167 {
168 }
169
170 void Success(const IPromiseArgs& message)
171 {
172 // check the target is still alive in the broker
173 if (broker_.IsActive(successTarget_))
174 {
175 successCallable_(message);
176 }
177 }
178
179 void Failure(const IPromiseArgs& message)
180 {
181 // check the target is still alive in the broker
182 if (broker_.IsActive(failureTarget_))
183 {
184 failureCallable_(message);
185 }
186 }
187
188 Promise& Then(IPromiseTarget* target, boost::function<void (const IPromiseArgs& message)> f)
189 {
190 if (successTarget_ != NULL)
191 {
192 // TODO: throw throw new "Promise may only have a single success target"
193 }
194 successTarget_ = target;
195 successCallable_ = f;
196 return *this;
197 }
198
199 Promise& Else(IPromiseTarget* target, boost::function<void (const IPromiseArgs& message)> f)
200 {
201 if (failureTarget_ != NULL)
202 {
203 // TODO: throw throw new "Promise may only have a single failure target"
204 }
205 failureTarget_ = target;
206 failureCallable_ = f;
207 return *this;
208 }
209
210 };
211
212 class IObserver : public boost::noncopyable
213 {
214 protected:
215 MessageBroker& broker_;
216
217 public:
218 IObserver(MessageBroker& broker)
219 : broker_(broker)
220 {
221 broker_.Register(*this);
222 }
223
224 virtual ~IObserver()
225 {
226 broker_.Unregister(*this);
227 }
228
229 };
230
231 class IPromiseTarget : public boost::noncopyable
232 {
233 protected:
234 MessageBroker& broker_;
235
236 public:
237 IPromiseTarget(MessageBroker& broker)
238 : broker_(broker)
239 {
240 broker_.Register(*this);
241 }
242
243 virtual ~IPromiseTarget()
244 {
245 broker_.Unregister(*this);
246 }
247 };
248
249 class IPromiseSource : public boost::noncopyable
250 {
251 protected:
252 MessageBroker& broker_;
253
254 public:
255 IPromiseSource(MessageBroker& broker)
256 : broker_(broker)
257 {
258 broker_.Register(*this);
259 }
260
261 virtual ~IPromiseSource()
262 {
263 broker_.Unregister(*this);
264 }
265 };
266
267
268 struct CallableObserver
269 {
270 IObserver* observer;
271 boost::function<void (IObservable& from, const IMessage& message)> f;
272 };
273
274 class IObservable : public boost::noncopyable
275 {
276 protected:
277 MessageBroker& broker_;
278
279 std::set<IObserver*> observers_;
280
281 std::map<int, std::set<CallableObserver*> > callables_;
282 public:
283
284 IObservable(MessageBroker& broker)
285 : broker_(broker)
286 {
287 }
288 virtual ~IObservable()
289 {
290 }
291
292 void EmitMessage(const IMessage& message)
293 {
294 //broker_.EmitMessage(*this, observers_, message);
295 int messageType = message.GetType();
296 if (callables_.find(messageType) != callables_.end())
297 {
298 for (std::set<CallableObserver*>::iterator observer = callables_[messageType].begin(); observer != callables_[messageType].end(); observer++)
299 {
300 CallableObserver* callable = *observer;
301 if (broker_.IsActive(callable->observer))
302 {
303 callable->f(*this, message);
304 }
305 }
306 }
307
308 }
309
310 void RegisterObserver(IObserver& observer)
311 {
312 observers_.insert(&observer);
313 }
314
315 void UnregisterObserver(IObserver& observer)
316 {
317 observers_.erase(&observer);
318 }
319
320 //template<typename TObserver> void Connect(MessageType messageType, IObserver& observer, void (TObserver::*ptrToMemberHandler)(IObservable& from, const IMessage& message))
321 void Connect(int messageType, IObserver& observer, boost::function<void (IObservable& from, const IMessage& message)> f)
322 {
323 callables_[messageType] = std::set<CallableObserver*>();
324 CallableObserver* callable = new CallableObserver();
325 callable->observer = &observer;
326 callable->f = f;
327 callables_[messageType].insert(callable);
328 }
329 };
330
331
332 enum CustomMessageType
333 {
334 CustomMessageType_First = MessageType_LastGenericStoneMessage + 1,
335
336 CustomMessageType_Completed
337 };
338
339 class MyObservable : public IObservable
340 {
341 public:
342 struct MyCustomMessage: public ICustomMessage
343 {
344 int payload_;
345 MyCustomMessage(int payload)
346 : ICustomMessage(CustomMessageType_Completed),
347 payload_(payload)
348 {}
349 };
350
351 MyObservable(MessageBroker& broker)
352 : IObservable(broker)
353 {}
354
355 };
356
357 class MyObserver : public IObserver
358 {
359 public:
360 MyObserver(MessageBroker& broker)
361 : IObserver(broker)
362 {}
363 void HandleCompletedMessage(IObservable& from, const IMessage& message)
364 {
365 const MyObservable::MyCustomMessage& msg = dynamic_cast<const MyObservable::MyCustomMessage&>(message);
366 testCounter += msg.payload_;
367 }
368
369 };
370
371
372 class MyPromiseSource : public IPromiseSource
373 {
374 Promise* currentPromise_;
375 public:
376 struct MyPromiseArgs : public IPromiseArgs
377 {
378 int increment;
379 };
380
381 MyPromiseSource(MessageBroker& broker)
382 : IPromiseSource(broker),
383 currentPromise_(NULL)
384 {}
385
386 Promise& StartSomethingAsync()
387 {
388 currentPromise_ = new Promise(broker_);
389 return *currentPromise_;
390 }
391
392 void CompleteSomethingAsyncWithSuccess()
393 {
394 currentPromise_->Success(EmptyPromiseArguments());
395 delete currentPromise_;
396 }
397
398 void CompleteSomethingAsyncWithFailure()
399 {
400 currentPromise_->Failure(EmptyPromiseArguments());
401 delete currentPromise_;
402 }
403 };
404
405
406 class MyPromiseTarget : public IPromiseTarget
407 {
408 public:
409 MyPromiseTarget(MessageBroker& broker)
410 : IPromiseTarget(broker)
411 {}
412
413 void IncrementCounter(const IPromiseArgs& args)
414 {
415 testCounter++;
416 }
417
418 void DecrementCounter(const IPromiseArgs& args)
419 {
420 testCounter--;
421 }
422 };
423 }
424
425 #define CONNECT_MESSAGES(observablePtr, messageType, observerPtr, observerFnPtr) (observablePtr)->Connect(messageType, *(observerPtr), boost::bind(observerFnPtr, observerPtr, _1, _2))
426 #define PTHEN(targetPtr, targetFnPtr) Then(targetPtr, boost::bind(targetFnPtr, targetPtr, _1))
427 #define PELSE(targetPtr, targetFnPtr) Else(targetPtr, boost::bind(targetFnPtr, targetPtr, _1))
428
429
430 TEST(MessageBroker2, TestPermanentConnectionSimpleUseCase)
431 {
432 MessageBroker broker;
433 MyObservable observable(broker);
434 MyObserver observer(broker);
435
436 // create a permanent connection between an observable and an observer
437 CONNECT_MESSAGES(&observable, CustomMessageType_Completed, &observer, &MyObserver::HandleCompletedMessage);
438
439 testCounter = 0;
440 observable.EmitMessage(MyObservable::MyCustomMessage(12));
441 ASSERT_EQ(12, testCounter);
442
443 // the connection is permanent; if we emit the same message again, the observer will be notified again
444 testCounter = 0;
445 observable.EmitMessage(MyObservable::MyCustomMessage(20));
446 ASSERT_EQ(20, testCounter);
447 }
448
449 TEST(MessageBroker2, TestPermanentConnectionDeleteObserver)
450 {
451 MessageBroker broker;
452 MyObservable observable(broker);
453 MyObserver* observer = new MyObserver(broker);
454
455 // create a permanent connection between an observable and an observer
456 CONNECT_MESSAGES(&observable, CustomMessageType_Completed, observer, &MyObserver::HandleCompletedMessage);
457
458 testCounter = 0;
459 observable.EmitMessage(MyObservable::MyCustomMessage(12));
460 ASSERT_EQ(12, testCounter);
461
462 // delete the observer and check that the callback is not called anymore
463 delete observer;
464
465 // the connection is permanent; if we emit the same message again, the observer will be notified again
466 testCounter = 0;
467 observable.EmitMessage(MyObservable::MyCustomMessage(20));
468 ASSERT_EQ(0, testCounter);
469 }
470
471
472 TEST(MessageBroker2, TestPromiseSuccessFailure)
473 {
474 MessageBroker broker;
475 MyPromiseSource source(broker);
476 MyPromiseTarget target(broker);
477
478 // test a successful promise
479 source.StartSomethingAsync()
480 .PTHEN(&target, &MyPromiseTarget::IncrementCounter)
481 .PELSE(&target, &MyPromiseTarget::DecrementCounter);
482
483 testCounter = 0;
484 source.CompleteSomethingAsyncWithSuccess();
485 ASSERT_EQ(1, testCounter);
486
487 // test a failing promise
488 source.StartSomethingAsync()
489 .PTHEN(&target, &MyPromiseTarget::IncrementCounter)
490 .PELSE(&target, &MyPromiseTarget::DecrementCounter);
491
492 testCounter = 0;
493 source.CompleteSomethingAsyncWithFailure();
494 ASSERT_EQ(-1, testCounter);
495 }
496
497 //TEST(MessageBroker2, TestPromiseDeleteTarget)
498 //{
499 // MessageBroker broker;
500 // MyPromiseSource source(broker);
501 // MyPromiseTarget target(broker);
502
503 // // test a successful promise
504 // source.StartSomethingAsync()
505 // .PTHEN(&target, &MyPromiseTarget::IncrementCounter)
506 // .PELSE(&target, &MyPromiseTarget::DecrementCounter);
507
508 // testCounter = 0;
509 // source.CompleteSomethingAsyncWithSuccess();
510 // ASSERT_EQ(1, testCounter);
511
512 // // test a failing promise
513 // source.StartSomethingAsync()
514 // .PTHEN(&target, &MyPromiseTarget::IncrementCounter)
515 // .PELSE(&target, &MyPromiseTarget::DecrementCounter);
516
517 // testCounter = 0;
518 // source.CompleteSomethingAsyncWithFailure();
519 // ASSERT_EQ(-1, testCounter);
520 //}