0
|
1 /**
|
|
2 * Transfers accelerator plugin for Orthanc
|
|
3 * Copyright (C) 2018 Osimis, Belgium
|
|
4 *
|
|
5 * This program is free software: you can redistribute it and/or
|
|
6 * modify it under the terms of the GNU Affero General Public License
|
|
7 * as published by the Free Software Foundation, either version 3 of
|
|
8 * the License, or (at your option) any later version.
|
|
9 *
|
|
10 * This program is distributed in the hope that it will be useful, but
|
|
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
13 * Affero General Public License for more details.
|
|
14 *
|
|
15 * You should have received a copy of the GNU Affero General Public License
|
|
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
17 **/
|
|
18
|
|
19
|
|
20 #include "PushJob.h"
|
|
21
|
|
22 #include "BucketPushQuery.h"
|
|
23 #include "../HttpQueries/HttpQueriesRunner.h"
|
|
24 #include "../TransferScheduler.h"
|
|
25
|
|
26 #include <Core/Logging.h>
|
|
27
|
|
28 #include <json/writer.h>
|
|
29
|
|
30
|
|
31 namespace OrthancPlugins
|
|
32 {
|
|
33 class PushJob::FinalState : public IState
|
|
34 {
|
|
35 private:
|
|
36 const PushJob& job_;
|
|
37 JobInfo& info_;
|
|
38 std::string transactionUri_;
|
|
39 bool isCommit_;
|
|
40
|
|
41 public:
|
|
42 FinalState(const PushJob& job,
|
|
43 JobInfo& info,
|
|
44 const std::string& transactionUri,
|
|
45 bool isCommit) :
|
|
46 job_(job),
|
|
47 info_(info),
|
|
48 transactionUri_(transactionUri),
|
|
49 isCommit_(isCommit)
|
|
50 {
|
|
51 }
|
|
52
|
|
53 virtual StateUpdate* Step()
|
|
54 {
|
|
55 std::string uri = transactionUri_;
|
|
56
|
|
57 if (isCommit_)
|
|
58 {
|
|
59 uri += "/commit";
|
|
60 }
|
|
61 else
|
|
62 {
|
|
63 uri += "/discard";
|
|
64 }
|
|
65
|
|
66 Json::Value answer;
|
|
67 if (!job_.peers_.DoPost(answer, job_.peerIndex_, uri, ""))
|
|
68 {
|
|
69 if (isCommit_)
|
|
70 {
|
|
71 LOG(ERROR) << "Cannot commit push transaction on remote peer: "
|
|
72 << job_.query_.GetPeer();
|
|
73 }
|
|
74
|
|
75 return StateUpdate::Failure();
|
|
76 }
|
|
77 else if (isCommit_)
|
|
78 {
|
|
79 return StateUpdate::Success();
|
|
80 }
|
|
81 else
|
|
82 {
|
|
83 return StateUpdate::Failure();
|
|
84 }
|
|
85 }
|
|
86
|
|
87 virtual void Stop(OrthancPluginJobStopReason reason)
|
|
88 {
|
|
89 }
|
|
90 };
|
|
91
|
|
92
|
|
93 class PushJob::PushBucketsState : public IState
|
|
94 {
|
|
95 private:
|
|
96 const PushJob& job_;
|
|
97 JobInfo& info_;
|
|
98 std::string transactionUri_;
|
|
99 HttpQueriesQueue queue_;
|
|
100 std::auto_ptr<HttpQueriesRunner> runner_;
|
|
101
|
|
102 void UpdateInfo()
|
|
103 {
|
|
104 size_t scheduledQueriesCount, completedQueriesCount;
|
|
105 uint64_t uploadedSize, downloadedSize;
|
|
106 queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize);
|
|
107
|
|
108 info_.SetContent("UploadedSizeMB", ConvertToMegabytes(uploadedSize));
|
|
109 info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount));
|
|
110
|
|
111 if (runner_.get() != NULL)
|
|
112 {
|
|
113 float speed;
|
|
114 runner_->GetSpeed(speed);
|
|
115 info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed));
|
|
116 }
|
|
117
|
|
118 // The "2" below corresponds to the "CreateTransactionState"
|
|
119 // and "FinalState" steps (which prevents division by zero)
|
|
120 info_.SetProgress(static_cast<float>(1 /* CreateTransactionState */ + completedQueriesCount) /
|
|
121 static_cast<float>(2 + scheduledQueriesCount));
|
|
122 }
|
|
123
|
|
124 public:
|
|
125 PushBucketsState(const PushJob& job,
|
|
126 JobInfo& info,
|
|
127 const std::string& transactionUri,
|
|
128 const std::vector<TransferBucket>& buckets) :
|
|
129 job_(job),
|
|
130 info_(info),
|
|
131 transactionUri_(transactionUri),
|
|
132 queue_(job.context_)
|
|
133 {
|
|
134 queue_.Reserve(buckets.size());
|
|
135
|
|
136 for (size_t i = 0; i < buckets.size(); i++)
|
|
137 {
|
|
138 queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(),
|
|
139 transactionUri_, i, job.query_.GetCompression()));
|
|
140 }
|
|
141
|
|
142 UpdateInfo();
|
|
143 }
|
|
144
|
|
145 virtual StateUpdate* Step()
|
|
146 {
|
|
147 if (runner_.get() == NULL)
|
|
148 {
|
|
149 runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_));
|
|
150 }
|
|
151
|
|
152 HttpQueriesQueue::Status status = queue_.WaitComplete(200);
|
|
153
|
|
154 UpdateInfo();
|
|
155
|
|
156 switch (status)
|
|
157 {
|
|
158 case HttpQueriesQueue::Status_Running:
|
|
159 return StateUpdate::Continue();
|
|
160
|
|
161 case HttpQueriesQueue::Status_Success:
|
|
162 // Commit transaction on remote peer
|
|
163 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, true));
|
|
164
|
|
165 case HttpQueriesQueue::Status_Failure:
|
|
166 // Discard transaction on remote peer
|
|
167 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, false));
|
|
168
|
|
169 default:
|
|
170 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
|
|
171 }
|
|
172 }
|
|
173
|
|
174 virtual void Stop(OrthancPluginJobStopReason reason)
|
|
175 {
|
|
176 // Cancel the running download threads
|
|
177 runner_.reset();
|
|
178 }
|
|
179 };
|
|
180
|
|
181
|
|
182 class PushJob::CreateTransactionState : public IState
|
|
183 {
|
|
184 private:
|
|
185 const PushJob& job_;
|
|
186 JobInfo& info_;
|
|
187 std::string createTransaction_;
|
|
188 std::vector<TransferBucket> buckets_;
|
|
189
|
|
190 public:
|
|
191 CreateTransactionState(const PushJob& job,
|
|
192 JobInfo& info) :
|
|
193 job_(job),
|
|
194 info_(info)
|
|
195 {
|
|
196 TransferScheduler scheduler;
|
|
197 scheduler.ParseListOfResources(job_.cache_, job_.query_.GetResources());
|
|
198
|
|
199 Json::Value push;
|
|
200 scheduler.FormatPushTransaction(push, buckets_,
|
|
201 job.targetBucketSize_, 2 * job.targetBucketSize_,
|
|
202 job_.query_.GetCompression());
|
|
203
|
|
204 Json::FastWriter writer;
|
|
205 createTransaction_ = writer.write(push);
|
|
206
|
|
207 info_.SetContent("Peer", job_.query_.GetPeer());
|
|
208 info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression()));
|
|
209 info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount()));
|
|
210 info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize()));
|
|
211 }
|
|
212
|
|
213 virtual StateUpdate* Step()
|
|
214 {
|
|
215 Json::Value answer;
|
|
216 if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_PUSH, createTransaction_))
|
|
217 {
|
|
218 LOG(ERROR) << "Cannot create a push transaction to peer \""
|
|
219 << job_.query_.GetPeer()
|
|
220 << "\" (check that it has the transfers accelerator plugin installed)";
|
|
221 return StateUpdate::Failure();
|
|
222 }
|
|
223
|
|
224 if (answer.type() != Json::objectValue ||
|
|
225 !answer.isMember(KEY_PATH) ||
|
|
226 answer[KEY_PATH].type() != Json::stringValue)
|
|
227 {
|
|
228 LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer();
|
|
229 return StateUpdate::Failure();
|
|
230 }
|
|
231
|
|
232 std::string transactionUri = answer[KEY_PATH].asString();
|
|
233
|
|
234 return StateUpdate::Next(new PushBucketsState(job_, info_, transactionUri, buckets_));
|
|
235 }
|
|
236
|
|
237 virtual void Stop(OrthancPluginJobStopReason reason)
|
|
238 {
|
|
239 }
|
|
240 };
|
|
241
|
|
242
|
|
243 StatefulOrthancJob::StateUpdate* PushJob::CreateInitialState(JobInfo& info)
|
|
244 {
|
|
245 return StateUpdate::Next(new CreateTransactionState(*this, info));
|
|
246 }
|
|
247
|
|
248
|
|
249 PushJob::PushJob(OrthancPluginContext* context,
|
|
250 const TransferQuery& query,
|
|
251 OrthancInstancesCache& cache,
|
|
252 size_t threadsCount,
|
|
253 size_t targetBucketSize) :
|
|
254 StatefulOrthancJob(JOB_TYPE_PUSH),
|
|
255 context_(context),
|
|
256 cache_(cache),
|
|
257 query_(query),
|
|
258 threadsCount_(threadsCount),
|
|
259 targetBucketSize_(targetBucketSize),
|
|
260 peers_(context)
|
|
261 {
|
|
262 if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
|
|
263 {
|
|
264 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer();
|
|
265 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
|
|
266 }
|
|
267
|
|
268 Json::Value serialized;
|
|
269 query.Serialize(serialized);
|
|
270 UpdateSerialized(serialized);
|
|
271 }
|
|
272 }
|