Mercurial > hg > orthanc-transfers
annotate Framework/PullMode/PullJob.cpp @ 8:4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 05 Dec 2018 09:16:51 +0100 |
parents | 5e6de82bb10f |
children | 7e207ade2f1a |
rev | line source |
---|---|
0 | 1 /** |
2 * Transfers accelerator plugin for Orthanc | |
3 * Copyright (C) 2018 Osimis, Belgium | |
4 * | |
5 * This program is free software: you can redistribute it and/or | |
6 * modify it under the terms of the GNU Affero General Public License | |
7 * as published by the Free Software Foundation, either version 3 of | |
8 * the License, or (at your option) any later version. | |
9 * | |
10 * This program is distributed in the hope that it will be useful, but | |
11 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 * Affero General Public License for more details. | |
14 * | |
15 * You should have received a copy of the GNU Affero General Public License | |
16 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 **/ | |
18 | |
19 | |
20 #include "PullJob.h" | |
21 | |
22 #include "BucketPullQuery.h" | |
23 #include "../HttpQueries/HttpQueriesRunner.h" | |
24 #include "../TransferScheduler.h" | |
25 | |
26 #include <Core/Logging.h> | |
27 | |
28 #include <json/writer.h> | |
29 | |
30 | |
31 namespace OrthancPlugins | |
32 { | |
33 class PullJob::CommitState : public IState | |
34 { | |
35 private: | |
36 const PullJob& job_; | |
37 std::auto_ptr<DownloadArea> area_; | |
38 | |
39 public: | |
40 CommitState(const PullJob& job, | |
41 DownloadArea* area /* takes ownership */) : | |
42 job_(job), | |
43 area_(area) | |
44 { | |
45 } | |
46 | |
47 virtual StateUpdate* Step() | |
48 { | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5
diff
changeset
|
49 area_->Commit(); |
0 | 50 return StateUpdate::Success(); |
51 } | |
52 | |
53 virtual void Stop(OrthancPluginJobStopReason reason) | |
54 { | |
55 } | |
56 }; | |
57 | |
58 | |
59 class PullJob::PullBucketsState : public IState | |
60 { | |
61 private: | |
62 const PullJob& job_; | |
63 JobInfo& info_; | |
64 HttpQueriesQueue queue_; | |
65 std::auto_ptr<DownloadArea> area_; | |
66 std::auto_ptr<HttpQueriesRunner> runner_; | |
67 | |
68 void UpdateInfo() | |
69 { | |
70 size_t scheduledQueriesCount, completedQueriesCount; | |
71 uint64_t uploadedSize, downloadedSize; | |
72 queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize); | |
73 | |
74 info_.SetContent("DownloadedSizeMB", ConvertToMegabytes(downloadedSize)); | |
75 info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount)); | |
76 | |
77 if (runner_.get() != NULL) | |
78 { | |
79 float speed; | |
80 runner_->GetSpeed(speed); | |
81 info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed)); | |
82 } | |
83 | |
84 // The "2" below corresponds to the "LookupInstancesState" | |
85 // and "CommitState" steps (which prevents division by zero) | |
86 info_.SetProgress(static_cast<float>(1 /* LookupInstancesState */ + completedQueriesCount) / | |
87 static_cast<float>(2 + scheduledQueriesCount)); | |
88 } | |
89 | |
90 public: | |
91 PullBucketsState(const PullJob& job, | |
92 JobInfo& info, | |
93 const TransferScheduler& scheduler) : | |
94 job_(job), | |
95 info_(info), | |
96 area_(new DownloadArea(scheduler)) | |
97 { | |
98 const std::string baseUrl = job.peers_.GetPeerUrl(job.query_.GetPeer()); | |
99 | |
100 std::vector<TransferBucket> buckets; | |
101 scheduler.ComputePullBuckets(buckets, job.targetBucketSize_, 2 * job.targetBucketSize_, | |
102 baseUrl, job.query_.GetCompression()); | |
103 area_.reset(new DownloadArea(scheduler)); | |
104 | |
105 queue_.Reserve(buckets.size()); | |
106 | |
107 for (size_t i = 0; i < buckets.size(); i++) | |
108 { | |
109 queue_.Enqueue(new BucketPullQuery(*area_, buckets[i], job.query_.GetPeer(), job.query_.GetCompression())); | |
110 } | |
111 | |
112 info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount())); | |
113 info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize())); | |
114 UpdateInfo(); | |
115 } | |
116 | |
117 virtual StateUpdate* Step() | |
118 { | |
119 if (runner_.get() == NULL) | |
120 { | |
121 runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_)); | |
122 } | |
123 | |
124 HttpQueriesQueue::Status status = queue_.WaitComplete(200); | |
125 | |
126 UpdateInfo(); | |
127 | |
128 switch (status) | |
129 { | |
130 case HttpQueriesQueue::Status_Running: | |
131 return StateUpdate::Continue(); | |
132 | |
133 case HttpQueriesQueue::Status_Success: | |
134 return StateUpdate::Next(new CommitState(job_, area_.release())); | |
135 | |
136 case HttpQueriesQueue::Status_Failure: | |
137 return StateUpdate::Failure(); | |
138 | |
139 default: | |
140 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | |
141 } | |
142 } | |
143 | |
144 virtual void Stop(OrthancPluginJobStopReason reason) | |
145 { | |
146 // Cancel the running download threads | |
147 runner_.reset(); | |
148 } | |
149 }; | |
150 | |
151 | |
152 class PullJob::LookupInstancesState : public IState | |
153 { | |
154 private: | |
155 const PullJob& job_; | |
156 JobInfo& info_; | |
157 | |
158 public: | |
159 LookupInstancesState(const PullJob& job, | |
160 JobInfo& info) : | |
161 job_(job), | |
162 info_(info) | |
163 { | |
164 info_.SetContent("Peer", job_.query_.GetPeer()); | |
165 info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression())); | |
166 } | |
167 | |
168 virtual StateUpdate* Step() | |
169 { | |
170 Json::FastWriter writer; | |
171 const std::string lookup = writer.write(job_.query_.GetResources()); | |
172 | |
173 Json::Value answer; | |
174 if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_LOOKUP, lookup)) | |
175 { | |
176 LOG(ERROR) << "Cannot retrieve the list of instances to pull from peer \"" | |
177 << job_.query_.GetPeer() | |
178 << "\" (check that it has the transfers accelerator plugin installed)"; | |
179 return StateUpdate::Failure(); | |
180 } | |
181 | |
182 if (answer.type() != Json::objectValue || | |
183 !answer.isMember(KEY_INSTANCES) || | |
184 !answer.isMember(KEY_ORIGINATOR_UUID) || | |
185 answer[KEY_INSTANCES].type() != Json::arrayValue || | |
186 answer[KEY_ORIGINATOR_UUID].type() != Json::stringValue) | |
187 { | |
188 LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer(); | |
189 return StateUpdate::Failure(); | |
190 } | |
191 | |
192 if (job_.query_.HasOriginator() && | |
193 job_.query_.GetOriginator() != answer[KEY_ORIGINATOR_UUID].asString()) | |
194 { | |
5
5e6de82bb10f
use of user properties instead of BidirectionalPeers option
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
195 LOG(ERROR) << "Invalid originator, check out the \"" << KEY_REMOTE_SELF |
5e6de82bb10f
use of user properties instead of BidirectionalPeers option
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
196 << "\" configuration option of peer: " << job_.query_.GetPeer(); |
0 | 197 return StateUpdate::Failure(); |
198 } | |
199 | |
200 TransferScheduler scheduler; | |
201 | |
202 for (Json::Value::ArrayIndex i = 0; i < answer[KEY_INSTANCES].size(); i++) | |
203 { | |
204 DicomInstanceInfo instance(answer[KEY_INSTANCES][i]); | |
205 scheduler.AddInstance(instance); | |
206 } | |
207 | |
208 if (scheduler.GetInstancesCount() == 0) | |
209 { | |
210 // We're already done: No instance to be retrieved | |
211 return StateUpdate::Success(); | |
212 } | |
213 else | |
214 { | |
215 return StateUpdate::Next(new PullBucketsState(job_, info_, scheduler)); | |
216 } | |
217 } | |
218 | |
219 virtual void Stop(OrthancPluginJobStopReason reason) | |
220 { | |
221 } | |
222 }; | |
223 | |
224 | |
225 StatefulOrthancJob::StateUpdate* PullJob::CreateInitialState(JobInfo& info) | |
226 { | |
227 return StateUpdate::Next(new LookupInstancesState(*this, info)); | |
228 } | |
229 | |
230 | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5
diff
changeset
|
231 PullJob::PullJob(const TransferQuery& query, |
0 | 232 size_t threadsCount, |
233 size_t targetBucketSize) : | |
234 StatefulOrthancJob(JOB_TYPE_PULL), | |
235 query_(query), | |
236 threadsCount_(threadsCount), | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
5
diff
changeset
|
237 targetBucketSize_(targetBucketSize) |
0 | 238 { |
239 if (!peers_.LookupName(peerIndex_, query_.GetPeer())) | |
240 { | |
241 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer(); | |
242 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); | |
243 } | |
244 | |
245 Json::Value serialized; | |
246 query.Serialize(serialized); | |
247 UpdateSerialized(serialized); | |
248 } | |
249 } |