comparison Framework/DownloadArea.cpp @ 0:95226b754d9e

initial release
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 17 Sep 2018 11:34:55 +0200
parents
children 4c3437217518
comparison
equal deleted inserted replaced
-1:000000000000 0:95226b754d9e
1 /**
2 * Transfers accelerator plugin for Orthanc
3 * Copyright (C) 2018 Osimis, Belgium
4 *
5 * This program is free software: you can redistribute it and/or
6 * modify it under the terms of the GNU Affero General Public License
7 * as published by the Free Software Foundation, either version 3 of
8 * the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 **/
18
19
20 #include "DownloadArea.h"
21
22 #include <Core/Compression/GzipCompressor.h>
23 #include <Core/Logging.h>
24 #include <Core/SystemToolbox.h>
25 #include <Plugins/Samples/Common/OrthancPluginCppWrapper.h>
26
27 #include <boost/filesystem.hpp>
28 #include <boost/filesystem/fstream.hpp>
29
30 namespace OrthancPlugins
31 {
32 class DownloadArea::Instance::Writer : public boost::noncopyable
33 {
34 private:
35 boost::filesystem::ofstream stream_;
36
37 public:
38 Writer(Orthanc::TemporaryFile& f,
39 bool create)
40 {
41 if (create)
42 {
43 // Create the file.
44 stream_.open(f.GetPath(), std::ofstream::out | std::ofstream::binary);
45 }
46 else
47 {
48 // Open the existing file to modify it. The "in" mode is
49 // necessary, otherwise previous content is lost by
50 // truncation (as an ofstream defaults to std::ios::trunc,
51 // the flag to truncate the existing content).
52 stream_.open(f.GetPath(), std::ofstream::in | std::ofstream::out | std::ofstream::binary);
53 }
54
55 if (!stream_.good())
56 {
57 throw Orthanc::OrthancException(Orthanc::ErrorCode_CannotWriteFile);
58 }
59 }
60
61 void Write(size_t offset,
62 const void* data,
63 size_t size)
64 {
65 stream_.seekp(offset);
66 stream_.write(reinterpret_cast<const char*>(data), size);
67 }
68 };
69
70
71 DownloadArea::Instance::Instance(const DicomInstanceInfo& info) :
72 info_(info)
73 {
74 Writer writer(file_, true);
75
76 // Create a sparse file of expected size
77 if (info_.GetSize() != 0)
78 {
79 writer.Write(info_.GetSize() - 1, "", 1);
80 }
81 }
82
83
84 void DownloadArea::Instance::WriteChunk(size_t offset,
85 const void* data,
86 size_t size)
87 {
88 if (offset + size > info_.GetSize())
89 {
90 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
91 }
92 else if (size > 0)
93 {
94 Writer writer(file_, false);
95 writer.Write(offset, data, size);
96 }
97 }
98
99
100 void DownloadArea::Instance::Commit(OrthancPluginContext* context,
101 bool simulate) const
102 {
103 std::string content;
104 Orthanc::SystemToolbox::ReadFile(content, file_.GetPath());
105
106 std::string md5;
107 Orthanc::Toolbox::ComputeMD5(md5, content);
108
109 if (md5 == info_.GetMD5())
110 {
111 if (!simulate)
112 {
113 if (context == NULL)
114 {
115 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
116 }
117
118 Json::Value result;
119 if (!RestApiPost(result, context, "/instances",
120 content.empty() ? NULL : content.c_str(), content.size(),
121 false))
122 {
123 LOG(ERROR) << "Cannot import a transfered DICOM instance into Orthanc: "
124 << info_.GetId();
125 throw Orthanc::OrthancException(Orthanc::ErrorCode_CorruptedFile);
126 }
127 }
128 }
129 else
130 {
131 LOG(ERROR) << "Bad MD5 sum in a transfered DICOM instance: " << info_.GetId();
132 throw Orthanc::OrthancException(Orthanc::ErrorCode_CorruptedFile);
133 }
134 }
135
136
137 void DownloadArea::Clear()
138 {
139 for (Instances::iterator it = instances_.begin();
140 it != instances_.end(); ++it)
141 {
142 if (it->second != NULL)
143 {
144 delete it->second;
145 it->second = NULL;
146 }
147 }
148
149 instances_.clear();
150 }
151
152
153 DownloadArea::Instance& DownloadArea::LookupInstance(const std::string& id)
154 {
155 Instances::iterator it = instances_.find(id);
156
157 if (it == instances_.end())
158 {
159 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
160 }
161 else if (it->first != id ||
162 it->second == NULL)
163 {
164 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
165 }
166 else
167 {
168 return *it->second;
169 }
170 }
171
172
173 void DownloadArea::WriteUncompressedBucket(const TransferBucket& bucket,
174 const void* data,
175 size_t size)
176 {
177 if (size != bucket.GetTotalSize())
178 {
179 throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol);
180 }
181
182 if (size == 0)
183 {
184 return;
185 }
186
187 size_t pos = 0;
188
189 for (size_t i = 0; i < bucket.GetChunksCount(); i++)
190 {
191 size_t chunkSize = bucket.GetChunkSize(i);
192 size_t offset = bucket.GetChunkOffset(i);
193
194 if (pos + chunkSize > size)
195 {
196 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
197 }
198
199 Instance& instance = LookupInstance(bucket.GetChunkInstanceId(i));
200 instance.WriteChunk(offset, reinterpret_cast<const char*>(data) + pos, chunkSize);
201
202 pos += chunkSize;
203 }
204
205 if (pos != size)
206 {
207 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
208 }
209 }
210
211
212 void DownloadArea::Setup(const std::vector<DicomInstanceInfo>& instances)
213 {
214 totalSize_ = 0;
215
216 for (size_t i = 0; i < instances.size(); i++)
217 {
218 const std::string& id = instances[i].GetId();
219
220 assert(instances_.find(id) == instances_.end());
221 instances_[id] = new Instance(instances[i]);
222
223 totalSize_ += instances[i].GetSize();
224 }
225 }
226
227
228 void DownloadArea::CommitInternal(OrthancPluginContext* context,
229 bool simulate)
230 {
231 boost::mutex::scoped_lock lock(mutex_);
232
233 for (Instances::iterator it = instances_.begin();
234 it != instances_.end(); ++it)
235 {
236 if (it->second != NULL)
237 {
238 it->second->Commit(context, simulate);
239 delete it->second;
240 it->second = NULL;
241 }
242 else
243 {
244 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
245 }
246 }
247 }
248
249
250 DownloadArea::DownloadArea(const TransferScheduler& scheduler)
251 {
252 std::vector<DicomInstanceInfo> instances;
253 scheduler.ListInstances(instances);
254 Setup(instances);
255 }
256
257
258 void DownloadArea::WriteBucket(const TransferBucket& bucket,
259 const void* data,
260 size_t size,
261 BucketCompression compression)
262 {
263 boost::mutex::scoped_lock lock(mutex_);
264
265 switch (compression)
266 {
267 case BucketCompression_None:
268 WriteUncompressedBucket(bucket, data, size);
269 break;
270
271 case BucketCompression_Gzip:
272 {
273 std::string uncompressed;
274 Orthanc::GzipCompressor compressor;
275 compressor.Uncompress(uncompressed, data, size);
276 WriteUncompressedBucket(bucket, uncompressed.c_str(), uncompressed.size());
277 break;
278 }
279
280 default:
281 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
282 }
283 }
284
285
286 void DownloadArea::WriteInstance(const std::string& instanceId,
287 const void* data,
288 size_t size)
289 {
290 std::string md5;
291 Orthanc::Toolbox::ComputeMD5(md5, data, size);
292
293 {
294 boost::mutex::scoped_lock lock(mutex_);
295
296 Instances::const_iterator it = instances_.find(instanceId);
297 if (it == instances_.end() ||
298 it->second == NULL ||
299 it->second->GetInfo().GetId() != instanceId ||
300 it->second->GetInfo().GetSize() != size ||
301 it->second->GetInfo().GetMD5() != md5)
302 {
303 throw Orthanc::OrthancException(Orthanc::ErrorCode_CorruptedFile);
304 }
305 else
306 {
307 it->second->WriteChunk(0, data, size);
308 }
309 }
310 }
311
312
313 void DownloadArea::CheckMD5()
314 {
315 LOG(INFO) << "Checking MD5 sum without committing (testing)";
316 CommitInternal(NULL, true);
317 }
318
319
320 void DownloadArea::Commit(OrthancPluginContext* context)
321 {
322 LOG(INFO) << "Importing transfered DICOM files from the temporary download area into Orthanc";
323 CommitInternal(context, false);
324 }
325 }