Mercurial > hg > orthanc-transfers
changeset 10:c9e28e31262e
new option: MaxHttpRetries
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 04 Mar 2019 15:26:49 +0100 |
parents | 7e207ade2f1a |
children | 4b9c2e0a92c8 |
files | Framework/HttpQueries/DetectTransferPlugin.cpp Framework/HttpQueries/HttpQueriesQueue.cpp Framework/PullMode/PullJob.cpp Framework/PullMode/PullJob.h Framework/PushMode/PushJob.cpp Framework/PushMode/PushJob.h Framework/TransferToolbox.cpp Framework/TransferToolbox.h Plugin/Plugin.cpp Plugin/PluginContext.cpp Plugin/PluginContext.h |
diffstat | 11 files changed, 181 insertions(+), 34 deletions(-) [+] |
line wrap: on
line diff
--- a/Framework/HttpQueries/DetectTransferPlugin.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/HttpQueries/DetectTransferPlugin.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -51,19 +51,32 @@ Json::Reader reader; Json::Value value; + bool enabled = false; + if (reader.parse(reinterpret_cast<const char*>(answer), reinterpret_cast<const char*>(answer) + size, value) && value.type() == Json::arrayValue) { + // Loop over the plugins that are enabled on the remote peer for (Json::Value::ArrayIndex i = 0; i < value.size(); i++) { if (value[i].type() == Json::stringValue && value[i].asString() == PLUGIN_NAME) { result_[peer_] = true; + enabled = true; } } } + + if (enabled) + { + LOG(INFO) << "Peer \"" << peer_ << "\" has the transfers accelerator plugin enabled"; + } + else + { + LOG(WARNING) << "Peer \"" << peer_ << "\" does *not* have the transfers accelerator plugin enabled"; + } }
--- a/Framework/HttpQueries/HttpQueriesQueue.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/HttpQueries/HttpQueriesQueue.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -214,7 +214,7 @@ // Error: Let's retry retry ++; - if (retry < maxRetries) + if (retry <= maxRetries) { // Wait 1 second before retrying boost::this_thread::sleep(boost::posix_time::seconds(1));
--- a/Framework/PullMode/PullJob.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/PullMode/PullJob.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -101,7 +101,8 @@ scheduler.ComputePullBuckets(buckets, job.targetBucketSize_, 2 * job.targetBucketSize_, baseUrl, job.query_.GetCompression()); area_.reset(new DownloadArea(scheduler)); - + + queue_.SetMaxRetries(job.maxHttpRetries_); queue_.Reserve(buckets.size()); for (size_t i = 0; i < buckets.size(); i++) @@ -171,7 +172,7 @@ const std::string lookup = writer.write(job_.query_.GetResources()); Json::Value answer; - if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_LOOKUP, lookup)) + if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_LOOKUP, lookup, job_.maxHttpRetries_)) { LOG(ERROR) << "Cannot retrieve the list of instances to pull from peer \"" << job_.query_.GetPeer() @@ -230,11 +231,13 @@ PullJob::PullJob(const TransferQuery& query, size_t threadsCount, - size_t targetBucketSize) : + size_t targetBucketSize, + unsigned int maxHttpRetries) : StatefulOrthancJob(JOB_TYPE_PULL), query_(query), threadsCount_(threadsCount), - targetBucketSize_(targetBucketSize) + targetBucketSize_(targetBucketSize), + maxHttpRetries_(maxHttpRetries) { if (!peers_.LookupName(peerIndex_, query_.GetPeer())) {
--- a/Framework/PullMode/PullJob.h Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/PullMode/PullJob.h Mon Mar 04 15:26:49 2019 +0100 @@ -37,12 +37,14 @@ size_t targetBucketSize_; OrthancPeers peers_; size_t peerIndex_; + unsigned int maxHttpRetries_; virtual StateUpdate* CreateInitialState(JobInfo& info); public: PullJob(const TransferQuery& query, size_t threadsCount, - size_t targetBucketSize); + size_t targetBucketSize, + unsigned int maxHttpRetries); }; }
--- a/Framework/PushMode/PushJob.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/PushMode/PushJob.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -52,19 +52,19 @@ virtual StateUpdate* Step() { - std::string uri = transactionUri_; - + Json::Value answer; + bool success = false; + if (isCommit_) { - uri += "/commit"; + success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_); } else { - uri += "/discard"; + success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_); } - Json::Value answer; - if (!job_.peers_.DoPost(answer, job_.peerIndex_, uri, "")) + if (!success) { if (isCommit_) { @@ -130,6 +130,7 @@ info_(info), transactionUri_(transactionUri) { + queue_.SetMaxRetries(job.maxHttpRetries_); queue_.Reserve(buckets.size()); for (size_t i = 0; i < buckets.size(); i++) @@ -212,7 +213,7 @@ virtual StateUpdate* Step() { Json::Value answer; - if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_PUSH, createTransaction_)) + if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_)) { LOG(ERROR) << "Cannot create a push transaction to peer \"" << job_.query_.GetPeer() @@ -248,12 +249,14 @@ PushJob::PushJob(const TransferQuery& query, OrthancInstancesCache& cache, size_t threadsCount, - size_t targetBucketSize) : + size_t targetBucketSize, + unsigned int maxHttpRetries) : StatefulOrthancJob(JOB_TYPE_PUSH), cache_(cache), query_(query), threadsCount_(threadsCount), - targetBucketSize_(targetBucketSize) + targetBucketSize_(targetBucketSize), + maxHttpRetries_(maxHttpRetries) { if (!peers_.LookupName(peerIndex_, query_.GetPeer())) {
--- a/Framework/PushMode/PushJob.h Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/PushMode/PushJob.h Mon Mar 04 15:26:49 2019 +0100 @@ -38,13 +38,15 @@ size_t targetBucketSize_; OrthancPeers peers_; size_t peerIndex_; - + unsigned int maxHttpRetries_; + virtual StateUpdate* CreateInitialState(JobInfo& info); public: PushJob(const TransferQuery& query, OrthancInstancesCache& cache, size_t threadsCount, - size_t targetBucketSize); + size_t targetBucketSize, + unsigned int maxHttpRetries); }; }
--- a/Framework/TransferToolbox.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/TransferToolbox.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -21,8 +21,10 @@ #include <Core/Logging.h> #include <Core/OrthancException.h> +#include <Plugins/Samples/Common/OrthancPluginCppWrapper.h> #include <boost/math/special_functions/round.hpp> +#include <boost/thread/thread.hpp> namespace OrthancPlugins @@ -73,4 +75,88 @@ throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); } } + + + bool DoPostPeer(Json::Value& answer, + const OrthancPeers& peers, + size_t peerIndex, + const std::string& uri, + const std::string& body, + unsigned int maxRetries) + { + unsigned int retry = 0; + + for (;;) + { + try + { + if (peers.DoPost(answer, peerIndex, uri, body)) + { + return true; + } + } + catch (Orthanc::OrthancException&) + { + } + + if (retry >= maxRetries) + { + return false; + } + else + { + // Wait 1 second before retrying + boost::this_thread::sleep(boost::posix_time::seconds(1)); + retry++; + } + } + } + + + bool DoPostPeer(Json::Value& answer, + const OrthancPeers& peers, + const std::string& peerName, + const std::string& uri, + const std::string& body, + unsigned int maxRetries) + { + size_t index; + + return (peers.LookupName(index, peerName) && + DoPostPeer(answer, peers, index, uri, body, maxRetries)); + } + + + bool DoDeletePeer(const OrthancPeers& peers, + size_t peerIndex, + const std::string& uri, + unsigned int maxRetries) + { + unsigned int retry = 0; + + for (;;) + { + try + { + if (peers.DoDelete(peerIndex, uri)) + { + return true; + } + } + catch (Orthanc::OrthancException&) + { + } + + if (retry >= maxRetries) + { + return false; + } + else + { + // Wait 1 second before retrying + boost::this_thread::sleep(boost::posix_time::seconds(1)); + retry++; + } + } + } }
--- a/Framework/TransferToolbox.h Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/TransferToolbox.h Mon Mar 04 15:26:49 2019 +0100 @@ -21,6 +21,7 @@ #include <stdint.h> #include <string> +#include <json/value.h> static const unsigned int KB = 1024; static const unsigned int MB = 1024 * 1024; @@ -59,6 +60,8 @@ namespace OrthancPlugins { + class OrthancPeers; + enum BucketCompression { BucketCompression_None, @@ -72,4 +75,23 @@ BucketCompression StringToBucketCompression(const std::string& value); const char* EnumerationToString(BucketCompression compression); + + bool DoPostPeer(Json::Value& answer, + const OrthancPeers& peers, + size_t peerIndex, + const std::string& uri, + const std::string& body, + unsigned int maxRetries); + + bool DoPostPeer(Json::Value& answer, + const OrthancPeers& peers, + const std::string& peerName, + const std::string& uri, + const std::string& body, + unsigned int maxRetries); + + bool DoDeletePeer(const OrthancPeers& peers, + size_t peerIndex, + const std::string& uri, + unsigned int maxRetries); }
--- a/Plugin/Plugin.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Plugin/Plugin.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -274,8 +274,9 @@ OrthancPlugins::TransferQuery query(body); - SubmitJob(output, new OrthancPlugins::PullJob( - query, context.GetThreadsCount(), context.GetTargetBucketSize()), + SubmitJob(output, new OrthancPlugins::PullJob(query, context.GetThreadsCount(), + context.GetTargetBucketSize(), + context.GetMaxHttpRetries()), query.GetPriority()); } @@ -452,7 +453,7 @@ std::string s = writer.write(lookup); Json::Value answer; - if (peers.DoPost(answer, query.GetPeer(), URI_PULL, s) && + if (DoPostPeer(answer, peers, query.GetPeer(), URI_PULL, s, context.GetMaxHttpRetries()) && answer.type() == Json::objectValue && answer.isMember(KEY_ID) && answer.isMember(KEY_PATH) && @@ -479,7 +480,9 @@ else { SubmitJob(output, new OrthancPlugins::PushJob(query, context.GetCache(), - context.GetThreadsCount(), context.GetTargetBucketSize()), + context.GetThreadsCount(), + context.GetTargetBucketSize(), + context.GetMaxHttpRetries()), query.GetPriority()); } } @@ -520,14 +523,16 @@ { job.reset(new OrthancPlugins::PullJob(query, context.GetThreadsCount(), - context.GetTargetBucketSize())); + context.GetTargetBucketSize(), + context.GetMaxHttpRetries())); } else if (type == JOB_TYPE_PUSH) { job.reset(new OrthancPlugins::PushJob(query, context.GetCache(), context.GetThreadsCount(), - context.GetTargetBucketSize())); + context.GetTargetBucketSize(), + context.GetMaxHttpRetries())); } if (job.get() == NULL) @@ -637,6 +642,7 @@ size_t targetBucketSize = 4096; // In KB size_t maxPushTransactions = 4; size_t memoryCacheSize = 512; // In MB + unsigned int maxHttpRetries = 0; std::map<std::string, std::string> bidirectionalPeers; { @@ -650,13 +656,13 @@ threadsCount = plugin.GetUnsignedIntegerValue("Threads", threadsCount); targetBucketSize = plugin.GetUnsignedIntegerValue("BucketSize", targetBucketSize); memoryCacheSize = plugin.GetUnsignedIntegerValue("CacheSize", memoryCacheSize); - maxPushTransactions = plugin.GetUnsignedIntegerValue - ("MaxPushTransactions", maxPushTransactions); + maxPushTransactions = plugin.GetUnsignedIntegerValue("MaxPushTransactions", maxPushTransactions); + maxHttpRetries = plugin.GetUnsignedIntegerValue("MaxHttpRetries", maxHttpRetries); } } - OrthancPlugins::PluginContext::Initialize - (threadsCount, targetBucketSize * KB, maxPushTransactions, memoryCacheSize * MB); + OrthancPlugins::PluginContext::Initialize(threadsCount, targetBucketSize * KB, maxPushTransactions, + memoryCacheSize * MB, maxHttpRetries); OrthancPlugins::RegisterRestCallback<ServeChunks> (std::string(URI_CHUNKS) + "/([.0-9a-f-]+)", true);
--- a/Plugin/PluginContext.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Plugin/PluginContext.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -27,11 +27,13 @@ PluginContext::PluginContext(size_t threadsCount, size_t targetBucketSize, size_t maxPushTransactions, - size_t memoryCacheSize) : + size_t memoryCacheSize, + unsigned int maxHttpRetries) : pushTransactions_(maxPushTransactions), semaphore_(threadsCount), threadsCount_(threadsCount), - targetBucketSize_(targetBucketSize) + targetBucketSize_(targetBucketSize), + maxHttpRetries_(maxHttpRetries) { pluginUuid_ = Orthanc::Toolbox::GenerateUuid(); @@ -58,10 +60,11 @@ void PluginContext::Initialize(size_t threadsCount, size_t targetBucketSize, size_t maxPushTransactions, - size_t memoryCacheSize) + size_t memoryCacheSize, + unsigned int maxHttpRetries) { GetSingleton().reset(new PluginContext(threadsCount, targetBucketSize, - maxPushTransactions, memoryCacheSize)); + maxPushTransactions, memoryCacheSize, maxHttpRetries)); }
--- a/Plugin/PluginContext.h Mon Dec 24 13:45:31 2018 +0100 +++ b/Plugin/PluginContext.h Mon Mar 04 15:26:49 2019 +0100 @@ -40,12 +40,13 @@ // Configuration size_t threadsCount_; size_t targetBucketSize_; - + unsigned int maxHttpRetries_; PluginContext(size_t threadsCount, size_t targetBucketSize, size_t maxPushTransactions, - size_t memoryCacheSize); + size_t memoryCacheSize, + unsigned int maxHttpRetries); static std::auto_ptr<PluginContext>& GetSingleton(); @@ -80,10 +81,16 @@ return targetBucketSize_; } + unsigned int GetMaxHttpRetries() const + { + return maxHttpRetries_; + } + static void Initialize(size_t threadsCount, size_t targetBucketSize, size_t maxPushTransactions, - size_t memoryCacheSize); + size_t memoryCacheSize, + unsigned int maxHttpRetries); static PluginContext& GetInstance();