diff Framework/PushMode/PushJob.cpp @ 0:95226b754d9e

initial release
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 17 Sep 2018 11:34:55 +0200
parents
children 4c3437217518
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Framework/PushMode/PushJob.cpp	Mon Sep 17 11:34:55 2018 +0200
@@ -0,0 +1,272 @@
+/**
+ * Transfers accelerator plugin for Orthanc
+ * Copyright (C) 2018 Osimis, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Affero General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "PushJob.h"
+
+#include "BucketPushQuery.h"
+#include "../HttpQueries/HttpQueriesRunner.h"
+#include "../TransferScheduler.h"
+
+#include <Core/Logging.h>
+
+#include <json/writer.h>
+
+
+namespace OrthancPlugins
+{
+  class PushJob::FinalState : public IState
+  {
+  private:
+    const PushJob&  job_;
+    JobInfo&        info_;
+    std::string     transactionUri_;
+    bool            isCommit_;
+      
+  public:
+    FinalState(const PushJob& job,
+               JobInfo& info,
+               const std::string& transactionUri,
+               bool isCommit) :
+      job_(job),
+      info_(info),
+      transactionUri_(transactionUri),
+      isCommit_(isCommit)
+    {
+    }
+
+    virtual StateUpdate* Step()
+    {
+      std::string uri = transactionUri_;
+        
+      if (isCommit_)
+      {
+        uri += "/commit";
+      }
+      else
+      {
+        uri += "/discard";
+      }
+        
+      Json::Value answer;
+      if (!job_.peers_.DoPost(answer, job_.peerIndex_, uri, ""))
+      {
+        if (isCommit_)
+        {
+          LOG(ERROR) << "Cannot commit push transaction on remote peer: "
+                     << job_.query_.GetPeer();
+        }
+          
+        return StateUpdate::Failure();
+      } 
+      else if (isCommit_)
+      {
+        return StateUpdate::Success();
+      }
+      else
+      {
+        return StateUpdate::Failure();
+      }
+    }
+
+    virtual void Stop(OrthancPluginJobStopReason reason)
+    {
+    }
+  };
+
+
+  class PushJob::PushBucketsState : public IState
+  {
+  private:
+    const PushJob&                    job_;
+    JobInfo&                          info_;
+    std::string                       transactionUri_;
+    HttpQueriesQueue                  queue_;
+    std::auto_ptr<HttpQueriesRunner>  runner_;
+
+    void UpdateInfo()
+    {
+      size_t scheduledQueriesCount, completedQueriesCount;
+      uint64_t uploadedSize, downloadedSize;
+      queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize);
+
+      info_.SetContent("UploadedSizeMB", ConvertToMegabytes(uploadedSize));
+      info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount));
+
+      if (runner_.get() != NULL)
+      {
+        float speed;
+        runner_->GetSpeed(speed);
+        info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed));
+      }
+            
+      // The "2" below corresponds to the "CreateTransactionState"
+      // and "FinalState" steps (which prevents division by zero)
+      info_.SetProgress(static_cast<float>(1 /* CreateTransactionState */ + completedQueriesCount) / 
+                        static_cast<float>(2 + scheduledQueriesCount));
+    }
+
+  public:
+    PushBucketsState(const PushJob&  job,
+                     JobInfo& info,
+                     const std::string& transactionUri,
+                     const std::vector<TransferBucket>& buckets) :
+      job_(job),
+      info_(info),
+      transactionUri_(transactionUri),
+      queue_(job.context_)
+    {
+      queue_.Reserve(buckets.size());
+        
+      for (size_t i = 0; i < buckets.size(); i++)
+      {
+        queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(),
+                                           transactionUri_, i, job.query_.GetCompression()));
+      }
+
+      UpdateInfo();
+    }
+      
+    virtual StateUpdate* Step()
+    {
+      if (runner_.get() == NULL)
+      {
+        runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_));
+      }
+
+      HttpQueriesQueue::Status status = queue_.WaitComplete(200);
+
+      UpdateInfo();
+
+      switch (status)
+      {
+        case HttpQueriesQueue::Status_Running:
+          return StateUpdate::Continue();
+
+        case HttpQueriesQueue::Status_Success:
+          // Commit transaction on remote peer
+          return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, true));
+
+        case HttpQueriesQueue::Status_Failure:
+          // Discard transaction on remote peer
+          return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, false));
+
+        default:
+          throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
+      }        
+    }
+
+    virtual void Stop(OrthancPluginJobStopReason reason)
+    {
+      // Cancel the running download threads
+      runner_.reset();
+    }
+  };
+
+
+  class PushJob::CreateTransactionState : public IState
+  {
+  private:
+    const PushJob&                job_;
+    JobInfo&                      info_;
+    std::string                   createTransaction_;
+    std::vector<TransferBucket>   buckets_;
+
+  public:
+    CreateTransactionState(const PushJob& job,
+                           JobInfo& info) :
+      job_(job),
+      info_(info)
+    {
+      TransferScheduler scheduler;
+      scheduler.ParseListOfResources(job_.cache_, job_.query_.GetResources());
+
+      Json::Value push;      
+      scheduler.FormatPushTransaction(push, buckets_,
+                                      job.targetBucketSize_, 2 * job.targetBucketSize_,
+                                      job_.query_.GetCompression());
+
+      Json::FastWriter writer;
+      createTransaction_ = writer.write(push);
+
+      info_.SetContent("Peer", job_.query_.GetPeer());
+      info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression()));
+      info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount()));
+      info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize()));
+    }
+
+    virtual StateUpdate* Step()
+    {
+      Json::Value answer;
+      if (!job_.peers_.DoPost(answer, job_.peerIndex_, URI_PUSH, createTransaction_))
+      {
+        LOG(ERROR) << "Cannot create a push transaction to peer \"" 
+                   << job_.query_.GetPeer()
+                   << "\" (check that it has the transfers accelerator plugin installed)";
+        return StateUpdate::Failure();
+      } 
+
+      if (answer.type() != Json::objectValue ||
+          !answer.isMember(KEY_PATH) ||
+          answer[KEY_PATH].type() != Json::stringValue)
+      {
+        LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer();
+        return StateUpdate::Failure();
+      }
+
+      std::string transactionUri = answer[KEY_PATH].asString();
+
+      return StateUpdate::Next(new PushBucketsState(job_, info_, transactionUri, buckets_));
+    }
+
+    virtual void Stop(OrthancPluginJobStopReason reason)
+    {
+    }
+  };
+
+
+  StatefulOrthancJob::StateUpdate* PushJob::CreateInitialState(JobInfo& info)
+  {
+    return StateUpdate::Next(new CreateTransactionState(*this, info));
+  }
+    
+    
+  PushJob::PushJob(OrthancPluginContext* context,
+                   const TransferQuery& query,
+                   OrthancInstancesCache& cache,
+                   size_t threadsCount,
+                   size_t targetBucketSize) :
+    StatefulOrthancJob(JOB_TYPE_PUSH),
+    context_(context),
+    cache_(cache),
+    query_(query),
+    threadsCount_(threadsCount),
+    targetBucketSize_(targetBucketSize),
+    peers_(context)
+  {
+    if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
+    {
+      LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer();
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
+    }
+
+    Json::Value serialized;
+    query.Serialize(serialized);
+    UpdateSerialized(serialized);
+  }
+}