Mercurial > hg > orthanc-transfers
annotate Framework/PushMode/PushJob.cpp @ 77:1e396fb509ca default
updated copyright, as Orthanc Team now replaces Osimis
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 30 May 2024 22:44:10 +0200 |
parents | f4e828607f02 |
children |
rev | line source |
---|---|
0 | 1 /** |
2 * Transfers accelerator plugin for Orthanc | |
77
1e396fb509ca
updated copyright, as Orthanc Team now replaces Osimis
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
44
diff
changeset
|
3 * Copyright (C) 2018-2023 Osimis S.A., Belgium |
1e396fb509ca
updated copyright, as Orthanc Team now replaces Osimis
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
44
diff
changeset
|
4 * Copyright (C) 2024-2024 Orthanc Team SRL, Belgium |
1e396fb509ca
updated copyright, as Orthanc Team now replaces Osimis
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
44
diff
changeset
|
5 * Copyright (C) 2021-2024 Sebastien Jodogne, ICTEAM UCLouvain, Belgium |
0 | 6 * |
7 * This program is free software: you can redistribute it and/or | |
8 * modify it under the terms of the GNU Affero General Public License | |
9 * as published by the Free Software Foundation, either version 3 of | |
10 * the License, or (at your option) any later version. | |
11 * | |
12 * This program is distributed in the hope that it will be useful, but | |
13 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
15 * Affero General Public License for more details. | |
16 * | |
17 * You should have received a copy of the GNU Affero General Public License | |
18 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 **/ | |
20 | |
21 | |
22 #include "PushJob.h" | |
23 | |
24 #include "BucketPushQuery.h" | |
25 #include "../HttpQueries/HttpQueriesRunner.h" | |
26 #include "../TransferScheduler.h" | |
27 | |
25
dfc43678aecb
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
20
diff
changeset
|
28 #include <Compatibility.h> // For std::unique_ptr |
20 | 29 #include <Logging.h> |
0 | 30 |
31 | |
32 namespace OrthancPlugins | |
33 { | |
34 class PushJob::FinalState : public IState | |
35 { | |
36 private: | |
37 const PushJob& job_; | |
38 JobInfo& info_; | |
39 std::string transactionUri_; | |
40 bool isCommit_; | |
41 | |
42 public: | |
43 FinalState(const PushJob& job, | |
44 JobInfo& info, | |
45 const std::string& transactionUri, | |
46 bool isCommit) : | |
47 job_(job), | |
48 info_(info), | |
49 transactionUri_(transactionUri), | |
50 isCommit_(isCommit) | |
51 { | |
52 } | |
53 | |
54 virtual StateUpdate* Step() | |
55 { | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
56 Json::Value answer; |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
57 bool success = false; |
44
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
58 std::map<std::string, std::string> headers; |
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
59 job_.query_.GetHttpHeaders(headers); |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
60 |
0 | 61 if (isCommit_) |
62 { | |
44
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
63 success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_, headers); |
0 | 64 } |
65 else | |
66 { | |
44
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
67 success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_, headers); |
0 | 68 } |
69 | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
70 if (!success) |
0 | 71 { |
72 if (isCommit_) | |
73 { | |
44
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
74 LOG(ERROR) << "Cannot commit push transaction on remote peer: " // TODO: add job ID |
0 | 75 << job_.query_.GetPeer(); |
76 } | |
77 | |
78 return StateUpdate::Failure(); | |
79 } | |
80 else if (isCommit_) | |
81 { | |
82 return StateUpdate::Success(); | |
83 } | |
84 else | |
85 { | |
86 return StateUpdate::Failure(); | |
87 } | |
88 } | |
89 | |
90 virtual void Stop(OrthancPluginJobStopReason reason) | |
91 { | |
92 } | |
93 }; | |
94 | |
95 | |
96 class PushJob::PushBucketsState : public IState | |
97 { | |
98 private: | |
99 const PushJob& job_; | |
100 JobInfo& info_; | |
101 std::string transactionUri_; | |
102 HttpQueriesQueue queue_; | |
25
dfc43678aecb
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
20
diff
changeset
|
103 std::unique_ptr<HttpQueriesRunner> runner_; |
0 | 104 |
105 void UpdateInfo() | |
106 { | |
107 size_t scheduledQueriesCount, completedQueriesCount; | |
108 uint64_t uploadedSize, downloadedSize; | |
109 queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize); | |
110 | |
111 info_.SetContent("UploadedSizeMB", ConvertToMegabytes(uploadedSize)); | |
112 info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount)); | |
113 | |
114 if (runner_.get() != NULL) | |
115 { | |
116 float speed; | |
117 runner_->GetSpeed(speed); | |
118 info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed)); | |
119 } | |
120 | |
121 // The "2" below corresponds to the "CreateTransactionState" | |
122 // and "FinalState" steps (which prevents division by zero) | |
123 info_.SetProgress(static_cast<float>(1 /* CreateTransactionState */ + completedQueriesCount) / | |
124 static_cast<float>(2 + scheduledQueriesCount)); | |
125 } | |
126 | |
127 public: | |
128 PushBucketsState(const PushJob& job, | |
129 JobInfo& info, | |
130 const std::string& transactionUri, | |
131 const std::vector<TransferBucket>& buckets) : | |
132 job_(job), | |
133 info_(info), | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
134 transactionUri_(transactionUri) |
0 | 135 { |
44
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
136 std::map<std::string, std::string> headers; |
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
137 job_.query_.GetHttpHeaders(headers); |
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
138 |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
139 queue_.SetMaxRetries(job.maxHttpRetries_); |
0 | 140 queue_.Reserve(buckets.size()); |
141 | |
142 for (size_t i = 0; i < buckets.size(); i++) | |
143 { | |
144 queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(), | |
44
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
145 transactionUri_, i, job.query_.GetCompression(), headers)); |
0 | 146 } |
147 | |
148 UpdateInfo(); | |
149 } | |
150 | |
151 virtual StateUpdate* Step() | |
152 { | |
153 if (runner_.get() == NULL) | |
154 { | |
155 runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_)); | |
156 } | |
157 | |
158 HttpQueriesQueue::Status status = queue_.WaitComplete(200); | |
159 | |
160 UpdateInfo(); | |
161 | |
162 switch (status) | |
163 { | |
164 case HttpQueriesQueue::Status_Running: | |
165 return StateUpdate::Continue(); | |
166 | |
167 case HttpQueriesQueue::Status_Success: | |
168 // Commit transaction on remote peer | |
169 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, true)); | |
170 | |
171 case HttpQueriesQueue::Status_Failure: | |
172 // Discard transaction on remote peer | |
173 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, false)); | |
174 | |
175 default: | |
176 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | |
177 } | |
178 } | |
179 | |
180 virtual void Stop(OrthancPluginJobStopReason reason) | |
181 { | |
182 // Cancel the running download threads | |
183 runner_.reset(); | |
184 } | |
185 }; | |
186 | |
187 | |
188 class PushJob::CreateTransactionState : public IState | |
189 { | |
190 private: | |
191 const PushJob& job_; | |
192 JobInfo& info_; | |
193 std::string createTransaction_; | |
194 std::vector<TransferBucket> buckets_; | |
195 | |
196 public: | |
197 CreateTransactionState(const PushJob& job, | |
198 JobInfo& info) : | |
199 job_(job), | |
200 info_(info) | |
201 { | |
202 TransferScheduler scheduler; | |
203 scheduler.ParseListOfResources(job_.cache_, job_.query_.GetResources()); | |
204 | |
205 Json::Value push; | |
206 scheduler.FormatPushTransaction(push, buckets_, | |
207 job.targetBucketSize_, 2 * job.targetBucketSize_, | |
208 job_.query_.GetCompression()); | |
209 | |
31
cfeda58d0c8e
remove calls to deprecated classes of JsonCpp
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
25
diff
changeset
|
210 Orthanc::Toolbox::WriteFastJson(createTransaction_, push); |
0 | 211 |
37
9708addb5a87
added 'Resources' and 'Originator' in the jobs 'Content'
Alain Mazy <am@osimis.io>
parents:
33
diff
changeset
|
212 info_.SetContent("Resources", job_.query_.GetResources()); |
0 | 213 info_.SetContent("Peer", job_.query_.GetPeer()); |
214 info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression())); | |
215 info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount())); | |
216 info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize())); | |
217 } | |
218 | |
219 virtual StateUpdate* Step() | |
220 { | |
221 Json::Value answer; | |
44
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
222 std::map<std::string, std::string> headers; |
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
223 job_.query_.GetHttpHeaders(headers); |
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
224 |
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
225 if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_, headers)) |
0 | 226 { |
227 LOG(ERROR) << "Cannot create a push transaction to peer \"" | |
228 << job_.query_.GetPeer() | |
229 << "\" (check that it has the transfers accelerator plugin installed)"; | |
230 return StateUpdate::Failure(); | |
231 } | |
232 | |
233 if (answer.type() != Json::objectValue || | |
234 !answer.isMember(KEY_PATH) || | |
235 answer[KEY_PATH].type() != Json::stringValue) | |
236 { | |
237 LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer(); | |
238 return StateUpdate::Failure(); | |
239 } | |
240 | |
241 std::string transactionUri = answer[KEY_PATH].asString(); | |
242 | |
243 return StateUpdate::Next(new PushBucketsState(job_, info_, transactionUri, buckets_)); | |
244 } | |
245 | |
246 virtual void Stop(OrthancPluginJobStopReason reason) | |
247 { | |
248 } | |
249 }; | |
250 | |
251 | |
252 StatefulOrthancJob::StateUpdate* PushJob::CreateInitialState(JobInfo& info) | |
253 { | |
254 return StateUpdate::Next(new CreateTransactionState(*this, info)); | |
255 } | |
256 | |
257 | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
258 PushJob::PushJob(const TransferQuery& query, |
0 | 259 OrthancInstancesCache& cache, |
260 size_t threadsCount, | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
261 size_t targetBucketSize, |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
262 unsigned int maxHttpRetries) : |
0 | 263 StatefulOrthancJob(JOB_TYPE_PUSH), |
264 cache_(cache), | |
265 query_(query), | |
266 threadsCount_(threadsCount), | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
267 targetBucketSize_(targetBucketSize), |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
268 maxHttpRetries_(maxHttpRetries) |
0 | 269 { |
270 if (!peers_.LookupName(peerIndex_, query_.GetPeer())) | |
271 { | |
272 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer(); | |
273 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); | |
274 } | |
275 | |
276 Json::Value serialized; | |
277 query.Serialize(serialized); | |
278 UpdateSerialized(serialized); | |
279 } | |
280 } |