Mercurial > hg > orthanc-transfers
diff Framework/TransferScheduler.cpp @ 0:95226b754d9e
initial release
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 17 Sep 2018 11:34:55 +0200 |
parents | |
children | 4c3437217518 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Framework/TransferScheduler.cpp Mon Sep 17 11:34:55 2018 +0200 @@ -0,0 +1,347 @@ +/** + * Transfers accelerator plugin for Orthanc + * Copyright (C) 2018 Osimis, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "TransferScheduler.h" + +#include <Core/Logging.h> +#include <Core/OrthancException.h> +#include <Plugins/Samples/Common/OrthancPluginCppWrapper.h> + + +namespace OrthancPlugins +{ + void TransferScheduler::AddResource(OrthancInstancesCache& cache, + Orthanc::ResourceType level, + const std::string& id) + { + Json::Value resource; + + std::string base; + switch (level) + { + case Orthanc::ResourceType_Patient: + base = "patients"; + break; + + case Orthanc::ResourceType_Study: + base = "studies"; + break; + + case Orthanc::ResourceType_Series: + base = "series"; + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + + if (RestApiGet(resource, cache.GetContext(), "/" + base + "/" + id + "/instances", false)) + { + if (resource.type() != Json::arrayValue) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } + + for (Json::Value::ArrayIndex i = 0; i < resource.size(); i++) + { + if (resource[i].type() != Json::objectValue || + !resource[i].isMember(KEY_ID) || + resource[i][KEY_ID].type() != Json::stringValue) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } + + AddInstance(cache, resource[i][KEY_ID].asString()); + } + } + else + { + std::string s = Orthanc::EnumerationToString(level); + Orthanc::Toolbox::ToLowerCase(s); + LOG(WARNING) << "Missing " << s << ": " << id; + throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); + } + } + + + void TransferScheduler::ComputeBucketsInternal(std::vector<TransferBucket>& target, + size_t groupThreshold, + size_t separateThreshold, + const std::string& baseUrl, /* only needed in pull mode */ + BucketCompression compression /* only needed in pull mode */) const + { + if (groupThreshold > separateThreshold || + separateThreshold == 0) // (*) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + + target.clear(); + + std::list<std::string> toGroup_; + + for (Instances::const_iterator it = instances_.begin(); + it != instances_.end(); ++it) + { + size_t size = it->second.GetSize(); + + if (size < groupThreshold) + { + toGroup_.push_back(it->first); + } + else if (size < separateThreshold) + { + // Send the whole instance as it is + TransferBucket bucket; + bucket.AddChunk(it->second, 0, size); + target.push_back(bucket); + } + else + { + // Divide this large instance as a set of chunks + size_t chunksCount; + + if (size % separateThreshold == 0) + { + chunksCount = size / separateThreshold; + } + else + { + chunksCount = size / separateThreshold + 1; + } + + assert(chunksCount != 0); // This follows from (*) + + size_t chunkSize = size / chunksCount; + size_t offset = 0; + + for (size_t i = 0; i < chunksCount; i++, offset += chunkSize) + { + TransferBucket bucket; + + if (i == chunksCount - 1) + { + // The last chunk must contain all the remaining bytes + // of the instance (correction of rounding effects) + bucket.AddChunk(it->second, offset, size - offset); + } + else + { + bucket.AddChunk(it->second, offset, chunkSize); + } + + target.push_back(bucket); + } + } + } + + // Grouping the remaining small instances, preventing the + // download URL from getting too long: "If you keep URLs under + // 2000 characters, they'll work in virtually any combination of + // client and server software." + // https://stackoverflow.com/a/417184/881731 + + static const size_t MAX_URL_LENGTH = 2000 - 44 /* size of an Orthanc identifier (SHA-1) */; + + TransferBucket bucket; + + for (std::list<std::string>::const_iterator it = toGroup_.begin(); + it != toGroup_.end(); ++it) + { + Instances::const_iterator instance = instances_.find(*it); + assert(instance != instances_.end()); + + bucket.AddChunk(instance->second, 0, instance->second.GetSize()); + + bool full = (bucket.GetTotalSize() >= groupThreshold); + + if (!full && !baseUrl.empty()) + { + std::string uri; + bucket.ComputePullUri(uri, compression); + + std::string url = baseUrl + uri; + full = (url.length() >= MAX_URL_LENGTH); + } + + if (full) + { + target.push_back(bucket); + bucket.Clear(); + } + } + + if (bucket.GetChunksCount() > 0) + { + target.push_back(bucket); + } + } + + + void TransferScheduler::AddInstance(OrthancInstancesCache& cache, + const std::string& instanceId) + { + size_t size; + std::string md5; + cache.GetInstanceInfo(size, md5, instanceId); + + AddInstance(DicomInstanceInfo(instanceId, size, md5)); + } + + + void TransferScheduler::AddInstance(const DicomInstanceInfo& info) + { + instances_[info.GetId()] = info; + } + + + void TransferScheduler::ParseListOfResources(OrthancInstancesCache& cache, + const Json::Value& resources) + { + if (resources.type() != Json::arrayValue) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat); + } + + for (Json::Value::ArrayIndex i = 0; i < resources.size(); i++) + { + if (resources[i].type() != Json::objectValue || + !resources[i].isMember(KEY_LEVEL) || + !resources[i].isMember(KEY_ID) || + resources[i][KEY_LEVEL].type() != Json::stringValue || + resources[i][KEY_ID].type() != Json::stringValue) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat); + } + else + { + Orthanc::ResourceType level = Orthanc::StringToResourceType(resources[i][KEY_LEVEL].asCString()); + + switch (level) + { + case Orthanc::ResourceType_Patient: + AddPatient(cache, resources[i][KEY_ID].asString()); + break; + + case Orthanc::ResourceType_Study: + AddStudy(cache, resources[i][KEY_ID].asString()); + break; + + case Orthanc::ResourceType_Series: + AddSeries(cache, resources[i][KEY_ID].asString()); + break; + + case Orthanc::ResourceType_Instance: + AddInstance(cache, resources[i][KEY_ID].asString()); + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + } + } + } + + + void TransferScheduler::ListInstances(std::vector<DicomInstanceInfo>& target) const + { + target.clear(); + target.reserve(instances_.size()); + + for (Instances::const_iterator it = instances_.begin(); + it != instances_.end(); ++it) + { + assert(it->first == it->second.GetId()); + target.push_back(it->second); + } + } + + + size_t TransferScheduler::GetTotalSize() const + { + size_t size = 0; + + for (Instances::const_iterator it = instances_.begin(); + it != instances_.end(); ++it) + { + size += it->second.GetSize(); + } + + return size; + } + + + void TransferScheduler::ComputePullBuckets(std::vector<TransferBucket>& target, + size_t groupThreshold, + size_t separateThreshold, + const std::string& baseUrl, + BucketCompression compression) const + { + ComputeBucketsInternal(target, groupThreshold, separateThreshold, baseUrl, compression); + } + + + void TransferScheduler::FormatPushTransaction(Json::Value& target, + std::vector<TransferBucket>& buckets, + size_t groupThreshold, + size_t separateThreshold, + BucketCompression compression) const + { + ComputeBucketsInternal(buckets, groupThreshold, separateThreshold, "", BucketCompression_None); + + target = Json::objectValue; + + Json::Value tmp = Json::arrayValue; + + for (Instances::const_iterator it = instances_.begin(); + it != instances_.end(); ++it) + { + Json::Value item; + it->second.Serialize(item); + tmp.append(item); + } + + target[KEY_INSTANCES] = tmp; + + tmp = Json::arrayValue; + + for (size_t i = 0; i < buckets.size(); i++) + { + Json::Value item; + buckets[i].Serialize(item); + tmp.append(item); + } + + target[KEY_BUCKETS] = tmp; + + switch (compression) + { + case BucketCompression_Gzip: + target[KEY_COMPRESSION] = "gzip"; + break; + + case BucketCompression_None: + target[KEY_COMPRESSION] = "none"; + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + } +}