changeset 269:567761f0c1ea

fix issue #151: support of retries in the storage area plugins to deal with multiple writers
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 21 Apr 2021 17:54:31 +0200
parents 9b003f265a8f
children 5931c2ff22ca
files Framework/Plugins/StorageBackend.cpp Framework/Plugins/StorageBackend.h MySQL/Plugins/MySQLStorageArea.cpp MySQL/Plugins/StoragePlugin.cpp PostgreSQL/NEWS PostgreSQL/Plugins/PostgreSQLStorageArea.cpp PostgreSQL/Plugins/StoragePlugin.cpp
diffstat 7 files changed, 202 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/Framework/Plugins/StorageBackend.cpp	Wed Apr 21 11:14:40 2021 +0200
+++ b/Framework/Plugins/StorageBackend.cpp	Wed Apr 21 17:54:31 2021 +0200
@@ -32,6 +32,7 @@
 #include <Logging.h>
 #include <OrthancException.h>
 
+#include <boost/thread.hpp>
 #include <cassert>
 #include <limits>
 
@@ -56,8 +57,34 @@
 
 namespace OrthancDatabases
 {
-  StorageBackend::StorageBackend(IDatabaseFactory* factory) :
-    manager_(factory)
+  class StorageBackend::ReadWholeOperation : public StorageBackend::IDatabaseOperation
+  {
+  private:
+    IFileContentVisitor&      visitor_;
+    const char*               uuid_;
+    OrthancPluginContentType  type_;
+
+  public:
+    ReadWholeOperation(IFileContentVisitor& visitor,
+                       const char* uuid,
+                       OrthancPluginContentType type) :
+      visitor_(visitor),
+      uuid_(uuid),
+      type_(type)
+    {
+    }
+
+    virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
+    {
+      accessor.ReadWhole(visitor_, uuid_, type_);
+    }
+  };
+
+
+  StorageBackend::StorageBackend(IDatabaseFactory* factory,
+                                 unsigned int maxRetries) :
+    manager_(factory),
+    maxRetries_(maxRetries)
   {
   }
   
@@ -233,8 +260,7 @@
       
     transaction.Commit();
   }
-
-
+  
 
   static OrthancPluginContext* context_ = NULL;
   static std::unique_ptr<StorageBackend>  backend_;
@@ -245,6 +271,33 @@
                                               int64_t size,
                                               OrthancPluginContentType type)
   {
+    class Operation : public StorageBackend::IDatabaseOperation
+    {
+    private:
+      const char*               uuid_;
+      const void*               content_;
+      int64_t                   size_;
+      OrthancPluginContentType  type_;
+      
+    public:
+      Operation(const char* uuid,
+                const void* content,
+                int64_t size,
+                OrthancPluginContentType type) :
+        uuid_(uuid),
+        content_(content),
+        size_(size),
+        type_(type)
+      {
+      }
+      
+      virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
+      {
+        accessor.Create(uuid_, content_, size_, type_);
+      }
+    };
+
+
     try
     {
       if (backend_.get() == NULL)
@@ -253,8 +306,8 @@
       }
       else
       {
-        std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-        accessor->Create(uuid, content, static_cast<size_t>(size), type);
+        Operation operation(uuid, content, size, type);
+        backend_->Execute(operation);
         return OrthancPluginErrorCode_Success;
       }
     }
@@ -327,8 +380,8 @@
         Visitor visitor(target);
 
         {
-          std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-          accessor->ReadWhole(visitor, uuid, type);
+          StorageBackend::ReadWholeOperation operation(visitor, uuid, type);
+          backend_->Execute(operation);
         }
 
         return OrthancPluginErrorCode_Success;
@@ -389,6 +442,36 @@
     };
 
 
