changeset 106:c9356e42af99

added TransferMode for S3 (currently affected by https://github.com/aws/aws-sdk-cpp/issues/2319 since we are using version 1.9.45)
author Alain Mazy <am@osimis.io>
date Mon, 09 Oct 2023 15:10:26 +0200
parents 0585b5de6be2
children 87972cfe7ac5
files Aws/AwsS3StoragePlugin.cpp Aws/AwsStaticConfiguration.cmake Aws/CMakeLists.txt NEWS
diffstat 4 files changed, 180 insertions(+), 25 deletions(-) [+]
line wrap: on
line diff
--- a/Aws/AwsS3StoragePlugin.cpp	Mon Oct 09 10:18:58 2023 +0200
+++ b/Aws/AwsS3StoragePlugin.cpp	Mon Oct 09 15:10:26 2023 +0200
@@ -25,11 +25,18 @@
 #include <aws/s3/model/ListObjectsRequest.h>
 #include <aws/s3/model/DeleteObjectRequest.h>
 #include <aws/core/auth/AWSCredentialsProvider.h>
+#include <aws/core/utils/HashingUtils.h>
+#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
 #include <aws/core/utils/memory/stl/AWSStringStream.h>
-#include <aws/core/utils/HashingUtils.h>
+#include <aws/core/utils/memory/AWSMemory.h>
+#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
+#include <aws/core/utils/StringUtils.h>
+#include <aws/transfer/TransferManager.h>
 #include <aws/crt/Api.h>
+#include <fstream>
 
 #include <boost/lexical_cast.hpp>
+#include <boost/interprocess/streams/bufferstream.hpp>
 #include <iostream>
 #include <fstream>
 
@@ -39,13 +46,16 @@
 {
 public:
 
-  Aws::S3::S3Client       client_;
   std::string             bucketName_;
   bool                    storageContainsUnknownFiles_;
+  bool                    useTransferManager_;
+  std::shared_ptr<Aws::S3::S3Client>               client_;
+  std::shared_ptr<Aws::Utils::Threading::Executor> executor_;
+  std::shared_ptr<Aws::Transfer::TransferManager>  transferManager_;
 
 public:
 
-  AwsS3StoragePlugin(const std::string& nameForLogs,  const Aws::S3::S3Client& client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles);
+  AwsS3StoragePlugin(const std::string& nameForLogs,  std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles, bool useTransferManager);
 
   virtual ~AwsS3StoragePlugin();
 
@@ -55,21 +65,21 @@
 };
 
 
-class Writer : public IStorage::IWriter
+class DirectWriter : public IStorage::IWriter
 {
   std::string             path_;
-  Aws::S3::S3Client       client_;
+  std::shared_ptr<Aws::S3::S3Client>       client_;
   std::string             bucketName_;
 
 public:
-  Writer(const Aws::S3::S3Client& client, const std::string& bucketName, const std::string& path)
+  DirectWriter(std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, const std::string& path)
     : path_(path),
       client_(client),
       bucketName_(bucketName)
   {
   }
 
-  virtual ~Writer()
+  virtual ~DirectWriter()
   {
   }
 
@@ -89,7 +99,7 @@
     putObjectRequest.SetBody(stream);
     putObjectRequest.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(Aws::Utils::HashingUtils::CalculateMD5(*stream)));
 
-    auto result = client_.PutObject(putObjectRequest);
+    auto result = client_->PutObject(putObjectRequest);
 
     if (!result.IsSuccess())
     {
@@ -99,15 +109,16 @@
 };
 
 
-class Reader : public IStorage::IReader
+class DirectReader : public IStorage::IReader
 {
-  Aws::S3::S3Client       client_;
+protected:
+  std::shared_ptr<Aws::S3::S3Client>       client_;
   std::string             bucketName_;
   std::list<std::string>  paths_;
   std::string             uuid_;
 
 public:
-  Reader(const Aws::S3::S3Client& client, const std::string& bucketName, const std::list<std::string>& paths, const char* uuid)
+  DirectReader(std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, const std::list<std::string>& paths, const char* uuid)
     : client_(client),
       bucketName_(bucketName),
       paths_(paths),
@@ -115,7 +126,7 @@
   {
   }
 
-  virtual ~Reader()
+  virtual ~DirectReader()
   {
 
   }
@@ -160,7 +171,7 @@
     listObjectRequest.SetBucket(bucketName_.c_str());
     listObjectRequest.SetPrefix(path.c_str());
 
-    auto result = client_.ListObjects(listObjectRequest);
+    auto result = client_->ListObjects(listObjectRequest);
 
     if (result.IsSuccess())
     {
@@ -232,7 +243,7 @@
     });
 
     // Get the object
-    auto result = client_.GetObject(getObjectRequest);
+    auto result = client_->GetObject(getObjectRequest);
     if (result.IsSuccess())
     {
     }
@@ -246,6 +257,111 @@
 
 
 
+class TransferWriter : public IStorage::IWriter
+{
+  std::string             path_;
+  std::shared_ptr<Aws::Transfer::TransferManager>  transferManager_;
+  std::string             bucketName_;
+
+public:
+  TransferWriter(std::shared_ptr<Aws::Transfer::TransferManager> transferManager, const std::string& bucketName, const std::string& path)
+    : path_(path),
+      transferManager_(transferManager),
+      bucketName_(bucketName)
+  {
+  }
+
+  virtual ~TransferWriter()
+  {
+  }
+
+  virtual void Write(const char* data, size_t size)
+  {
+    boost::interprocess::bufferstream buffer(const_cast<char*>(static_cast<const char*>(data)), static_cast<size_t>(size));
+    std::shared_ptr<Aws::IOStream> body = Aws::MakeShared<Aws::IOStream>(ALLOCATION_TAG, buffer.rdbuf());
+
+    std::shared_ptr<Aws::Transfer::TransferHandle> transferHandle = transferManager_->UploadFile(body, bucketName_, path_.c_str(), "application/binary", Aws::Map<Aws::String, Aws::String>());
+    transferHandle->WaitUntilFinished();
+
+    if (transferHandle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED)
+    {
+      throw StoragePluginException(std::string("error while writing file ") + path_ + ": response code = " + boost::lexical_cast<std::string>(static_cast<int>(transferHandle->GetLastError().GetResponseCode())) + " " + transferHandle->GetLastError().GetMessage());
+    }
+  }
+};
+
+
+class TransferReader : public DirectReader
+{
+  std::shared_ptr<Aws::Transfer::TransferManager>  transferManager_;
+
+public:
+  TransferReader(std::shared_ptr<Aws::Transfer::TransferManager> transferManager, std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, const std::list<std::string>& paths, const char* uuid)
+    : DirectReader(client, bucketName, paths, uuid),
+      transferManager_(transferManager)
+  {
+  }
+
+  virtual ~TransferReader()
+  {
+
+  }
+
+  virtual void ReadWhole(char* data, size_t size)
+  {
+    std::string firstExceptionMessage;
+
+    for (auto& path: paths_)
+    {
+      try
+      {
+        // The local variable 'streamBuffer' is captured by reference in a lambda.
+        // It must persist until all downloading by the 'transfer_manager' is complete.
+        Aws::Utils::Stream::PreallocatedStreamBuf streamBuffer(reinterpret_cast<unsigned char*>(data), size);
+
+        std::shared_ptr<Aws::Transfer::TransferHandle> downloadHandler = transferManager_->DownloadFile(bucketName_, path, [&]() { //Define a lambda expression for the callback method parameter to stream back the data.
+                    return Aws::New<Aws::IOStream>(ALLOCATION_TAG, &streamBuffer);
+                });
+        
+        downloadHandler->WaitUntilFinished();
+
+        if (downloadHandler->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED)
+        {
+          return;
+        }
+        else if (firstExceptionMessage.empty())
+        {
+          firstExceptionMessage = downloadHandler->GetLastError().GetMessage();
+        }
+    // getObjectRequest.SetResponseStreamFactory(
+    //       [data, size]()
+    // {
+    //   std::unique_ptr<Aws::StringStream>
+    //       istream(Aws::New<Aws::StringStream>(ALLOCATION_TAG));
+
+    //   istream->rdbuf()->pubsetbuf(static_cast<char*>(data),
+    //                               size);
+
+    //   return istream.release();
+    // });
+
+
+      }
+      catch (StoragePluginException& ex)
+      {
+        if (firstExceptionMessage.empty())
+        {
+          firstExceptionMessage = ex.what();
+        }
+        //ignore to retry
+      }
+    }
+    throw StoragePluginException(firstExceptionMessage);
+  }
+
+};
+
+
 
 
 const char* AwsS3StoragePluginFactory::GetStoragePluginName()
@@ -340,26 +456,29 @@
       configuration.caFile = caFile;
     }
     
