Mercurial > hg > orthanc-transfers
annotate Framework/PushMode/PushJob.cpp @ 8:4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 05 Dec 2018 09:16:51 +0100 |
parents | 95226b754d9e |
children | 7e207ade2f1a |
rev | line source |
---|---|
0 | 1 /** |
2 * Transfers accelerator plugin for Orthanc | |
3 * Copyright (C) 2018 Osimis, Belgium | |
4 * | |
5 * This program is free software: you can redistribute it and/or | |
6 * modify it under the terms of the GNU Affero General Public License | |
7 * as published by the Free Software Foundation, either version 3 of | |
8 * the License, or (at your option) any later version. | |
9 * | |
10 * This program is distributed in the hope that it will be useful, but | |
11 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 * Affero General Public License for more details. | |
14 * | |
15 * You should have received a copy of the GNU Affero General Public License | |
16 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 **/ | |
18 | |
19 | |
20 #include "PushJob.h" | |
21 | |
22 #include "BucketPushQuery.h" | |
23 #include "../HttpQueries/HttpQueriesRunner.h" | |
24 #include "../TransferScheduler.h" | |
25 | |
26 #include <Core/Logging.h> | |
27 | |
28 #include <json/writer.h> | |
29 | |
30 | |
31 namespace OrthancPlugins | |
32 { | |
33 class PushJob::FinalState : public IState | |
34 { | |
35 private: | |
36 const PushJob& job_; | |
37 JobInfo& info_; | |
38 std::string transactionUri_; | |
39 bool isCommit_; | |
40 | |
41 public: | |
42 FinalState(const PushJob& job, | |
43 JobInfo& info, | |
44 const std::string& transactionUri, | |
45 bool isCommit) : | |
46 job_(job), | |
47 info_(info), | |
48 transactionUri_(transactionUri), | |
49 isCommit_(isCommit) | |
50 { | |
51 } | |
52 | |
53 virtual StateUpdate* Step() | |
54 { | |
55 std::string uri = transactionUri_; | |
56 | |
57 if (isCommit_) | |
58 { | |
59 uri += "/commit"; | |
60 } | |
61 else | |
62 { | |
63 uri += "/discard"; | |
64 } | |
65 | |
66 Json::Value answer; | |
67 if (!job_.peers_.DoPost(answer, job_.peerIndex_, uri, "")) | |
68 { | |
69 if (isCommit_) | |
70 { | |
71 LOG(ERROR) << "Cannot commit push transaction on remote peer: " | |
72 << job_.query_.GetPeer(); | |
73 } | |
74 | |
75 return StateUpdate::Failure(); | |
76 } | |
77 else if (isCommit_) | |
78 { | |
79 return StateUpdate::Success(); | |
80 } | |
81 else | |
82 { | |
83 return StateUpdate::Failure(); | |
84 } | |
85 } | |
86 | |
87 virtual void Stop(OrthancPluginJobStopReason reason) | |
88 { | |
89 } | |
90 }; | |
91 | |
92 | |
93 class PushJob::PushBucketsState : public IState | |
94 { | |
95 private: | |
96 const PushJob& job_; | |
97 JobInfo& info_; | |
98 std::string transactionUri_; | |
99 HttpQueriesQueue queue_; | |
100 std::auto_ptr<HttpQueriesRunner> runner_; | |
101 | |
102 void UpdateInfo() | |
103 { | |
104 size_t scheduledQueriesCount, completedQueriesCount; | |
105 uint64_t uploadedSize, downloadedSize; | |
106 queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize); | |
107 | |
108 info_.SetContent("UploadedSizeMB", ConvertToMegabytes(uploadedSize)); | |
109 info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount)); | |
110 | |
111 if (runner_.get() != NULL) | |
112 { | |
113 float speed; | |
114 runner_->GetSpeed(speed); | |
115 info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed)); | |
116 } | |
117 | |
118 // The "2" below corresponds to the "CreateTransactionState" | |
119 // and "FinalState" steps (which prevents division by zero) | |
120 info_.SetProgress(static_cast<float>(1 /* CreateTransactionState */ + completedQueriesCount) / | |
121 static_cast<float>(2 + scheduledQueriesCount)); | |
122 } | |
123 | |
124 public: | |
125 PushBucketsState(const PushJob& job, | |
126 JobInfo& info, | |
127 const std::string& transactionUri, | |
128 const std::vector<TransferBucket>& buckets) : | |
129 job_(job), | |
130 info_(info), | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
131 transactionUri_(transactionUri) |
0 | 132 { |
133 queue_.Reserve(buckets.size()); | |
134 | |
135 for (size_t i = 0; i < buckets.size(); i++) | |
136 { | |
137 queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(), | |
138 transactionUri_, i, job.query_.GetCompression())); | |
139 } | |
140 | |
141 UpdateInfo(); | |
142 } | |
143 | |
144 virtual StateUpdate* Step() | |
145 { | |
146 if (runner_.get() == NULL) | |
147 { | |
148 runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_)); | |
149 } | |
150 | |
151 HttpQueriesQueue::Status status = queue_.WaitComplete(200); | |
152 | |
153 UpdateInfo(); | |
154 | |
155 switch (status) | |
156 { | |
157 case HttpQueriesQueue::Status_Running: | |
158 return StateUpdate::Continue(); | |
159 | |
160 case HttpQueriesQueue::Status_Success: | |
161 // Commit transaction on remote peer | |
162 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, true)); | |
163 | |
164 case HttpQueriesQueue::Status_Failure: | |
165 // Discard transaction on remote peer | |
166 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, false)); | |
167 | |
168 default: | |
169 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | |
170 } | |
171 } | |
172 | |
173 virtual void Stop(OrthancPluginJobStopReason reason) | |
174 { | |
175 // Cancel the running download threads | |
176 runner_.reset(); | |
177 } | |
178 }; | |
179 | |
180 | |
181 class PushJob::CreateTransactionState : public IState | |
182 { | |
183 private: | |
184 const PushJob& job_; | |
185 JobInfo& info_; | |
186 std::string createTransaction_; | |
187 std::vector<TransferBucket> buckets_; | |
188 | |
189 public: | |
190 CreateTransactionState(const PushJob& job, | |
191 JobInfo& info) : | |
192 job_(job), | |
193 info_(info) | |
194 { | |
195 TransferScheduler scheduler; | |
196 scheduler.ParseListOfResources(job_.cache_, job_.query_.GetResources()); | |
197 | |
198 Json::Value push; | |
199 scheduler.FormatPushTransaction(push, buckets_, | |
200 job.targetBucketSize_, 2 * job.targetBucketSize_, | |
201 job_.query_.GetCompression()); | |
202 | |
203 Json::FastWriter writer; | |
204 createTransaction_ = writer.write(push); | |
205 | |
206 info_.SetContent("Peer", job_.query_.GetPeer()); | |
207 info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression())); | |
208 info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount())); | |
209 info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize())); | |
210 } | |
211 | |
212 virtual StateUpdate* Step() | |
213 { | |
214 Json::Value answer; | |
215 if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_PUSH, createTransaction_)) | |
216 { | |
217 LOG(ERROR) << "Cannot create a push transaction to peer \"" | |
218 << job_.query_.GetPeer() | |
219 << "\" (check that it has the transfers accelerator plugin installed)"; | |
220 return StateUpdate::Failure(); | |
221 } | |
222 | |
223 if (answer.type() != Json::objectValue || | |
224 !answer.isMember(KEY_PATH) || | |
225 answer[KEY_PATH].type() != Json::stringValue) | |
226 { | |
227 LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer(); | |
228 return StateUpdate::Failure(); | |
229 } | |
230 | |
231 std::string transactionUri = answer[KEY_PATH].asString(); | |
232 | |
233 return StateUpdate::Next(new PushBucketsState(job_, info_, transactionUri, buckets_)); | |
234 } | |
235 | |
236 virtual void Stop(OrthancPluginJobStopReason reason) | |
237 { | |
238 } | |
239 }; | |
240 | |
241 | |
242 StatefulOrthancJob::StateUpdate* PushJob::CreateInitialState(JobInfo& info) | |
243 { | |
244 return StateUpdate::Next(new CreateTransactionState(*this, info)); | |
245 } | |
246 | |
247 | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
248 PushJob::PushJob(const TransferQuery& query, |
0 | 249 OrthancInstancesCache& cache, |
250 size_t threadsCount, | |
251 size_t targetBucketSize) : | |
252 StatefulOrthancJob(JOB_TYPE_PUSH), | |
253 cache_(cache), | |
254 query_(query), | |
255 threadsCount_(threadsCount), | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
256 targetBucketSize_(targetBucketSize) |
0 | 257 { |
258 if (!peers_.LookupName(peerIndex_, query_.GetPeer())) | |
259 { | |
260 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer(); | |
261 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); | |
262 } | |
263 | |
264 Json::Value serialized; | |
265 query.Serialize(serialized); | |
266 UpdateSerialized(serialized); | |
267 } | |
268 } |