comparison Framework/PullMode/PullJob.cpp @ 0:95226b754d9e

initial release
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 17 Sep 2018 11:34:55 +0200
parents
children 5e6de82bb10f
comparison
equal deleted inserted replaced
-1:000000000000 0:95226b754d9e
1 /**
2 * Transfers accelerator plugin for Orthanc
3 * Copyright (C) 2018 Osimis, Belgium
4 *
5 * This program is free software: you can redistribute it and/or
6 * modify it under the terms of the GNU Affero General Public License
7 * as published by the Free Software Foundation, either version 3 of
8 * the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 **/
18
19
20 #include "PullJob.h"
21
22 #include "BucketPullQuery.h"
23 #include "../HttpQueries/HttpQueriesRunner.h"
24 #include "../TransferScheduler.h"
25
26 #include <Core/Logging.h>
27
28 #include <json/writer.h>
29
30
31 namespace OrthancPlugins
32 {
33 class PullJob::CommitState : public IState
34 {
35 private:
36 const PullJob& job_;
37 std::auto_ptr<DownloadArea> area_;
38
39 public:
40 CommitState(const PullJob& job,
41 DownloadArea* area /* takes ownership */) :
42 job_(job),
43 area_(area)
44 {
45 }
46
47 virtual StateUpdate* Step()
48 {
49 area_->Commit(job_.context_);
50 return StateUpdate::Success();
51 }
52
53 virtual void Stop(OrthancPluginJobStopReason reason)
54 {
55 }
56 };
57
58
59 class PullJob::PullBucketsState : public IState
60 {
61 private:
62 const PullJob& job_;
63 JobInfo& info_;
64 HttpQueriesQueue queue_;
65 std::auto_ptr<DownloadArea> area_;
66 std::auto_ptr<HttpQueriesRunner> runner_;
67
68 void UpdateInfo()
69 {
70 size_t scheduledQueriesCount, completedQueriesCount;
71 uint64_t uploadedSize, downloadedSize;
72 queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize);
73
74 info_.SetContent("DownloadedSizeMB", ConvertToMegabytes(downloadedSize));
75 info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount));
76
77 if (runner_.get() != NULL)
78 {
79 float speed;
80 runner_->GetSpeed(speed);
81 info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed));
82 }
83
84 // The "2" below corresponds to the "LookupInstancesState"
85 // and "CommitState" steps (which prevents division by zero)
86 info_.SetProgress(static_cast<float>(1 /* LookupInstancesState */ + completedQueriesCount) /
87 static_cast<float>(2 + scheduledQueriesCount));
88 }
89
90 public:
91 PullBucketsState(const PullJob& job,
92 JobInfo& info,
93 const TransferScheduler& scheduler) :
94 job_(job),
95 info_(info),
96 queue_(job.context_),
97 area_(new DownloadArea(scheduler))
98 {
99 const std::string baseUrl = job.peers_.GetPeerUrl(job.query_.GetPeer());
100
101 std::vector<TransferBucket> buckets;
102 scheduler.ComputePullBuckets(buckets, job.targetBucketSize_, 2 * job.targetBucketSize_,
103 baseUrl, job.query_.GetCompression());
104 area_.reset(new DownloadArea(scheduler));
105
106 queue_.Reserve(buckets.size());
107
108 for (size_t i = 0; i < buckets.size(); i++)
109 {
110 queue_.Enqueue(new BucketPullQuery(*area_, buckets[i], job.query_.GetPeer(), job.query_.GetCompression()));
111 }
112
113 info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount()));
114 info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize()));
115 UpdateInfo();
116 }
117
118 virtual StateUpdate* Step()
119 {
120 if (runner_.get() == NULL)
121 {
122 runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_));
123 }
124
125 HttpQueriesQueue::Status status = queue_.WaitComplete(200);
126
127 UpdateInfo();
128
129 switch (status)
130 {
131 case HttpQueriesQueue::Status_Running:
132 return StateUpdate::Continue();
133
134 case HttpQueriesQueue::Status_Success:
135 return StateUpdate::Next(new CommitState(job_, area_.release()));
136
137 case HttpQueriesQueue::Status_Failure:
138 return StateUpdate::Failure();
139
140 default:
141 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
142 }
143 }
144
145 virtual void Stop(OrthancPluginJobStopReason reason)
146 {
147 // Cancel the running download threads
148 runner_.reset();
149 }
150 };
151
152
153 class PullJob::LookupInstancesState : public IState
154 {
155 private:
156 const PullJob& job_;
157 JobInfo& info_;
158
159 public:
160 LookupInstancesState(const PullJob& job,
161 JobInfo& info) :
162 job_(job),
163 info_(info)
164 {
165 info_.SetContent("Peer", job_.query_.GetPeer());
166 info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression()));
167 }
168
169 virtual StateUpdate* Step()
170 {
171 Json::FastWriter writer;
172 const std::string lookup = writer.write(job_.query_.GetResources());
173
174 Json::Value answer;
175 if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_LOOKUP, lookup))
176 {
177 LOG(ERROR) << "Cannot retrieve the list of instances to pull from peer \""
178 << job_.query_.GetPeer()
179 << "\" (check that it has the transfers accelerator plugin installed)";
180 return StateUpdate::Failure();
181 }
182
183 if (answer.type() != Json::objectValue ||
184 !answer.isMember(KEY_INSTANCES) ||
185 !answer.isMember(KEY_ORIGINATOR_UUID) ||
186 answer[KEY_INSTANCES].type() != Json::arrayValue ||
187 answer[KEY_ORIGINATOR_UUID].type() != Json::stringValue)
188 {
189 LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer();
190 return StateUpdate::Failure();
191 }
192
193 if (job_.query_.HasOriginator() &&
194 job_.query_.GetOriginator() != answer[KEY_ORIGINATOR_UUID].asString())
195 {
196 LOG(ERROR) << "Invalid originator, check out the \""
197 << KEY_PLUGIN_CONFIGURATION << "." << KEY_BIDIRECTIONAL_PEERS
198 << "\" configuration option";
199 return StateUpdate::Failure();
200 }
201
202 TransferScheduler scheduler;
203
204 for (Json::Value::ArrayIndex i = 0; i < answer[KEY_INSTANCES].size(); i++)
205 {
206 DicomInstanceInfo instance(answer[KEY_INSTANCES][i]);
207 scheduler.AddInstance(instance);
208 }
209
210 if (scheduler.GetInstancesCount() == 0)
211 {
212 // We're already done: No instance to be retrieved
213 return StateUpdate::Success();
214 }
215 else
216 {
217 return StateUpdate::Next(new PullBucketsState(job_, info_, scheduler));
218 }
219 }
220
221 virtual void Stop(OrthancPluginJobStopReason reason)
222 {
223 }
224 };
225
226
227 StatefulOrthancJob::StateUpdate* PullJob::CreateInitialState(JobInfo& info)
228 {
229 return StateUpdate::Next(new LookupInstancesState(*this, info));
230 }
231
232
233 PullJob::PullJob(OrthancPluginContext* context,
234 const TransferQuery& query,
235 size_t threadsCount,
236 size_t targetBucketSize) :
237 StatefulOrthancJob(JOB_TYPE_PULL),
238 context_(context),
239 query_(query),
240 threadsCount_(threadsCount),
241 targetBucketSize_(targetBucketSize),
242 peers_(context)
243 {
244 if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
245 {
246 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer();
247 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
248 }
249
250 Json::Value serialized;
251 query.Serialize(serialized);
252 UpdateSerialized(serialized);
253 }
254 }