Mercurial > hg > orthanc-transfers
view Plugin/Plugin.cpp @ 9:7e207ade2f1a
preparing for 2019
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 24 Dec 2018 13:45:31 +0100 |
parents | 4c3437217518 |
children | c9e28e31262e |
line wrap: on
line source
/** * Transfers accelerator plugin for Orthanc * Copyright (C) 2018-2019 Osimis S.A., Belgium * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Affero General Public License * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. **/ #include "PluginContext.h" #include "../Framework/HttpQueries/DetectTransferPlugin.h" #include "../Framework/PullMode/PullJob.h" #include "../Framework/PushMode/PushJob.h" #include "../Framework/TransferScheduler.h" #include <EmbeddedResources.h> #include <Core/ChunkedBuffer.h> #include <Core/Compression/GzipCompressor.h> #include <Core/Logging.h> static bool DisplayPerformanceWarning() { (void) DisplayPerformanceWarning; // Disable warning about unused function LOG(WARNING) << "Performance warning in transfers accelerator: " << "Non-release build, runtime debug assertions are turned on"; return true; } static size_t ReadSizeArgument(const OrthancPluginHttpRequest* request, uint32_t index) { std::string value(request->getValues[index]); try { int tmp = boost::lexical_cast<int>(value); if (tmp >= 0) { return static_cast<size_t>(tmp); } } catch (boost::bad_lexical_cast&) { } LOG(ERROR) << "The \"" << request->getKeys[index] << "\" GET argument must be a positive integer: " << value; throw Orthanc::OrthancException(Orthanc::ErrorCode_BadParameterType); } void ServeChunks(OrthancPluginRestOutput* output, const char* url, const OrthancPluginHttpRequest* request) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); if (request->method != OrthancPluginHttpMethod_Get) { OrthancPluginSendMethodNotAllowed(OrthancPlugins::GetGlobalContext(), output, "GET"); return; } assert(request->groupsCount == 1); std::vector<std::string> instances; Orthanc::Toolbox::TokenizeString(instances, std::string(request->groups[0]), '.'); size_t offset = 0; size_t requestedSize = 0; OrthancPlugins::BucketCompression compression = OrthancPlugins::BucketCompression_None; for (uint32_t i = 0; i < request->getCount; i++) { std::string key(request->getKeys[i]); if (key == "offset") { offset = ReadSizeArgument(request, i); } else if (key == "size") { requestedSize = ReadSizeArgument(request, i); } else if (key == "compression") { compression = OrthancPlugins::StringToBucketCompression(request->getValues[i]); } else { LOG(INFO) << "Ignored GET argument: " << key; } } // Limit the number of clients Orthanc::Semaphore::Locker lock(context.GetSemaphore()); Orthanc::ChunkedBuffer buffer; for (size_t i = 0; i < instances.size() && (requestedSize == 0 || buffer.GetNumBytes() < requestedSize); i++) { size_t instanceSize; std::string md5; // Ignored context.GetCache().GetInstanceInfo(instanceSize, md5, instances[i]); if (offset >= instanceSize) { offset -= instanceSize; } else { size_t toRead; if (requestedSize == 0) { toRead = instanceSize - offset; } else { toRead = requestedSize - buffer.GetNumBytes(); if (toRead > instanceSize - offset) { toRead = instanceSize - offset; } } std::string chunk; std::string md5; // Ignored context.GetCache().GetChunk(chunk, md5, instances[i], offset, toRead); buffer.AddChunk(chunk); offset = 0; assert(requestedSize == 0 || buffer.GetNumBytes() <= requestedSize); } } std::string chunk; buffer.Flatten(chunk); switch (compression) { case OrthancPlugins::BucketCompression_None: { OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, chunk.c_str(), chunk.size(), "application/octet-stream"); break; } case OrthancPlugins::BucketCompression_Gzip: { std::string compressed; Orthanc::GzipCompressor gzip; //gzip.SetCompressionLevel(9); Orthanc::IBufferCompressor::Compress(compressed, gzip, chunk); OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, compressed.c_str(), compressed.size(), "application/gzip"); break; } default: throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); } } static bool ParsePostBody(Json::Value& body, OrthancPluginRestOutput* output, const OrthancPluginHttpRequest* request) { Json::Reader reader; if (request->method != OrthancPluginHttpMethod_Post) { OrthancPluginSendMethodNotAllowed(OrthancPlugins::GetGlobalContext(), output, "POST"); return false; } else if (reader.parse(request->body, request->body + request->bodySize, body)) { return true; } else { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat); } } void LookupInstances(OrthancPluginRestOutput* output, const char* url, const OrthancPluginHttpRequest* request) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); Json::Value resources; if (!ParsePostBody(resources, output, request)) { return; } OrthancPlugins::TransferScheduler scheduler; scheduler.ParseListOfResources(context.GetCache(), resources); Json::Value answer = Json::objectValue; answer[KEY_INSTANCES] = Json::arrayValue; answer[KEY_ORIGINATOR_UUID] = context.GetPluginUuid(); answer["CountInstances"] = static_cast<uint32_t>(scheduler.GetInstancesCount()); answer["TotalSize"] = boost::lexical_cast<std::string>(scheduler.GetTotalSize()); answer["TotalSizeMB"] = OrthancPlugins::ConvertToMegabytes(scheduler.GetTotalSize()); std::vector<OrthancPlugins::DicomInstanceInfo> instances; scheduler.ListInstances(instances); for (size_t i = 0; i < instances.size(); i++) { Json::Value instance; instances[i].Serialize(instance); answer[KEY_INSTANCES].append(instance); } Json::FastWriter writer; std::string s = writer.write(answer); OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, s.c_str(), s.size(), "application/json"); } static void SubmitJob(OrthancPluginRestOutput* output, OrthancPlugins::OrthancJob* job, int priority) { std::string id = OrthancPlugins::OrthancJob::Submit(job, priority); Json::Value result = Json::objectValue; result[KEY_ID] = id; result[KEY_PATH] = std::string(URI_JOBS) + "/" + id; std::string s = result.toStyledString(); OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, s.c_str(), s.size(), "application/json"); } void SchedulePull(OrthancPluginRestOutput* output, const char* url, const OrthancPluginHttpRequest* request) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); Json::Value body; if (!ParsePostBody(body, output, request)) { return; } OrthancPlugins::TransferQuery query(body); SubmitJob(output, new OrthancPlugins::PullJob( query, context.GetThreadsCount(), context.GetTargetBucketSize()), query.GetPriority()); } void CreatePush(OrthancPluginRestOutput* output, const char* url, const OrthancPluginHttpRequest* request) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); Json::Value query; if (!ParsePostBody(query, output, request)) { return; } if (query.type() != Json::objectValue || !query.isMember(KEY_BUCKETS) || !query.isMember(KEY_COMPRESSION) || !query.isMember(KEY_INSTANCES) || query[KEY_BUCKETS].type() != Json::arrayValue || query[KEY_COMPRESSION].type() != Json::stringValue || query[KEY_INSTANCES].type() != Json::arrayValue) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat); } std::vector<OrthancPlugins::DicomInstanceInfo> instances; instances.reserve(query[KEY_INSTANCES].size()); for (Json::Value::ArrayIndex i = 0; i < query[KEY_INSTANCES].size(); i++) { OrthancPlugins::DicomInstanceInfo instance(query[KEY_INSTANCES][i]); instances.push_back(instance); } std::vector<OrthancPlugins::TransferBucket> buckets; buckets.reserve(query[KEY_BUCKETS].size()); for (Json::Value::ArrayIndex i = 0; i < query[KEY_BUCKETS].size(); i++) { OrthancPlugins::TransferBucket bucket(query[KEY_BUCKETS][i]); buckets.push_back(bucket); } OrthancPlugins::BucketCompression compression = OrthancPlugins::StringToBucketCompression(query[KEY_COMPRESSION].asString()); std::string id = context.GetActivePushTransactions().CreateTransaction (instances, buckets, compression); Json::Value result = Json::objectValue; result[KEY_ID] = id; result[KEY_PATH] = std::string(URI_PUSH) + "/" + id; std::string s = result.toStyledString(); OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, s.c_str(), s.size(), "application/json"); } void StorePush(OrthancPluginRestOutput* output, const char* url, const OrthancPluginHttpRequest* request) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); if (request->method != OrthancPluginHttpMethod_Put) { OrthancPluginSendMethodNotAllowed(OrthancPlugins::GetGlobalContext(), output, "PUT"); return; } assert(request->groupsCount == 2); std::string transaction(request->groups[0]); std::string chunk(request->groups[1]); size_t chunkIndex; try { chunkIndex = boost::lexical_cast<size_t>(chunk); } catch (boost::bad_lexical_cast&) { throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); } context.GetActivePushTransactions().Store (transaction, chunkIndex, request->body, request->bodySize); std::string s = "{}"; OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, s.c_str(), s.size(), "application/json"); } void CommitPush(OrthancPluginRestOutput* output, const char* url, const OrthancPluginHttpRequest* request) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); if (request->method != OrthancPluginHttpMethod_Post) { OrthancPluginSendMethodNotAllowed(OrthancPlugins::GetGlobalContext(), output, "POST"); return; } assert(request->groupsCount == 1); std::string transaction(request->groups[0]); context.GetActivePushTransactions().Commit(transaction); std::string s = "{}"; OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, s.c_str(), s.size(), "application/json"); } void DiscardPush(OrthancPluginRestOutput* output, const char* url, const OrthancPluginHttpRequest* request) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); if (request->method != OrthancPluginHttpMethod_Delete) { OrthancPluginSendMethodNotAllowed(OrthancPlugins::GetGlobalContext(), output, "DELETE"); return; } assert(request->groupsCount == 1); std::string transaction(request->groups[0]); context. GetActivePushTransactions().Discard(transaction); std::string s = "{}"; OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, s.c_str(), s.size(), "application/json"); } void ScheduleSend(OrthancPluginRestOutput* output, const char* url, const OrthancPluginHttpRequest* request) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); Json::Value body; if (!ParsePostBody(body, output, request)) { return; } OrthancPlugins::TransferQuery query(body); OrthancPlugins::OrthancPeers peers; std::string remoteSelf; // For pull mode bool pullMode = peers.LookupUserProperty(remoteSelf, query.GetPeer(), KEY_REMOTE_SELF); LOG(INFO) << "Sending resources to peer \"" << query.GetPeer() << "\" using " << (pullMode ? "pull" : "push") << " mode"; if (pullMode) { Json::Value lookup = Json::objectValue; lookup[KEY_RESOURCES] = query.GetResources(); lookup[KEY_COMPRESSION] = OrthancPlugins::EnumerationToString(query.GetCompression()); lookup[KEY_ORIGINATOR_UUID] = context.GetPluginUuid(); lookup[KEY_PEER] = remoteSelf; Json::FastWriter writer; std::string s = writer.write(lookup); Json::Value answer; if (peers.DoPost(answer, query.GetPeer(), URI_PULL, s) && answer.type() == Json::objectValue && answer.isMember(KEY_ID) && answer.isMember(KEY_PATH) && answer[KEY_ID].type() == Json::stringValue && answer[KEY_PATH].type() == Json::stringValue) { const std::string url = peers.GetPeerUrl(query.GetPeer()); Json::Value result = Json::objectValue; result[KEY_PEER] = query.GetPeer(); result[KEY_REMOTE_JOB] = answer[KEY_ID].asString(); result[KEY_URL] = url + answer[KEY_PATH].asString(); std::string s = result.toStyledString(); OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, s.c_str(), s.size(), "application/json"); } else { LOG(ERROR) << "Cannot trigger send DICOM instances using pull mode to peer: " << query.GetPeer() << " (check out remote logs, and that transfer plugin is installed)"; throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol); } } else { SubmitJob(output, new OrthancPlugins::PushJob(query, context.GetCache(), context.GetThreadsCount(), context.GetTargetBucketSize()), query.GetPriority()); } } OrthancPluginJob* Unserializer(const char* jobType, const char* serialized) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); if (jobType == NULL || serialized == NULL) { return NULL; } std::string type(jobType); if (type != JOB_TYPE_PULL && type != JOB_TYPE_PUSH) { return NULL; } try { std::string tmp(serialized); Json::Value source; Json::Reader reader; if (reader.parse(tmp, source)) { OrthancPlugins::TransferQuery query(source); std::auto_ptr<OrthancPlugins::OrthancJob> job; if (type == JOB_TYPE_PULL) { job.reset(new OrthancPlugins::PullJob(query, context.GetThreadsCount(), context.GetTargetBucketSize())); } else if (type == JOB_TYPE_PUSH) { job.reset(new OrthancPlugins::PushJob(query, context.GetCache(), context.GetThreadsCount(), context.GetTargetBucketSize())); } if (job.get() == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); } else { return OrthancPlugins::OrthancJob::Create(job.release()); } } else { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat); } } catch (Orthanc::OrthancException& e) { LOG(ERROR) << "Error while unserializing a job from the transfers accelerator plugin: " << e.What(); return NULL; } catch (...) { LOG(ERROR) << "Error while unserializing a job from the transfers accelerator plugin"; return NULL; } } void ServePeers(OrthancPluginRestOutput* output, const char* url, const OrthancPluginHttpRequest* request) { OrthancPlugins::PluginContext& context = OrthancPlugins::PluginContext::GetInstance(); if (request->method != OrthancPluginHttpMethod_Get) { OrthancPluginSendMethodNotAllowed(OrthancPlugins::GetGlobalContext(), output, "GET"); return; } OrthancPlugins::DetectTransferPlugin::Result detection; OrthancPlugins::DetectTransferPlugin::Apply (detection, context.GetThreadsCount(), 2 /* timeout */); Json::Value result = Json::objectValue; OrthancPlugins::OrthancPeers peers; for (OrthancPlugins::DetectTransferPlugin::Result::const_iterator it = detection.begin(); it != detection.end(); ++it) { if (it->second) { std::string remoteSelf; if (peers.LookupUserProperty(remoteSelf, it->first, KEY_REMOTE_SELF)) { result[it->first] = "bidirectional"; } else { result[it->first] = "installed"; } } else { result[it->first] = "disabled"; } } std::string s = result.toStyledString(); OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, s.c_str(), s.size(), "application/json"); } extern "C" { ORTHANC_PLUGINS_API int32_t OrthancPluginInitialize(OrthancPluginContext* context) { Orthanc::Logging::Initialize(context); assert(DisplayPerformanceWarning()); OrthancPlugins::SetGlobalContext(context); /* Check the version of the Orthanc core */ if (OrthancPluginCheckVersion(context) == 0) { LOG(ERROR) << "Your version of Orthanc (" << context->orthancVersion << ") must be above " << ORTHANC_PLUGINS_MINIMAL_MAJOR_NUMBER << "." << ORTHANC_PLUGINS_MINIMAL_MINOR_NUMBER << "." << ORTHANC_PLUGINS_MINIMAL_REVISION_NUMBER << " to run this plugin"; return -1; } OrthancPluginSetDescription(context, "Accelerates transfers and provides " "storage commitment between Orthanc peers"); try { size_t threadsCount = 4; size_t targetBucketSize = 4096; // In KB size_t maxPushTransactions = 4; size_t memoryCacheSize = 512; // In MB std::map<std::string, std::string> bidirectionalPeers; { OrthancPlugins::OrthancConfiguration config; if (config.IsSection(KEY_PLUGIN_CONFIGURATION)) { OrthancPlugins::OrthancConfiguration plugin; config.GetSection(plugin, KEY_PLUGIN_CONFIGURATION); threadsCount = plugin.GetUnsignedIntegerValue("Threads", threadsCount); targetBucketSize = plugin.GetUnsignedIntegerValue("BucketSize", targetBucketSize); memoryCacheSize = plugin.GetUnsignedIntegerValue("CacheSize", memoryCacheSize); maxPushTransactions = plugin.GetUnsignedIntegerValue ("MaxPushTransactions", maxPushTransactions); } } OrthancPlugins::PluginContext::Initialize (threadsCount, targetBucketSize * KB, maxPushTransactions, memoryCacheSize * MB); OrthancPlugins::RegisterRestCallback<ServeChunks> (std::string(URI_CHUNKS) + "/([.0-9a-f-]+)", true); OrthancPlugins::RegisterRestCallback<LookupInstances> (URI_LOOKUP, true); OrthancPlugins::RegisterRestCallback<SchedulePull> (URI_PULL, true); OrthancPlugins::RegisterRestCallback<ScheduleSend> (URI_SEND, true); OrthancPlugins::RegisterRestCallback<ServePeers> (URI_PEERS, true); if (maxPushTransactions != 0) { // If no push transaction is allowed, their URIs are disabled OrthancPlugins::RegisterRestCallback<CreatePush> (URI_PUSH, true); OrthancPlugins::RegisterRestCallback<StorePush> (std::string(URI_PUSH) + "/([.0-9a-f-]+)/([0-9]+)", true); OrthancPlugins::RegisterRestCallback<CommitPush> (std::string(URI_PUSH) + "/([.0-9a-f-]+)/commit", true); OrthancPlugins::RegisterRestCallback<DiscardPush> (std::string(URI_PUSH) + "/([.0-9a-f-]+)", true); } OrthancPluginRegisterJobsUnserializer(context, Unserializer); /* Extend the default Orthanc Explorer with custom JavaScript */ std::string explorer; Orthanc::EmbeddedResources::GetFileResource (explorer, Orthanc::EmbeddedResources::ORTHANC_EXPLORER); OrthancPluginExtendOrthancExplorer(context, explorer.c_str()); } catch (Orthanc::OrthancException& e) { LOG(ERROR) << "Cannot initialize transfers accelerator plugin: " << e.What(); return -1; } return 0; } ORTHANC_PLUGINS_API void OrthancPluginFinalize() { LOG(WARNING) << "Transfers accelerator plugin is finalizing"; try { OrthancPlugins::PluginContext::Finalize(); } catch (Orthanc::OrthancException& e) { LOG(ERROR) << "Error while finalizing the transfers accelerator plugin: " << e.What(); } } ORTHANC_PLUGINS_API const char* OrthancPluginGetName() { return PLUGIN_NAME; } ORTHANC_PLUGINS_API const char* OrthancPluginGetVersion() { return ORTHANC_PLUGIN_VERSION; } }