comparison UnitTests/MessageWithDestination.cpp @ 140:4d863c7b2f44

message queues
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 10 Oct 2012 18:20:04 +0200
parents 3ad78369fcc4
children 24681d35bad9
comparison
equal deleted inserted replaced
139:3ad78369fcc4 140:4d863c7b2f44
1 #include "../Core/IDynamicObject.h" 1 #include "../Core/IDynamicObject.h"
2 2
3 #include "../Core/OrthancException.h"
4
5 #include <memory>
6 #include <map>
3 #include <gtest/gtest.h> 7 #include <gtest/gtest.h>
4 #include <string> 8 #include <string>
9 #include <boost/thread.hpp>
10 #include <boost/date_time/posix_time/posix_time_types.hpp>
5 11
6 namespace Orthanc 12 namespace Orthanc
7 { 13 {
14 class SharedMessageQueue
15 {
16 private:
17 typedef std::list<IDynamicObject*> Queue;
18
19 unsigned int maxSize_;
20 Queue queue_;
21 boost::mutex mutex_;
22 boost::condition_variable elementAvailable_;
23
24 public:
25 SharedMessageQueue(unsigned int maxSize = 0)
26 {
27 maxSize_ = maxSize;
28 }
29
30 ~SharedMessageQueue()
31 {
32 for (Queue::iterator it = queue_.begin(); it != queue_.end(); it++)
33 {
34 delete *it;
35 }
36 }
37
38 void Enqueue(IDynamicObject* message)
39 {
40 boost::mutex::scoped_lock lock(mutex_);
41
42 if (maxSize_ != 0 && queue_.size() > maxSize_)
43 {
44 // Too many elements in the queue: First remove the oldest
45 delete queue_.front();
46 queue_.pop_front();
47 }
48
49 queue_.push_back(message);
50 elementAvailable_.notify_one();
51 }
52
53 IDynamicObject* Dequeue(int32_t timeout)
54 {
55 boost::mutex::scoped_lock lock(mutex_);
56
57 // Wait for a message to arrive in the FIFO queue
58 while (queue_.empty())
59 {
60 if (timeout == 0)
61 {
62 elementAvailable_.wait(lock);
63 }
64 else
65 {
66 bool success = elementAvailable_.timed_wait
67 (lock, boost::posix_time::milliseconds(timeout));
68 if (!success)
69 {
70 throw OrthancException(ErrorCode_Timeout);
71 }
72 }
73 }
74
75 std::auto_ptr<IDynamicObject> message(queue_.front());
76 queue_.pop_front();
77
78 return message.release();
79 }
80
81 IDynamicObject* Dequeue()
82 {
83 return Dequeue(0);
84 }
85 };
86
87
8 /** 88 /**
9 * This class represents a message that is to be sent to some destination. 89 * This class represents a message that is to be sent to some destination.
10 **/ 90 **/
11 class MessageWithDestination : public boost::noncopyable 91 class MessageToDispatch : public boost::noncopyable
12 { 92 {
13 private: 93 private:
14 IDynamicObject* message_; 94 IDynamicObject* message_;
15 std::string destination_; 95 std::string destination_;
16 96
18 /** 98 /**
19 * Create a new message with a destination. 99 * Create a new message with a destination.
20 * \param message The content of the message (takes the ownership) 100 * \param message The content of the message (takes the ownership)
21 * \param destination The destination of the message 101 * \param destination The destination of the message
22 **/ 102 **/
23 MessageWithDestination(IDynamicObject* message, 103 MessageToDispatch(IDynamicObject* message,
24 const char* destination) 104 const char* destination)
25 { 105 {
26 message_ = message; 106 message_ = message;
27 destination_ = destination; 107 destination_ = destination;
28 } 108 }
29 109
30 ~MessageWithDestination() 110 ~MessageToDispatch()
31 { 111 {
32 if (message_) 112 if (message_)
33 { 113 {
34 delete message_; 114 delete message_;
115 }
116 }
117 };
118
119
120 class IDestinationContext : public IDynamicObject
121 {
122 public:
123 virtual void Handle(const IDynamicObject& message) = 0;
124 };
125
126
127 class IDestinationContextFactory : public IDynamicObject
128 {
129 public:
130 virtual IDestinationContext* Construct(const char* destination) = 0;
131 };
132
133
134 class MessageDispatcher
135 {
136 private:
137 typedef std::map<std::string, IDestinationContext*> ActiveContexts;
138
139 std::auto_ptr<IDestinationContextFactory> factory_;
140 ActiveContexts activeContexts_;
141 SharedMessageQueue queue_;
142
143 public:
144 MessageDispatcher(IDestinationContextFactory* factory) // takes the ownership
145 {
146 factory_.reset(factory);
147 }
148
149 ~MessageDispatcher()
150 {
151 for (ActiveContexts::iterator it = activeContexts_.begin();
152 it != activeContexts_.end(); it++)
153 {
154 delete it->second;
35 } 155 }
36 } 156 }
37 }; 157 };
38 } 158 }
39 159
41 161
42 #include "../Core/DicomFormat/DicomString.h" 162 #include "../Core/DicomFormat/DicomString.h"
43 163
44 using namespace Orthanc; 164 using namespace Orthanc;
45 165
46 TEST(MessageWithDestination, A) 166 TEST(MessageToDispatch, A)
47 { 167 {
48 MessageWithDestination a(new DicomString("coucou"), "pukkaj"); 168 MessageToDispatch a(new DicomString("coucou"), "pukkaj");
49 } 169 }
50 170