view Framework/PushMode/PushJob.cpp @ 86:e3202f8f3db8 default tip

fix build by removing C++11 primitives
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 20 May 2025 10:11:13 +0200
parents 3a460db3e6cb
children
line wrap: on
line source

/**
 * Transfers accelerator plugin for Orthanc
 * Copyright (C) 2018-2023 Osimis S.A., Belgium
 * Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
 * Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Affero General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Affero General Public License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#include "PushJob.h"

#include "BucketPushQuery.h"
#include "../HttpQueries/HttpQueriesRunner.h"
#include "../TransferScheduler.h"

#include <boost/algorithm/string.hpp> // For boost::iequals and boost::split
#include <Compatibility.h> // For std::unique_ptr
#include <Logging.h>

namespace OrthancPlugins
{

  /**
   * This is a helper function to extract cookie name and value into a single
   * string from the response http headers.  The extracted cookie string can
   * then be used to set the "Cookie" header in subsequent requests.
   *
   * Orthanc appears not to support multiple "Set-Cookie" headers in the
   * response, so only the last `set-cookie` header is included in the headers
   * map. This means that if the server responds with multiple cookies, only the
   * last one will be extracted.
   *
   * This is a very simlified implementation for use by the Accelerator plugin.
   * It is not a substitute for a full cookie jar implementation. However, with
   * the Authenication plugin, a simplified implementation where any cookie from
   * the initial request can be used in the finite number of subsequent
   * requests is sufficient.
   */
  static std::string ExtractCookiesFromHeaders(const std::map<std::string, std::string> &headers)
  {
    // an array of cookies returned by the server
    std::vector<std::string> cookies;

    for (std::map<std::string, std::string>::const_iterator it = headers.begin(); it != headers.end(); ++it)
    {
      // Check if the header is a Set-Cookie header (case-insensitive)
      if (boost::iequals(it->first, "set-cookie"))
      {
        // Set-Cookie headers are formatted as:
        // Set-Cookie: <cookie-name>=<cookie-value>; <attributes>
        // or
        // Set-Cookie: <cookie-name>=<cookie-value>
        //
        // We only need the cookie name and value, so we split on ';'
        // and take the first part

        // Split the cookie string by ';' and take the first part (the actual cookie)
        std::vector<std::string> tokens;
        Orthanc::Toolbox::SplitString(tokens, it->second, ';');
        
        if (!tokens.empty())
        {
          std::string cookie = tokens[0];
          std::string trimmedCookie = boost::trim_copy(cookie);
          cookies.push_back(trimmedCookie);
        }
      }
    }

    // Join all cookies with "; "
    std::ostringstream result;
    for (size_t i = 0; i < cookies.size(); ++i)
    {
      if (i > 0)
      {
        result << "; ";
      }
      
      result << cookies[i];
    }

    return result.str();
  }

  class PushJob::FinalState : public IState
  {
  private:
    const PushJob&  job_;
    JobInfo&        info_;
    std::string     transactionUri_;
    bool            isCommit_;
    /**
     * Stores any cookies to be sent in the http request. These
     * cookies are obtained from the response headers of the
     * initialisation request for the push job.
     */
    std::string     cookieHeader_;

  public:
    FinalState(const PushJob& job,
               JobInfo& info,
               const std::string& transactionUri,
               bool isCommit,
               const std::string &cookieHeader) :
      job_(job),
      info_(info),
      transactionUri_(transactionUri),
      isCommit_(isCommit),
      cookieHeader_(cookieHeader)
    {
    }

    virtual StateUpdate* Step()
    {
      Json::Value answer;
      bool success = false;
      std::map<std::string, std::string> headers;
      job_.query_.GetHttpHeaders(headers);

      if (!cookieHeader_.empty())
      {
        headers["Cookie"] = cookieHeader_;
      }

      if (isCommit_)
      {
        success = DoPostPeer(answer, job_.peers_, job_.peerIndex_, transactionUri_ + "/commit", "", job_.maxHttpRetries_, headers, job_.commitTimeout_);
      }
      else
      {
        success = DoDeletePeer(job_.peers_, job_.peerIndex_, transactionUri_, job_.maxHttpRetries_, headers);
      }
        
      if (!success)
      {
        if (isCommit_)
        {
          LOG(ERROR) << "Cannot commit push transaction on remote peer: " // TODO: add job ID
                     << job_.query_.GetPeer();
        }
          
        return StateUpdate::Failure();
      }
      else if (isCommit_)
      {
        return StateUpdate::Success();
      }
      else
      {
        return StateUpdate::Failure();
      }
    }

    virtual void Stop(OrthancPluginJobStopReason reason)
    {
    }
  };


  class PushJob::PushBucketsState : public IState
  {
  private:
    const PushJob&                     job_;
    JobInfo&                           info_;
    std::string                        transactionUri_;
    HttpQueriesQueue                   queue_;
    std::unique_ptr<HttpQueriesRunner> runner_;
    /**
     * Stores any cookies to be sent in the http request. These
     * cookies are obtained from the response headers of the
     * initialisation request for the push job.
     */
    std::string                        cookieHeader_;

    void UpdateInfo()
    {
      size_t scheduledQueriesCount, completedQueriesCount;
      uint64_t uploadedSize, downloadedSize;
      queue_.GetStatistics(scheduledQueriesCount, completedQueriesCount, downloadedSize, uploadedSize);

      info_.SetContent("UploadedSizeMB", ConvertToMegabytes(uploadedSize));
      info_.SetContent("CompletedHttpQueries", static_cast<unsigned int>(completedQueriesCount));

      if (runner_.get() != NULL)
      {
        float speed;
        runner_->GetSpeed(speed);
        info_.SetContent("NetworkSpeedKBs", static_cast<unsigned int>(speed));
      }
            
      // The "2" below corresponds to the "CreateTransactionState"
      // and "FinalState" steps (which prevents division by zero)
      info_.SetProgress(static_cast<float>(1 /* CreateTransactionState */ + completedQueriesCount) / 
                        static_cast<float>(2 + scheduledQueriesCount));
    }

  public:
    PushBucketsState(const PushJob&  job,
                     JobInfo& info,
                     const std::string& transactionUri,
                     const std::vector<TransferBucket>& buckets,
                     const std::string& cookieHeader) : 
      job_(job),
      info_(info),
      transactionUri_(transactionUri),
      cookieHeader_(cookieHeader)
    {
      std::map<std::string, std::string> headers;
      job_.query_.GetHttpHeaders(headers);

      headers["Content-Type"] = "application/octet-stream";
      if (!cookieHeader_.empty())
      {
        headers["Cookie"] = cookieHeader_;
      }

      queue_.SetMaxRetries(job.maxHttpRetries_);
      queue_.Reserve(buckets.size());
        
      for (size_t i = 0; i < buckets.size(); i++)
      {
        queue_.Enqueue(new BucketPushQuery(job.cache_, buckets[i], job.query_.GetPeer(),
                                           transactionUri_, i, job.query_.GetCompression(), headers));
      }

      UpdateInfo();
    }
      
    virtual StateUpdate* Step()
    {
      if (runner_.get() == NULL)
      {
        runner_.reset(new HttpQueriesRunner(queue_, job_.threadsCount_));
      }

      HttpQueriesQueue::Status status = queue_.WaitComplete(200);

      UpdateInfo();

      switch (status)
      {
        case HttpQueriesQueue::Status_Running:
          return StateUpdate::Continue();

        case HttpQueriesQueue::Status_Success:
          // Commit transaction on remote peer
          return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, true, cookieHeader_));

        case HttpQueriesQueue::Status_Failure:
          // Discard transaction on remote peer
          return StateUpdate::Next(new FinalState(job_, info_, transactionUri_, false, cookieHeader_));

        default:
          throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
      }        
    }

    virtual void Stop(OrthancPluginJobStopReason reason)
    {
      // Cancel the running download threads
      runner_.reset();
    }
  };


  class PushJob::CreateTransactionState : public IState
  {
  private:
    const PushJob&                job_;
    JobInfo&                      info_;
    std::string                   createTransaction_;
    std::vector<TransferBucket>   buckets_;

  public:
    CreateTransactionState(const PushJob& job,
                           JobInfo& info) :
      job_(job),
      info_(info)
    {
      TransferScheduler scheduler;
      scheduler.ParseListOfResources(job_.cache_, job_.query_.GetResources());

      Json::Value push;      
      scheduler.FormatPushTransaction(push, buckets_,
                                      job.targetBucketSize_, 2 * job.targetBucketSize_,
                                      job_.query_.GetCompression());

      Orthanc::Toolbox::WriteFastJson(createTransaction_, push);

      info_.SetContent("Resources", job_.query_.GetResources());
      info_.SetContent("Peer", job_.query_.GetPeer());
      info_.SetContent("Compression", EnumerationToString(job_.query_.GetCompression()));
      info_.SetContent("TotalInstances", static_cast<unsigned int>(scheduler.GetInstancesCount()));
      info_.SetContent("TotalSizeMB", ConvertToMegabytes(scheduler.GetTotalSize()));
    }

    virtual StateUpdate* Step()
    {
      Json::Value answer;
      std::map<std::string, std::string> headers;
      std::map<std::string, std::string> answerHeaders;
      job_.query_.GetHttpHeaders(headers);

      headers["Content-Type"] = "application/json";

      if (!DoPostPeer(answer, answerHeaders, job_.peers_, job_.peerIndex_, URI_PUSH, createTransaction_, job_.maxHttpRetries_, headers, job_.commitTimeout_))
      {
        LOG(ERROR) << "Cannot create a push transaction to peer \"" 
                   << job_.query_.GetPeer()
                   << "\" (check that it has the transfers accelerator plugin installed)";
        return StateUpdate::Failure();
      } 

      if (answer.type() != Json::objectValue ||
          !answer.isMember(KEY_PATH) ||
          answer[KEY_PATH].type() != Json::stringValue)
      {
        LOG(ERROR) << "Bad network protocol from peer: " << job_.query_.GetPeer();
        return StateUpdate::Failure();
      }

      std::string transactionUri = answer[KEY_PATH].asString();
      /**
       * Some load balancers such as AWS Application Load Balancer use Sticky
       * Session Cookies, which are set by the load balancer on a request. If
       * subsequent requests include these cookies, then the load balancer will
       * route the request to the same backend server.
       *
       * This is important for the Accelerated Transfers plugin as the push
       * transactions are statefull, meaning all requests (initialisation, each
       * bucket and commit) must be sent to the same backend server, otherwise
       * the transfer job will fail.
       *
       * In order to support cookie based sticky sessions, we need to extract the
       * cookies from the initilisation request and include them in the
       * subsequent requests for this transfer job.answerHeaders
       *
       * Currently, the answerHeaders maps only contains 1 Set-Cookie headers
       * (the last one).  This is a limitation of the current HttpClient
       * implementation. Meaning that any subsequent requests in this transfer
       * job will only include the last `Set-Cookie` header from the initial
       * request.
       *
       * The cookieHeader is passed into each subsequent JobStates
       * (PushBucketsState and FinalState) so that the cookies are included in
       * the headers of each request.
       */
      std::string cookieHeader = ExtractCookiesFromHeaders(answerHeaders);

      return StateUpdate::Next(new PushBucketsState(job_, info_, transactionUri, buckets_, cookieHeader));
    }

    virtual void Stop(OrthancPluginJobStopReason reason)
    {
    }
  };


  StatefulOrthancJob::StateUpdate* PushJob::CreateInitialState(JobInfo& info)
  {
    return StateUpdate::Next(new CreateTransactionState(*this, info));
  }
    
    
  PushJob::PushJob(const TransferQuery& query,
                   OrthancInstancesCache& cache,
                   size_t threadsCount,
                   size_t targetBucketSize,
                   unsigned int maxHttpRetries,
                   unsigned int commitTimeout) :
    StatefulOrthancJob(JOB_TYPE_PUSH),
    cache_(cache),
    query_(query),
    threadsCount_(threadsCount),
    targetBucketSize_(targetBucketSize),
    maxHttpRetries_(maxHttpRetries),
    commitTimeout_(commitTimeout)
  {
    if (!peers_.LookupName(peerIndex_, query_.GetPeer()))
    {
      LOG(ERROR) << "Unknown Orthanc peer: " << query_.GetPeer();
      throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
    }

    Json::Value serialized;
    query.Serialize(serialized);
    UpdateSerialized(serialized);
  }
}