comparison Resources/Orthanc/Core/MultiThreading/SharedMessageQueue.cpp @ 200:03afbee0cc7b

integration of Orthanc core into Stone
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 23 Mar 2018 11:04:03 +0100
parents
children
comparison
equal deleted inserted replaced
199:dabe9982fca3 200:03afbee0cc7b
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation, either version 3 of the
10 * License, or (at your option) any later version.
11 *
12 * In addition, as a special exception, the copyright holders of this
13 * program give permission to link the code of its release with the
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it
15 * that use the same license as the "OpenSSL" library), and distribute
16 * the linked executables. You must obey the GNU General Public License
17 * in all respects for all of the code used other than "OpenSSL". If you
18 * modify file(s) with this exception, you may extend this exception to
19 * your version of the file(s), but you are not obligated to do so. If
20 * you do not wish to do so, delete this exception statement from your
21 * version. If you delete this exception statement from all source files
22 * in the program, then also delete it here.
23 *
24 * This program is distributed in the hope that it will be useful, but
25 * WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27 * General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program. If not, see <http://www.gnu.org/licenses/>.
31 **/
32
33
34 #include "../PrecompiledHeaders.h"
35 #include "SharedMessageQueue.h"
36
37
38
39 /**
40 * FIFO (queue):
41 *
42 * back front
43 * +--+--+--+--+--+--+--+--+--+--+--+
44 * Enqueue -> | | | | | | | | | | | |
45 * | | | | | | | | | | | | -> Dequeue
46 * +--+--+--+--+--+--+--+--+--+--+--+
47 * ^
48 * |
49 * Make room here
50 *
51 *
52 * LIFO (stack):
53 *
54 * back front
55 * +--+--+--+--+--+--+--+--+--+--+--+
56 * | | | | | | | | | | | | <- Enqueue
57 * | | | | | | | | | | | | -> Dequeue
58 * +--+--+--+--+--+--+--+--+--+--+--+
59 * ^
60 * |
61 * Make room here
62 **/
63
64
65 namespace Orthanc
66 {
67 SharedMessageQueue::SharedMessageQueue(unsigned int maxSize) :
68 isFifo_(true),
69 maxSize_(maxSize)
70 {
71 }
72
73
74 SharedMessageQueue::~SharedMessageQueue()
75 {
76 for (Queue::iterator it = queue_.begin(); it != queue_.end(); ++it)
77 {
78 delete *it;
79 }
80 }
81
82
83 void SharedMessageQueue::Enqueue(IDynamicObject* message)
84 {
85 boost::mutex::scoped_lock lock(mutex_);
86
87 if (maxSize_ != 0 && queue_.size() > maxSize_)
88 {
89 if (isFifo_)
90 {
91 // Too many elements in the queue: Make room
92 delete queue_.front();
93 queue_.pop_front();
94 }
95 else
96 {
97 // Too many elements in the stack: Make room
98 delete queue_.back();
99 queue_.pop_back();
100 }
101 }
102
103 if (isFifo_)
104 {
105 // Queue policy (FIFO)
106 queue_.push_back(message);
107 }
108 else
109 {
110 // Stack policy (LIFO)
111 queue_.push_front(message);
112 }
113
114 elementAvailable_.notify_one();
115 }
116
117
118 IDynamicObject* SharedMessageQueue::Dequeue(int32_t millisecondsTimeout)
119 {
120 boost::mutex::scoped_lock lock(mutex_);
121
122 // Wait for a message to arrive in the FIFO queue
123 while (queue_.empty())
124 {
125 if (millisecondsTimeout == 0)
126 {
127 elementAvailable_.wait(lock);
128 }
129 else
130 {
131 bool success = elementAvailable_.timed_wait
132 (lock, boost::posix_time::milliseconds(millisecondsTimeout));
133 if (!success)
134 {
135 return NULL;
136 }
137 }
138 }
139
140 std::auto_ptr<IDynamicObject> message(queue_.front());
141 queue_.pop_front();
142
143 if (queue_.empty())
144 {
145 emptied_.notify_all();
146 }
147
148 return message.release();
149 }
150
151
152
153 bool SharedMessageQueue::WaitEmpty(int32_t millisecondsTimeout)
154 {
155 boost::mutex::scoped_lock lock(mutex_);
156
157 // Wait for the queue to become empty
158 while (!queue_.empty())
159 {
160 if (millisecondsTimeout == 0)
161 {
162 emptied_.wait(lock);
163 }
164 else
165 {
166 if (!emptied_.timed_wait
167 (lock, boost::posix_time::milliseconds(millisecondsTimeout)))
168 {
169 return false;
170 }
171 }
172 }
173
174 return true;
175 }
176
177
178 void SharedMessageQueue::SetFifoPolicy()
179 {
180 boost::mutex::scoped_lock lock(mutex_);
181 isFifo_ = true;
182 }
183
184 void SharedMessageQueue::SetLifoPolicy()
185 {
186 boost::mutex::scoped_lock lock(mutex_);
187 isFifo_ = false;
188 }
189
190 void SharedMessageQueue::Clear()
191 {
192 boost::mutex::scoped_lock lock(mutex_);
193
194 if (queue_.empty())
195 {
196 return;
197 }
198 else
199 {
200 while (!queue_.empty())
201 {
202 std::auto_ptr<IDynamicObject> message(queue_.front());
203 queue_.pop_front();
204 }
205
206 emptied_.notify_all();
207 }
208 }
209 }