Mercurial > hg > orthanc-transfers
comparison Framework/HttpQueries/HttpQueriesQueue.cpp @ 0:95226b754d9e
initial release
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 17 Sep 2018 11:34:55 +0200 |
parents | |
children | 9bcd6eadcff5 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:95226b754d9e |
---|---|
1 /** | |
2 * Transfers accelerator plugin for Orthanc | |
3 * Copyright (C) 2018 Osimis, Belgium | |
4 * | |
5 * This program is free software: you can redistribute it and/or | |
6 * modify it under the terms of the GNU Affero General Public License | |
7 * as published by the Free Software Foundation, either version 3 of | |
8 * the License, or (at your option) any later version. | |
9 * | |
10 * This program is distributed in the hope that it will be useful, but | |
11 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 * Affero General Public License for more details. | |
14 * | |
15 * You should have received a copy of the GNU Affero General Public License | |
16 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 **/ | |
18 | |
19 | |
20 #include "HttpQueriesQueue.h" | |
21 | |
22 #include <Core/Logging.h> | |
23 #include <Core/OrthancException.h> | |
24 | |
25 namespace OrthancPlugins | |
26 { | |
27 HttpQueriesQueue::Status HttpQueriesQueue::GetStatusInternal() const | |
28 { | |
29 if (successQueries_ == queries_.size()) | |
30 { | |
31 return Status_Success; | |
32 } | |
33 else if (isFailure_) | |
34 { | |
35 return Status_Failure; | |
36 } | |
37 else | |
38 { | |
39 return Status_Running; | |
40 } | |
41 } | |
42 | |
43 | |
44 HttpQueriesQueue::HttpQueriesQueue(OrthancPluginContext* context) : | |
45 context_(context), | |
46 peers_(context), | |
47 maxRetries_(0) | |
48 { | |
49 Reset(); | |
50 } | |
51 | |
52 | |
53 HttpQueriesQueue::~HttpQueriesQueue() | |
54 { | |
55 for (size_t i = 0; i < queries_.size(); i++) | |
56 { | |
57 assert(queries_[i] != NULL); | |
58 delete queries_[i]; | |
59 } | |
60 } | |
61 | |
62 | |
63 unsigned int HttpQueriesQueue::GetMaxRetries() | |
64 { | |
65 boost::mutex::scoped_lock lock(mutex_); | |
66 return maxRetries_; | |
67 } | |
68 | |
69 | |
70 void HttpQueriesQueue::SetMaxRetries(unsigned int maxRetries) | |
71 { | |
72 boost::mutex::scoped_lock lock(mutex_); | |
73 maxRetries_ = maxRetries; | |
74 } | |
75 | |
76 | |
77 void HttpQueriesQueue::Reserve(size_t size) | |
78 { | |
79 boost::mutex::scoped_lock lock(mutex_); | |
80 queries_.reserve(size); | |
81 } | |
82 | |
83 | |
84 void HttpQueriesQueue::Reset() | |
85 { | |
86 boost::mutex::scoped_lock lock(mutex_); | |
87 position_ = 0; | |
88 downloadedSize_ = 0; | |
89 uploadedSize_ = 0; | |
90 successQueries_ = 0; | |
91 isFailure_ = false; | |
92 } | |
93 | |
94 | |
95 void HttpQueriesQueue::Enqueue(IHttpQuery* query) | |
96 { | |
97 if (query == NULL) | |
98 { | |
99 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); | |
100 } | |
101 else | |
102 { | |
103 boost::mutex::scoped_lock lock(mutex_); | |
104 queries_.push_back(query); | |
105 } | |
106 } | |
107 | |
108 | |
109 bool HttpQueriesQueue::ExecuteOneQuery(uint64_t& networkTraffic) | |
110 { | |
111 networkTraffic = 0; | |
112 | |
113 unsigned int maxRetries; | |
114 IHttpQuery* query = NULL; | |
115 | |
116 { | |
117 boost::mutex::scoped_lock lock(mutex_); | |
118 | |
119 maxRetries = maxRetries_; | |
120 | |
121 if (position_ == queries_.size() || | |
122 isFailure_) | |
123 { | |
124 return false; | |
125 } | |
126 else | |
127 { | |
128 query = queries_[position_]; | |
129 position_ ++; | |
130 } | |
131 } | |
132 | |
133 std::string body; | |
134 | |
135 if (query->GetMethod() == Orthanc::HttpMethod_Post || | |
136 query->GetMethod() == Orthanc::HttpMethod_Put) | |
137 { | |
138 query->ReadBody(body); | |
139 } | |
140 | |
141 unsigned int retry = 0; | |
142 | |
143 for (;;) | |
144 { | |
145 MemoryBuffer answer(context_); | |
146 | |
147 bool success; | |
148 | |
149 try | |
150 { | |
151 switch (query->GetMethod()) | |
152 { | |
153 case Orthanc::HttpMethod_Get: | |
154 success = peers_.DoGet(answer, query->GetPeer(), query->GetUri()); | |
155 break; | |
156 | |
157 case Orthanc::HttpMethod_Post: | |
158 success = peers_.DoPost(answer, query->GetPeer(), query->GetUri(), body); | |
159 break; | |
160 | |
161 case Orthanc::HttpMethod_Put: | |
162 success = peers_.DoPut(query->GetPeer(), query->GetUri(), body); | |
163 break; | |
164 | |
165 case Orthanc::HttpMethod_Delete: | |
166 success = peers_.DoDelete(query->GetPeer(), query->GetUri()); | |
167 break; | |
168 | |
169 default: | |
170 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); | |
171 } | |
172 } | |
173 catch (Orthanc::OrthancException& e) | |
174 { | |
175 LOG(ERROR) << "Unhandled exception during an HTTP query to peer \"" | |
176 << query->GetPeer() << "\": " << e.What(); | |
177 success = false; | |
178 } | |
179 | |
180 if (success) | |
181 { | |
182 size_t downloaded = 0; | |
183 size_t uploaded = 0; | |
184 | |
185 if (query->GetMethod() == Orthanc::HttpMethod_Get || | |
186 query->GetMethod() == Orthanc::HttpMethod_Post) | |
187 { | |
188 query->HandleAnswer(answer.GetData(), answer.GetSize()); | |
189 downloaded = answer.GetSize(); | |
190 } | |
191 | |
192 if (query->GetMethod() == Orthanc::HttpMethod_Put || | |
193 query->GetMethod() == Orthanc::HttpMethod_Post) | |
194 { | |
195 uploaded = body.size(); | |
196 } | |
197 | |
198 networkTraffic = downloaded + uploaded; | |
199 | |
200 { | |
201 boost::mutex::scoped_lock lock(mutex_); | |
202 downloadedSize_ += downloaded; | |
203 uploadedSize_ += uploaded; | |
204 successQueries_ ++; | |
205 | |
206 if (successQueries_ == queries_.size()) | |
207 { | |
208 completed_.notify_all(); | |
209 } | |
210 | |
211 return true; | |
212 } | |
213 } | |
214 else | |
215 { | |
216 // Error: Let's retry | |
217 retry ++; | |
218 | |
219 if (retry < maxRetries) | |
220 { | |
221 // Wait 1 second before retrying | |
222 boost::this_thread::sleep(boost::posix_time::seconds(1)); | |
223 } | |
224 else | |
225 { | |
226 LOG(ERROR) << "Reached the maximum number of retries for a HTTP query"; | |
227 | |
228 { | |
229 boost::mutex::scoped_lock lock(mutex_); | |
230 isFailure_ = true; | |
231 completed_.notify_all(); | |
232 } | |
233 | |
234 return false; | |
235 } | |
236 } | |
237 } | |
238 } | |
239 | |
240 | |
241 HttpQueriesQueue::Status HttpQueriesQueue::WaitComplete(unsigned int timeoutMS) | |
242 { | |
243 boost::mutex::scoped_lock lock(mutex_); | |
244 | |
245 Status status = GetStatusInternal(); | |
246 | |
247 if (status == Status_Running) | |
248 { | |
249 completed_.timed_wait(lock, boost::posix_time::milliseconds(timeoutMS)); | |
250 return GetStatusInternal(); | |
251 } | |
252 else | |
253 { | |
254 return status; | |
255 } | |
256 } | |
257 | |
258 | |
259 void HttpQueriesQueue::WaitComplete() | |
260 { | |
261 boost::mutex::scoped_lock lock(mutex_); | |
262 | |
263 while (GetStatusInternal() == Status_Running) | |
264 { | |
265 completed_.timed_wait(lock, boost::posix_time::milliseconds(200)); | |
266 } | |
267 } | |
268 | |
269 | |
270 void HttpQueriesQueue::GetStatistics(size_t& scheduledQueriesCount, | |
271 size_t& successQueriesCount, | |
272 uint64_t& downloadedSize, | |
273 uint64_t& uploadedSize) | |
274 { | |
275 boost::mutex::scoped_lock lock(mutex_); | |
276 scheduledQueriesCount = queries_.size(); | |
277 successQueriesCount = successQueries_; | |
278 downloadedSize = downloadedSize_; | |
279 uploadedSize = uploadedSize_; | |
280 } | |
281 } |