diff Framework/PullMode/PullJob.cpp @ 0:95226b754d9e

initial release
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 17 Sep 2018 11:34:55 +0200
parents
children 5e6de82bb10f
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Framework/PullMode/PullJob.cpp	Mon Sep 17 11:34:55 2018 +0200
@@ -0,0 +1,254 @@
+/**
+ * Transfers accelerator plugin for Orthanc
+ * Copyright (C) 2018 Osimis, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Affero General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "PullJob.h"
+
+#include "BucketPullQuery.h"
+#include "../HttpQueries/HttpQueriesRunner.h"
+#include "../TransferScheduler.h"
+
+#include <Core/Logging.h>
+
+#include <json/writer.h>
+
+
+namespace OrthancPlugins
+{
+  class PullJob::CommitState : public IState
+  {
+  private:
+    const PullJob&               job_;
+    std::auto_ptr<DownloadArea>  area_;
+
+  public:
+    CommitState(const PullJob& job,
+                DownloadArea* area /* takes ownership */) :
+      job_(job),
+      area_(area)
+    {
+    }
+
+    virtual StateUpdate* Step()
+    {
+      area_->Commit(job_.context_);
+      return StateUpdate::Success();
+    }
+
+    virtual void Stop(OrthancPluginJobStopReason reason)
+    {
+    }
+  };
+
+
+  class PullJob::PullBucketsState : public IState
+  {
+  private:
+    const PullJob&                    job_;
+    JobInfo&                          info_;
+    HttpQueriesQueue                  queue_;
+    std::auto_ptr<DownloadArea>       area_;
+    std::auto_ptr<HttpQueriesRunner>  runner_;
+
+    void UpdateInfo()
+    {
+      size_t scheduledQueriesCount, completedQueriesCount;
+      uint64_t uploadedSize, downloadedSize;
+      queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize);
+
+      info_.SetContent("DownloadedSizeMB", ConvertToMegabytes(downloadedSize));
+      info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount));
+
+      if (runner_.get() != NULL)
+      {
+        float speed;
+        runner_->GetSpeed(speed);
+        info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed));
+      }
+            
+      // The "2" below corresponds to the "LookupInstancesState"
+      // and "CommitState" steps (which prevents division by zero)
+      info_.SetProgress(static_cast<float>(1 /* LookupInstancesState */ + completedQueriesCount) / 
+                        static_cast<float>(2 + scheduledQueriesCount));
+    }
+
+  public:
+    PullBucketsState(const PullJob&  job,
+                     JobInfo& info,
+                     const TransferScheduler& scheduler) :
+      job_(job),
+      info_(info),
+      queue_(job.context_),
+      area_(new DownloadArea(scheduler))
+    {
+      const std::string baseUrl = job.peers_.GetPeerUrl(job.query_.GetPeer());
+
+      std::vector<TransferBucket> buckets;
+      scheduler.ComputePullBuckets(buckets, job.targetBucketSize_, 2 * job.targetBucketSize_,
+                                   baseUrl, job.query_.GetCompression());
+      area_.reset(new DownloadArea(scheduler));
+        
+      queue_.Reserve(buckets.size());
+        
+      for (size_t i = 0; i < buckets.size(); i++)
+      {
+        queue_.Enqueue(new BucketPullQuery(*area_, buckets[i], job.query_.GetPeer(), job.query_.GetCompression()));
+      }
+
+      info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount()));
+      info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize()));
+      UpdateInfo();
+    }
+      
+    virtual StateUpdate* Step()
+    {
+      if (runner_.get() == NULL)
+      {
+        runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_));
+      }
+
+      HttpQueriesQueue::Status status = queue_.WaitComplete(200);
+
+      UpdateInfo();
+
+      switch (status)
+      {
+        case HttpQueriesQueue::Status_Running:
+          return StateUpdate::Continue();
+
+        case HttpQueriesQueue::Status_Success:
+          return StateUpdate::Next(new CommitState(job_, area_.release()));
+
+        case HttpQueriesQueue::Status_Failure:
+          return StateUpdate::Failure();
+
+        default:
+          throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
+      }        
+    }
+
+    virtual void Stop(OrthancPluginJobStopReason reason)
+    {
+      // Cancel the running download threads
+      runner_.reset();
+    }
+  };
+    
+
+  class PullJob::LookupInstancesState : public IState
+  {
+  private:
+    const PullJob&  job_;
+    JobInfo&        info_;
+
+  public:
+    LookupInstancesState(const PullJob& job,
+                         JobInfo& info) :
+      job_(job),
+      info_(info)
+    {
+      info_.SetContent("Peer", job_.query_.GetPeer());
+      info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression()));
+    }
+
+    virtual StateUpdate* Step()
+    {
+      Json::FastWriter writer;
+      const std::string lookup = writer.write(job_.query_.GetResources()); 
+
+      Json::Value answer;
+      if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_LOOKUP, lookup))
+      {
+        LOG(ERROR) << "Cannot retrieve the list of instances to pull from peer \"" 
+                   << job_.query_.GetPeer()
+                   << "\" (check that it has the transfers accelerator plugin installed)";
+        return StateUpdate::Failure();
+      } 
+
+      if (answer.type() != Json::objectValue ||
+          !answer.isMember(KEY_INSTANCES) ||
+          !answer.isMember(KEY_ORIGINATOR_UUID) ||
+          answer[KEY_INSTANCES].type() != Json::arrayValue ||
+          answer[KEY_ORIGINATOR_UUID].type() != Json::stringValue)
+      {
+        LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer();
+        return StateUpdate::Failure();
+      }
+
+      if (job_.query_.HasOriginator() &&
+          job_.query_.GetOriginator() != answer[KEY_ORIGINATOR_UUID].asString())
+      {
+        LOG(ERROR) << "Invalid originator, check out the \""
+                   << KEY_PLUGIN_CONFIGURATION << "." << KEY_BIDIRECTIONAL_PEERS
+                   << "\" configuration option";
+        return StateUpdate::Failure();
+      }
+
+      TransferScheduler  scheduler;
+
+      for (Json::Value::ArrayIndex i = 0; i < answer[KEY_INSTANCES].size(); i++)
+      {
+        DicomInstanceInfo instance(answer[KEY_INSTANCES][i]);
+        scheduler.AddInstance(instance);
+      }
+
+      if (scheduler.GetInstancesCount() == 0)
+      {
+        // We're already done: No instance to be retrieved
+        return StateUpdate::Success();
+      }
+      else
+      {
+        return StateUpdate::Next(new PullBucketsState(job_, info_, scheduler));
+      }
+    }
+
+    virtual void Stop(OrthancPluginJobStopReason reason)
+    {
+    }
+  };
+
+
+  StatefulOrthancJob::StateUpdate* PullJob::CreateInitialState(JobInfo& info)
+  {
+    return StateUpdate::Next(new LookupInstancesState(*this, info));
+  }
+    
+    
+  PullJob::PullJob(OrthancPluginContext* context,
+                   const TransferQuery& query,
+                   size_t threadsCount,
+                   size_t targetBucketSize) :
+    StatefulOrthancJob(JOB_TYPE_PULL),
+    context_(context),
+    query_(query),
+    threadsCount_(threadsCount),
+    targetBucketSize_(targetBucketSize),
+    peers_(context)
+  {
+    if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
+    {
+      LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer();
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
+    }
+
+    Json::Value serialized;
+    query.Serialize(serialized);
+    UpdateSerialized(serialized);
+  }
+}