diff Framework/Plugins/StorageBackend.cpp @ 269:567761f0c1ea

fix issue #151: support of retries in the storage area plugins to deal with multiple writers
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 21 Apr 2021 17:54:31 +0200
parents cd73e34d5411
children 5931c2ff22ca
line wrap: on
line diff
--- a/Framework/Plugins/StorageBackend.cpp	Wed Apr 21 11:14:40 2021 +0200
+++ b/Framework/Plugins/StorageBackend.cpp	Wed Apr 21 17:54:31 2021 +0200
@@ -32,6 +32,7 @@
 #include <Logging.h>
 #include <OrthancException.h>
 
+#include <boost/thread.hpp>
 #include <cassert>
 #include <limits>
 
@@ -56,8 +57,34 @@
 
 namespace OrthancDatabases
 {
-  StorageBackend::StorageBackend(IDatabaseFactory* factory) :
-    manager_(factory)
+  class StorageBackend::ReadWholeOperation : public StorageBackend::IDatabaseOperation
+  {
+  private:
+    IFileContentVisitor&      visitor_;
+    const char*               uuid_;
+    OrthancPluginContentType  type_;
+
+  public:
+    ReadWholeOperation(IFileContentVisitor& visitor,
+                       const char* uuid,
+                       OrthancPluginContentType type) :
+      visitor_(visitor),
+      uuid_(uuid),
+      type_(type)
+    {
+    }
+
+    virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
+    {
+      accessor.ReadWhole(visitor_, uuid_, type_);
+    }
+  };
+
+
+  StorageBackend::StorageBackend(IDatabaseFactory* factory,
+                                 unsigned int maxRetries) :
+    manager_(factory),
+    maxRetries_(maxRetries)
   {
   }
   
@@ -233,8 +260,7 @@
       
     transaction.Commit();
   }
-
-
+  
 
   static OrthancPluginContext* context_ = NULL;
   static std::unique_ptr<StorageBackend>  backend_;
@@ -245,6 +271,33 @@
                                               int64_t size,
                                               OrthancPluginContentType type)
   {
+    class Operation : public StorageBackend::IDatabaseOperation
+    {
+    private:
+      const char*               uuid_;
+      const void*               content_;
+      int64_t                   size_;
+      OrthancPluginContentType  type_;
+      
+    public:
+      Operation(const char* uuid,
+                const void* content,
+                int64_t size,
+                OrthancPluginContentType type) :
+        uuid_(uuid),
+        content_(content),
+        size_(size),
+        type_(type)
+      {
+      }
+      
+      virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
+      {
+        accessor.Create(uuid_, content_, size_, type_);
+      }
+    };
+
+
     try
     {
       if (backend_.get() == NULL)
@@ -253,8 +306,8 @@
       }
       else
       {
-        std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-        accessor->Create(uuid, content, static_cast<size_t>(size), type);
+        Operation operation(uuid, content, size, type);
+        backend_->Execute(operation);
         return OrthancPluginErrorCode_Success;
       }
     }
@@ -327,8 +380,8 @@
         Visitor visitor(target);
 
         {
-          std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-          accessor->ReadWhole(visitor, uuid, type);
+          StorageBackend::ReadWholeOperation operation(visitor, uuid, type);
+          backend_->Execute(operation);
         }
 
         return OrthancPluginErrorCode_Success;
@@ -389,6 +442,36 @@
     };
 
 
+    class Operation : public StorageBackend::IDatabaseOperation
+    {
+    private:
+      Visitor&                  visitor_;
+      const char*               uuid_;
+      OrthancPluginContentType  type_;
+      uint64_t                  start_;
+      size_t                    length_;
+
+    public:
+      Operation(Visitor& visitor,
+                const char* uuid,
+                OrthancPluginContentType type,
+                uint64_t start,
+                size_t length) :
+        visitor_(visitor),
+        uuid_(uuid),
+        type_(type),
+        start_(start),
+        length_(length)
+      {
+      }
+
+      virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
+      {
+        accessor.ReadRange(visitor_, uuid_, type_, start_, length_);
+      }
+    };
+
+
     try
     {
       if (backend_.get() == NULL)
@@ -400,8 +483,8 @@
         Visitor visitor(target);
 
         {
-          std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-          accessor->ReadRange(visitor, uuid, type, start, target->size);
+          Operation operation(visitor, uuid, type, start, target->size);
+          backend_->Execute(operation);
         }
 
         return OrthancPluginErrorCode_Success;
@@ -511,8 +594,8 @@
         Visitor visitor(data, size);
 
         {
-          std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-          accessor->ReadWhole(visitor, uuid, type);
+          StorageBackend::ReadWholeOperation operation(visitor, uuid, type);
+          backend_->Execute(operation);
         }
 
         visitor.Release();
@@ -527,6 +610,27 @@
   static OrthancPluginErrorCode StorageRemove(const char* uuid,
                                               OrthancPluginContentType type)
   {
+    class Operation : public StorageBackend::IDatabaseOperation
+    {
+    private:
+      const char*               uuid_;
+      OrthancPluginContentType  type_;
+      
+    public:
+      Operation(const char* uuid,
+                OrthancPluginContentType type) :
+        uuid_(uuid),
+        type_(type)
+      {
+      }
+      
+      virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
+      {
+        accessor.Remove(uuid_, type_);
+      }
+    };
+
+    
     try
     {
       if (backend_.get() == NULL)
@@ -535,8 +639,8 @@
       }
       else
       {
-        std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-        accessor->Remove(uuid, type);
+        Operation operation(uuid, type);
+        backend_->Execute(operation);
         return OrthancPluginErrorCode_Success;
       }
     }
@@ -552,9 +656,8 @@
     {
       throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
     }
-    
-    if (context_ != NULL ||
-        backend_.get() != NULL)
+    else if (context_ != NULL ||
+             backend_.get() != NULL)
     {
       // This function can only be invoked once in the plugin
       throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
@@ -570,6 +673,9 @@
 #  if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 0)
       if (OrthancPluginCheckVersionAdvanced(context, 1, 9, 0) == 1)
       {
+        LOG(WARNING) << "The storage area plugin will retry up to " << backend_->GetMaxRetries()
+                     << " time(s) in the case of a collision";
+
         OrthancPluginStorageReadRange readRange = NULL;
         if (backend_->HasReadRange())
         {
@@ -661,4 +767,46 @@
       throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
     }
   }
+
+
+  void StorageBackend::Execute(IDatabaseOperation& operation)
+  {
+    std::unique_ptr<IAccessor> accessor(CreateAccessor());
+    if (accessor.get() == NULL)
+    {
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
+    }
+    
+    unsigned int attempt = 0;
+    
+    for (;;)
+    {
+      try
+      {
+        operation.Execute(*accessor);
+        return;  // Success
+      }
+      catch (Orthanc::OrthancException& e)
+      {
+        if (e.GetErrorCode() == Orthanc::ErrorCode_DatabaseCannotSerialize)
+        {
+          if (attempt >= maxRetries_)
+          {
+            throw;
+          }
+          else
+          {
+            attempt++;
+            
+            // The "rand()" adds some jitter to de-synchronize writers
+            boost::this_thread::sleep(boost::posix_time::milliseconds(100 * attempt + 5 * (rand() % 10)));
+          }
+        }
+        else
+        {
+          throw;
+        }
+      }
+    }
+  }
 }