comparison Framework/HttpQueries/HttpQueriesQueue.cpp @ 0:95226b754d9e

initial release
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 17 Sep 2018 11:34:55 +0200
parents
children 9bcd6eadcff5
comparison
equal deleted inserted replaced
-1:000000000000 0:95226b754d9e
1 /**
2 * Transfers accelerator plugin for Orthanc
3 * Copyright (C) 2018 Osimis, Belgium
4 *
5 * This program is free software: you can redistribute it and/or
6 * modify it under the terms of the GNU Affero General Public License
7 * as published by the Free Software Foundation, either version 3 of
8 * the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 **/
18
19
20 #include "HttpQueriesQueue.h"
21
22 #include <Core/Logging.h>
23 #include <Core/OrthancException.h>
24
25 namespace OrthancPlugins
26 {
27 HttpQueriesQueue::Status HttpQueriesQueue::GetStatusInternal() const
28 {
29 if (successQueries_ == queries_.size())
30 {
31 return Status_Success;
32 }
33 else if (isFailure_)
34 {
35 return Status_Failure;
36 }
37 else
38 {
39 return Status_Running;
40 }
41 }
42
43
44 HttpQueriesQueue::HttpQueriesQueue(OrthancPluginContext* context) :
45 context_(context),
46 peers_(context),
47 maxRetries_(0)
48 {
49 Reset();
50 }
51
52
53 HttpQueriesQueue::~HttpQueriesQueue()
54 {
55 for (size_t i = 0; i < queries_.size(); i++)
56 {
57 assert(queries_[i] != NULL);
58 delete queries_[i];
59 }
60 }
61
62
63 unsigned int HttpQueriesQueue::GetMaxRetries()
64 {
65 boost::mutex::scoped_lock lock(mutex_);
66 return maxRetries_;
67 }
68
69
70 void HttpQueriesQueue::SetMaxRetries(unsigned int maxRetries)
71 {
72 boost::mutex::scoped_lock lock(mutex_);
73 maxRetries_ = maxRetries;
74 }
75
76
77 void HttpQueriesQueue::Reserve(size_t size)
78 {
79 boost::mutex::scoped_lock lock(mutex_);
80 queries_.reserve(size);
81 }
82
83
84 void HttpQueriesQueue::Reset()
85 {
86 boost::mutex::scoped_lock lock(mutex_);
87 position_ = 0;
88 downloadedSize_ = 0;
89 uploadedSize_ = 0;
90 successQueries_ = 0;
91 isFailure_ = false;
92 }
93
94
95 void HttpQueriesQueue::Enqueue(IHttpQuery* query)
96 {
97 if (query == NULL)
98 {
99 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
100 }
101 else
102 {
103 boost::mutex::scoped_lock lock(mutex_);
104 queries_.push_back(query);
105 }
106 }
107
108
109 bool HttpQueriesQueue::ExecuteOneQuery(uint64_t& networkTraffic)
110 {
111 networkTraffic = 0;
112
113 unsigned int maxRetries;
114 IHttpQuery* query = NULL;
115
116 {
117 boost::mutex::scoped_lock lock(mutex_);
118
119 maxRetries = maxRetries_;
120
121 if (position_ == queries_.size() ||
122 isFailure_)
123 {
124 return false;
125 }
126 else
127 {
128 query = queries_[position_];
129 position_ ++;
130 }
131 }
132
133 std::string body;
134
135 if (query->GetMethod() == Orthanc::HttpMethod_Post ||
136 query->GetMethod() == Orthanc::HttpMethod_Put)
137 {
138 query->ReadBody(body);
139 }
140
141 unsigned int retry = 0;
142
143 for (;;)
144 {
145 MemoryBuffer answer(context_);
146
147 bool success;
148
149 try
150 {
151 switch (query->GetMethod())
152 {
153 case Orthanc::HttpMethod_Get:
154 success = peers_.DoGet(answer, query->GetPeer(), query->GetUri());
155 break;
156
157 case Orthanc::HttpMethod_Post:
158 success = peers_.DoPost(answer, query->GetPeer(), query->GetUri(), body);
159 break;
160
161 case Orthanc::HttpMethod_Put:
162 success = peers_.DoPut(query->GetPeer(), query->GetUri(), body);
163 break;
164
165 case Orthanc::HttpMethod_Delete:
166 success = peers_.DoDelete(query->GetPeer(), query->GetUri());
167 break;
168
169 default:
170 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
171 }
172 }
173 catch (Orthanc::OrthancException& e)
174 {
175 LOG(ERROR) << "Unhandled exception during an HTTP query to peer \""
176 << query->GetPeer() << "\": " << e.What();
177 success = false;
178 }
179
180 if (success)
181 {
182 size_t downloaded = 0;
183 size_t uploaded = 0;
184
185 if (query->GetMethod() == Orthanc::HttpMethod_Get ||
186 query->GetMethod() == Orthanc::HttpMethod_Post)
187 {
188 query->HandleAnswer(answer.GetData(), answer.GetSize());
189 downloaded = answer.GetSize();
190 }
191
192 if (query->GetMethod() == Orthanc::HttpMethod_Put ||
193 query->GetMethod() == Orthanc::HttpMethod_Post)
194 {
195 uploaded = body.size();
196 }
197
198 networkTraffic = downloaded + uploaded;
199
200 {
201 boost::mutex::scoped_lock lock(mutex_);
202 downloadedSize_ += downloaded;
203 uploadedSize_ += uploaded;
204 successQueries_ ++;
205
206 if (successQueries_ == queries_.size())
207 {
208 completed_.notify_all();
209 }
210
211 return true;
212 }
213 }
214 else
215 {
216 // Error: Let's retry
217 retry ++;
218
219 if (retry < maxRetries)
220 {
221 // Wait 1 second before retrying
222 boost::this_thread::sleep(boost::posix_time::seconds(1));
223 }
224 else
225 {
226 LOG(ERROR) << "Reached the maximum number of retries for a HTTP query";
227
228 {
229 boost::mutex::scoped_lock lock(mutex_);
230 isFailure_ = true;
231 completed_.notify_all();
232 }
233
234 return false;
235 }
236 }
237 }
238 }
239
240
241 HttpQueriesQueue::Status HttpQueriesQueue::WaitComplete(unsigned int timeoutMS)
242 {
243 boost::mutex::scoped_lock lock(mutex_);
244
245 Status status = GetStatusInternal();
246
247 if (status == Status_Running)
248 {
249 completed_.timed_wait(lock, boost::posix_time::milliseconds(timeoutMS));
250 return GetStatusInternal();
251 }
252 else
253 {
254 return status;
255 }
256 }
257
258
259 void HttpQueriesQueue::WaitComplete()
260 {
261 boost::mutex::scoped_lock lock(mutex_);
262
263 while (GetStatusInternal() == Status_Running)
264 {
265 completed_.timed_wait(lock, boost::posix_time::milliseconds(200));
266 }
267 }
268
269
270 void HttpQueriesQueue::GetStatistics(size_t& scheduledQueriesCount,
271 size_t& successQueriesCount,
272 uint64_t& downloadedSize,
273 uint64_t& uploadedSize)
274 {
275 boost::mutex::scoped_lock lock(mutex_);
276 scheduledQueriesCount = queries_.size();
277 successQueriesCount = successQueries_;
278 downloadedSize = downloadedSize_;
279 uploadedSize = uploadedSize_;
280 }
281 }