comparison Framework/PushMode/PushJob.cpp @ 0:95226b754d9e

initial release
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 17 Sep 2018 11:34:55 +0200
parents
children 4c3437217518
comparison
equal deleted inserted replaced
-1:000000000000 0:95226b754d9e
1 /**
2 * Transfers accelerator plugin for Orthanc
3 * Copyright (C) 2018 Osimis, Belgium
4 *
5 * This program is free software: you can redistribute it and/or
6 * modify it under the terms of the GNU Affero General Public License
7 * as published by the Free Software Foundation, either version 3 of
8 * the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 **/
18
19
20 #include "PushJob.h"
21
22 #include "BucketPushQuery.h"
23 #include "../HttpQueries/HttpQueriesRunner.h"
24 #include "../TransferScheduler.h"
25
26 #include <Core/Logging.h>
27
28 #include <json/writer.h>
29
30
31 namespace OrthancPlugins
32 {
33 class PushJob::FinalState : public IState
34 {
35 private:
36 const PushJob& job_;
37 JobInfo& info_;
38 std::string transactionUri_;
39 bool isCommit_;
40
41 public:
42 FinalState(const PushJob& job,
43 JobInfo& info,
44 const std::string& transactionUri,
45 bool isCommit) :
46 job_(job),
47 info_(info),
48 transactionUri_(transactionUri),
49 isCommit_(isCommit)
50 {
51 }
52
53 virtual StateUpdate* Step()
54 {
55 std::string uri = transactionUri_;
56
57 if (isCommit_)
58 {
59 uri += "/commit";
60 }
61 else
62 {
63 uri += "/discard";
64 }
65
66 Json::Value answer;
67 if (!job_.peers_.DoPost(answer, job_.peerIndex_, uri, ""))
68 {
69 if (isCommit_)
70 {
71 LOG(ERROR) << "Cannot commit push transaction on remote peer: "
72 << job_.query_.GetPeer();
73 }
74
75 return StateUpdate::Failure();
76 }
77 else if (isCommit_)
78 {
79 return StateUpdate::Success();
80 }
81 else
82 {
83 return StateUpdate::Failure();
84 }
85 }
86
87 virtual void Stop(OrthancPluginJobStopReason reason)
88 {
89 }
90 };
91
92
93 class PushJob::PushBucketsState : public IState
94 {
95 private:
96 const PushJob& job_;
97 JobInfo& info_;
98 std::string transactionUri_;
99 HttpQueriesQueue queue_;
100 std::auto_ptr<HttpQueriesRunner> runner_;
101
102 void UpdateInfo()
103 {
104 size_t scheduledQueriesCount, completedQueriesCount;
105 uint64_t uploadedSize, downloadedSize;
106 queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize);
107
108 info_.SetContent("UploadedSizeMB", ConvertToMegabytes(uploadedSize));
109 info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount));
110
111 if (runner_.get() != NULL)
112 {
113 float speed;
114 runner_->GetSpeed(speed);
115 info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed));
116 }
117
118 // The "2" below corresponds to the "CreateTransactionState"
119 // and "FinalState" steps (which prevents division by zero)
120 info_.SetProgress(static_cast<float>(1 /* CreateTransactionState */ + completedQueriesCount) /
121 static_cast<float>(2 + scheduledQueriesCount));
122 }
123
124 public:
125 PushBucketsState(const PushJob& job,
126 JobInfo& info,
127 const std::string& transactionUri,
128 const std::vector<TransferBucket>& buckets) :
129 job_(job),
130 info_(info),
131 transactionUri_(transactionUri),
132 queue_(job.context_)
133 {
134 queue_.Reserve(buckets.size());
135
136 for (size_t i = 0; i < buckets.size(); i++)
137 {
138 queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(),
139 transactionUri_, i, job.query_.GetCompression()));
140 }
141
142 UpdateInfo();
143 }
144
145 virtual StateUpdate* Step()
146 {
147 if (runner_.get() == NULL)
148 {
149 runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_));
150 }
151
152 HttpQueriesQueue::Status status = queue_.WaitComplete(200);
153
154 UpdateInfo();
155
156 switch (status)
157 {
158 case HttpQueriesQueue::Status_Running:
159 return StateUpdate::Continue();
160
161 case HttpQueriesQueue::Status_Success:
162 // Commit transaction on remote peer
163 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, true));
164
165 case HttpQueriesQueue::Status_Failure:
166 // Discard transaction on remote peer
167 return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, false));
168
169 default:
170 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
171 }
172 }
173
174 virtual void Stop(OrthancPluginJobStopReason reason)
175 {
176 // Cancel the running download threads
177 runner_.reset();
178 }
179 };
180
181
182 class PushJob::CreateTransactionState : public IState
183 {
184 private:
185 const PushJob& job_;
186 JobInfo& info_;
187 std::string createTransaction_;
188 std::vector<TransferBucket> buckets_;
189
190 public:
191 CreateTransactionState(const PushJob& job,
192 JobInfo& info) :
193 job_(job),
194 info_(info)
195 {
196 TransferScheduler scheduler;
197 scheduler.ParseListOfResources(job_.cache_, job_.query_.GetResources());
198
199 Json::Value push;
200 scheduler.FormatPushTransaction(push, buckets_,
201 job.targetBucketSize_, 2 * job.targetBucketSize_,
202 job_.query_.GetCompression());
203
204 Json::FastWriter writer;
205 createTransaction_ = writer.write(push);
206
207 info_.SetContent("Peer", job_.query_.GetPeer());
208 info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression()));
209 info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount()));
210 info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize()));
211 }
212
213 virtual StateUpdate* Step()
214 {
215 Json::Value answer;
216 if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_PUSH, createTransaction_))
217 {
218 LOG(ERROR) << "Cannot create a push transaction to peer \""
219 << job_.query_.GetPeer()
220 << "\" (check that it has the transfers accelerator plugin installed)";
221 return StateUpdate::Failure();
222 }
223
224 if (answer.type() != Json::objectValue ||
225 !answer.isMember(KEY_PATH) ||
226 answer[KEY_PATH].type() != Json::stringValue)
227 {
228 LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer();
229 return StateUpdate::Failure();
230 }
231
232 std::string transactionUri = answer[KEY_PATH].asString();
233
234 return StateUpdate::Next(new PushBucketsState(job_, info_, transactionUri, buckets_));
235 }
236
237 virtual void Stop(OrthancPluginJobStopReason reason)
238 {
239 }
240 };
241
242
243 StatefulOrthancJob::StateUpdate* PushJob::CreateInitialState(JobInfo& info)
244 {
245 return StateUpdate::Next(new CreateTransactionState(*this, info));
246 }
247
248
249 PushJob::PushJob(OrthancPluginContext* context,
250 const TransferQuery& query,
251 OrthancInstancesCache& cache,
252 size_t threadsCount,
253 size_t targetBucketSize) :
254 StatefulOrthancJob(JOB_TYPE_PUSH),
255 context_(context),
256 cache_(cache),
257 query_(query),
258 threadsCount_(threadsCount),
259 targetBucketSize_(targetBucketSize),
260 peers_(context)
261 {
262 if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
263 {
264 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer();
265 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
266 }
267
268 Json::Value serialized;
269 query.Serialize(serialized);
270 UpdateSerialized(serialized);
271 }
272 }