changeset 10:c9e28e31262e

new option: MaxHttpRetries
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 04 Mar 2019 15:26:49 +0100
parents 7e207ade2f1a
children 4b9c2e0a92c8
files Framework/HttpQueries/DetectTransferPlugin.cpp Framework/HttpQueries/HttpQueriesQueue.cpp Framework/PullMode/PullJob.cpp Framework/PullMode/PullJob.h Framework/PushMode/PushJob.cpp Framework/PushMode/PushJob.h Framework/TransferToolbox.cpp Framework/TransferToolbox.h Plugin/Plugin.cpp Plugin/PluginContext.cpp Plugin/PluginContext.h
diffstat 11 files changed, 181 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/Framework/HttpQueries/DetectTransferPlugin.cpp	Mon Dec 24 13:45:31 2018 +0100
+++ b/Framework/HttpQueries/DetectTransferPlugin.cpp	Mon Mar 04 15:26:49 2019 +0100
@@ -51,19 +51,32 @@
     Json::Reader reader;
     Json::Value value;
 
+    bool enabled = false;
+
     if (reader.parse(reinterpret_cast<const char*>(answer), 
                      reinterpret_cast<const char*>(answer) + size, value) &&
         value.type() == Json::arrayValue)
     {
+      // Loop over the plugins that are enabled on the remote peer
       for (Json::Value::ArrayIndex i = 0; i < value.size(); i++)
       {
         if (value[i].type() == Json::stringValue &&
             value[i].asString() == PLUGIN_NAME)
         {
           result_[peer_] = true;
+          enabled = true;
         }
       }
     }
+
+    if (enabled)
+    {
+      LOG(INFO) << "Peer \"" << peer_ << "\" has the transfers accelerator plugin enabled";
+    }
+    else
+    {
+      LOG(WARNING) << "Peer \"" << peer_ << "\" does *not* have the transfers accelerator plugin enabled";
+    }
   }
 
 
--- a/Framework/HttpQueries/HttpQueriesQueue.cpp	Mon Dec 24 13:45:31 2018 +0100
+++ b/Framework/HttpQueries/HttpQueriesQueue.cpp	Mon Mar 04 15:26:49 2019 +0100
@@ -214,7 +214,7 @@
         // Error: Let's retry
         retry ++;
 
-        if (retry < maxRetries)
+        if (retry <= maxRetries)
         {
           // Wait 1 second before retrying
           boost::this_thread::sleep(boost::posix_time::seconds(1));
--- a/Framework/PullMode/PullJob.cpp	Mon Dec 24 13:45:31 2018 +0100
+++ b/Framework/PullMode/PullJob.cpp	Mon Mar 04 15:26:49 2019 +0100
@@ -101,7 +101,8 @@
       scheduler.ComputePullBuckets(buckets, job.targetBucketSize_, 2 * job.targetBucketSize_,
                                    baseUrl, job.query_.GetCompression());
       area_.reset(new DownloadArea(scheduler));
-        
+
+      queue_.SetMaxRetries(job.maxHttpRetries_);
       queue_.Reserve(buckets.size());
         
       for (size_t i = 0; i < buckets.size(); i++)
@@ -171,7 +172,7 @@
       const std::string lookup = writer.write(job_.query_.GetResources()); 
 
       Json::Value answer;
-      if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_LOOKUP, lookup))
+      if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_LOOKUP, lookup, job_.maxHttpRetries_))
       {
         LOG(ERROR) << "Cannot retrieve the list of instances to pull from peer \"" 
                    << job_.query_.GetPeer()
@@ -230,11 +231,13 @@
     
   PullJob::PullJob(const TransferQuery& query,
                    size_t threadsCount,
-                   size_t targetBucketSize) :
+                   size_t targetBucketSize,
+                   unsigned int maxHttpRetries) :
     StatefulOrthancJob(JOB_TYPE_PULL),
     query_(query),
     threadsCount_(threadsCount),
-    targetBucketSize_(targetBucketSize)
+    targetBucketSize_(targetBucketSize),
+    maxHttpRetries_(maxHttpRetries)
   {
     if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
     {
--- a/Framework/PullMode/PullJob.h	Mon Dec 24 13:45:31 2018 +0100
+++ b/Framework/PullMode/PullJob.h	Mon Mar 04 15:26:49 2019 +0100
@@ -37,12 +37,14 @@
     size_t         targetBucketSize_;
     OrthancPeers   peers_;
     size_t         peerIndex_;
+    unsigned int   maxHttpRetries_;
 
     virtual StateUpdate* CreateInitialState(JobInfo& info);    
     
   public:
     PullJob(const TransferQuery& query,
             size_t threadsCount,
-            size_t targetBucketSize);
+            size_t targetBucketSize,
+            unsigned int maxHttpRetries);
   };
 }
