view Framework/PushMode/PushJob.cpp @ 30:3abebab5d004

sync
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 06 Nov 2020 18:06:55 +0100
parents dfc43678aecb
children cfeda58d0c8e
line wrap: on
line source

/**
 * Transfers accelerator plugin for Orthanc
 * Copyright (C) 2018-2020 Osimis S.A., Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Affero General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Affero General Public License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#include "PushJob.h"

#include "BucketPushQuery.h"
#include "../HttpQueries/HttpQueriesRunner.h"
#include "../TransferScheduler.h"

#include <Compatibility.h>  // For std::unique_ptr
#include <Logging.h>

#include <json/writer.h>


namespace OrthancPlugins
{
  class PushJob::FinalState : public IState
  {
  private:
    const PushJob&  job_;
    JobInfo&        info_;
    std::string     transactionUri_;
    bool            isCommit_;
      
  public:
    FinalState(const PushJob& job,
               JobInfo& info,
               const std::string& transactionUri,
               bool isCommit) :
      job_(job),
      info_(info),
      transactionUri_(transactionUri),
      isCommit_(isCommit)
    {
    }

    virtual StateUpdate* Step()
    {
      Json::Value answer;
      bool success = false;

      if (isCommit_)
      {
        success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_);
      }
      else
      {
        success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_);
      }
        
      if (!success)
      {
        if (isCommit_)
        {
          LOG(ERROR) << "Cannot commit push transaction on remote peer: "
                     << job_.query_.GetPeer();
        }
          
        return StateUpdate::Failure();
      } 
      else if (isCommit_)
      {
        return StateUpdate::Success();
      }
      else
      {
        return StateUpdate::Failure();
      }
    }

    virtual void Stop(OrthancPluginJobStopReason reason)
    {
    }
  };


  class PushJob::PushBucketsState : public IState
  {
  private:
    const PushJob&                    job_;
    JobInfo&                          info_;
    std::string                       transactionUri_;
    HttpQueriesQueue                  queue_;
    std::unique_ptr<HttpQueriesRunner>  runner_;

    void UpdateInfo()
    {
      size_t scheduledQueriesCount, completedQueriesCount;
      uint64_t uploadedSize, downloadedSize;
      queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize);

      info_.SetContent("UploadedSizeMB", ConvertToMegabytes(uploadedSize));
      info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount));

      if (runner_.get() != NULL)
      {
        float speed;
        runner_->GetSpeed(speed);
        info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed));
      }
            
      // The "2" below corresponds to the "CreateTransactionState"
      // and "FinalState" steps (which prevents division by zero)
      info_.SetProgress(static_cast<float>(1 /* CreateTransactionState */ + completedQueriesCount) / 
                        static_cast<float>(2 + scheduledQueriesCount));
    }

  public:
    PushBucketsState(const PushJob&  job,
                     JobInfo& info,
                     const std::string& transactionUri,
                     const std::vector<TransferBucket>& buckets) :
      job_(job),
      info_(info),
      transactionUri_(transactionUri)
    {
      queue_.SetMaxRetries(job.maxHttpRetries_);
      queue_.Reserve(buckets.size());
        
      for (size_t i = 0; i < buckets.size(); i++)
      {
        queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(),
                                           transactionUri_, i, job.query_.GetCompression()));
      }

      UpdateInfo();
    }
      
    virtual StateUpdate* Step()
    {
      if (runner_.get() == NULL)
      {
        runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_));
      }

      HttpQueriesQueue::Status status = queue_.WaitComplete(200);

      UpdateInfo();

      switch (status)
      {
        case HttpQueriesQueue::Status_Running:
          return StateUpdate::Continue();

        case HttpQueriesQueue::Status_Success:
          // Commit transaction on remote peer
          return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, true));

        case HttpQueriesQueue::Status_Failure:
          // Discard transaction on remote peer
          return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, false));

        default:
          throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
      }        
    }

    virtual void Stop(OrthancPluginJobStopReason reason)
    {
      // Cancel the running download threads
      runner_.reset();
    }
  };


  class PushJob::CreateTransactionState : public IState
  {
  private:
    const PushJob&                job_;
    JobInfo&                      info_;
    std::string                   createTransaction_;
    std::vector<TransferBucket>   buckets_;

  public:
    CreateTransactionState(const PushJob& job,
                           JobInfo& info) :
      job_(job),
      info_(info)
    {
      TransferScheduler scheduler;
      scheduler.ParseListOfResources(job_.cache_, job_.query_.GetResources());

      Json::Value push;      
      scheduler.FormatPushTransaction(push, buckets_,
                                      job.targetBucketSize_, 2 * job.targetBucketSize_,
                                      job_.query_.GetCompression());

      Json::FastWriter writer;
      createTransaction_ = writer.write(push);

      info_.SetContent("Peer", job_.query_.GetPeer());
      info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression()));
      info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount()));
      info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize()));
    }

    virtual StateUpdate* Step()
    {
      Json::Value answer;
      if (!DoPostPeer(answer, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_))
      {
        LOG(ERROR) << "Cannot create a push transaction to peer \"" 
                   << job_.query_.GetPeer()
                   << "\" (check that it has the transfers accelerator plugin installed)";
        return StateUpdate::Failure();
      } 

      if (answer.type() != Json::objectValue ||
          !answer.isMember(KEY_PATH) ||
          answer[KEY_PATH].type() != Json::stringValue)
      {
        LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer();
        return StateUpdate::Failure();
      }

      std::string transactionUri = answer[KEY_PATH].asString();

      return StateUpdate::Next(new PushBucketsState(job_, info_, transactionUri, buckets_));
    }

    virtual void Stop(OrthancPluginJobStopReason reason)
    {
    }
  };


  StatefulOrthancJob::StateUpdate* PushJob::CreateInitialState(JobInfo& info)
  {
    return StateUpdate::Next(new CreateTransactionState(*this, info));
  }
    
    
  PushJob::PushJob(const TransferQuery& query,
                   OrthancInstancesCache& cache,
                   size_t threadsCount,
                   size_t targetBucketSize,
                   unsigned int maxHttpRetries) :
    StatefulOrthancJob(JOB_TYPE_PUSH),
    cache_(cache),
    query_(query),
    threadsCount_(threadsCount),
    targetBucketSize_(targetBucketSize),
    maxHttpRetries_(maxHttpRetries)
  {
    if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
    {
      LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer();
      throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
    }

    Json::Value serialized;
    query.Serialize(serialized);
    UpdateSerialized(serialized);
  }
}