diff Framework/PushMode/PushJob.cpp @ 44:f4e828607f02

Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
author Alain Mazy <am@osimis.io>
date Wed, 19 Oct 2022 21:12:57 +0200
parents 9708addb5a87
children 1e396fb509ca
line wrap: on
line diff
--- a/Framework/PushMode/PushJob.cpp	Tue Jul 12 17:49:40 2022 +0200
+++ b/Framework/PushMode/PushJob.cpp	Wed Oct 19 21:12:57 2022 +0200
@@ -53,21 +53,23 @@
     {
       Json::Value answer;
       bool success = false;
+      std::map<std::string, std::string> headers;
+      job_.query_.GetHttpHeaders(headers);
 
       if (isCommit_)
       {
-        success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_);
+        success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_, headers);
       }
       else
       {
-        success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_);
+        success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_, headers);
       }
         
       if (!success)
       {
         if (isCommit_)
         {
-          LOG(ERROR) << "Cannot commit push transaction on remote peer: "
+          LOG(ERROR) << "Cannot commit push transaction on remote peer: " // TODO: add job ID
                      << job_.query_.GetPeer();
         }
           
@@ -129,13 +131,16 @@
       info_(info),
       transactionUri_(transactionUri)
     {
+      std::map<std::string, std::string> headers;
+      job_.query_.GetHttpHeaders(headers);
+
       queue_.SetMaxRetries(job.maxHttpRetries_);
       queue_.Reserve(buckets.size());
         
       for (size_t i = 0; i < buckets.size(); i++)
       {
         queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(),
-                                           transactionUri_, i, job.query_.GetCompression()));
+                                           transactionUri_, i, job.query_.GetCompression(), headers));
       }
 
       UpdateInfo();
@@ -212,7 +217,10 @@
     virtual StateUpdate* Step()
     {
       Json::Value answer;
-      if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_))
+      std::map<std::string, std::string> headers;
+      job_.query_.GetHttpHeaders(headers);
+
+      if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_, headers))
       {
         LOG(ERROR) << "Cannot create a push transaction to peer \"" 
                    << job_.query_.GetPeer()