comparison OrthancFramework/Sources/MultiThreading/SharedMessageQueue.cpp @ 4044:d25f4c0fa160 framework

splitting code into OrthancFramework and OrthancServer
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 10 Jun 2020 20:30:34 +0200
parents Core/MultiThreading/SharedMessageQueue.cpp@2a170a8f1faf
children bf7b9edf6b81
comparison
equal deleted inserted replaced
4043:6c6239aec462 4044:d25f4c0fa160
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2020 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation, either version 3 of the
10 * License, or (at your option) any later version.
11 *
12 * In addition, as a special exception, the copyright holders of this
13 * program give permission to link the code of its release with the
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it
15 * that use the same license as the "OpenSSL" library), and distribute
16 * the linked executables. You must obey the GNU General Public License
17 * in all respects for all of the code used other than "OpenSSL". If you
18 * modify file(s) with this exception, you may extend this exception to
19 * your version of the file(s), but you are not obligated to do so. If
20 * you do not wish to do so, delete this exception statement from your
21 * version. If you delete this exception statement from all source files
22 * in the program, then also delete it here.
23 *
24 * This program is distributed in the hope that it will be useful, but
25 * WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27 * General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program. If not, see <http://www.gnu.org/licenses/>.
31 **/
32
33
34 #include "../PrecompiledHeaders.h"
35 #include "SharedMessageQueue.h"
36
37
38 #include "../Compatibility.h"
39
40
41 /**
42 * FIFO (queue):
43 *
44 * back front
45 * +--+--+--+--+--+--+--+--+--+--+--+
46 * Enqueue -> | | | | | | | | | | | |
47 * | | | | | | | | | | | | -> Dequeue
48 * +--+--+--+--+--+--+--+--+--+--+--+
49 * ^
50 * |
51 * Make room here
52 *
53 *
54 * LIFO (stack):
55 *
56 * back front
57 * +--+--+--+--+--+--+--+--+--+--+--+
58 * | | | | | | | | | | | | <- Enqueue
59 * | | | | | | | | | | | | -> Dequeue
60 * +--+--+--+--+--+--+--+--+--+--+--+
61 * ^
62 * |
63 * Make room here
64 **/
65
66
67 namespace Orthanc
68 {
69 SharedMessageQueue::SharedMessageQueue(unsigned int maxSize) :
70 isFifo_(true),
71 maxSize_(maxSize)
72 {
73 }
74
75
76 SharedMessageQueue::~SharedMessageQueue()
77 {
78 for (Queue::iterator it = queue_.begin(); it != queue_.end(); ++it)
79 {
80 delete *it;
81 }
82 }
83
84
85 void SharedMessageQueue::Enqueue(IDynamicObject* message)
86 {
87 boost::mutex::scoped_lock lock(mutex_);
88
89 if (maxSize_ != 0 && queue_.size() > maxSize_)
90 {
91 if (isFifo_)
92 {
93 // Too many elements in the queue: Make room
94 delete queue_.front();
95 queue_.pop_front();
96 }
97 else
98 {
99 // Too many elements in the stack: Make room
100 delete queue_.back();
101 queue_.pop_back();
102 }
103 }
104
105 if (isFifo_)
106 {
107 // Queue policy (FIFO)
108 queue_.push_back(message);
109 }
110 else
111 {
112 // Stack policy (LIFO)
113 queue_.push_front(message);
114 }
115
116 elementAvailable_.notify_one();
117 }
118
119
120 IDynamicObject* SharedMessageQueue::Dequeue(int32_t millisecondsTimeout)
121 {
122 boost::mutex::scoped_lock lock(mutex_);
123
124 // Wait for a message to arrive in the FIFO queue
125 while (queue_.empty())
126 {
127 if (millisecondsTimeout == 0)
128 {
129 elementAvailable_.wait(lock);
130 }
131 else
132 {
133 bool success = elementAvailable_.timed_wait
134 (lock, boost::posix_time::milliseconds(millisecondsTimeout));
135 if (!success)
136 {
137 return NULL;
138 }
139 }
140 }
141
142 std::unique_ptr<IDynamicObject> message(queue_.front());
143 queue_.pop_front();
144
145 if (queue_.empty())
146 {
147 emptied_.notify_all();
148 }
149
150 return message.release();
151 }
152
153
154
155 bool SharedMessageQueue::WaitEmpty(int32_t millisecondsTimeout)
156 {
157 boost::mutex::scoped_lock lock(mutex_);
158
159 // Wait for the queue to become empty
160 while (!queue_.empty())
161 {
162 if (millisecondsTimeout == 0)
163 {
164 emptied_.wait(lock);
165 }
166 else
167 {
168 if (!emptied_.timed_wait
169 (lock, boost::posix_time::milliseconds(millisecondsTimeout)))
170 {
171 return false;
172 }
173 }
174 }
175
176 return true;
177 }
178
179
180 void SharedMessageQueue::SetFifoPolicy()
181 {
182 boost::mutex::scoped_lock lock(mutex_);
183 isFifo_ = true;
184 }
185
186 void SharedMessageQueue::SetLifoPolicy()
187 {
188 boost::mutex::scoped_lock lock(mutex_);
189 isFifo_ = false;
190 }
191
192 void SharedMessageQueue::Clear()
193 {
194 boost::mutex::scoped_lock lock(mutex_);
195
196 if (queue_.empty())
197 {
198 return;
199 }
200 else
201 {
202 while (!queue_.empty())
203 {
204 std::unique_ptr<IDynamicObject> message(queue_.front());
205 queue_.pop_front();
206 }
207
208 emptied_.notify_all();
209 }
210 }
211 }