+    class Operation : public StorageBackend::IDatabaseOperation
+    {
+    private:
+      Visitor&                  visitor_;
+      const char*               uuid_;
+      OrthancPluginContentType  type_;
+      uint64_t                  start_;
+      size_t                    length_;
+
+    public:
+      Operation(Visitor& visitor,
+                const char* uuid,
+                OrthancPluginContentType type,
+                uint64_t start,
+                size_t length) :
+        visitor_(visitor),
+        uuid_(uuid),
+        type_(type),
+        start_(start),
+        length_(length)
+      {
+      }
+
+      virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
+      {
+        accessor.ReadRange(visitor_, uuid_, type_, start_, length_);
+      }
+    };
+
+
     try
     {
       if (backend_.get() == NULL)
@@ -400,8 +483,8 @@
         Visitor visitor(target);
 
         {
-          std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-          accessor->ReadRange(visitor, uuid, type, start, target->size);
+          Operation operation(visitor, uuid, type, start, target->size);
+          backend_->Execute(operation);
         }
 
         return OrthancPluginErrorCode_Success;
@@ -511,8 +594,8 @@
         Visitor visitor(data, size);
 
         {
-          std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-          accessor->ReadWhole(visitor, uuid, type);
+          StorageBackend::ReadWholeOperation operation(visitor, uuid, type);
+          backend_->Execute(operation);
         }
 
         visitor.Release();
@@ -527,6 +610,27 @@
   static OrthancPluginErrorCode StorageRemove(const char* uuid,
                                               OrthancPluginContentType type)
   {
+    class Operation : public StorageBackend::IDatabaseOperation
+    {
+    private:
+      const char*               uuid_;
+      OrthancPluginContentType  type_;
+      
+    public:
+      Operation(const char* uuid,
+                OrthancPluginContentType type) :
+        uuid_(uuid),
+        type_(type)
+      {
+      }
+      
+      virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
+      {
+        accessor.Remove(uuid_, type_);
+      }
+    };
+
+    
     try
     {
       if (backend_.get() == NULL)
@@ -535,8 +639,8 @@
       }
       else
       {
-        std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor());
-        accessor->Remove(uuid, type);
+        Operation operation(uuid, type);
+        backend_->Execute(operation);
         return OrthancPluginErrorCode_Success;
       }
     }
@@ -552,9 +656,8 @@
     {
       throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
     }
-    
-    if (context_ != NULL ||
-        backend_.get() != NULL)
+    else if (context_ != NULL ||
+             backend_.get() != NULL)
     {
       // This function can only be invoked once in the plugin
       throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
@@ -570,6 +673,9 @@
 #  if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 0)
       if (OrthancPluginCheckVersionAdvanced(context, 1, 9, 0) == 1)
       {
+        LOG(WARNING) << "The storage area plugin will retry up to " << backend_->GetMaxRetries()
+                     << " time(s) in the case of a collision";
+
         OrthancPluginStorageReadRange readRange = NULL;
         if (backend_->HasReadRange())
         {
@@ -661,4 +767,46 @@
       throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
     }
   }
+
+
+  void StorageBackend::Execute(IDatabaseOperation& operation)
+  {
+    std::unique_ptr<IAccessor> accessor(CreateAccessor());
+    if (accessor.get() == NULL)
+    {
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
+    }
+    
+    unsigned int attempt = 0;
+    
+    for (;;)
+    {
+      try
+      {
+        operation.Execute(*accessor);
+        return;  // Success
+      }
+      catch (Orthanc::OrthancException& e)
+      {
+        if (e.GetErrorCode() == Orthanc::ErrorCode_DatabaseCannotSerialize)
+        {
+          if (attempt >= maxRetries_)
+          {
+            throw;
+          }
+          else
+          {
+            attempt++;
+            
+            // The "rand()" adds some jitter to de-synchronize writers
+            boost::this_thread::sleep(boost::posix_time::milliseconds(100 * attempt + 5 * (rand() % 10)));
+          }
+        }
+        else
+        {
+          throw;
+        }
+      }
+    }
+  }
 }
--- a/Framework/Plugins/StorageBackend.h	Wed Apr 21 11:14:40 2021 +0200
+++ b/Framework/Plugins/StorageBackend.h	Wed Apr 21 17:54:31 2021 +0200
@@ -71,11 +71,30 @@
                           OrthancPluginContentType type) = 0;
     };
     
+    /**
+     * This class is similar to
+     * "Orthanc::StatelessDatabaseOperations": It handles retries of
+     * transactions in the case of collision between multiple
+     * readers/writers.
+     **/
+    class IDatabaseOperation : public boost::noncopyable
+    {
+    public:
+      virtual ~IDatabaseOperation()
+      {
+      }
+
+      virtual void Execute(IAccessor& accessor) = 0;
+    };
+
+    class ReadWholeOperation;
+
   private:
     class StringVisitor;
     
     boost::mutex      mutex_;
     DatabaseManager   manager_;