--- a/Framework/PushMode/PushJob.cpp	Mon Dec 24 13:45:31 2018 +0100
+++ b/Framework/PushMode/PushJob.cpp	Mon Mar 04 15:26:49 2019 +0100
@@ -52,19 +52,19 @@
 
     virtual StateUpdate* Step()
     {
-      std::string uri = transactionUri_;
-        
+      Json::Value answer;
+      bool success = false;
+
       if (isCommit_)
       {
-        uri += "/commit";
+        success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_);
       }
       else
       {
-        uri += "/discard";
+        success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_);
       }
         
-      Json::Value answer;
-      if (!job_.peers_.DoPost(answer, job_.peerIndex_, uri, ""))
+      if (!success)
       {
         if (isCommit_)
         {
@@ -130,6 +130,7 @@
       info_(info),
       transactionUri_(transactionUri)
     {
+      queue_.SetMaxRetries(job.maxHttpRetries_);
       queue_.Reserve(buckets.size());
         
       for (size_t i = 0; i < buckets.size(); i++)
@@ -212,7 +213,7 @@
     virtual StateUpdate* Step()
     {
       Json::Value answer;
-      if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_PUSH, createTransaction_))
+      if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_))
       {
         LOG(ERROR) << "Cannot create a push transaction to peer \"" 
                    << job_.query_.GetPeer()
@@ -248,12 +249,14 @@
   PushJob::PushJob(const TransferQuery& query,
                    OrthancInstancesCache& cache,
                    size_t threadsCount,
-                   size_t targetBucketSize) :
+                   size_t targetBucketSize,
+                   unsigned int maxHttpRetries) :
     StatefulOrthancJob(JOB_TYPE_PUSH),
     cache_(cache),
     query_(query),
     threadsCount_(threadsCount),
-    targetBucketSize_(targetBucketSize)
+    targetBucketSize_(targetBucketSize),
+    maxHttpRetries_(maxHttpRetries)
   {
     if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
     {
--- a/Framework/PushMode/PushJob.h	Mon Dec 24 13:45:31 2018 +0100
+++ b/Framework/PushMode/PushJob.h	Mon Mar 04 15:26:49 2019 +0100
@@ -38,13 +38,15 @@
     size_t                   targetBucketSize_;
     OrthancPeers             peers_;
     size_t                   peerIndex_;
-    
+    unsigned int             maxHttpRetries_;
+ 
     virtual StateUpdate* CreateInitialState(JobInfo& info);
     
   public:
     PushJob(const TransferQuery& query,
             OrthancInstancesCache& cache,
             size_t threadsCount,
-            size_t targetBucketSize);
+            size_t targetBucketSize,
+            unsigned int maxHttpRetries);
   };
 }
--- a/Framework/TransferToolbox.cpp	Mon Dec 24 13:45:31 2018 +0100
+++ b/Framework/TransferToolbox.cpp	Mon Mar 04 15:26:49 2019 +0100
@@ -21,8 +21,10 @@
 
 #include <Core/Logging.h>
 #include <Core/OrthancException.h>
+#include <Plugins/Samples/Common/OrthancPluginCppWrapper.h>
 
 #include <boost/math/special_functions/round.hpp>
+#include <boost/thread/thread.hpp>
 
 
 namespace OrthancPlugins
@@ -73,4 +75,88 @@
         throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
     }
   }
