comparison Framework/TransferScheduler.cpp @ 0:95226b754d9e

initial release
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 17 Sep 2018 11:34:55 +0200
parents
children 4c3437217518
comparison
equal deleted inserted replaced
-1:000000000000 0:95226b754d9e
1 /**
2 * Transfers accelerator plugin for Orthanc
3 * Copyright (C) 2018 Osimis, Belgium
4 *
5 * This program is free software: you can redistribute it and/or
6 * modify it under the terms of the GNU Affero General Public License
7 * as published by the Free Software Foundation, either version 3 of
8 * the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 **/
18
19
20 #include "TransferScheduler.h"
21
22 #include <Core/Logging.h>
23 #include <Core/OrthancException.h>
24 #include <Plugins/Samples/Common/OrthancPluginCppWrapper.h>
25
26
27 namespace OrthancPlugins
28 {
29 void TransferScheduler::AddResource(OrthancInstancesCache& cache,
30 Orthanc::ResourceType level,
31 const std::string& id)
32 {
33 Json::Value resource;
34
35 std::string base;
36 switch (level)
37 {
38 case Orthanc::ResourceType_Patient:
39 base = "patients";
40 break;
41
42 case Orthanc::ResourceType_Study:
43 base = "studies";
44 break;
45
46 case Orthanc::ResourceType_Series:
47 base = "series";
48 break;
49
50 default:
51 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
52 }
53
54 if (RestApiGet(resource, cache.GetContext(), "/" + base + "/" + id + "/instances", false))
55 {
56 if (resource.type() != Json::arrayValue)
57 {
58 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
59 }
60
61 for (Json::Value::ArrayIndex i = 0; i < resource.size(); i++)
62 {
63 if (resource[i].type() != Json::objectValue ||
64 !resource[i].isMember(KEY_ID) ||
65 resource[i][KEY_ID].type() != Json::stringValue)
66 {
67 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
68 }
69
70 AddInstance(cache, resource[i][KEY_ID].asString());
71 }
72 }
73 else
74 {
75 std::string s = Orthanc::EnumerationToString(level);
76 Orthanc::Toolbox::ToLowerCase(s);
77 LOG(WARNING) << "Missing " << s << ": " << id;
78 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
79 }
80 }
81
82
83 void TransferScheduler::ComputeBucketsInternal(std::vector<TransferBucket>& target,
84 size_t groupThreshold,
85 size_t separateThreshold,
86 const std::string& baseUrl, /* only needed in pull mode */
87 BucketCompression compression /* only needed in pull mode */) const
88 {
89 if (groupThreshold > separateThreshold ||
90 separateThreshold == 0) // (*)
91 {
92 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
93 }
94
95 target.clear();
96
97 std::list<std::string> toGroup_;
98
99 for (Instances::const_iterator it = instances_.begin();
100 it != instances_.end(); ++it)
101 {
102 size_t size = it->second.GetSize();
103
104 if (size < groupThreshold)
105 {
106 toGroup_.push_back(it->first);
107 }
108 else if (size < separateThreshold)
109 {
110 // Send the whole instance as it is
111 TransferBucket bucket;
112 bucket.AddChunk(it->second, 0, size);
113 target.push_back(bucket);
114 }
115 else
116 {
117 // Divide this large instance as a set of chunks
118 size_t chunksCount;
119
120 if (size % separateThreshold == 0)
121 {
122 chunksCount = size / separateThreshold;
123 }
124 else
125 {
126 chunksCount = size / separateThreshold + 1;
127 }
128
129 assert(chunksCount != 0); // This follows from (*)
130
131 size_t chunkSize = size / chunksCount;
132 size_t offset = 0;
133
134 for (size_t i = 0; i < chunksCount; i++, offset += chunkSize)
135 {
136 TransferBucket bucket;
137
138 if (i == chunksCount - 1)
139 {
140 // The last chunk must contain all the remaining bytes
141 // of the instance (correction of rounding effects)
142 bucket.AddChunk(it->second, offset, size - offset);
143 }
144 else
145 {
146 bucket.AddChunk(it->second, offset, chunkSize);
147 }
148
149 target.push_back(bucket);
150 }
151 }
152 }
153
154 // Grouping the remaining small instances, preventing the
155 // download URL from getting too long: "If you keep URLs under
156 // 2000 characters, they'll work in virtually any combination of
157 // client and server software."
158 // https://stackoverflow.com/a/417184/881731
159
160 static const size_t MAX_URL_LENGTH = 2000 - 44 /* size of an Orthanc identifier (SHA-1) */;
161
162 TransferBucket bucket;
163
164 for (std::list<std::string>::const_iterator it = toGroup_.begin();
165 it != toGroup_.end(); ++it)
166 {
167 Instances::const_iterator instance = instances_.find(*it);
168 assert(instance != instances_.end());
169
170 bucket.AddChunk(instance->second, 0, instance->second.GetSize());
171
172 bool full = (bucket.GetTotalSize() >= groupThreshold);
173
174 if (!full && !baseUrl.empty())
175 {
176 std::string uri;
177 bucket.ComputePullUri(uri, compression);
178
179 std::string url = baseUrl + uri;
180 full = (url.length() >= MAX_URL_LENGTH);
181 }
182
183 if (full)
184 {
185 target.push_back(bucket);
186 bucket.Clear();
187 }
188 }
189
190 if (bucket.GetChunksCount() > 0)
191 {
192 target.push_back(bucket);
193 }
194 }
195
196
197 void TransferScheduler::AddInstance(OrthancInstancesCache& cache,
198 const std::string& instanceId)
199 {
200 size_t size;
201 std::string md5;
202 cache.GetInstanceInfo(size, md5, instanceId);
203
204 AddInstance(DicomInstanceInfo(instanceId, size, md5));
205 }
206
207
208 void TransferScheduler::AddInstance(const DicomInstanceInfo& info)
209 {
210 instances_[info.GetId()] = info;
211 }
212
213
214 void TransferScheduler::ParseListOfResources(OrthancInstancesCache& cache,
215 const Json::Value& resources)
216 {
217 if (resources.type() != Json::arrayValue)
218 {
219 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat);
220 }
221
222 for (Json::Value::ArrayIndex i = 0; i < resources.size(); i++)
223 {
224 if (resources[i].type() != Json::objectValue ||
225 !resources[i].isMember(KEY_LEVEL) ||
226 !resources[i].isMember(KEY_ID) ||
227 resources[i][KEY_LEVEL].type() != Json::stringValue ||
228 resources[i][KEY_ID].type() != Json::stringValue)
229 {
230 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat);
231 }
232 else
233 {
234 Orthanc::ResourceType level = Orthanc::StringToResourceType(resources[i][KEY_LEVEL].asCString());
235
236 switch (level)
237 {
238 case Orthanc::ResourceType_Patient:
239 AddPatient(cache, resources[i][KEY_ID].asString());
240 break;
241
242 case Orthanc::ResourceType_Study:
243 AddStudy(cache, resources[i][KEY_ID].asString());
244 break;
245
246 case Orthanc::ResourceType_Series:
247 AddSeries(cache, resources[i][KEY_ID].asString());
248 break;
249
250 case Orthanc::ResourceType_Instance:
251 AddInstance(cache, resources[i][KEY_ID].asString());
252 break;
253
254 default:
255 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
256 }
257 }
258 }
259 }
260
261
262 void TransferScheduler::ListInstances(std::vector<DicomInstanceInfo>& target) const
263 {
264 target.clear();
265 target.reserve(instances_.size());
266
267 for (Instances::const_iterator it = instances_.begin();
268 it != instances_.end(); ++it)
269 {
270 assert(it->first == it->second.GetId());
271 target.push_back(it->second);
272 }
273 }
274
275
276 size_t TransferScheduler::GetTotalSize() const
277 {
278 size_t size = 0;
279
280 for (Instances::const_iterator it = instances_.begin();
281 it != instances_.end(); ++it)
282 {
283 size += it->second.GetSize();
284 }
285
286 return size;
287 }
288
289
290 void TransferScheduler::ComputePullBuckets(std::vector<TransferBucket>& target,
291 size_t groupThreshold,
292 size_t separateThreshold,
293 const std::string& baseUrl,
294 BucketCompression compression) const
295 {
296 ComputeBucketsInternal(target, groupThreshold, separateThreshold, baseUrl, compression);
297 }
298
299
300 void TransferScheduler::FormatPushTransaction(Json::Value& target,
301 std::vector<TransferBucket>& buckets,
302 size_t groupThreshold,
303 size_t separateThreshold,
304 BucketCompression compression) const
305 {
306 ComputeBucketsInternal(buckets, groupThreshold, separateThreshold, "", BucketCompression_None);
307
308 target = Json::objectValue;
309
310 Json::Value tmp = Json::arrayValue;
311
312 for (Instances::const_iterator it = instances_.begin();
313 it != instances_.end(); ++it)
314 {
315 Json::Value item;
316 it->second.Serialize(item);
317 tmp.append(item);
318 }
319
320 target[KEY_INSTANCES] = tmp;
321
322 tmp = Json::arrayValue;
323
324 for (size_t i = 0; i < buckets.size(); i++)
325 {
326 Json::Value item;
327 buckets[i].Serialize(item);
328 tmp.append(item);
329 }
330
331 target[KEY_BUCKETS] = tmp;
332
333 switch (compression)
334 {
335 case BucketCompression_Gzip:
336 target[KEY_COMPRESSION] = "gzip";
337 break;
338
339 case BucketCompression_None:
340 target[KEY_COMPRESSION] = "none";
341 break;
342
343 default:
344 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
345 }
346 }
347 }