Mercurial > hg > orthanc-transfers
comparison Framework/PullMode/PullJob.cpp @ 0:95226b754d9e
initial release
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 17 Sep 2018 11:34:55 +0200 |
parents | |
children | 5e6de82bb10f |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:95226b754d9e |
---|---|
1 /** | |
2 * Transfers accelerator plugin for Orthanc | |
3 * Copyright (C) 2018 Osimis, Belgium | |
4 * | |
5 * This program is free software: you can redistribute it and/or | |
6 * modify it under the terms of the GNU Affero General Public License | |
7 * as published by the Free Software Foundation, either version 3 of | |
8 * the License, or (at your option) any later version. | |
9 * | |
10 * This program is distributed in the hope that it will be useful, but | |
11 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 * Affero General Public License for more details. | |
14 * | |
15 * You should have received a copy of the GNU Affero General Public License | |
16 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 **/ | |
18 | |
19 | |
20 #include "PullJob.h" | |
21 | |
22 #include "BucketPullQuery.h" | |
23 #include "../HttpQueries/HttpQueriesRunner.h" | |
24 #include "../TransferScheduler.h" | |
25 | |
26 #include <Core/Logging.h> | |
27 | |
28 #include <json/writer.h> | |
29 | |
30 | |
31 namespace OrthancPlugins | |
32 { | |
33 class PullJob::CommitState : public IState | |
34 { | |
35 private: | |
36 const PullJob& job_; | |
37 std::auto_ptr<DownloadArea> area_; | |
38 | |
39 public: | |
40 CommitState(const PullJob& job, | |
41 DownloadArea* area /* takes ownership */) : | |
42 job_(job), | |
43 area_(area) | |
44 { | |
45 } | |
46 | |
47 virtual StateUpdate* Step() | |
48 { | |
49 area_->Commit(job_.context_); | |
50 return StateUpdate::Success(); | |
51 } | |
52 | |
53 virtual void Stop(OrthancPluginJobStopReason reason) | |
54 { | |
55 } | |
56 }; | |
57 | |
58 | |
59 class PullJob::PullBucketsState : public IState | |
60 { | |
61 private: | |
62 const PullJob& job_; | |
63 JobInfo& info_; | |
64 HttpQueriesQueue queue_; | |
65 std::auto_ptr<DownloadArea> area_; | |
66 std::auto_ptr<HttpQueriesRunner> runner_; | |
67 | |
68 void UpdateInfo() | |
69 { | |
70 size_t scheduledQueriesCount, completedQueriesCount; | |
71 uint64_t uploadedSize, downloadedSize; | |
72 queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize); | |
73 | |
74 info_.SetContent("DownloadedSizeMB", ConvertToMegabytes(downloadedSize)); | |
75 info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount)); | |
76 | |
77 if (runner_.get() != NULL) | |
78 { | |
79 float speed; | |
80 runner_->GetSpeed(speed); | |
81 info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed)); | |
82 } | |
83 | |
84 // The "2" below corresponds to the "LookupInstancesState" | |
85 // and "CommitState" steps (which prevents division by zero) | |
86 info_.SetProgress(static_cast<float>(1 /* LookupInstancesState */ + completedQueriesCount) / | |
87 static_cast<float>(2 + scheduledQueriesCount)); | |
88 } | |
89 | |
90 public: | |
91 PullBucketsState(const PullJob& job, | |
92 JobInfo& info, | |
93 const TransferScheduler& scheduler) : | |
94 job_(job), | |
95 info_(info), | |
96 queue_(job.context_), | |
97 area_(new DownloadArea(scheduler)) | |
98 { | |
99 const std::string baseUrl = job.peers_.GetPeerUrl(job.query_.GetPeer()); | |
100 | |
101 std::vector<TransferBucket> buckets; | |
102 scheduler.ComputePullBuckets(buckets, job.targetBucketSize_, 2 * job.targetBucketSize_, | |
103 baseUrl, job.query_.GetCompression()); | |
104 area_.reset(new DownloadArea(scheduler)); | |
105 | |
106 queue_.Reserve(buckets.size()); | |
107 | |
108 for (size_t i = 0; i < buckets.size(); i++) | |
109 { | |
110 queue_.Enqueue(new BucketPullQuery(*area_, buckets[i], job.query_.GetPeer(), job.query_.GetCompression())); | |
111 } | |
112 | |
113 info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount())); | |
114 info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize())); | |
115 UpdateInfo(); | |
116 } | |
117 | |
118 virtual StateUpdate* Step() | |
119 { | |
120 if (runner_.get() == NULL) | |
121 { | |
122 runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_)); | |
123 } | |
124 | |
125 HttpQueriesQueue::Status status = queue_.WaitComplete(200); | |
126 | |
127 UpdateInfo(); | |
128 | |
129 switch (status) | |
130 { | |
131 case HttpQueriesQueue::Status_Running: | |
132 return StateUpdate::Continue(); | |
133 | |
134 case HttpQueriesQueue::Status_Success: | |
135 return StateUpdate::Next(new CommitState(job_, area_.release())); | |
136 | |
137 case HttpQueriesQueue::Status_Failure: | |
138 return StateUpdate::Failure(); | |
139 | |
140 default: | |
141 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | |
142 } | |
143 } | |
144 | |
145 virtual void Stop(OrthancPluginJobStopReason reason) | |
146 { | |
147 // Cancel the running download threads | |
148 runner_.reset(); | |
149 } | |
150 }; | |
151 | |
152 | |
153 class PullJob::LookupInstancesState : public IState | |
154 { | |
155 private: | |
156 const PullJob& job_; | |
157 JobInfo& info_; | |
158 | |
159 public: | |
160 LookupInstancesState(const PullJob& job, | |
161 JobInfo& info) : | |
162 job_(job), | |
163 info_(info) | |
164 { | |
165 info_.SetContent("Peer", job_.query_.GetPeer()); | |
166 info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression())); | |
167 } | |
168 | |
169 virtual StateUpdate* Step() | |
170 { | |
171 Json::FastWriter writer; | |
172 const std::string lookup = writer.write(job_.query_.GetResources()); | |
173 | |
174 Json::Value answer; | |
175 if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_LOOKUP, lookup)) | |
176 { | |
177 LOG(ERROR) << "Cannot retrieve the list of instances to pull from peer \"" | |
178 << job_.query_.GetPeer() | |
179 << "\" (check that it has the transfers accelerator plugin installed)"; | |
180 return StateUpdate::Failure(); | |
181 } | |
182 | |
183 if (answer.type() != Json::objectValue || | |
184 !answer.isMember(KEY_INSTANCES) || | |
185 !answer.isMember(KEY_ORIGINATOR_UUID) || | |
186 answer[KEY_INSTANCES].type() != Json::arrayValue || | |
187 answer[KEY_ORIGINATOR_UUID].type() != Json::stringValue) | |
188 { | |
189 LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer(); | |
190 return StateUpdate::Failure(); | |
191 } | |
192 | |
193 if (job_.query_.HasOriginator() && | |
194 job_.query_.GetOriginator() != answer[KEY_ORIGINATOR_UUID].asString()) | |
195 { | |
196 LOG(ERROR) << "Invalid originator, check out the \"" | |
197 << KEY_PLUGIN_CONFIGURATION << "." << KEY_BIDIRECTIONAL_PEERS | |
198 << "\" configuration option"; | |
199 return StateUpdate::Failure(); | |
200 } | |
201 | |
202 TransferScheduler scheduler; | |
203 | |
204 for (Json::Value::ArrayIndex i = 0; i < answer[KEY_INSTANCES].size(); i++) | |
205 { | |
206 DicomInstanceInfo instance(answer[KEY_INSTANCES][i]); | |
207 scheduler.AddInstance(instance); | |
208 } | |
209 | |
210 if (scheduler.GetInstancesCount() == 0) | |
211 { | |
212 // We're already done: No instance to be retrieved | |
213 return StateUpdate::Success(); | |
214 } | |
215 else | |
216 { | |
217 return StateUpdate::Next(new PullBucketsState(job_, info_, scheduler)); | |
218 } | |
219 } | |
220 | |
221 virtual void Stop(OrthancPluginJobStopReason reason) | |
222 { | |
223 } | |
224 }; | |
225 | |
226 | |
227 StatefulOrthancJob::StateUpdate* PullJob::CreateInitialState(JobInfo& info) | |
228 { | |
229 return StateUpdate::Next(new LookupInstancesState(*this, info)); | |
230 } | |
231 | |
232 | |
233 PullJob::PullJob(OrthancPluginContext* context, | |
234 const TransferQuery& query, | |
235 size_t threadsCount, | |
236 size_t targetBucketSize) : | |
237 StatefulOrthancJob(JOB_TYPE_PULL), | |
238 context_(context), | |
239 query_(query), | |
240 threadsCount_(threadsCount), | |
241 targetBucketSize_(targetBucketSize), | |
242 peers_(context) | |
243 { | |
244 if (!peers_.LookupName(peerIndex_, query_.GetPeer())) | |
245 { | |
246 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer(); | |
247 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); | |
248 } | |
249 | |
250 Json::Value serialized; | |
251 query.Serialize(serialized); | |
252 UpdateSerialized(serialized); | |
253 } | |
254 } |