+
+
+  bool DoPostPeer(Json::Value& answer,
+                  const OrthancPeers& peers,
+                  size_t peerIndex,
+                  const std::string& uri,
+                  const std::string& body,
+                  unsigned int maxRetries)
+  {
+    unsigned int retry = 0;
+
+    for (;;)
+    {
+      try
+      {
+        if (peers.DoPost(answer, peerIndex, uri, body))
+        {
+          return true;
+        }
+      }
+      catch (Orthanc::OrthancException&)
+      {
+      }
+
+      if (retry >= maxRetries)
+      {
+        return false;
+      }
+      else
+      {
+        // Wait 1 second before retrying
+        boost::this_thread::sleep(boost::posix_time::seconds(1));
+        retry++;
+      }
+    }
+  }
+
+
+  bool DoPostPeer(Json::Value& answer,
+                  const OrthancPeers& peers,
+                  const std::string& peerName,
+                  const std::string& uri,
+                  const std::string& body,
+                  unsigned int maxRetries)
+  {
+    size_t index;
+
+    return (peers.LookupName(index, peerName) &&
+            DoPostPeer(answer, peers, index, uri, body, maxRetries));
+  }
+
+
+  bool DoDeletePeer(const OrthancPeers& peers,
+                    size_t peerIndex,
+                    const std::string& uri,
+                    unsigned int maxRetries)
+  {
+    unsigned int retry = 0;
+
+    for (;;)
+    {
+      try
+      {
+        if (peers.DoDelete(peerIndex, uri))
+        {
+          return true;
+        }
+      }
+      catch (Orthanc::OrthancException&)
+      {
+      }
+      
+      if (retry >= maxRetries)
+      {
+        return false;
+      }
+      else
+      {
+        // Wait 1 second before retrying
+        boost::this_thread::sleep(boost::posix_time::seconds(1));
+        retry++;
+      }
+    }
+  }
 }
--- a/Framework/TransferToolbox.h	Mon Dec 24 13:45:31 2018 +0100
+++ b/Framework/TransferToolbox.h	Mon Mar 04 15:26:49 2019 +0100
@@ -21,6 +21,7 @@
 
 #include <stdint.h>
 #include <string>
+#include <json/value.h>
 
 static const unsigned int KB = 1024;
 static const unsigned int MB = 1024 * 1024;
@@ -59,6 +60,8 @@
   
 namespace OrthancPlugins
 {
+  class OrthancPeers;
+  
   enum BucketCompression
   {
     BucketCompression_None,
@@ -72,4 +75,23 @@
   BucketCompression StringToBucketCompression(const std::string& value);
 
   const char* EnumerationToString(BucketCompression compression);
+
+  bool DoPostPeer(Json::Value& answer,
+                  const OrthancPeers& peers,
+                  size_t peerIndex,
+                  const std::string& uri,
+                  const std::string& body,
+                  unsigned int maxRetries);
+
+  bool DoPostPeer(Json::Value& answer,
+                  const OrthancPeers& peers,
+                  const std::string& peerName,
+                  const std::string& uri,
+                  const std::string& body,
+                  unsigned int maxRetries);
+
+  bool DoDeletePeer(const OrthancPeers& peers,
+                    size_t peerIndex,
+                    const std::string& uri,
+                    unsigned int maxRetries);
 }
--- a/Plugin/Plugin.cpp	Mon Dec 24 13:45:31 2018 +0100
+++ b/Plugin/Plugin.cpp	Mon Mar 04 15:26:49 2019 +0100
@@ -274,8 +274,9 @@
 
   OrthancPlugins::TransferQuery query(body);
 
-  SubmitJob(output, new OrthancPlugins::PullJob(
-              query, context.GetThreadsCount(), context.GetTargetBucketSize()),
+  SubmitJob(output, new OrthancPlugins::PullJob(query, context.GetThreadsCount(),
+                                                context.GetTargetBucketSize(),
+                                                context.GetMaxHttpRetries()),
             query.GetPriority());
 }
 
@@ -452,7 +453,7 @@
     std::string s = writer.write(lookup);
 
     Json::Value answer;
-    if (peers.DoPost(answer, query.GetPeer(), URI_PULL, s) &&
+    if (DoPostPeer(answer, peers, query.GetPeer(), URI_PULL, s, context.GetMaxHttpRetries()) &&
         answer.type() == Json::objectValue &&
         answer.isMember(KEY_ID) &&
         answer.isMember(KEY_PATH) &&
@@ -479,7 +480,9 @@
   else
   {
     SubmitJob(output, new OrthancPlugins::PushJob(query, context.GetCache(),
-                                                  context.GetThreadsCount(), context.GetTargetBucketSize()),
+                                                  context.GetThreadsCount(),
+                                                  context.GetTargetBucketSize(),
+                                                  context.GetMaxHttpRetries()),
               query.GetPriority());
   }
 }
@@ -520,14 +523,16 @@
       {
         job.reset(new OrthancPlugins::PullJob(query,
                                               context.GetThreadsCount(),
-                                              context.GetTargetBucketSize()));
+                                              context.GetTargetBucketSize(),
+                                              context.GetMaxHttpRetries()));
       }
       else if (type == JOB_TYPE_PUSH)
       {
         job.reset(new OrthancPlugins::PushJob(query,
                                               context.GetCache(),
                                               context.GetThreadsCount(),
-                                              context.GetTargetBucketSize()));
+                                              context.GetTargetBucketSize(),
+                                              context.GetMaxHttpRetries()));
       }
 
       if (job.get() == NULL)
