Mercurial > hg > orthanc-transfers
annotate Framework/PushMode/PushJob.cpp @ 28:b7e32fe4973b
sync
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Tue, 07 Jul 2020 20:42:52 +0200 |
parents | dfc43678aecb |
children | cfeda58d0c8e |
rev | line source |
---|---|
0 | 1 /** |
2 * Transfers accelerator plugin for Orthanc | |
19
b06103a50c95
upgrade to year 2020
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
10
diff
changeset
|
3 * Copyright (C) 2018-2020 Osimis S.A., Belgium |
0 | 4 * |
5 * This program is free software: you can redistribute it and/or | |
6 * modify it under the terms of the GNU Affero General Public License | |
7 * as published by the Free Software Foundation, either version 3 of | |
8 * the License, or (at your option) any later version. | |
9 * | |
10 * This program is distributed in the hope that it will be useful, but | |
11 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 * Affero General Public License for more details. | |
14 * | |
15 * You should have received a copy of the GNU Affero General Public License | |
16 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 **/ | |
18 | |
19 | |
20 #include "PushJob.h" | |
21 | |
22 #include "BucketPushQuery.h" | |
23 #include "../HttpQueries/HttpQueriesRunner.h" | |
24 #include "../TransferScheduler.h" | |
25 | |
25
dfc43678aecb
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
20
diff
changeset
|
26 #include <Compatibility.h> // For std::unique_ptr |
20 | 27 #include <Logging.h> |
0 | 28 |
29 #include <json/writer.h> | |
30 | |
31 | |
32 namespace OrthancPlugins | |
33 { | |
34 class PushJob::FinalState : public IState | |
35 { | |
36 private: | |
37 const PushJob& job_; | |
38 JobInfo& info_; | |
39 std::string transactionUri_; | |
40 bool isCommit_; | |
41 | |
42 public: | |
43 FinalState(const PushJob& job, | |
44 JobInfo& info, | |
45 const std::string& transactionUri, | |
46 bool isCommit) : | |
47 job_(job), | |
48 info_(info), | |
49 transactionUri_(transactionUri), | |
50 isCommit_(isCommit) | |
51 { | |
52 } | |
53 | |
54 virtual StateUpdate* Step() | |
55 { | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
56 Json::Value answer; |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
57 bool success = false; |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
58 |
0 | 59 if (isCommit_) |
60 { | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
61 success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_); |
0 | 62 } |
63 else | |
64 { | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
65 success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_); |
0 | 66 } |
67 | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
68 if (!success) |
0 | 69 { |
70 if (isCommit_) | |
71 { | |
72 LOG(ERROR) << "Cannot commit push transaction on remote peer: " | |
73 << job_.query_.GetPeer(); | |
74 } | |
75 | |
76 return StateUpdate::Failure(); | |
77 } | |
78 else if (isCommit_) | |
79 { | |
80 return StateUpdate::Success(); | |
81 } | |
82 else | |
83 { | |
84 return StateUpdate::Failure(); | |
85 } | |
86 } | |
87 | |
88 virtual void Stop(OrthancPluginJobStopReason reason) | |
89 { | |
90 } | |
91 }; | |
92 | |
93 | |
94 class PushJob::PushBucketsState : public IState | |
95 { | |
96 private: | |
97 const PushJob& job_; | |
98 JobInfo& info_; | |
99 std::string transactionUri_; | |
100 HttpQueriesQueue queue_; | |
25
dfc43678aecb
replacing deprecated std::auto_ptr by std::unique_ptr
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
20
diff
changeset
|
101 std::unique_ptr<HttpQueriesRunner> runner_; |
0 | 102 |
103 void UpdateInfo() | |
104 { | |
105 size_t scheduledQueriesCount, completedQueriesCount; | |
106 uint64_t uploadedSize, downloadedSize; | |
107 queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize); | |
108 | |
109 info_.SetContent("UploadedSizeMB", ConvertToMegabytes(uploadedSize)); | |
110 info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount)); | |
111 | |
112 if (runner_.get() != NULL) | |
113 { | |
114 float speed; | |
115 runner_->GetSpeed(speed); | |
116 info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed)); | |
117 } | |
118 | |
119 // The "2" below corresponds to the "CreateTransactionState" | |
120 // and "FinalState" steps (which prevents division by zero) | |
121 info_.SetProgress(static_cast<float>(1 /* CreateTransactionState */ + completedQueriesCount) / | |
122 static_cast<float>(2 + scheduledQueriesCount)); | |
123 } | |
124 | |
125 public: | |
126 PushBucketsState(const PushJob& job, | |
127 JobInfo& info, | |
128 const std::string& transactionUri, | |
129 const std::vector<TransferBucket>& buckets) : | |
130 job_(job), | |
131 info_(info), | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
132 transactionUri_(transactionUri) |
0 | 133 { |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
134 queue_.SetMaxRetries(job.maxHttpRetries_); |
0 | 135 queue_.Reserve(buckets.size()); |
136 | |
137 for (size_t i = 0; i < buckets.size(); i++) | |
138 { | |
139 queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(), | |
140 transactionUri_, i, job.query_.GetCompression())); | |
141 } | |
142 | |
143 UpdateInfo(); | |
144 } | |
145 | |
146 virtual StateUpdate* Step() | |
147 { | |
148 if (runner_.get() == NULL) | |
149 { | |
150 runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_)); | |
151 } | |
152 | |
153 HttpQueriesQueue::Status status = queue_.WaitComplete(200); | |
154 | |
155 UpdateInfo(); | |
156 | |
157 switch (status) | |
158 { | |
159 case HttpQueriesQueue::Status_Running: | |
160 return StateUpdate::Continue(); | |
161 | |
162 case HttpQueriesQueue::Status_Success: | |
163 // Commit transaction on remote peer | |
164 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, true)); | |
165 | |
166 case HttpQueriesQueue::Status_Failure: | |
167 // Discard transaction on remote peer | |
168 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, false)); | |
169 | |
170 default: | |
171 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | |
172 } | |
173 } | |
174 | |
175 virtual void Stop(OrthancPluginJobStopReason reason) | |
176 { | |
177 // Cancel the running download threads | |
178 runner_.reset(); | |
179 } | |
180 }; | |
181 | |
182 | |
183 class PushJob::CreateTransactionState : public IState | |
184 { | |
185 private: | |
186 const PushJob& job_; | |
187 JobInfo& info_; | |
188 std::string createTransaction_; | |
189 std::vector<TransferBucket> buckets_; | |
190 | |
191 public: | |
192 CreateTransactionState(const PushJob& job, | |
193 JobInfo& info) : | |
194 job_(job), | |
195 info_(info) | |
196 { | |
197 TransferScheduler scheduler; | |
198 scheduler.ParseListOfResources(job_.cache_, job_.query_.GetResources()); | |
199 | |
200 Json::Value push; | |
201 scheduler.FormatPushTransaction(push, buckets_, | |
202 job.targetBucketSize_, 2 * job.targetBucketSize_, | |
203 job_.query_.GetCompression()); | |
204 | |
205 Json::FastWriter writer; | |
206 createTransaction_ = writer.write(push); | |
207 | |
208 info_.SetContent("Peer", job_.query_.GetPeer()); | |
209 info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression())); | |
210 info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount())); | |
211 info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize())); | |
212 } | |
213 | |
214 virtual StateUpdate* Step() | |
215 { | |
216 Json::Value answer; | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
217 if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_)) |
0 | 218 { |
219 LOG(ERROR) << "Cannot create a push transaction to peer \"" | |
220 << job_.query_.GetPeer() | |
221 << "\" (check that it has the transfers accelerator plugin installed)"; | |
222 return StateUpdate::Failure(); | |
223 } | |
224 | |
225 if (answer.type() != Json::objectValue || | |
226 !answer.isMember(KEY_PATH) || | |
227 answer[KEY_PATH].type() != Json::stringValue) | |
228 { | |
229 LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer(); | |
230 return StateUpdate::Failure(); | |
231 } | |
232 | |
233 std::string transactionUri = answer[KEY_PATH].asString(); | |
234 | |
235 return StateUpdate::Next(new PushBucketsState(job_, info_, transactionUri, buckets_)); | |
236 } | |
237 | |
238 virtual void Stop(OrthancPluginJobStopReason reason) | |
239 { | |
240 } | |
241 }; | |
242 | |
243 | |
244 StatefulOrthancJob::StateUpdate* PushJob::CreateInitialState(JobInfo& info) | |
245 { | |
246 return StateUpdate::Next(new CreateTransactionState(*this, info)); | |
247 } | |
248 | |
249 | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
250 PushJob::PushJob(const TransferQuery& query, |
0 | 251 OrthancInstancesCache& cache, |
252 size_t threadsCount, | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
253 size_t targetBucketSize, |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
254 unsigned int maxHttpRetries) : |
0 | 255 StatefulOrthancJob(JOB_TYPE_PUSH), |
256 cache_(cache), | |
257 query_(query), | |
258 threadsCount_(threadsCount), | |
10
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
259 targetBucketSize_(targetBucketSize), |
c9e28e31262e
new option: MaxHttpRetries
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
9
diff
changeset
|
260 maxHttpRetries_(maxHttpRetries) |
0 | 261 { |
262 if (!peers_.LookupName(peerIndex_, query_.GetPeer())) | |
263 { | |
264 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer(); | |
265 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); | |
266 } | |
267 | |
268 Json::Value serialized; | |
269 query.Serialize(serialized); | |
270 UpdateSerialized(serialized); | |
271 } | |
272 } |