Mercurial > hg > orthanc-transfers
annotate Framework/PullMode/PullJob.cpp @ 77:1e396fb509ca default
updated copyright, as Orthanc Team now replaces Osimis
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 30 May 2024 22:44:10 +0200 |
parents | f4e828607f02 |
children |
rev | line source |
---|---|
0 | 1 /** |
2 * Transfers accelerator plugin for Orthanc | |
77
1e396fb509ca
updated copyright, as Orthanc Team now replaces Osimis
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
44
diff
changeset
|
3 * Copyright (C) 2018-2023 Osimis S.A., Belgium |
1e396fb509ca
updated copyright, as Orthanc Team now replaces Osimis
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
44
diff
changeset
|
4 * Copyright (C) 2024-2024 Orthanc Team SRL, Belgium |
1e396fb509ca
updated copyright, as Orthanc Team now replaces Osimis
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
44
diff
changeset
|
5 * Copyright (C) 2021-2024 Sebastien Jodogne, ICTEAM UCLouvain, Belgium |
0 | 6 * |
7 * This program is free software: you can redistribute it and/or | |
8 * modify it under the terms of the GNU Affero General Public License | |
9 * as published by the Free Software Foundation, either version 3 of | |
10 * the License, or (at your option) any later version. | |
11 * | |
12 * This program is distributed in the hope that it will be useful, but | |
13 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
15 * Affero General Public License for more details. | |
16 * | |
17 * You should have received a copy of the GNU Affero General Public License | |
18 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 **/ | |
20 | |
21 | |
22 #include "PullJob.h" | |
23 | |
24 #include "BucketPullQuery.h" | |
25 #include "../HttpQueries/HttpQueriesRunner.h" | |
26 #include "../TransferScheduler.h" | |
27 | |
25
dfc43678aecb
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
20
diff
changeset
|
28 #include <Compatibility.h> // For std::unique_ptr |
20 | 29 #include <Logging.h> |
0 | 30 |
31 | |
32 namespace OrthancPlugins | |
33 { | |
34 class PullJob::CommitState : public IState | |
35 { | |
36 private: | |
37 const PullJob& job_; | |
25
dfc43678aecb
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
20
diff
changeset
|
38 std::unique_ptr<DownloadArea> area_; |
0 | 39 |
40 public: | |
41 CommitState(const PullJob& job, | |
42 DownloadArea* area /* takes ownership */) : | |
43 job_(job), | |
44 area_(area) | |
45 { | |
46 } | |
47 | |
48 virtual StateUpdate* Step() | |
49 { | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5
diff
changeset
|
50 area_->Commit(); |
0 | 51 return StateUpdate::Success(); |
52 } | |
53 | |
54 virtual void Stop(OrthancPluginJobStopReason reason) | |
55 { | |
56 } | |
57 }; | |
58 | |
59 | |
60 class PullJob::PullBucketsState : public IState | |
61 { | |
62 private: | |
63 const PullJob& job_; | |
64 JobInfo& info_; | |
65 HttpQueriesQueue queue_; | |
25
dfc43678aecb
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
20
diff
changeset
|
66 std::unique_ptr<DownloadArea> area_; |
dfc43678aecb
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
20
diff
changeset
|
67 std::unique_ptr<HttpQueriesRunner> runner_; |
0 | 68 |
69 void UpdateInfo() | |
70 { | |
71 size_t scheduledQueriesCount, completedQueriesCount; | |
72 uint64_t uploadedSize, downloadedSize; | |
73 queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize); | |
74 | |
75 info_.SetContent("DownloadedSizeMB", ConvertToMegabytes(downloadedSize)); | |
76 info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount)); | |
77 | |
78 if (runner_.get() != NULL) | |
79 { | |
80 float speed; | |
81 runner_->GetSpeed(speed); | |
82 info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed)); | |
83 } | |
84 | |
85 // The "2" below corresponds to the "LookupInstancesState" | |
86 // and "CommitState" steps (which prevents division by zero) | |
87 info_.SetProgress(static_cast<float>(1 /* LookupInstancesState */ + completedQueriesCount) / | |
88 static_cast<float>(2 + scheduledQueriesCount)); | |
89 } | |
90 | |
91 public: | |
92 PullBucketsState(const PullJob& job, | |
93 JobInfo& info, | |
94 const TransferScheduler& scheduler) : | |
95 job_(job), | |
96 info_(info), | |
97 area_(new DownloadArea(scheduler)) | |
98 { | |
99 const std::string baseUrl = job.peers_.GetPeerUrl(job.query_.GetPeer()); | |
100 | |
101 std::vector<TransferBucket> buckets; | |
102 scheduler.ComputePullBuckets(buckets, job.targetBucketSize_, 2 * job.targetBucketSize_, | |
103 baseUrl, job.query_.GetCompression()); | |
104 area_.reset(new DownloadArea(scheduler)); | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
105 |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
106 queue_.SetMaxRetries(job.maxHttpRetries_); |
0 | 107 queue_.Reserve(buckets.size()); |
108 | |
109 for (size_t i = 0; i < buckets.size(); i++) | |
110 { | |
111 queue_.Enqueue(new BucketPullQuery(*area_, buckets[i], job.query_.GetPeer(), job.query_.GetCompression())); | |
112 } | |
113 | |
114 info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount())); | |
115 info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize())); | |
116 UpdateInfo(); | |
117 } | |
118 | |
119 virtual StateUpdate* Step() | |
120 { | |
121 if (runner_.get() == NULL) | |
122 { | |
123 runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_)); | |
124 } | |
125 | |
126 HttpQueriesQueue::Status status = queue_.WaitComplete(200); | |
127 | |
128 UpdateInfo(); | |
129 | |
130 switch (status) | |
131 { | |
132 case HttpQueriesQueue::Status_Running: | |
133 return StateUpdate::Continue(); | |
134 | |
135 case HttpQueriesQueue::Status_Success: | |
136 return StateUpdate::Next(new CommitState(job_, area_.release())); | |
137 | |
138 case HttpQueriesQueue::Status_Failure: | |
139 return StateUpdate::Failure(); | |
140 | |
141 default: | |
142 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | |
143 } | |
144 } | |
145 | |
146 virtual void Stop(OrthancPluginJobStopReason reason) | |
147 { | |
148 // Cancel the running download threads | |
149 runner_.reset(); | |
150 } | |
151 }; | |
152 | |
153 | |
154 class PullJob::LookupInstancesState : public IState | |
155 { | |
156 private: | |
157 const PullJob& job_; | |
158 JobInfo& info_; | |
159 | |
160 public: | |
161 LookupInstancesState(const PullJob& job, | |
162 JobInfo& info) : | |
163 job_(job), | |
164 info_(info) | |
165 { | |
37
9708addb5a87
added 'Resources' and 'Originator' in the jobs 'Content'
Alain Mazy <am@osimis.io>
parents:
33
diff
changeset
|
166 if (job_.query_.HasOriginator()) |
9708addb5a87
added 'Resources' and 'Originator' in the jobs 'Content'
Alain Mazy <am@osimis.io>
parents:
33
diff
changeset
|
167 { |
9708addb5a87
added 'Resources' and 'Originator' in the jobs 'Content'
Alain Mazy <am@osimis.io>
parents:
33
diff
changeset
|
168 info_.SetContent("Originator", job_.query_.GetOriginator()); |
9708addb5a87
added 'Resources' and 'Originator' in the jobs 'Content'
Alain Mazy <am@osimis.io>
parents:
33
diff
changeset
|
169 } |
9708addb5a87
added 'Resources' and 'Originator' in the jobs 'Content'
Alain Mazy <am@osimis.io>
parents:
33
diff
changeset
|
170 info_.SetContent("Resources", job_.query_.GetResources()); |
0 | 171 info_.SetContent("Peer", job_.query_.GetPeer()); |
172 info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression())); | |
173 } | |
174 | |
175 virtual StateUpdate* Step() | |
176 { | |
31
cfeda58d0c8e
remove calls to deprecated classes of JsonCpp
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
25
diff
changeset
|
177 std::string lookup; |
cfeda58d0c8e
remove calls to deprecated classes of JsonCpp
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
25
diff
changeset
|
178 Orthanc::Toolbox::WriteFastJson(lookup, job_.query_.GetResources()); |
0 | 179 |
180 Json::Value answer; | |
44
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
181 std::map<std::string, std::string> headers; |
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
182 job_.query_.GetHttpHeaders(headers); |
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
183 |
f4e828607f02
Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
Alain Mazy <am@osimis.io>
parents:
37
diff
changeset
|
184 if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_LOOKUP, lookup, job_.maxHttpRetries_, headers)) |
0 | 185 { |
186 LOG(ERROR) << "Cannot retrieve the list of instances to pull from peer \"" | |
187 << job_.query_.GetPeer() | |
188 << "\" (check that it has the transfers accelerator plugin installed)"; | |
189 return StateUpdate::Failure(); | |
190 } | |
191 | |
192 if (answer.type() != Json::objectValue || | |
193 !answer.isMember(KEY_INSTANCES) || | |
194 !answer.isMember(KEY_ORIGINATOR_UUID) || | |
195 answer[KEY_INSTANCES].type() != Json::arrayValue || | |
196 answer[KEY_ORIGINATOR_UUID].type() != Json::stringValue) | |
197 { | |
198 LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer(); | |
199 return StateUpdate::Failure(); | |
200 } | |
201 | |
202 if (job_.query_.HasOriginator() && | |
203 job_.query_.GetOriginator() != answer[KEY_ORIGINATOR_UUID].asString()) | |
204 { | |
5
5e6de82bb10f
use of user properties instead of BidirectionalPeers option
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
205 LOG(ERROR) << "Invalid originator, check out the \"" << KEY_REMOTE_SELF |
5e6de82bb10f
use of user properties instead of BidirectionalPeers option
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
206 << "\" configuration option of peer: " << job_.query_.GetPeer(); |
0 | 207 return StateUpdate::Failure(); |
208 } | |
209 | |
210 TransferScheduler scheduler; | |
211 | |
212 for (Json::Value::ArrayIndex i = 0; i < answer[KEY_INSTANCES].size(); i++) | |
213 { | |
214 DicomInstanceInfo instance(answer[KEY_INSTANCES][i]); | |
215 scheduler.AddInstance(instance); | |
216 } | |
217 | |
218 if (scheduler.GetInstancesCount() == 0) | |
219 { | |
220 // We're already done: No instance to be retrieved | |
221 return StateUpdate::Success(); | |
222 } | |
223 else | |
224 { | |
225 return StateUpdate::Next(new PullBucketsState(job_, info_, scheduler)); | |
226 } | |
227 } | |
228 | |
229 virtual void Stop(OrthancPluginJobStopReason reason) | |
230 { | |
231 } | |
232 }; | |
233 | |
234 | |
235 StatefulOrthancJob::StateUpdate* PullJob::CreateInitialState(JobInfo& info) | |
236 { | |
237 return StateUpdate::Next(new LookupInstancesState(*this, info)); | |
238 } | |
239 | |
240 | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5
diff
changeset
|
241 PullJob::PullJob(const TransferQuery& query, |
0 | 242 size_t threadsCount, |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
243 size_t targetBucketSize, |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
244 unsigned int maxHttpRetries) : |
0 | 245 StatefulOrthancJob(JOB_TYPE_PULL), |
246 query_(query), | |
247 threadsCount_(threadsCount), | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
248 targetBucketSize_(targetBucketSize), |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
249 maxHttpRetries_(maxHttpRetries) |
0 | 250 { |
251 if (!peers_.LookupName(peerIndex_, query_.GetPeer())) | |
252 { | |
253 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer(); | |
254 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); | |
255 } | |
256 | |
257 Json::Value serialized; | |
258 query.Serialize(serialized); | |
259 UpdateSerialized(serialized); | |
260 } | |
261 } |