Mercurial > hg > orthanc-transfers
annotate Framework/TransferScheduler.cpp @ 27:8f3137dc55c1
sync
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 06 Jul 2020 21:33:11 +0200 |
parents | ebf978ab064d |
children | 44a0430d7899 |
rev | line source |
---|---|
0 | 1 /** |
2 * Transfers accelerator plugin for Orthanc | |
19 | 3 * Copyright (C) 2018-2020 Osimis S.A., Belgium |
0 | 4 * |
5 * This program is free software: you can redistribute it and/or | |
6 * modify it under the terms of the GNU Affero General Public License | |
7 * as published by the Free Software Foundation, either version 3 of | |
8 * the License, or (at your option) any later version. | |
9 * | |
10 * This program is distributed in the hope that it will be useful, but | |
11 * WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 * Affero General Public License for more details. | |
14 * | |
15 * You should have received a copy of the GNU Affero General Public License | |
16 * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 **/ | |
18 | |
19 | |
20 #include "TransferScheduler.h" | |
21 | |
22 | 22 #include "../Resources/Orthanc/Plugins/OrthancPluginCppWrapper.h" |
23 | |
20 | 24 #include <Logging.h> |
25 #include <OrthancException.h> | |
0 | 26 |
27 | |
28 namespace OrthancPlugins | |
29 { | |
30 void TransferScheduler::AddResource(OrthancInstancesCache& cache, | |
31 Orthanc::ResourceType level, | |
32 const std::string& id) | |
33 { | |
34 Json::Value resource; | |
35 | |
36 std::string base; | |
37 switch (level) | |
38 { | |
39 case Orthanc::ResourceType_Patient: | |
40 base = "patients"; | |
41 break; | |
42 | |
43 case Orthanc::ResourceType_Study: | |
44 base = "studies"; | |
45 break; | |
46 | |
47 case Orthanc::ResourceType_Series: | |
48 base = "series"; | |
49 break; | |
50 | |
51 default: | |
52 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); | |
53 } | |
54 | |
8
4c3437217518
fix for compatibility with simplified OrthancPluginCppWrapper
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
0
diff
changeset
|
55 if (RestApiGet(resource, "/" + base + "/" + id + "/instances", false)) |
0 | 56 { |
57 if (resource.type() != Json::arrayValue) | |
58 { | |
59 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | |
60 } | |
61 | |
62 for (Json::Value::ArrayIndex i = 0; i < resource.size(); i++) | |
63 { | |
64 if (resource[i].type() != Json::objectValue || | |
65 !resource[i].isMember(KEY_ID) || | |
66 resource[i][KEY_ID].type() != Json::stringValue) | |
67 { | |
68 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | |
69 } | |
70 | |
71 AddInstance(cache, resource[i][KEY_ID].asString()); | |
72 } | |
73 } | |
74 else | |
75 { | |
76 std::string s = Orthanc::EnumerationToString(level); | |
77 Orthanc::Toolbox::ToLowerCase(s); | |
78 LOG(WARNING) << "Missing " << s << ": " << id; | |
79 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); | |
80 } | |
81 } | |
82 | |
83 | |
84 void TransferScheduler::ComputeBucketsInternal(std::vector<TransferBucket>& target, | |
85 size_t groupThreshold, | |
86 size_t separateThreshold, | |
87 const std::string& baseUrl, /* only needed in pull mode */ | |
88 BucketCompression compression /* only needed in pull mode */) const | |
89 { | |
90 if (groupThreshold > separateThreshold || | |
91 separateThreshold == 0) // (*) | |
92 { | |
93 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); | |
94 } | |
95 | |
96 target.clear(); | |
97 | |
98 std::list<std::string> toGroup_; | |
99 | |
100 for (Instances::const_iterator it = instances_.begin(); | |
101 it != instances_.end(); ++it) | |
102 { | |
103 size_t size = it->second.GetSize(); | |
104 | |
105 if (size < groupThreshold) | |
106 { | |
107 toGroup_.push_back(it->first); | |
108 } | |
109 else if (size < separateThreshold) | |
110 { | |
111 // Send the whole instance as it is | |
112 TransferBucket bucket; | |
113 bucket.AddChunk(it->second, 0, size); | |
114 target.push_back(bucket); | |
115 } | |
116 else | |
117 { | |
118 // Divide this large instance as a set of chunks | |
119 size_t chunksCount; | |
120 | |
121 if (size % separateThreshold == 0) | |
122 { | |
123 chunksCount = size / separateThreshold; | |
124 } | |
125 else | |
126 { | |
127 chunksCount = size / separateThreshold + 1; | |
128 } | |
129 | |
130 assert(chunksCount != 0); // This follows from (*) | |
131 | |
132 size_t chunkSize = size / chunksCount; | |
133 size_t offset = 0; | |
134 | |
135 for (size_t i = 0; i < chunksCount; i++, offset += chunkSize) | |
136 { | |
137 TransferBucket bucket; | |
138 | |
139 if (i == chunksCount - 1) | |
140 { | |
141 // The last chunk must contain all the remaining bytes | |
142 // of the instance (correction of rounding effects) | |
143 bucket.AddChunk(it->second, offset, size - offset); | |
144 } | |
145 else | |
146 { | |
147 bucket.AddChunk(it->second, offset, chunkSize); | |
148 } | |
149 | |
150 target.push_back(bucket); | |
151 } | |
152 } | |
153 } | |
154 | |
155 // Grouping the remaining small instances, preventing the | |
156 // download URL from getting too long: "If you keep URLs under | |
157 // 2000 characters, they'll work in virtually any combination of | |
158 // client and server software." | |
159 // https://stackoverflow.com/a/417184/881731 | |
160 | |
161 static const size_t MAX_URL_LENGTH = 2000 - 44 /* size of an Orthanc identifier (SHA-1) */; | |
162 | |
163 TransferBucket bucket; | |
164 | |
165 for (std::list<std::string>::const_iterator it = toGroup_.begin(); | |
166 it != toGroup_.end(); ++it) | |
167 { | |
168 Instances::const_iterator instance = instances_.find(*it); | |
169 assert(instance != instances_.end()); | |
170 | |
171 bucket.AddChunk(instance->second, 0, instance->second.GetSize()); | |
172 | |
173 bool full = (bucket.GetTotalSize() >= groupThreshold); | |
174 | |
175 if (!full && !baseUrl.empty()) | |
176 { | |
177 std::string uri; | |
178 bucket.ComputePullUri(uri, compression); | |
179 | |
180 std::string url = baseUrl + uri; | |
181 full = (url.length() >= MAX_URL_LENGTH); | |
182 } | |
183 | |
184 if (full) | |
185 { | |
186 target.push_back(bucket); | |
187 bucket.Clear(); | |
188 } | |
189 } | |
190 | |
191 if (bucket.GetChunksCount() > 0) | |
192 { | |
193 target.push_back(bucket); | |
194 } | |
195 } | |
196 | |
197 | |
198 void TransferScheduler::AddInstance(OrthancInstancesCache& cache, | |
199 const std::string& instanceId) | |
200 { | |
201 size_t size; | |
202 std::string md5; | |
203 cache.GetInstanceInfo(size, md5, instanceId); | |
204 | |
205 AddInstance(DicomInstanceInfo(instanceId, size, md5)); | |
206 } | |
207 | |
208 | |
209 void TransferScheduler::AddInstance(const DicomInstanceInfo& info) | |
210 { | |
211 instances_[info.GetId()] = info; | |
212 } | |
213 | |
214 | |
215 void TransferScheduler::ParseListOfResources(OrthancInstancesCache& cache, | |
216 const Json::Value& resources) | |
217 { | |
218 if (resources.type() != Json::arrayValue) | |
219 { | |
220 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat); | |
221 } | |
222 | |
223 for (Json::Value::ArrayIndex i = 0; i < resources.size(); i++) | |
224 { | |
225 if (resources[i].type() != Json::objectValue || | |
226 !resources[i].isMember(KEY_LEVEL) || | |
227 !resources[i].isMember(KEY_ID) || | |
228 resources[i][KEY_LEVEL].type() != Json::stringValue || | |
229 resources[i][KEY_ID].type() != Json::stringValue) | |
230 { | |
231 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat); | |
232 } | |
233 else | |
234 { | |
235 Orthanc::ResourceType level = Orthanc::StringToResourceType(resources[i][KEY_LEVEL].asCString()); | |
236 | |
237 switch (level) | |
238 { | |
239 case Orthanc::ResourceType_Patient: | |
240 AddPatient(cache, resources[i][KEY_ID].asString()); | |
241 break; | |
242 | |
243 case Orthanc::ResourceType_Study: | |
244 AddStudy(cache, resources[i][KEY_ID].asString()); | |
245 break; | |
246 | |
247 case Orthanc::ResourceType_Series: | |
248 AddSeries(cache, resources[i][KEY_ID].asString()); | |
249 break; | |
250 | |
251 case Orthanc::ResourceType_Instance: | |
252 AddInstance(cache, resources[i][KEY_ID].asString()); | |
253 break; | |
254 | |
255 default: | |
256 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); | |
257 } | |
258 } | |
259 } | |
260 } | |
261 | |
262 | |
263 void TransferScheduler::ListInstances(std::vector<DicomInstanceInfo>& target) const | |
264 { | |
265 target.clear(); | |
266 target.reserve(instances_.size()); | |
267 | |
268 for (Instances::const_iterator it = instances_.begin(); | |
269 it != instances_.end(); ++it) | |
270 { | |
271 assert(it->first == it->second.GetId()); | |
272 target.push_back(it->second); | |
273 } | |
274 } | |
275 | |
276 | |
277 size_t TransferScheduler::GetTotalSize() const | |
278 { | |
279 size_t size = 0; | |
280 | |
281 for (Instances::const_iterator it = instances_.begin(); | |
282 it != instances_.end(); ++it) | |
283 { | |
284 size += it->second.GetSize(); | |
285 } | |
286 | |
287 return size; | |
288 } | |
289 | |
290 | |
291 void TransferScheduler::ComputePullBuckets(std::vector<TransferBucket>& target, | |
292 size_t groupThreshold, | |
293 size_t separateThreshold, | |
294 const std::string& baseUrl, | |
295 BucketCompression compression) const | |
296 { | |
297 ComputeBucketsInternal(target, groupThreshold, separateThreshold, baseUrl, compression); | |
298 } | |
299 | |
300 | |
301 void TransferScheduler::FormatPushTransaction(Json::Value& target, | |
302 std::vector<TransferBucket>& buckets, | |
303 size_t groupThreshold, | |
304 size_t separateThreshold, | |
305 BucketCompression compression) const | |
306 { | |
307 ComputeBucketsInternal(buckets, groupThreshold, separateThreshold, "", BucketCompression_None); | |
308 | |
309 target = Json::objectValue; | |
310 | |
311 Json::Value tmp = Json::arrayValue; | |
312 | |
313 for (Instances::const_iterator it = instances_.begin(); | |
314 it != instances_.end(); ++it) | |
315 { | |
316 Json::Value item; | |
317 it->second.Serialize(item); | |
318 tmp.append(item); | |
319 } | |
320 | |
321 target[KEY_INSTANCES] = tmp; | |
322 | |
323 tmp = Json::arrayValue; | |
324 | |
325 for (size_t i = 0; i < buckets.size(); i++) | |
326 { | |
327 Json::Value item; | |
328 buckets[i].Serialize(item); | |
329 tmp.append(item); | |
330 } | |
331 | |
332 target[KEY_BUCKETS] = tmp; | |
333 | |
334 switch (compression) | |
335 { | |
336 case BucketCompression_Gzip: | |
337 target[KEY_COMPRESSION] = "gzip"; | |
338 break; | |
339 | |
340 case BucketCompression_None: | |
341 target[KEY_COMPRESSION] = "none"; | |
342 break; | |
343 | |
344 default: | |
345 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); | |
346 } | |
347 } | |
348 } |