+    bool useTransferManager = true; // new in v 2.3.0
+    pluginSection.LookupBooleanValue(useTransferManager, "UseTransferManager");
+
     if (pluginSection.LookupStringValue(accessKey, "AccessKey") && pluginSection.LookupStringValue(secretKey, "SecretKey"))
     {
       OrthancPlugins::LogInfo("AWS S3 Storage: using credentials from the configuration file");
       Aws::Auth::AWSCredentials credentials(accessKey.c_str(), secretKey.c_str());
       
-      Aws::S3::S3Client client(credentials, configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing);
+      std::shared_ptr<Aws::S3::S3Client> client = Aws::MakeShared<Aws::S3::S3Client>(ALLOCATION_TAG, credentials, configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing);
       
       OrthancPlugins::LogInfo("AWS S3 storage initialized");
 
-      return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles);
+      return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles, useTransferManager);
     } 
     else
     {
       // when using default credentials, credentials are not checked at startup but only the first time you try to access the bucket !
       OrthancPlugins::LogInfo("AWS S3 Storage: using default credentials provider");
-      Aws::S3::S3Client client(configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing);
+      std::shared_ptr<Aws::S3::S3Client> client = Aws::MakeShared<Aws::S3::S3Client>(ALLOCATION_TAG, configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing);
 
       OrthancPlugins::LogInfo("AWS S3 storage initialized");
 
-      return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles);
+      return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles, useTransferManager);
     }  
   }
   catch (const std::exception& e)
