comparison Framework/PushMode/PushJob.cpp @ 44:f4e828607f02

Added 'SenderTransferID' option that is added as an HTTP header in outgoing requests in PushMode
author Alain Mazy <am@osimis.io>
date Wed, 19 Oct 2022 21:12:57 +0200
parents 9708addb5a87
children 1e396fb509ca
comparison
equal deleted inserted replaced
43:c3fefbb11321 44:f4e828607f02
51 51
52 virtual StateUpdate* Step() 52 virtual StateUpdate* Step()
53 { 53 {
54 Json::Value answer; 54 Json::Value answer;
55 bool success = false; 55 bool success = false;
56 std::map<std::string, std::string> headers;
57 job_.query_.GetHttpHeaders(headers);
56 58
57 if (isCommit_) 59 if (isCommit_)
58 { 60 {
59 success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_); 61 success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_, headers);
60 } 62 }
61 else 63 else
62 { 64 {
63 success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_); 65 success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_, headers);
64 } 66 }
65 67
66 if (!success) 68 if (!success)
67 { 69 {
68 if (isCommit_) 70 if (isCommit_)
69 { 71 {
70 LOG(ERROR) << "Cannot commit push transaction on remote peer: " 72 LOG(ERROR) << "Cannot commit push transaction on remote peer: " // TODO: add job ID
71 << job_.query_.GetPeer(); 73 << job_.query_.GetPeer();
72 } 74 }
73 75
74 return StateUpdate::Failure(); 76 return StateUpdate::Failure();
75 } 77 }
127 const std::vector<TransferBucket>& buckets) : 129 const std::vector<TransferBucket>& buckets) :
128 job_(job), 130 job_(job),
129 info_(info), 131 info_(info),
130 transactionUri_(transactionUri) 132 transactionUri_(transactionUri)
131 { 133 {
134 std::map<std::string, std::string> headers;
135 job_.query_.GetHttpHeaders(headers);
136
132 queue_.SetMaxRetries(job.maxHttpRetries_); 137 queue_.SetMaxRetries(job.maxHttpRetries_);
133 queue_.Reserve(buckets.size()); 138 queue_.Reserve(buckets.size());
134 139
135 for (size_t i = 0; i < buckets.size(); i++) 140 for (size_t i = 0; i < buckets.size(); i++)
136 { 141 {
137 queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(), 142 queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(),
138 transactionUri_, i, job.query_.GetCompression())); 143 transactionUri_, i, job.query_.GetCompression(), headers));
139 } 144 }
140 145
141 UpdateInfo(); 146 UpdateInfo();
142 } 147 }
143 148
210 } 215 }
211 216
212 virtual StateUpdate* Step() 217 virtual StateUpdate* Step()
213 { 218 {
214 Json::Value answer; 219 Json::Value answer;
215 if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_)) 220 std::map<std::string, std::string> headers;
221 job_.query_.GetHttpHeaders(headers);
222
223 if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_, headers))
216 { 224 {
217 LOG(ERROR) << "Cannot create a push transaction to peer \"" 225 LOG(ERROR) << "Cannot create a push transaction to peer \""
218 << job_.query_.GetPeer() 226 << job_.query_.GetPeer()
219 << "\" (check that it has the transfers accelerator plugin installed)"; 227 << "\" (check that it has the transfers accelerator plugin installed)";
220 return StateUpdate::Failure(); 228 return StateUpdate::Failure();