# HG changeset patch # User Sebastien Jodogne # Date 1551709609 -3600 # Node ID c9e28e31262e9eeea0293dbb1e644f39c6a91fc8 # Parent 7e207ade2f1a06bae6771f44a0cfee4866b80b5b new option: MaxHttpRetries diff -r 7e207ade2f1a -r c9e28e31262e Framework/HttpQueries/DetectTransferPlugin.cpp --- a/Framework/HttpQueries/DetectTransferPlugin.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/HttpQueries/DetectTransferPlugin.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -51,19 +51,32 @@ Json::Reader reader; Json::Value value; + bool enabled = false; + if (reader.parse(reinterpret_cast(answer), reinterpret_cast(answer) + size, value) && value.type() == Json::arrayValue) { + // Loop over the plugins that are enabled on the remote peer for (Json::Value::ArrayIndex i = 0; i < value.size(); i++) { if (value[i].type() == Json::stringValue && value[i].asString() == PLUGIN_NAME) { result_[peer_] = true; + enabled = true; } } } + + if (enabled) + { + LOG(INFO) << "Peer \"" << peer_ << "\" has the transfers accelerator plugin enabled"; + } + else + { + LOG(WARNING) << "Peer \"" << peer_ << "\" does *not* have the transfers accelerator plugin enabled"; + } } diff -r 7e207ade2f1a -r c9e28e31262e Framework/HttpQueries/HttpQueriesQueue.cpp --- a/Framework/HttpQueries/HttpQueriesQueue.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/HttpQueries/HttpQueriesQueue.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -214,7 +214,7 @@ // Error: Let's retry retry ++; - if (retry < maxRetries) + if (retry <= maxRetries) { // Wait 1 second before retrying boost::this_thread::sleep(boost::posix_time::seconds(1)); diff -r 7e207ade2f1a -r c9e28e31262e Framework/PullMode/PullJob.cpp --- a/Framework/PullMode/PullJob.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/PullMode/PullJob.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -101,7 +101,8 @@ scheduler.ComputePullBuckets(buckets, job.targetBucketSize_, 2 * job.targetBucketSize_, baseUrl, job.query_.GetCompression()); area_.reset(new DownloadArea(scheduler)); - + + queue_.SetMaxRetries(job.maxHttpRetries_); queue_.Reserve(buckets.size()); for (size_t i = 0; i < buckets.size(); i++) @@ -171,7 +172,7 @@ const std::string lookup = writer.write(job_.query_.GetResources()); Json::Value answer; - if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_LOOKUP, lookup)) + if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_LOOKUP, lookup, job_.maxHttpRetries_)) { LOG(ERROR) << "Cannot retrieve the list of instances to pull from peer \"" << job_.query_.GetPeer() @@ -230,11 +231,13 @@ PullJob::PullJob(const TransferQuery& query, size_t threadsCount, - size_t targetBucketSize) : + size_t targetBucketSize, + unsigned int maxHttpRetries) : StatefulOrthancJob(JOB_TYPE_PULL), query_(query), threadsCount_(threadsCount), - targetBucketSize_(targetBucketSize) + targetBucketSize_(targetBucketSize), + maxHttpRetries_(maxHttpRetries) { if (!peers_.LookupName(peerIndex_, query_.GetPeer())) { diff -r 7e207ade2f1a -r c9e28e31262e Framework/PullMode/PullJob.h --- a/Framework/PullMode/PullJob.h Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/PullMode/PullJob.h Mon Mar 04 15:26:49 2019 +0100 @@ -37,12 +37,14 @@ size_t targetBucketSize_; OrthancPeers peers_; size_t peerIndex_; + unsigned int maxHttpRetries_; virtual StateUpdate* CreateInitialState(JobInfo& info); public: PullJob(const TransferQuery& query, size_t threadsCount, - size_t targetBucketSize); + size_t targetBucketSize, + unsigned int maxHttpRetries); }; } diff -r 7e207ade2f1a -r c9e28e31262e Framework/PushMode/PushJob.cpp --- a/Framework/PushMode/PushJob.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/PushMode/PushJob.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -52,19 +52,19 @@ virtual StateUpdate* Step() { - std::string uri = transactionUri_; - + Json::Value answer; + bool success = false; + if (isCommit_) { - uri += "/commit"; + success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_); } else { - uri += "/discard"; + success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_); } - Json::Value answer; - if (!job_.peers_.DoPost(answer, job_.peerIndex_, uri, "")) + if (!success) { if (isCommit_) { @@ -130,6 +130,7 @@ info_(info), transactionUri_(transactionUri) { + queue_.SetMaxRetries(job.maxHttpRetries_); queue_.Reserve(buckets.size()); for (size_t i = 0; i < buckets.size(); i++) @@ -212,7 +213,7 @@ virtual StateUpdate* Step() { Json::Value answer; - if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_PUSH, createTransaction_)) + if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_)) { LOG(ERROR) << "Cannot create a push transaction to peer \"" << job_.query_.GetPeer() @@ -248,12 +249,14 @@ PushJob::PushJob(const TransferQuery& query, OrthancInstancesCache& cache, size_t threadsCount, - size_t targetBucketSize) : + size_t targetBucketSize, + unsigned int maxHttpRetries) : StatefulOrthancJob(JOB_TYPE_PUSH), cache_(cache), query_(query), threadsCount_(threadsCount), - targetBucketSize_(targetBucketSize) + targetBucketSize_(targetBucketSize), + maxHttpRetries_(maxHttpRetries) { if (!peers_.LookupName(peerIndex_, query_.GetPeer())) { diff -r 7e207ade2f1a -r c9e28e31262e Framework/PushMode/PushJob.h --- a/Framework/PushMode/PushJob.h Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/PushMode/PushJob.h Mon Mar 04 15:26:49 2019 +0100 @@ -38,13 +38,15 @@ size_t targetBucketSize_; OrthancPeers peers_; size_t peerIndex_; - + unsigned int maxHttpRetries_; + virtual StateUpdate* CreateInitialState(JobInfo& info); public: PushJob(const TransferQuery& query, OrthancInstancesCache& cache, size_t threadsCount, - size_t targetBucketSize); + size_t targetBucketSize, + unsigned int maxHttpRetries); }; } diff -r 7e207ade2f1a -r c9e28e31262e Framework/TransferToolbox.cpp --- a/Framework/TransferToolbox.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/TransferToolbox.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -21,8 +21,10 @@ #include #include +#include #include +#include namespace OrthancPlugins @@ -73,4 +75,88 @@ throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); } } + + + bool DoPostPeer(Json::Value& answer, + const OrthancPeers& peers, + size_t peerIndex, + const std::string& uri, + const std::string& body, + unsigned int maxRetries) + { + unsigned int retry = 0; + + for (;;) + { + try + { + if (peers.DoPost(answer, peerIndex, uri, body)) + { + return true; + } + } + catch (Orthanc::OrthancException&) + { + } + + if (retry >= maxRetries) + { + return false; + } + else + { + // Wait 1 second before retrying + boost::this_thread::sleep(boost::posix_time::seconds(1)); + retry++; + } + } + } + + + bool DoPostPeer(Json::Value& answer, + const OrthancPeers& peers, + const std::string& peerName, + const std::string& uri, + const std::string& body, + unsigned int maxRetries) + { + size_t index; + + return (peers.LookupName(index, peerName) && + DoPostPeer(answer, peers, index, uri, body, maxRetries)); + } + + + bool DoDeletePeer(const OrthancPeers& peers, + size_t peerIndex, + const std::string& uri, + unsigned int maxRetries) + { + unsigned int retry = 0; + + for (;;) + { + try + { + if (peers.DoDelete(peerIndex, uri)) + { + return true; + } + } + catch (Orthanc::OrthancException&) + { + } + + if (retry >= maxRetries) + { + return false; + } + else + { + // Wait 1 second before retrying + boost::this_thread::sleep(boost::posix_time::seconds(1)); + retry++; + } + } + } } diff -r 7e207ade2f1a -r c9e28e31262e Framework/TransferToolbox.h --- a/Framework/TransferToolbox.h Mon Dec 24 13:45:31 2018 +0100 +++ b/Framework/TransferToolbox.h Mon Mar 04 15:26:49 2019 +0100 @@ -21,6 +21,7 @@ #include #include +#include static const unsigned int KB = 1024; static const unsigned int MB = 1024 * 1024; @@ -59,6 +60,8 @@ namespace OrthancPlugins { + class OrthancPeers; + enum BucketCompression { BucketCompression_None, @@ -72,4 +75,23 @@ BucketCompression StringToBucketCompression(const std::string& value); const char* EnumerationToString(BucketCompression compression); + + bool DoPostPeer(Json::Value& answer, + const OrthancPeers& peers, + size_t peerIndex, + const std::string& uri, + const std::string& body, + unsigned int maxRetries); + + bool DoPostPeer(Json::Value& answer, + const OrthancPeers& peers, + const std::string& peerName, + const std::string& uri, + const std::string& body, + unsigned int maxRetries); + + bool DoDeletePeer(const OrthancPeers& peers, + size_t peerIndex, + const std::string& uri, + unsigned int maxRetries); } diff -r 7e207ade2f1a -r c9e28e31262e Plugin/Plugin.cpp --- a/Plugin/Plugin.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Plugin/Plugin.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -274,8 +274,9 @@ OrthancPlugins::TransferQuery query(body); - SubmitJob(output, new OrthancPlugins::PullJob( - query, context.GetThreadsCount(), context.GetTargetBucketSize()), + SubmitJob(output, new OrthancPlugins::PullJob(query, context.GetThreadsCount(), + context.GetTargetBucketSize(), + context.GetMaxHttpRetries()), query.GetPriority()); } @@ -452,7 +453,7 @@ std::string s = writer.write(lookup); Json::Value answer; - if (peers.DoPost(answer, query.GetPeer(), URI_PULL, s) && + if (DoPostPeer(answer, peers, query.GetPeer(), URI_PULL, s, context.GetMaxHttpRetries()) && answer.type() == Json::objectValue && answer.isMember(KEY_ID) && answer.isMember(KEY_PATH) && @@ -479,7 +480,9 @@ else { SubmitJob(output, new OrthancPlugins::PushJob(query, context.GetCache(), - context.GetThreadsCount(), context.GetTargetBucketSize()), + context.GetThreadsCount(), + context.GetTargetBucketSize(), + context.GetMaxHttpRetries()), query.GetPriority()); } } @@ -520,14 +523,16 @@ { job.reset(new OrthancPlugins::PullJob(query, context.GetThreadsCount(), - context.GetTargetBucketSize())); + context.GetTargetBucketSize(), + context.GetMaxHttpRetries())); } else if (type == JOB_TYPE_PUSH) { job.reset(new OrthancPlugins::PushJob(query, context.GetCache(), context.GetThreadsCount(), - context.GetTargetBucketSize())); + context.GetTargetBucketSize(), + context.GetMaxHttpRetries())); } if (job.get() == NULL) @@ -637,6 +642,7 @@ size_t targetBucketSize = 4096; // In KB size_t maxPushTransactions = 4; size_t memoryCacheSize = 512; // In MB + unsigned int maxHttpRetries = 0; std::map bidirectionalPeers; { @@ -650,13 +656,13 @@ threadsCount = plugin.GetUnsignedIntegerValue("Threads", threadsCount); targetBucketSize = plugin.GetUnsignedIntegerValue("BucketSize", targetBucketSize); memoryCacheSize = plugin.GetUnsignedIntegerValue("CacheSize", memoryCacheSize); - maxPushTransactions = plugin.GetUnsignedIntegerValue - ("MaxPushTransactions", maxPushTransactions); + maxPushTransactions = plugin.GetUnsignedIntegerValue("MaxPushTransactions", maxPushTransactions); + maxHttpRetries = plugin.GetUnsignedIntegerValue("MaxHttpRetries", maxHttpRetries); } } - OrthancPlugins::PluginContext::Initialize - (threadsCount, targetBucketSize * KB, maxPushTransactions, memoryCacheSize * MB); + OrthancPlugins::PluginContext::Initialize(threadsCount, targetBucketSize * KB, maxPushTransactions, + memoryCacheSize * MB, maxHttpRetries); OrthancPlugins::RegisterRestCallback (std::string(URI_CHUNKS) + "/([.0-9a-f-]+)", true); diff -r 7e207ade2f1a -r c9e28e31262e Plugin/PluginContext.cpp --- a/Plugin/PluginContext.cpp Mon Dec 24 13:45:31 2018 +0100 +++ b/Plugin/PluginContext.cpp Mon Mar 04 15:26:49 2019 +0100 @@ -27,11 +27,13 @@ PluginContext::PluginContext(size_t threadsCount, size_t targetBucketSize, size_t maxPushTransactions, - size_t memoryCacheSize) : + size_t memoryCacheSize, + unsigned int maxHttpRetries) : pushTransactions_(maxPushTransactions), semaphore_(threadsCount), threadsCount_(threadsCount), - targetBucketSize_(targetBucketSize) + targetBucketSize_(targetBucketSize), + maxHttpRetries_(maxHttpRetries) { pluginUuid_ = Orthanc::Toolbox::GenerateUuid(); @@ -58,10 +60,11 @@ void PluginContext::Initialize(size_t threadsCount, size_t targetBucketSize, size_t maxPushTransactions, - size_t memoryCacheSize) + size_t memoryCacheSize, + unsigned int maxHttpRetries) { GetSingleton().reset(new PluginContext(threadsCount, targetBucketSize, - maxPushTransactions, memoryCacheSize)); + maxPushTransactions, memoryCacheSize, maxHttpRetries)); } diff -r 7e207ade2f1a -r c9e28e31262e Plugin/PluginContext.h --- a/Plugin/PluginContext.h Mon Dec 24 13:45:31 2018 +0100 +++ b/Plugin/PluginContext.h Mon Mar 04 15:26:49 2019 +0100 @@ -40,12 +40,13 @@ // Configuration size_t threadsCount_; size_t targetBucketSize_; - + unsigned int maxHttpRetries_; PluginContext(size_t threadsCount, size_t targetBucketSize, size_t maxPushTransactions, - size_t memoryCacheSize); + size_t memoryCacheSize, + unsigned int maxHttpRetries); static std::auto_ptr& GetSingleton(); @@ -80,10 +81,16 @@ return targetBucketSize_; } + unsigned int GetMaxHttpRetries() const + { + return maxHttpRetries_; + } + static void Initialize(size_t threadsCount, size_t targetBucketSize, size_t maxPushTransactions, - size_t memoryCacheSize); + size_t memoryCacheSize, + unsigned int maxHttpRetries); static PluginContext& GetInstance();