@@ -379,17 +498,33 @@
 }
 
 
-AwsS3StoragePlugin::AwsS3StoragePlugin(const std::string& nameForLogs, const Aws::S3::S3Client& client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles)
+AwsS3StoragePlugin::AwsS3StoragePlugin(const std::string& nameForLogs, std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles, bool useTransferManager)
   : BaseStorage(nameForLogs, enableLegacyStorageStructure),
-    client_(client),
     bucketName_(bucketName),
-    storageContainsUnknownFiles_(storageContainsUnknownFiles)
+    storageContainsUnknownFiles_(storageContainsUnknownFiles),
+    useTransferManager_(useTransferManager),
+    client_(client)
 {
+  if (useTransferManager_)
+  {
+    executor_ = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(ALLOCATION_TAG, 10);
+    Aws::Transfer::TransferManagerConfiguration transferConfig(executor_.get());
+    transferConfig.s3Client = client_;
+
+    transferManager_ = Aws::Transfer::TransferManager::Create(transferConfig);
+  }
 }
 
 IStorage::IWriter* AwsS3StoragePlugin::GetWriterForObject(const char* uuid, OrthancPluginContentType type, bool encryptionEnabled)
 {
-  return new Writer(client_, bucketName_, GetPath(uuid, type, encryptionEnabled));
+  if (useTransferManager_)
+  {
+    return new TransferWriter(transferManager_, bucketName_, GetPath(uuid, type, encryptionEnabled));
+  }
+  else
+  {
+    return new DirectWriter(client_, bucketName_, GetPath(uuid, type, encryptionEnabled));
+  }
 }
 
 IStorage::IReader* AwsS3StoragePlugin::GetReaderForObject(const char* uuid, OrthancPluginContentType type, bool encryptionEnabled)
@@ -401,7 +536,14 @@
     paths.push_back(GetPath(uuid, type, encryptionEnabled, true));
   }
 
-  return new Reader(client_, bucketName_, paths, uuid);
+  if (useTransferManager_)
+  {
+    return new TransferReader(transferManager_, client_, bucketName_, paths, uuid);
+  }
+  else
+  {
+    return new DirectReader(client_, bucketName_, paths, uuid);
+  }
 }
 
 void AwsS3StoragePlugin::DeleteObject(const char* uuid, OrthancPluginContentType type, bool encryptionEnabled)
@@ -422,7 +564,7 @@
     deleteObjectRequest.SetBucket(bucketName_.c_str());
     deleteObjectRequest.SetKey(path.c_str());
 
-    auto result = client_.DeleteObject(deleteObjectRequest);
+    auto result = client_->DeleteObject(deleteObjectRequest);
 
     if (!result.IsSuccess() && firstExceptionMessage.empty())  
     {
--- a/Aws/AwsStaticConfiguration.cmake	Mon Oct 09 10:18:58 2023 +0200
+++ b/Aws/AwsStaticConfiguration.cmake	Mon Oct 09 15:10:26 2023 +0200
@@ -156,6 +156,7 @@
   ${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-core/source/utils/xml/
   ${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-s3/source/
   ${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-s3/source/model/
+  ${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-transfer/source/transfer/
   )
 
 
--- a/Aws/CMakeLists.txt	Mon Oct 09 10:18:58 2023 +0200
+++ b/Aws/CMakeLists.txt	Mon Oct 09 15:10:26 2023 +0200
@@ -112,7 +112,7 @@
   # Use vcpkg by Microsoft
   option(BUILD_SHARED_LIBS "Build shared libraries" ON)
   find_package(cryptopp CONFIG REQUIRED)
-  find_package(AWSSDK REQUIRED COMPONENTS s3)
+  find_package(AWSSDK REQUIRED COMPONENTS s3 transfer)
   set(CRYPTOPP_LIBRARIES cryptopp-static)
   
 else()
--- a/NEWS	Mon Oct 09 10:18:58 2023 +0200
+++ b/NEWS	Mon Oct 09 15:10:26 2023 +0200
@@ -1,3 +1,15 @@
+Pending changes in the mainline
+===============================
+
+* AWS plugin:
+  * Added a new configuration "UseTransferMode" (true by default).
+    When set to true, the Transfer Manager mode is used to upload/download
+    whole files to/from the bucket.  If set to false, the default "object"
+    mode is used.  The Transfer Manager mode is supposed to be faster,
+    especially for large files.
+
+
+
 2023-07-20 - v 2.2.0
 ====================