# HG changeset patch # User Alain Mazy # Date 1696857026 -7200 # Node ID c9356e42af99ccd1fe28a7a842a0f3622a05d3ac # Parent 0585b5de6be2cea93ae38745dad79420b78d3bc9 added TransferMode for S3 (currently affected by https://github.com/aws/aws-sdk-cpp/issues/2319 since we are using version 1.9.45) diff -r 0585b5de6be2 -r c9356e42af99 Aws/AwsS3StoragePlugin.cpp --- a/Aws/AwsS3StoragePlugin.cpp Mon Oct 09 10:18:58 2023 +0200 +++ b/Aws/AwsS3StoragePlugin.cpp Mon Oct 09 15:10:26 2023 +0200 @@ -25,11 +25,18 @@ #include #include #include +#include +#include #include -#include +#include +#include +#include +#include #include +#include #include +#include #include #include @@ -39,13 +46,16 @@ { public: - Aws::S3::S3Client client_; std::string bucketName_; bool storageContainsUnknownFiles_; + bool useTransferManager_; + std::shared_ptr client_; + std::shared_ptr executor_; + std::shared_ptr transferManager_; public: - AwsS3StoragePlugin(const std::string& nameForLogs, const Aws::S3::S3Client& client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles); + AwsS3StoragePlugin(const std::string& nameForLogs, std::shared_ptr client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles, bool useTransferManager); virtual ~AwsS3StoragePlugin(); @@ -55,21 +65,21 @@ }; -class Writer : public IStorage::IWriter +class DirectWriter : public IStorage::IWriter { std::string path_; - Aws::S3::S3Client client_; + std::shared_ptr client_; std::string bucketName_; public: - Writer(const Aws::S3::S3Client& client, const std::string& bucketName, const std::string& path) + DirectWriter(std::shared_ptr client, const std::string& bucketName, const std::string& path) : path_(path), client_(client), bucketName_(bucketName) { } - virtual ~Writer() + virtual ~DirectWriter() { } @@ -89,7 +99,7 @@ putObjectRequest.SetBody(stream); putObjectRequest.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(Aws::Utils::HashingUtils::CalculateMD5(*stream))); - auto result = client_.PutObject(putObjectRequest); + auto result = client_->PutObject(putObjectRequest); if (!result.IsSuccess()) { @@ -99,15 +109,16 @@ }; -class Reader : public IStorage::IReader +class DirectReader : public IStorage::IReader { - Aws::S3::S3Client client_; +protected: + std::shared_ptr client_; std::string bucketName_; std::list paths_; std::string uuid_; public: - Reader(const Aws::S3::S3Client& client, const std::string& bucketName, const std::list& paths, const char* uuid) + DirectReader(std::shared_ptr client, const std::string& bucketName, const std::list& paths, const char* uuid) : client_(client), bucketName_(bucketName), paths_(paths), @@ -115,7 +126,7 @@ { } - virtual ~Reader() + virtual ~DirectReader() { } @@ -160,7 +171,7 @@ listObjectRequest.SetBucket(bucketName_.c_str()); listObjectRequest.SetPrefix(path.c_str()); - auto result = client_.ListObjects(listObjectRequest); + auto result = client_->ListObjects(listObjectRequest); if (result.IsSuccess()) { @@ -232,7 +243,7 @@ }); // Get the object - auto result = client_.GetObject(getObjectRequest); + auto result = client_->GetObject(getObjectRequest); if (result.IsSuccess()) { } @@ -246,6 +257,111 @@ +class TransferWriter : public IStorage::IWriter +{ + std::string path_; + std::shared_ptr transferManager_; + std::string bucketName_; + +public: + TransferWriter(std::shared_ptr transferManager, const std::string& bucketName, const std::string& path) + : path_(path), + transferManager_(transferManager), + bucketName_(bucketName) + { + } + + virtual ~TransferWriter() + { + } + + virtual void Write(const char* data, size_t size) + { + boost::interprocess::bufferstream buffer(const_cast(static_cast(data)), static_cast(size)); + std::shared_ptr body = Aws::MakeShared(ALLOCATION_TAG, buffer.rdbuf()); + + std::shared_ptr transferHandle = transferManager_->UploadFile(body, bucketName_, path_.c_str(), "application/binary", Aws::Map()); + transferHandle->WaitUntilFinished(); + + if (transferHandle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) + { + throw StoragePluginException(std::string("error while writing file ") + path_ + ": response code = " + boost::lexical_cast(static_cast(transferHandle->GetLastError().GetResponseCode())) + " " + transferHandle->GetLastError().GetMessage()); + } + } +}; + + +class TransferReader : public DirectReader +{ + std::shared_ptr transferManager_; + +public: + TransferReader(std::shared_ptr transferManager, std::shared_ptr client, const std::string& bucketName, const std::list& paths, const char* uuid) + : DirectReader(client, bucketName, paths, uuid), + transferManager_(transferManager) + { + } + + virtual ~TransferReader() + { + + } + + virtual void ReadWhole(char* data, size_t size) + { + std::string firstExceptionMessage; + + for (auto& path: paths_) + { + try + { + // The local variable 'streamBuffer' is captured by reference in a lambda. + // It must persist until all downloading by the 'transfer_manager' is complete. + Aws::Utils::Stream::PreallocatedStreamBuf streamBuffer(reinterpret_cast(data), size); + + std::shared_ptr downloadHandler = transferManager_->DownloadFile(bucketName_, path, [&]() { //Define a lambda expression for the callback method parameter to stream back the data. + return Aws::New(ALLOCATION_TAG, &streamBuffer); + }); + + downloadHandler->WaitUntilFinished(); + + if (downloadHandler->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED) + { + return; + } + else if (firstExceptionMessage.empty()) + { + firstExceptionMessage = downloadHandler->GetLastError().GetMessage(); + } + // getObjectRequest.SetResponseStreamFactory( + // [data, size]() + // { + // std::unique_ptr + // istream(Aws::New(ALLOCATION_TAG)); + + // istream->rdbuf()->pubsetbuf(static_cast(data), + // size); + + // return istream.release(); + // }); + + + } + catch (StoragePluginException& ex) + { + if (firstExceptionMessage.empty()) + { + firstExceptionMessage = ex.what(); + } + //ignore to retry + } + } + throw StoragePluginException(firstExceptionMessage); + } + +}; + + const char* AwsS3StoragePluginFactory::GetStoragePluginName() @@ -340,26 +456,29 @@ configuration.caFile = caFile; } + bool useTransferManager = true; // new in v 2.3.0 + pluginSection.LookupBooleanValue(useTransferManager, "UseTransferManager"); + if (pluginSection.LookupStringValue(accessKey, "AccessKey") && pluginSection.LookupStringValue(secretKey, "SecretKey")) { OrthancPlugins::LogInfo("AWS S3 Storage: using credentials from the configuration file"); Aws::Auth::AWSCredentials credentials(accessKey.c_str(), secretKey.c_str()); - Aws::S3::S3Client client(credentials, configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing); + std::shared_ptr client = Aws::MakeShared(ALLOCATION_TAG, credentials, configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing); OrthancPlugins::LogInfo("AWS S3 storage initialized"); - return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles); + return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles, useTransferManager); } else { // when using default credentials, credentials are not checked at startup but only the first time you try to access the bucket ! OrthancPlugins::LogInfo("AWS S3 Storage: using default credentials provider"); - Aws::S3::S3Client client(configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing); + std::shared_ptr client = Aws::MakeShared(ALLOCATION_TAG, configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing); OrthancPlugins::LogInfo("AWS S3 storage initialized"); - return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles); + return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles, useTransferManager); } } catch (const std::exception& e) @@ -379,17 +498,33 @@ } -AwsS3StoragePlugin::AwsS3StoragePlugin(const std::string& nameForLogs, const Aws::S3::S3Client& client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles) +AwsS3StoragePlugin::AwsS3StoragePlugin(const std::string& nameForLogs, std::shared_ptr client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles, bool useTransferManager) : BaseStorage(nameForLogs, enableLegacyStorageStructure), - client_(client), bucketName_(bucketName), - storageContainsUnknownFiles_(storageContainsUnknownFiles) + storageContainsUnknownFiles_(storageContainsUnknownFiles), + useTransferManager_(useTransferManager), + client_(client) { + if (useTransferManager_) + { + executor_ = Aws::MakeShared(ALLOCATION_TAG, 10); + Aws::Transfer::TransferManagerConfiguration transferConfig(executor_.get()); + transferConfig.s3Client = client_; + + transferManager_ = Aws::Transfer::TransferManager::Create(transferConfig); + } } IStorage::IWriter* AwsS3StoragePlugin::GetWriterForObject(const char* uuid, OrthancPluginContentType type, bool encryptionEnabled) { - return new Writer(client_, bucketName_, GetPath(uuid, type, encryptionEnabled)); + if (useTransferManager_) + { + return new TransferWriter(transferManager_, bucketName_, GetPath(uuid, type, encryptionEnabled)); + } + else + { + return new DirectWriter(client_, bucketName_, GetPath(uuid, type, encryptionEnabled)); + } } IStorage::IReader* AwsS3StoragePlugin::GetReaderForObject(const char* uuid, OrthancPluginContentType type, bool encryptionEnabled) @@ -401,7 +536,14 @@ paths.push_back(GetPath(uuid, type, encryptionEnabled, true)); } - return new Reader(client_, bucketName_, paths, uuid); + if (useTransferManager_) + { + return new TransferReader(transferManager_, client_, bucketName_, paths, uuid); + } + else + { + return new DirectReader(client_, bucketName_, paths, uuid); + } } void AwsS3StoragePlugin::DeleteObject(const char* uuid, OrthancPluginContentType type, bool encryptionEnabled) @@ -422,7 +564,7 @@ deleteObjectRequest.SetBucket(bucketName_.c_str()); deleteObjectRequest.SetKey(path.c_str()); - auto result = client_.DeleteObject(deleteObjectRequest); + auto result = client_->DeleteObject(deleteObjectRequest); if (!result.IsSuccess() && firstExceptionMessage.empty()) { diff -r 0585b5de6be2 -r c9356e42af99 Aws/AwsStaticConfiguration.cmake --- a/Aws/AwsStaticConfiguration.cmake Mon Oct 09 10:18:58 2023 +0200 +++ b/Aws/AwsStaticConfiguration.cmake Mon Oct 09 15:10:26 2023 +0200 @@ -156,6 +156,7 @@ ${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-core/source/utils/xml/ ${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-s3/source/ ${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-s3/source/model/ + ${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-transfer/source/transfer/ ) diff -r 0585b5de6be2 -r c9356e42af99 Aws/CMakeLists.txt --- a/Aws/CMakeLists.txt Mon Oct 09 10:18:58 2023 +0200 +++ b/Aws/CMakeLists.txt Mon Oct 09 15:10:26 2023 +0200 @@ -112,7 +112,7 @@ # Use vcpkg by Microsoft option(BUILD_SHARED_LIBS "Build shared libraries" ON) find_package(cryptopp CONFIG REQUIRED) - find_package(AWSSDK REQUIRED COMPONENTS s3) + find_package(AWSSDK REQUIRED COMPONENTS s3 transfer) set(CRYPTOPP_LIBRARIES cryptopp-static) else() diff -r 0585b5de6be2 -r c9356e42af99 NEWS --- a/NEWS Mon Oct 09 10:18:58 2023 +0200 +++ b/NEWS Mon Oct 09 15:10:26 2023 +0200 @@ -1,3 +1,15 @@ +Pending changes in the mainline +=============================== + +* AWS plugin: + * Added a new configuration "UseTransferMode" (true by default). + When set to true, the Transfer Manager mode is used to upload/download + whole files to/from the bucket. If set to false, the default "object" + mode is used. The Transfer Manager mode is supposed to be faster, + especially for large files. + + + 2023-07-20 - v 2.2.0 ====================