Mercurial > hg > orthanc-transfers
diff Framework/PullMode/PullJob.cpp @ 0:95226b754d9e
initial release
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 17 Sep 2018 11:34:55 +0200 |
parents | |
children | 5e6de82bb10f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Framework/PullMode/PullJob.cpp Mon Sep 17 11:34:55 2018 +0200 @@ -0,0 +1,254 @@ +/** + * Transfers accelerator plugin for Orthanc + * Copyright (C) 2018 Osimis, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "PullJob.h" + +#include "BucketPullQuery.h" +#include "../HttpQueries/HttpQueriesRunner.h" +#include "../TransferScheduler.h" + +#include <Core/Logging.h> + +#include <json/writer.h> + + +namespace OrthancPlugins +{ + class PullJob::CommitState : public IState + { + private: + const PullJob& job_; + std::auto_ptr<DownloadArea> area_; + + public: + CommitState(const PullJob& job, + DownloadArea* area /* takes ownership */) : + job_(job), + area_(area) + { + } + + virtual StateUpdate* Step() + { + area_->Commit(job_.context_); + return StateUpdate::Success(); + } + + virtual void Stop(OrthancPluginJobStopReason reason) + { + } + }; + + + class PullJob::PullBucketsState : public IState + { + private: + const PullJob& job_; + JobInfo& info_; + HttpQueriesQueue queue_; + std::auto_ptr<DownloadArea> area_; + std::auto_ptr<HttpQueriesRunner> runner_; + + void UpdateInfo() + { + size_t scheduledQueriesCount, completedQueriesCount; + uint64_t uploadedSize, downloadedSize; + queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize); + + info_.SetContent("DownloadedSizeMB", ConvertToMegabytes(downloadedSize)); + info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount)); + + if (runner_.get() != NULL) + { + float speed; + runner_->GetSpeed(speed); + info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed)); + } + + // The "2" below corresponds to the "LookupInstancesState" + // and "CommitState" steps (which prevents division by zero) + info_.SetProgress(static_cast<float>(1 /* LookupInstancesState */ + completedQueriesCount) / + static_cast<float>(2 + scheduledQueriesCount)); + } + + public: + PullBucketsState(const PullJob& job, + JobInfo& info, + const TransferScheduler& scheduler) : + job_(job), + info_(info), + queue_(job.context_), + area_(new DownloadArea(scheduler)) + { + const std::string baseUrl = job.peers_.GetPeerUrl(job.query_.GetPeer()); + + std::vector<TransferBucket> buckets; + scheduler.ComputePullBuckets(buckets, job.targetBucketSize_, 2 * job.targetBucketSize_, + baseUrl, job.query_.GetCompression()); + area_.reset(new DownloadArea(scheduler)); + + queue_.Reserve(buckets.size()); + + for (size_t i = 0; i < buckets.size(); i++) + { + queue_.Enqueue(new BucketPullQuery(*area_, buckets[i], job.query_.GetPeer(), job.query_.GetCompression())); + } + + info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount())); + info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize())); + UpdateInfo(); + } + + virtual StateUpdate* Step() + { + if (runner_.get() == NULL) + { + runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_)); + } + + HttpQueriesQueue::Status status = queue_.WaitComplete(200); + + UpdateInfo(); + + switch (status) + { + case HttpQueriesQueue::Status_Running: + return StateUpdate::Continue(); + + case HttpQueriesQueue::Status_Success: + return StateUpdate::Next(new CommitState(job_, area_.release())); + + case HttpQueriesQueue::Status_Failure: + return StateUpdate::Failure(); + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } + } + + virtual void Stop(OrthancPluginJobStopReason reason) + { + // Cancel the running download threads + runner_.reset(); + } + }; + + + class PullJob::LookupInstancesState : public IState + { + private: + const PullJob& job_; + JobInfo& info_; + + public: + LookupInstancesState(const PullJob& job, + JobInfo& info) : + job_(job), + info_(info) + { + info_.SetContent("Peer", job_.query_.GetPeer()); + info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression())); + } + + virtual StateUpdate* Step() + { + Json::FastWriter writer; + const std::string lookup = writer.write(job_.query_.GetResources()); + + Json::Value answer; + if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_LOOKUP, lookup)) + { + LOG(ERROR) << "Cannot retrieve the list of instances to pull from peer \"" + << job_.query_.GetPeer() + << "\" (check that it has the transfers accelerator plugin installed)"; + return StateUpdate::Failure(); + } + + if (answer.type() != Json::objectValue || + !answer.isMember(KEY_INSTANCES) || + !answer.isMember(KEY_ORIGINATOR_UUID) || + answer[KEY_INSTANCES].type() != Json::arrayValue || + answer[KEY_ORIGINATOR_UUID].type() != Json::stringValue) + { + LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer(); + return StateUpdate::Failure(); + } + + if (job_.query_.HasOriginator() && + job_.query_.GetOriginator() != answer[KEY_ORIGINATOR_UUID].asString()) + { + LOG(ERROR) << "Invalid originator, check out the \"" + << KEY_PLUGIN_CONFIGURATION << "." << KEY_BIDIRECTIONAL_PEERS + << "\" configuration option"; + return StateUpdate::Failure(); + } + + TransferScheduler scheduler; + + for (Json::Value::ArrayIndex i = 0; i < answer[KEY_INSTANCES].size(); i++) + { + DicomInstanceInfo instance(answer[KEY_INSTANCES][i]); + scheduler.AddInstance(instance); + } + + if (scheduler.GetInstancesCount() == 0) + { + // We're already done: No instance to be retrieved + return StateUpdate::Success(); + } + else + { + return StateUpdate::Next(new PullBucketsState(job_, info_, scheduler)); + } + } + + virtual void Stop(OrthancPluginJobStopReason reason) + { + } + }; + + + StatefulOrthancJob::StateUpdate* PullJob::CreateInitialState(JobInfo& info) + { + return StateUpdate::Next(new LookupInstancesState(*this, info)); + } + + + PullJob::PullJob(OrthancPluginContext* context, + const TransferQuery& query, + size_t threadsCount, + size_t targetBucketSize) : + StatefulOrthancJob(JOB_TYPE_PULL), + context_(context), + query_(query), + threadsCount_(threadsCount), + targetBucketSize_(targetBucketSize), + peers_(context) + { + if (!peers_.LookupName(peerIndex_, query_.GetPeer())) + { + LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer(); + throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource); + } + + Json::Value serialized; + query.Serialize(serialized); + UpdateSerialized(serialized); + } +}