comparison Framework/PushMode/PushJob.cpp @ 10:c9e28e31262e

new option: MaxHttpRetries
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 04 Mar 2019 15:26:49 +0100
parents 7e207ade2f1a
children b06103a50c95
comparison
equal deleted inserted replaced
9:7e207ade2f1a 10:c9e28e31262e
50 { 50 {
51 } 51 }
52 52
53 virtual StateUpdate* Step() 53 virtual StateUpdate* Step()
54 { 54 {
55 std::string uri = transactionUri_; 55 Json::Value answer;
56 bool success = false;
57
58 if (isCommit_)
59 {
60 success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_);
61 }
62 else
63 {
64 success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_);
65 }
56 66
57 if (isCommit_) 67 if (!success)
58 {
59 uri += "/commit";
60 }
61 else
62 {
63 uri += "/discard";
64 }
65
66 Json::Value answer;
67 if (!job_.peers_.DoPost(answer, job_.peerIndex_, uri, ""))
68 { 68 {
69 if (isCommit_) 69 if (isCommit_)
70 { 70 {
71 LOG(ERROR) << "Cannot commit push transaction on remote peer: " 71 LOG(ERROR) << "Cannot commit push transaction on remote peer: "
72 << job_.query_.GetPeer(); 72 << job_.query_.GetPeer();
128 const std::vector<TransferBucket>& buckets) : 128 const std::vector<TransferBucket>& buckets) :
129 job_(job), 129 job_(job),
130 info_(info), 130 info_(info),
131 transactionUri_(transactionUri) 131 transactionUri_(transactionUri)
132 { 132 {
133 queue_.SetMaxRetries(job.maxHttpRetries_);
133 queue_.Reserve(buckets.size()); 134 queue_.Reserve(buckets.size());
134 135
135 for (size_t i = 0; i < buckets.size(); i++) 136 for (size_t i = 0; i < buckets.size(); i++)
136 { 137 {
137 queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(), 138 queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(),
210 } 211 }
211 212
212 virtual StateUpdate* Step() 213 virtual StateUpdate* Step()
213 { 214 {
214 Json::Value answer; 215 Json::Value answer;
215 if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_PUSH, createTransaction_)) 216 if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_))
216 { 217 {
217 LOG(ERROR) << "Cannot create a push transaction to peer \"" 218 LOG(ERROR) << "Cannot create a push transaction to peer \""
218 << job_.query_.GetPeer() 219 << job_.query_.GetPeer()
219 << "\" (check that it has the transfers accelerator plugin installed)"; 220 << "\" (check that it has the transfers accelerator plugin installed)";
220 return StateUpdate::Failure(); 221 return StateUpdate::Failure();
246 247
247 248
248 PushJob::PushJob(const TransferQuery& query, 249 PushJob::PushJob(const TransferQuery& query,
249 OrthancInstancesCache& cache, 250 OrthancInstancesCache& cache,
250 size_t threadsCount, 251 size_t threadsCount,
251 size_t targetBucketSize) : 252 size_t targetBucketSize,
253 unsigned int maxHttpRetries) :
252 StatefulOrthancJob(JOB_TYPE_PUSH), 254 StatefulOrthancJob(JOB_TYPE_PUSH),
253 cache_(cache), 255 cache_(cache),
254 query_(query), 256 query_(query),
255 threadsCount_(threadsCount), 257 threadsCount_(threadsCount),
256 targetBucketSize_(targetBucketSize) 258 targetBucketSize_(targetBucketSize),
259 maxHttpRetries_(maxHttpRetries)
257 { 260 {
258 if (!peers_.LookupName(peerIndex_, query_.GetPeer())) 261 if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
259 { 262 {
260 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer(); 263 LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer();
261 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); 264 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);