Mercurial > hg > orthanc-transfers
view Framework/TransferScheduler.cpp @ 4:1ed03945c057
showing unavailable peers
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 17 Sep 2018 14:42:57 +0200 |
parents | 95226b754d9e |
children | 4c3437217518 |
line wrap: on
line source
/** * Transfers accelerator plugin for Orthanc * Copyright (C) 2018 Osimis, Belgium * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Affero General Public License * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. **/ #include "TransferScheduler.h" #include <Core/Logging.h> #include <Core/OrthancException.h> #include <Plugins/Samples/Common/OrthancPluginCppWrapper.h> namespace OrthancPlugins { void TransferScheduler::AddResource(OrthancInstancesCache& cache, Orthanc::ResourceType level, const std::string& id) { Json::Value resource; std::string base; switch (level) { case Orthanc::ResourceType_Patient: base = "patients"; break; case Orthanc::ResourceType_Study: base = "studies"; break; case Orthanc::ResourceType_Series: base = "series"; break; default: throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); } if (RestApiGet(resource, cache.GetContext(), "/" + base + "/" + id + "/instances", false)) { if (resource.type() != Json::arrayValue) { throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); } for (Json::Value::ArrayIndex i = 0; i < resource.size(); i++) { if (resource[i].type() != Json::objectValue || !resource[i].isMember(KEY_ID) || resource[i][KEY_ID].type() != Json::stringValue) { throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); } AddInstance(cache, resource[i][KEY_ID].asString()); } } else { std::string s = Orthanc::EnumerationToString(level); Orthanc::Toolbox::ToLowerCase(s); LOG(WARNING) << "Missing " << s << ": " << id; throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); } } void TransferScheduler::ComputeBucketsInternal(std::vector<TransferBucket>& target, size_t groupThreshold, size_t separateThreshold, const std::string& baseUrl, /* only needed in pull mode */ BucketCompression compression /* only needed in pull mode */) const { if (groupThreshold > separateThreshold || separateThreshold == 0) // (*) { throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); } target.clear(); std::list<std::string> toGroup_; for (Instances::const_iterator it = instances_.begin(); it != instances_.end(); ++it) { size_t size = it->second.GetSize(); if (size < groupThreshold) { toGroup_.push_back(it->first); } else if (size < separateThreshold) { // Send the whole instance as it is TransferBucket bucket; bucket.AddChunk(it->second, 0, size); target.push_back(bucket); } else { // Divide this large instance as a set of chunks size_t chunksCount; if (size % separateThreshold == 0) { chunksCount = size / separateThreshold; } else { chunksCount = size / separateThreshold + 1; } assert(chunksCount != 0); // This follows from (*) size_t chunkSize = size / chunksCount; size_t offset = 0; for (size_t i = 0; i < chunksCount; i++, offset += chunkSize) { TransferBucket bucket; if (i == chunksCount - 1) { // The last chunk must contain all the remaining bytes // of the instance (correction of rounding effects) bucket.AddChunk(it->second, offset, size - offset); } else { bucket.AddChunk(it->second, offset, chunkSize); } target.push_back(bucket); } } } // Grouping the remaining small instances, preventing the // download URL from getting too long: "If you keep URLs under // 2000 characters, they'll work in virtually any combination of // client and server software." // https://stackoverflow.com/a/417184/881731 static const size_t MAX_URL_LENGTH = 2000 - 44 /* size of an Orthanc identifier (SHA-1) */; TransferBucket bucket; for (std::list<std::string>::const_iterator it = toGroup_.begin(); it != toGroup_.end(); ++it) { Instances::const_iterator instance = instances_.find(*it); assert(instance != instances_.end()); bucket.AddChunk(instance->second, 0, instance->second.GetSize()); bool full = (bucket.GetTotalSize() >= groupThreshold); if (!full && !baseUrl.empty()) { std::string uri; bucket.ComputePullUri(uri, compression); std::string url = baseUrl + uri; full = (url.length() >= MAX_URL_LENGTH); } if (full) { target.push_back(bucket); bucket.Clear(); } } if (bucket.GetChunksCount() > 0) { target.push_back(bucket); } } void TransferScheduler::AddInstance(OrthancInstancesCache& cache, const std::string& instanceId) { size_t size; std::string md5; cache.GetInstanceInfo(size, md5, instanceId); AddInstance(DicomInstanceInfo(instanceId, size, md5)); } void TransferScheduler::AddInstance(const DicomInstanceInfo& info) { instances_[info.GetId()] = info; } void TransferScheduler::ParseListOfResources(OrthancInstancesCache& cache, const Json::Value& resources) { if (resources.type() != Json::arrayValue) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat); } for (Json::Value::ArrayIndex i = 0; i < resources.size(); i++) { if (resources[i].type() != Json::objectValue || !resources[i].isMember(KEY_LEVEL) || !resources[i].isMember(KEY_ID) || resources[i][KEY_LEVEL].type() != Json::stringValue || resources[i][KEY_ID].type() != Json::stringValue) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat); } else { Orthanc::ResourceType level = Orthanc::StringToResourceType(resources[i][KEY_LEVEL].asCString()); switch (level) { case Orthanc::ResourceType_Patient: AddPatient(cache, resources[i][KEY_ID].asString()); break; case Orthanc::ResourceType_Study: AddStudy(cache, resources[i][KEY_ID].asString()); break; case Orthanc::ResourceType_Series: AddSeries(cache, resources[i][KEY_ID].asString()); break; case Orthanc::ResourceType_Instance: AddInstance(cache, resources[i][KEY_ID].asString()); break; default: throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); } } } } void TransferScheduler::ListInstances(std::vector<DicomInstanceInfo>& target) const { target.clear(); target.reserve(instances_.size()); for (Instances::const_iterator it = instances_.begin(); it != instances_.end(); ++it) { assert(it->first == it->second.GetId()); target.push_back(it->second); } } size_t TransferScheduler::GetTotalSize() const { size_t size = 0; for (Instances::const_iterator it = instances_.begin(); it != instances_.end(); ++it) { size += it->second.GetSize(); } return size; } void TransferScheduler::ComputePullBuckets(std::vector<TransferBucket>& target, size_t groupThreshold, size_t separateThreshold, const std::string& baseUrl, BucketCompression compression) const { ComputeBucketsInternal(target, groupThreshold, separateThreshold, baseUrl, compression); } void TransferScheduler::FormatPushTransaction(Json::Value& target, std::vector<TransferBucket>& buckets, size_t groupThreshold, size_t separateThreshold, BucketCompression compression) const { ComputeBucketsInternal(buckets, groupThreshold, separateThreshold, "", BucketCompression_None); target = Json::objectValue; Json::Value tmp = Json::arrayValue; for (Instances::const_iterator it = instances_.begin(); it != instances_.end(); ++it) { Json::Value item; it->second.Serialize(item); tmp.append(item); } target[KEY_INSTANCES] = tmp; tmp = Json::arrayValue; for (size_t i = 0; i < buckets.size(); i++) { Json::Value item; buckets[i].Serialize(item); tmp.append(item); } target[KEY_BUCKETS] = tmp; switch (compression) { case BucketCompression_Gzip: target[KEY_COMPRESSION] = "gzip"; break; case BucketCompression_None: target[KEY_COMPRESSION] = "none"; break; default: throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); } } }