+    unsigned int      maxRetries_;
 
   protected:
     class AccessorBase : public IAccessor
@@ -118,7 +137,8 @@
     virtual bool HasReadRange() const = 0;
 
   public:
-    StorageBackend(IDatabaseFactory* factory);  // Takes ownership
+    StorageBackend(IDatabaseFactory* factory /* takes ownership */,
+                   unsigned int maxRetries);
 
     virtual ~StorageBackend()
     {
@@ -147,5 +167,12 @@
                                   OrthancPluginContentType type,
                                   uint64_t start,
                                   size_t length);
+
+    unsigned int GetMaxRetries() const
+    {
+      return maxRetries_;
+    }
+
+    void Execute(IDatabaseOperation& operation);
   };
 }
--- a/MySQL/Plugins/MySQLStorageArea.cpp	Wed Apr 21 11:14:40 2021 +0200
+++ b/MySQL/Plugins/MySQLStorageArea.cpp	Wed Apr 21 17:54:31 2021 +0200
@@ -88,7 +88,8 @@
 
   MySQLStorageArea::MySQLStorageArea(const MySQLParameters& parameters,
                                      bool clearAll) :
-    StorageBackend(MySQLDatabase::CreateDatabaseFactory(parameters))
+    StorageBackend(MySQLDatabase::CreateDatabaseFactory(parameters),
+                   parameters.GetMaxConnectionRetries())
   {
     {
       AccessorBase accessor(*this);
--- a/MySQL/Plugins/StoragePlugin.cpp	Wed Apr 21 11:14:40 2021 +0200
+++ b/MySQL/Plugins/StoragePlugin.cpp	Wed Apr 21 17:54:31 2021 +0200
@@ -63,8 +63,8 @@
     try
     {
       OrthancDatabases::MySQLParameters parameters(mysql, configuration);
-      OrthancDatabases::StorageBackend::Register
-        (context, new OrthancDatabases::MySQLStorageArea(parameters, false /* don't clear database */));
+      OrthancDatabases::StorageBackend::Register(
+        context, new OrthancDatabases::MySQLStorageArea(parameters, false /* don't clear database */));
     }
     catch (Orthanc::OrthancException& e)
     {
--- a/PostgreSQL/NEWS	Wed Apr 21 11:14:40 2021 +0200
+++ b/PostgreSQL/NEWS	Wed Apr 21 17:54:31 2021 +0200
@@ -8,6 +8,7 @@
 * Support of range reads for the storage area, from Orthanc SDK 1.9.0
 * Fix issue #193 (LSB binaries crash with PostgreSQL + SSL) by changeset
   in OrthancFramework: https://hg.orthanc-server.com/orthanc/rev/9a9118406484
+* Fix issue #151 (Storage failures when running with two instances and PG_LOCK=false)
 
 
 Release 3.3 (2020-12-14)
--- a/PostgreSQL/Plugins/PostgreSQLStorageArea.cpp	Wed Apr 21 11:14:40 2021 +0200
+++ b/PostgreSQL/Plugins/PostgreSQLStorageArea.cpp	Wed Apr 21 17:54:31 2021 +0200
@@ -71,7 +71,8 @@
 
   PostgreSQLStorageArea::PostgreSQLStorageArea(const PostgreSQLParameters& parameters,
                                                bool clearAll) :
-    StorageBackend(PostgreSQLDatabase::CreateDatabaseFactory(parameters))
+    StorageBackend(PostgreSQLDatabase::CreateDatabaseFactory(parameters),
+                   parameters.GetMaxConnectionRetries())
   {
     {
       AccessorBase accessor(*this);
--- a/PostgreSQL/Plugins/StoragePlugin.cpp	Wed Apr 21 11:14:40 2021 +0200
+++ b/PostgreSQL/Plugins/StoragePlugin.cpp	Wed Apr 21 17:54:31 2021 +0200
@@ -59,8 +59,8 @@
     try
     {
       OrthancDatabases::PostgreSQLParameters parameters(postgresql);
-      OrthancDatabases::StorageBackend::Register
-        (context, new OrthancDatabases::PostgreSQLStorageArea(parameters, false /* don't clear database */));
+      OrthancDatabases::StorageBackend::Register(
+        context, new OrthancDatabases::PostgreSQLStorageArea(parameters, false /* don't clear database */));
     }
     catch (Orthanc::OrthancException& e)
     {