Mercurial > hg > orthanc-transfers
annotate Framework/HttpQueries/HttpQueriesQueue.cpp @ 49:aff1ade130c5 OrthancTransfers-1.0
closing branch OrthancTransfers-1.0
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 05 Dec 2022 14:25:02 +0100 |
parents | c9e28e31262e |
children | b06103a50c95 |
rev | line source |
---|---|
0 | 1 /** |
2 * Transfers accelerator plugin for Orthanc | |
9 | 3 * Copyright (C) 2018-2019 Osimis S.A., Belgium |
0 | 4 * |
5 * This program is free software: you can redistribute it and/or | |
6 * modify it under the terms of the GNU Affero General Public License | |
7 * as published by the Free Software Foundation, either version 3 of | |
8 * the License, or (at your option) any later version. | |
9 * | |
10 * This program is distributed in the hope that it will be useful, but | |
11 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 * Affero General Public License for more details. | |
14 * | |
15 * You should have received a copy of the GNU Affero General Public License | |
16 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 **/ | |
18 | |
19 | |
20 #include "HttpQueriesQueue.h" | |
21 | |
22 #include <Core/Logging.h> | |
23 #include <Core/OrthancException.h> | |
24 | |
25 namespace OrthancPlugins | |
26 { | |
27 HttpQueriesQueue::Status HttpQueriesQueue::GetStatusInternal() const | |
28 { | |
29 if (successQueries_ == queries_.size()) | |
30 { | |
31 return Status_Success; | |
32 } | |
33 else if (isFailure_) | |
34 { | |
35 return Status_Failure; | |
36 } | |
37 else | |
38 { | |
39 return Status_Running; | |
40 } | |
41 } | |
42 | |
43 | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5
diff
changeset
|
44 HttpQueriesQueue::HttpQueriesQueue() : |
0 | 45 maxRetries_(0) |
46 { | |
47 Reset(); | |
48 } | |
49 | |
50 | |
51 HttpQueriesQueue::~HttpQueriesQueue() | |
52 { | |
53 for (size_t i = 0; i < queries_.size(); i++) | |
54 { | |
55 assert(queries_[i] != NULL); | |
56 delete queries_[i]; | |
57 } | |
58 } | |
59 | |
60 | |
61 unsigned int HttpQueriesQueue::GetMaxRetries() | |
62 { | |
63 boost::mutex::scoped_lock lock(mutex_); | |
64 return maxRetries_; | |
65 } | |
66 | |
67 | |
68 void HttpQueriesQueue::SetMaxRetries(unsigned int maxRetries) | |
69 { | |
70 boost::mutex::scoped_lock lock(mutex_); | |
71 maxRetries_ = maxRetries; | |
72 } | |
73 | |
74 | |
75 void HttpQueriesQueue::Reserve(size_t size) | |
76 { | |
77 boost::mutex::scoped_lock lock(mutex_); | |
78 queries_.reserve(size); | |
79 } | |
80 | |
81 | |
82 void HttpQueriesQueue::Reset() | |
83 { | |
84 boost::mutex::scoped_lock lock(mutex_); | |
85 position_ = 0; | |
86 downloadedSize_ = 0; | |
87 uploadedSize_ = 0; | |
88 successQueries_ = 0; | |
89 isFailure_ = false; | |
90 } | |
91 | |
92 | |
93 void HttpQueriesQueue::Enqueue(IHttpQuery* query) | |
94 { | |
95 if (query == NULL) | |
96 { | |
97 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); | |
98 } | |
99 else | |
100 { | |
101 boost::mutex::scoped_lock lock(mutex_); | |
102 queries_.push_back(query); | |
103 } | |
104 } | |
105 | |
106 | |
3 | 107 bool HttpQueriesQueue::ExecuteOneQuery(size_t& networkTraffic) |
0 | 108 { |
109 networkTraffic = 0; | |
110 | |
111 unsigned int maxRetries; | |
112 IHttpQuery* query = NULL; | |
113 | |
114 { | |
115 boost::mutex::scoped_lock lock(mutex_); | |
116 | |
117 maxRetries = maxRetries_; | |
118 | |
119 if (position_ == queries_.size() || | |
120 isFailure_) | |
121 { | |
122 return false; | |
123 } | |
124 else | |
125 { | |
126 query = queries_[position_]; | |
127 position_ ++; | |
128 } | |
129 } | |
130 | |
131 std::string body; | |
132 | |
133 if (query->GetMethod() == Orthanc::HttpMethod_Post || | |
134 query->GetMethod() == Orthanc::HttpMethod_Put) | |
135 { | |
136 query->ReadBody(body); | |
137 } | |
138 | |
139 unsigned int retry = 0; | |
140 | |
141 for (;;) | |
142 { | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5
diff
changeset
|
143 MemoryBuffer answer; |
0 | 144 |
145 bool success; | |
146 | |
147 try | |
148 { | |
149 switch (query->GetMethod()) | |
150 { | |
151 case Orthanc::HttpMethod_Get: | |
152 success = peers_.DoGet(answer, query->GetPeer(), query->GetUri()); | |
153 break; | |
154 | |
155 case Orthanc::HttpMethod_Post: | |
156 success = peers_.DoPost(answer, query->GetPeer(), query->GetUri(), body); | |
157 break; | |
158 | |
159 case Orthanc::HttpMethod_Put: | |
160 success = peers_.DoPut(query->GetPeer(), query->GetUri(), body); | |
161 break; | |
162 | |
163 case Orthanc::HttpMethod_Delete: | |
164 success = peers_.DoDelete(query->GetPeer(), query->GetUri()); | |
165 break; | |
166 | |
167 default: | |
168 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); | |
169 } | |
170 } | |
171 catch (Orthanc::OrthancException& e) | |
172 { | |
173 LOG(ERROR) << "Unhandled exception during an HTTP query to peer \"" | |
174 << query->GetPeer() << "\": " << e.What(); | |
175 success = false; | |
176 } | |
177 | |
178 if (success) | |
179 { | |
180 size_t downloaded = 0; | |
181 size_t uploaded = 0; | |
182 | |
183 if (query->GetMethod() == Orthanc::HttpMethod_Get || | |
184 query->GetMethod() == Orthanc::HttpMethod_Post) | |
185 { | |
186 query->HandleAnswer(answer.GetData(), answer.GetSize()); | |
187 downloaded = answer.GetSize(); | |
188 } | |
189 | |
190 if (query->GetMethod() == Orthanc::HttpMethod_Put || | |
191 query->GetMethod() == Orthanc::HttpMethod_Post) | |
192 { | |
193 uploaded = body.size(); | |
194 } | |
195 | |
196 networkTraffic = downloaded + uploaded; | |
197 | |
198 { | |
199 boost::mutex::scoped_lock lock(mutex_); | |
200 downloadedSize_ += downloaded; | |
201 uploadedSize_ += uploaded; | |
202 successQueries_ ++; | |
203 | |
204 if (successQueries_ == queries_.size()) | |
205 { | |
206 completed_.notify_all(); | |
207 } | |
208 | |
209 return true; | |
210 } | |
211 } | |
212 else | |
213 { | |
214 // Error: Let's retry | |
215 retry ++; | |
216 | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
217 if (retry <= maxRetries) |
0 | 218 { |
219 // Wait 1 second before retrying | |
220 boost::this_thread::sleep(boost::posix_time::seconds(1)); | |
221 } | |
222 else | |
223 { | |
5
5e6de82bb10f
use of user properties instead of BidirectionalPeers option
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
3
diff
changeset
|
224 LOG(INFO) << "Reached the maximum number of retries for a HTTP query"; |
0 | 225 |
226 { | |
227 boost::mutex::scoped_lock lock(mutex_); | |
228 isFailure_ = true; | |
229 completed_.notify_all(); | |
230 } | |
231 | |
232 return false; | |
233 } | |
234 } | |
235 } | |
236 } | |
237 | |
238 | |
239 HttpQueriesQueue::Status HttpQueriesQueue::WaitComplete(unsigned int timeoutMS) | |
240 { | |
241 boost::mutex::scoped_lock lock(mutex_); | |
242 | |
243 Status status = GetStatusInternal(); | |
244 | |
245 if (status == Status_Running) | |
246 { | |
247 completed_.timed_wait(lock, boost::posix_time::milliseconds(timeoutMS)); | |
248 return GetStatusInternal(); | |
249 } | |
250 else | |
251 { | |
252 return status; | |
253 } | |
254 } | |
255 | |
256 | |
257 void HttpQueriesQueue::WaitComplete() | |
258 { | |
259 boost::mutex::scoped_lock lock(mutex_); | |
260 | |
261 while (GetStatusInternal() == Status_Running) | |
262 { | |
263 completed_.timed_wait(lock, boost::posix_time::milliseconds(200)); | |
264 } | |
265 } | |
266 | |
267 | |
268 void HttpQueriesQueue::GetStatistics(size_t& scheduledQueriesCount, | |
269 size_t& successQueriesCount, | |
270 uint64_t& downloadedSize, | |
271 uint64_t& uploadedSize) | |
272 { | |
273 boost::mutex::scoped_lock lock(mutex_); | |
274 scheduledQueriesCount = queries_.size(); | |
275 successQueriesCount = successQueries_; | |
276 downloadedSize = downloadedSize_; | |
277 uploadedSize = uploadedSize_; | |
278 } | |
279 } |