@@ -637,6 +642,7 @@
       size_t targetBucketSize = 4096;  // In KB
       size_t maxPushTransactions = 4;
       size_t memoryCacheSize = 512;    // In MB
+      unsigned int maxHttpRetries = 0;
       std::map<std::string, std::string> bidirectionalPeers;
     
       {
@@ -650,13 +656,13 @@
           threadsCount = plugin.GetUnsignedIntegerValue("Threads", threadsCount);
           targetBucketSize = plugin.GetUnsignedIntegerValue("BucketSize", targetBucketSize);
           memoryCacheSize = plugin.GetUnsignedIntegerValue("CacheSize", memoryCacheSize);
-          maxPushTransactions = plugin.GetUnsignedIntegerValue
-            ("MaxPushTransactions", maxPushTransactions);
+          maxPushTransactions = plugin.GetUnsignedIntegerValue("MaxPushTransactions", maxPushTransactions);
+          maxHttpRetries = plugin.GetUnsignedIntegerValue("MaxHttpRetries", maxHttpRetries);
         }
       }
 
-      OrthancPlugins::PluginContext::Initialize
-        (threadsCount, targetBucketSize * KB, maxPushTransactions, memoryCacheSize * MB);
+      OrthancPlugins::PluginContext::Initialize(threadsCount, targetBucketSize * KB, maxPushTransactions,
+                                                memoryCacheSize * MB, maxHttpRetries);
     
       OrthancPlugins::RegisterRestCallback<ServeChunks>
         (std::string(URI_CHUNKS) + "/([.0-9a-f-]+)", true);
--- a/Plugin/PluginContext.cpp	Mon Dec 24 13:45:31 2018 +0100
+++ b/Plugin/PluginContext.cpp	Mon Mar 04 15:26:49 2019 +0100
@@ -27,11 +27,13 @@
   PluginContext::PluginContext(size_t threadsCount,
                                size_t targetBucketSize,
                                size_t maxPushTransactions,
-                               size_t memoryCacheSize) :
+                               size_t memoryCacheSize,
+                               unsigned int maxHttpRetries) :
     pushTransactions_(maxPushTransactions),
     semaphore_(threadsCount),
     threadsCount_(threadsCount),
-    targetBucketSize_(targetBucketSize)
+    targetBucketSize_(targetBucketSize),
+    maxHttpRetries_(maxHttpRetries)
   {
     pluginUuid_ = Orthanc::Toolbox::GenerateUuid();
 
@@ -58,10 +60,11 @@
   void PluginContext::Initialize(size_t threadsCount,
                                  size_t targetBucketSize,
                                  size_t maxPushTransactions,
-                                 size_t memoryCacheSize)
+                                 size_t memoryCacheSize,
+                                 unsigned int maxHttpRetries)
   {
     GetSingleton().reset(new PluginContext(threadsCount, targetBucketSize,
-                                           maxPushTransactions, memoryCacheSize));
+                                           maxPushTransactions, memoryCacheSize, maxHttpRetries));
   }
 
   
--- a/Plugin/PluginContext.h	Mon Dec 24 13:45:31 2018 +0100
+++ b/Plugin/PluginContext.h	Mon Mar 04 15:26:49 2019 +0100
@@ -40,12 +40,13 @@
     // Configuration
     size_t                   threadsCount_;
     size_t                   targetBucketSize_;
-
+    unsigned int             maxHttpRetries_;
   
     PluginContext(size_t threadsCount,
                   size_t targetBucketSize,
                   size_t maxPushTransactions,
-                  size_t memoryCacheSize);
+                  size_t memoryCacheSize,
+                  unsigned int maxHttpRetries);
 
     static std::auto_ptr<PluginContext>& GetSingleton();
   
@@ -80,10 +81,16 @@
       return targetBucketSize_;
     }
 
+    unsigned int GetMaxHttpRetries() const
+    {
+      return maxHttpRetries_;
+    }
+
     static void Initialize(size_t threadsCount,
                            size_t targetBucketSize,
                            size_t maxPushTransactions,
-                           size_t memoryCacheSize);
+                           size_t memoryCacheSize,
+                           unsigned int maxHttpRetries);
   
     static PluginContext& GetInstance();