view Framework/HttpQueries/HttpQueriesQueue.cpp @ 78:0898578d5708 OrthancTransfers-1.3 tip

closing OrthancTransfers-1.3
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 30 May 2024 22:45:33 +0200
parents f4e828607f02
children 1e396fb509ca
line wrap: on
line source

/**
 * Transfers accelerator plugin for Orthanc
 * Copyright (C) 2018-2021 Osimis S.A., Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Affero General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Affero General Public License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#include "HttpQueriesQueue.h"

#include <Logging.h>
#include <OrthancException.h>

namespace OrthancPlugins
{
  HttpQueriesQueue::Status HttpQueriesQueue::GetStatusInternal() const
  {
    if (successQueries_ == queries_.size())
    {
      return Status_Success;
    }
    else if (isFailure_)
    {
      return Status_Failure;
    }
    else
    {
      return Status_Running;
    }
  }


  HttpQueriesQueue::HttpQueriesQueue() :
    maxRetries_(0)
  {
    Reset();
  }

    
  HttpQueriesQueue::~HttpQueriesQueue()
  {
    for (size_t i = 0; i < queries_.size(); i++)
    {
      assert(queries_[i] != NULL);
      delete queries_[i];
    }
  }

    
  unsigned int HttpQueriesQueue::GetMaxRetries()
  {
    boost::mutex::scoped_lock lock(mutex_);
    return maxRetries_;
  }

    
  void HttpQueriesQueue::SetMaxRetries(unsigned int maxRetries)
  {
    boost::mutex::scoped_lock lock(mutex_);
    maxRetries_ = maxRetries;
  }

    
  void HttpQueriesQueue::Reserve(size_t size)
  {
    boost::mutex::scoped_lock lock(mutex_);
    queries_.reserve(size);
  }

    
  void HttpQueriesQueue::Reset()
  {
    boost::mutex::scoped_lock lock(mutex_);
    position_ = 0;
    downloadedSize_ = 0;
    uploadedSize_ = 0;
    successQueries_ = 0;
    isFailure_ = false;
  }
    

  void HttpQueriesQueue::Enqueue(IHttpQuery* query)
  {
    if (query == NULL)
    {
      throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
    }
    else
    {
      boost::mutex::scoped_lock lock(mutex_);
      queries_.push_back(query);
    }
  }
    

  bool HttpQueriesQueue::ExecuteOneQuery(size_t& networkTraffic)
  {
    networkTraffic = 0;
      
    unsigned int maxRetries;
    IHttpQuery* query = NULL;

    {
      boost::mutex::scoped_lock lock(mutex_);

      maxRetries = maxRetries_;
        
      if (position_ == queries_.size() ||
          isFailure_)
      {
        return false;
      }
      else
      {
        query = queries_[position_];
        position_ ++;
      }
    }

    std::string body;

    if (query->GetMethod() == Orthanc::HttpMethod_Post ||
        query->GetMethod() == Orthanc::HttpMethod_Put)
    {
      query->ReadBody(body);              
    }       
    
    std::map<std::string, std::string> headers;
    query->GetHttpHeaders(headers);

    unsigned int retry = 0;

    for (;;)
    {
      MemoryBuffer answer;

      bool success;

      try
      {
        switch (query->GetMethod())
        {
          case Orthanc::HttpMethod_Get:
            success = peers_.DoGet(answer, query->GetPeer(), query->GetUri(), headers);
            break;

          case Orthanc::HttpMethod_Post:
            success = peers_.DoPost(answer, query->GetPeer(), query->GetUri(), body, headers);
            break;

          case Orthanc::HttpMethod_Put:
            success = peers_.DoPut(query->GetPeer(), query->GetUri(), body, headers);
            break;

          case Orthanc::HttpMethod_Delete:
            success = peers_.DoDelete(query->GetPeer(), query->GetUri(), headers);
            break;

          default:
            throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
        }
      }
      catch (Orthanc::OrthancException& e)
      {
        LOG(ERROR) << "Unhandled exception during an HTTP query to peer \"" 
                   << query->GetPeer() <<  " " << query->GetUri() + "\": " << e.What();
        success = false;
      }

      if (success)
      {
        size_t downloaded = 0;
        size_t uploaded = 0;

        if (query->GetMethod() == Orthanc::HttpMethod_Get ||
            query->GetMethod() == Orthanc::HttpMethod_Post)
        {
          query->HandleAnswer(answer.GetData(), answer.GetSize());
          downloaded = answer.GetSize();
        }

        if (query->GetMethod() == Orthanc::HttpMethod_Put ||
            query->GetMethod() == Orthanc::HttpMethod_Post)
        {
          uploaded = body.size();
        }
          
        networkTraffic = downloaded + uploaded;
            
        {
          boost::mutex::scoped_lock lock(mutex_);
          downloadedSize_ += downloaded;
          uploadedSize_ += uploaded;
          successQueries_ ++;

          if (successQueries_ == queries_.size())
          {
            completed_.notify_all();
          }

          return true;
        }
      }
      else
      {
        // Error: Let's retry
        retry ++;

        if (retry <= maxRetries)
        {
          // Wait 1 second before retrying
          boost::this_thread::sleep(boost::posix_time::seconds(1));
        }
        else
        {
          if (maxRetries > 0)
          {
            LOG(ERROR) << "Reached the maximum number of retries for a HTTP query to peer " << query->GetPeer() <<  " " << query->GetUri();
          }

          {
            boost::mutex::scoped_lock lock(mutex_);
            isFailure_ = true;
            completed_.notify_all();
          }

          return false;
        }
      }
    }
  }


  HttpQueriesQueue::Status HttpQueriesQueue::WaitComplete(unsigned int timeoutMS)
  {
    boost::mutex::scoped_lock lock(mutex_);

    Status status = GetStatusInternal();

    if (status == Status_Running)
    {
      completed_.timed_wait(lock, boost::posix_time::milliseconds(timeoutMS));
      return GetStatusInternal();
    }
    else
    {
      return status;
    }
  }


  void HttpQueriesQueue::WaitComplete()
  {
    boost::mutex::scoped_lock lock(mutex_);

    while (GetStatusInternal() == Status_Running)
    {
      completed_.timed_wait(lock, boost::posix_time::milliseconds(200));
    }
  }


  void HttpQueriesQueue::GetStatistics(size_t& scheduledQueriesCount,
                                       size_t& successQueriesCount,
                                       uint64_t& downloadedSize,
                                       uint64_t& uploadedSize)
  {
    boost::mutex::scoped_lock lock(mutex_);
    scheduledQueriesCount = queries_.size();
    successQueriesCount = successQueries_;
    downloadedSize = downloadedSize_;
    uploadedSize = uploadedSize_;
  }
}