view Framework/PushMode/ActivePushTransactions.cpp @ 37:9708addb5a87

added 'Resources' and 'Originator' in the jobs 'Content'
author Alain Mazy <am@osimis.io>
date Wed, 08 Jun 2022 17:11:13 +0200
parents 44a0430d7899
children 1e396fb509ca
line wrap: on
line source

/**
 * Transfers accelerator plugin for Orthanc
 * Copyright (C) 2018-2021 Osimis S.A., Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Affero General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Affero General Public License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#include "ActivePushTransactions.h"

#include "../DownloadArea.h"

#include <Compatibility.h>  // For std::unique_ptr
#include <Logging.h>


namespace OrthancPlugins
{
  class ActivePushTransactions::Transaction : public boost::noncopyable
  {
  private:
    DownloadArea                 area_;
    std::vector<TransferBucket>  buckets_;
    BucketCompression            compression_;

  public:
    Transaction(const std::vector<DicomInstanceInfo>& instances,
                const std::vector<TransferBucket>& buckets,
                BucketCompression compression) :
      area_(instances),
      buckets_(buckets),
      compression_(compression)
    {
    }

    DownloadArea& GetDownloadArea()
    {
      return area_;
    }

    BucketCompression GetCompression() const
    {
      return compression_;
    }

    const TransferBucket& GetBucket(size_t index) const
    {
      if (index >= buckets_.size())
      {
        throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
      }
      else
      {
        return buckets_[index];
      }
    }

    void Store(size_t bucketIndex,
               const void* data,
               size_t size)
    {
      area_.WriteBucket(GetBucket(bucketIndex), data, size, compression_);
    }
  };
    

  void ActivePushTransactions::FinalizeTransaction(const std::string& transactionUuid,
                                                   bool commit)
  {
    boost::mutex::scoped_lock  lock(mutex_);

    Content::iterator found = content_.find(transactionUuid);
    if (found == content_.end())
    {
      throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
    }

    assert(found->second != NULL);
    if (commit)
    {
      found->second->GetDownloadArea().Commit();
    }

    delete found->second;
    content_.erase(found);
    index_.Invalidate(transactionUuid);
  }


  ActivePushTransactions::~ActivePushTransactions()
  {
    for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
    {
      LOG(WARNING) << "Discarding an uncommitted push transaction "
                   << "in the transfers accelerator: " << it->first;
        
      assert(it->second != NULL);
      delete it->second;
    }
  }
    

  void ActivePushTransactions::ListTransactions(std::vector<std::string>& target)
  {
    boost::mutex::scoped_lock  lock(mutex_);

    target.clear();
    target.reserve(content_.size());

    for (Content::const_iterator it = content_.begin();
         it != content_.end(); ++it)
    {
      target.push_back(it->first);
    }
  }

  
  std::string ActivePushTransactions::CreateTransaction(const std::vector<DicomInstanceInfo>& instances,
                                                        const std::vector<TransferBucket>& buckets,
                                                        BucketCompression compression)
  {
    std::string uuid = Orthanc::Toolbox::GenerateUuid();
    std::unique_ptr<Transaction> tmp(new Transaction(instances, buckets, compression));

    LOG(INFO) << "Creating transaction to receive " << instances.size()
              << " instances (" << ConvertToMegabytes(tmp->GetDownloadArea().GetTotalSize())
              << "MB) in push mode: " << uuid;
      
    {
      boost::mutex::scoped_lock  lock(mutex_);

      // Drop the oldest active transaction, if not enough place
      if (content_.size() == maxSize_)
      {
        std::string oldest = index_.RemoveOldest();

        Content::iterator transaction = content_.find(oldest);
        assert(transaction != content_.end() &&
               transaction->second != NULL);

        delete transaction->second;
        content_.erase(transaction);

        LOG(WARNING) << "An inactive push transaction has been discarded: " << oldest;
      }

      index_.Add(uuid);
      content_[uuid] = tmp.release();
    }

    return uuid;
  }
    

  void ActivePushTransactions::Store(const std::string& transactionUuid,
                                     size_t bucketIndex,
                                     const void* data,
                                     size_t size)
  {
    boost::mutex::scoped_lock  lock(mutex_);

    Content::iterator found = content_.find(transactionUuid);
    if (found == content_.end())
    {
      throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
    }
      
    assert(found->second != NULL);

    index_.MakeMostRecent(transactionUuid);
      
    found->second->Store(bucketIndex, data, size);
  }
}