Mercurial > hg > orthanc-databases
diff Framework/Plugins/StorageBackend.cpp @ 269:567761f0c1ea
fix issue #151: support of retries in the storage area plugins to deal with multiple writers
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 21 Apr 2021 17:54:31 +0200 |
parents | cd73e34d5411 |
children | 5931c2ff22ca |
line wrap: on
line diff
--- a/Framework/Plugins/StorageBackend.cpp Wed Apr 21 11:14:40 2021 +0200 +++ b/Framework/Plugins/StorageBackend.cpp Wed Apr 21 17:54:31 2021 +0200 @@ -32,6 +32,7 @@ #include <Logging.h> #include <OrthancException.h> +#include <boost/thread.hpp> #include <cassert> #include <limits> @@ -56,8 +57,34 @@ namespace OrthancDatabases { - StorageBackend::StorageBackend(IDatabaseFactory* factory) : - manager_(factory) + class StorageBackend::ReadWholeOperation : public StorageBackend::IDatabaseOperation + { + private: + IFileContentVisitor& visitor_; + const char* uuid_; + OrthancPluginContentType type_; + + public: + ReadWholeOperation(IFileContentVisitor& visitor, + const char* uuid, + OrthancPluginContentType type) : + visitor_(visitor), + uuid_(uuid), + type_(type) + { + } + + virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE + { + accessor.ReadWhole(visitor_, uuid_, type_); + } + }; + + + StorageBackend::StorageBackend(IDatabaseFactory* factory, + unsigned int maxRetries) : + manager_(factory), + maxRetries_(maxRetries) { } @@ -233,8 +260,7 @@ transaction.Commit(); } - - + static OrthancPluginContext* context_ = NULL; static std::unique_ptr<StorageBackend> backend_; @@ -245,6 +271,33 @@ int64_t size, OrthancPluginContentType type) { + class Operation : public StorageBackend::IDatabaseOperation + { + private: + const char* uuid_; + const void* content_; + int64_t size_; + OrthancPluginContentType type_; + + public: + Operation(const char* uuid, + const void* content, + int64_t size, + OrthancPluginContentType type) : + uuid_(uuid), + content_(content), + size_(size), + type_(type) + { + } + + virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE + { + accessor.Create(uuid_, content_, size_, type_); + } + }; + + try { if (backend_.get() == NULL) @@ -253,8 +306,8 @@ } else { - std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); - accessor->Create(uuid, content, static_cast<size_t>(size), type); + Operation operation(uuid, content, size, type); + backend_->Execute(operation); return OrthancPluginErrorCode_Success; } } @@ -327,8 +380,8 @@ Visitor visitor(target); { - std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); - accessor->ReadWhole(visitor, uuid, type); + StorageBackend::ReadWholeOperation operation(visitor, uuid, type); + backend_->Execute(operation); } return OrthancPluginErrorCode_Success; @@ -389,6 +442,36 @@ }; + class Operation : public StorageBackend::IDatabaseOperation + { + private: + Visitor& visitor_; + const char* uuid_; + OrthancPluginContentType type_; + uint64_t start_; + size_t length_; + + public: + Operation(Visitor& visitor, + const char* uuid, + OrthancPluginContentType type, + uint64_t start, + size_t length) : + visitor_(visitor), + uuid_(uuid), + type_(type), + start_(start), + length_(length) + { + } + + virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE + { + accessor.ReadRange(visitor_, uuid_, type_, start_, length_); + } + }; + + try { if (backend_.get() == NULL) @@ -400,8 +483,8 @@ Visitor visitor(target); { - std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); - accessor->ReadRange(visitor, uuid, type, start, target->size); + Operation operation(visitor, uuid, type, start, target->size); + backend_->Execute(operation); } return OrthancPluginErrorCode_Success; @@ -511,8 +594,8 @@ Visitor visitor(data, size); { - std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); - accessor->ReadWhole(visitor, uuid, type); + StorageBackend::ReadWholeOperation operation(visitor, uuid, type); + backend_->Execute(operation); } visitor.Release(); @@ -527,6 +610,27 @@ static OrthancPluginErrorCode StorageRemove(const char* uuid, OrthancPluginContentType type) { + class Operation : public StorageBackend::IDatabaseOperation + { + private: + const char* uuid_; + OrthancPluginContentType type_; + + public: + Operation(const char* uuid, + OrthancPluginContentType type) : + uuid_(uuid), + type_(type) + { + } + + virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE + { + accessor.Remove(uuid_, type_); + } + }; + + try { if (backend_.get() == NULL) @@ -535,8 +639,8 @@ } else { - std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); - accessor->Remove(uuid, type); + Operation operation(uuid, type); + backend_->Execute(operation); return OrthancPluginErrorCode_Success; } } @@ -552,9 +656,8 @@ { throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); } - - if (context_ != NULL || - backend_.get() != NULL) + else if (context_ != NULL || + backend_.get() != NULL) { // This function can only be invoked once in the plugin throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); @@ -570,6 +673,9 @@ # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 0) if (OrthancPluginCheckVersionAdvanced(context, 1, 9, 0) == 1) { + LOG(WARNING) << "The storage area plugin will retry up to " << backend_->GetMaxRetries() + << " time(s) in the case of a collision"; + OrthancPluginStorageReadRange readRange = NULL; if (backend_->HasReadRange()) { @@ -661,4 +767,46 @@ throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); } } + + + void StorageBackend::Execute(IDatabaseOperation& operation) + { + std::unique_ptr<IAccessor> accessor(CreateAccessor()); + if (accessor.get() == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } + + unsigned int attempt = 0; + + for (;;) + { + try + { + operation.Execute(*accessor); + return; // Success + } + catch (Orthanc::OrthancException& e) + { + if (e.GetErrorCode() == Orthanc::ErrorCode_DatabaseCannotSerialize) + { + if (attempt >= maxRetries_) + { + throw; + } + else + { + attempt++; + + // The "rand()" adds some jitter to de-synchronize writers + boost::this_thread::sleep(boost::posix_time::milliseconds(100 * attempt + 5 * (rand() % 10))); + } + } + else + { + throw; + } + } + } + } }