Mercurial > hg > orthanc-transfers
diff Framework/PushMode/PushJob.cpp @ 0:95226b754d9e
initial release
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 17 Sep 2018 11:34:55 +0200 |
parents | |
children | 4c3437217518 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Framework/PushMode/PushJob.cpp Mon Sep 17 11:34:55 2018 +0200 @@ -0,0 +1,272 @@ +/** + * Transfers accelerator plugin for Orthanc + * Copyright (C) 2018 Osimis, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "PushJob.h" + +#include "BucketPushQuery.h" +#include "../HttpQueries/HttpQueriesRunner.h" +#include "../TransferScheduler.h" + +#include <Core/Logging.h> + +#include <json/writer.h> + + +namespace OrthancPlugins +{ + class PushJob::FinalState : public IState + { + private: + const PushJob& job_; + JobInfo& info_; + std::string transactionUri_; + bool isCommit_; + + public: + FinalState(const PushJob& job, + JobInfo& info, + const std::string& transactionUri, + bool isCommit) : + job_(job), + info_(info), + transactionUri_(transactionUri), + isCommit_(isCommit) + { + } + + virtual StateUpdate* Step() + { + std::string uri = transactionUri_; + + if (isCommit_) + { + uri += "/commit"; + } + else + { + uri += "/discard"; + } + + Json::Value answer; + if (!job_.peers_.DoPost(answer, job_.peerIndex_, uri, "")) + { + if (isCommit_) + { + LOG(ERROR) << "Cannot commit push transaction on remote peer: " + << job_.query_.GetPeer(); + } + + return StateUpdate::Failure(); + } + else if (isCommit_) + { + return StateUpdate::Success(); + } + else + { + return StateUpdate::Failure(); + } + } + + virtual void Stop(OrthancPluginJobStopReason reason) + { + } + }; + + + class PushJob::PushBucketsState : public IState + { + private: + const PushJob& job_; + JobInfo& info_; + std::string transactionUri_; + HttpQueriesQueue queue_; + std::auto_ptr<HttpQueriesRunner> runner_; + + void UpdateInfo() + { + size_t scheduledQueriesCount, completedQueriesCount; + uint64_t uploadedSize, downloadedSize; + queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize); + + info_.SetContent("UploadedSizeMB", ConvertToMegabytes(uploadedSize)); + info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount)); + + if (runner_.get() != NULL) + { + float speed; + runner_->GetSpeed(speed); + info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed)); + } + + // The "2" below corresponds to the "CreateTransactionState" + // and "FinalState" steps (which prevents division by zero) + info_.SetProgress(static_cast<float>(1 /* CreateTransactionState */ + completedQueriesCount) / + static_cast<float>(2 + scheduledQueriesCount)); + } + + public: + PushBucketsState(const PushJob& job, + JobInfo& info, + const std::string& transactionUri, + const std::vector<TransferBucket>& buckets) : + job_(job), + info_(info), + transactionUri_(transactionUri), + queue_(job.context_) + { + queue_.Reserve(buckets.size()); + + for (size_t i = 0; i < buckets.size(); i++) + { + queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(), + transactionUri_, i, job.query_.GetCompression())); + } + + UpdateInfo(); + } + + virtual StateUpdate* Step() + { + if (runner_.get() == NULL) + { + runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_)); + } + + HttpQueriesQueue::Status status = queue_.WaitComplete(200); + + UpdateInfo(); + + switch (status) + { + case HttpQueriesQueue::Status_Running: + return StateUpdate::Continue(); + + case HttpQueriesQueue::Status_Success: + // Commit transaction on remote peer + return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, true)); + + case HttpQueriesQueue::Status_Failure: + // Discard transaction on remote peer + return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, false)); + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } + } + + virtual void Stop(OrthancPluginJobStopReason reason) + { + // Cancel the running download threads + runner_.reset(); + } + }; + + + class PushJob::CreateTransactionState : public IState + { + private: + const PushJob& job_; + JobInfo& info_; + std::string createTransaction_; + std::vector<TransferBucket> buckets_; + + public: + CreateTransactionState(const PushJob& job, + JobInfo& info) : + job_(job), + info_(info) + { + TransferScheduler scheduler; + scheduler.ParseListOfResources(job_.cache_, job_.query_.GetResources()); + + Json::Value push; + scheduler.FormatPushTransaction(push, buckets_, + job.targetBucketSize_, 2 * job.targetBucketSize_, + job_.query_.GetCompression()); + + Json::FastWriter writer; + createTransaction_ = writer.write(push); + + info_.SetContent("Peer", job_.query_.GetPeer()); + info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression())); + info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount())); + info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize())); + } + + virtual StateUpdate* Step() + { + Json::Value answer; + if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_PUSH, createTransaction_)) + { + LOG(ERROR) << "Cannot create a push transaction to peer \"" + << job_.query_.GetPeer() + << "\" (check that it has the transfers accelerator plugin installed)"; + return StateUpdate::Failure(); + } + + if (answer.type() != Json::objectValue || + !answer.isMember(KEY_PATH) || + answer[KEY_PATH].type() != Json::stringValue) + { + LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer(); + return StateUpdate::Failure(); + } + + std::string transactionUri = answer[KEY_PATH].asString(); + + return StateUpdate::Next(new PushBucketsState(job_, info_, transactionUri, buckets_)); + } + + virtual void Stop(OrthancPluginJobStopReason reason) + { + } + }; + + + StatefulOrthancJob::StateUpdate* PushJob::CreateInitialState(JobInfo& info) + { + return StateUpdate::Next(new CreateTransactionState(*this, info)); + } + + + PushJob::PushJob(OrthancPluginContext* context, + const TransferQuery& query, + OrthancInstancesCache& cache, + size_t threadsCount, + size_t targetBucketSize) : + StatefulOrthancJob(JOB_TYPE_PUSH), + context_(context), + cache_(cache), + query_(query), + threadsCount_(threadsCount), + targetBucketSize_(targetBucketSize), + peers_(context) + { + if (!peers_.LookupName(peerIndex_, query_.GetPeer())) + { + LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer(); + throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); + } + + Json::Value serialized; + query.Serialize(serialized); + UpdateSerialized(serialized); + } +}