changeset 234:d1b124d116c1

PostgreSQL index plugin handles retries for collisions between multiple writers
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 08 Apr 2021 10:50:01 +0200
parents 7d46c99523a2
children f2b32d31fc99
files Framework/Common/DatabaseManager.cpp Framework/MySQL/MySQLDatabase.cpp Framework/MySQL/MySQLTransaction.cpp Framework/Plugins/DatabaseBackendAdapterV3.cpp Framework/Plugins/DatabaseBackendAdapterV3.h Framework/Plugins/IndexBackend.cpp Framework/Plugins/IndexBackend.h Framework/PostgreSQL/PostgreSQLStatement.cpp Framework/PostgreSQL/PostgreSQLTransaction.cpp MySQL/Plugins/IndexPlugin.cpp MySQL/Plugins/MySQLIndex.cpp PostgreSQL/NEWS PostgreSQL/Plugins/IndexPlugin.cpp SQLite/Plugins/IndexPlugin.cpp
diffstat 14 files changed, 150 insertions(+), 45 deletions(-) [+]
line wrap: on
line diff
--- a/Framework/Common/DatabaseManager.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/Common/DatabaseManager.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -58,7 +58,8 @@
     
   void DatabaseManager::CloseIfUnavailable(Orthanc::ErrorCode e)
   {
-    if (e != Orthanc::ErrorCode_Success)
+    if (e != Orthanc::ErrorCode_Success &&
+        e != Orthanc::ErrorCode_DatabaseCannotSerialize)
     {
       transaction_.reset(NULL);
     }
@@ -209,7 +210,7 @@
   {
     if (transaction_.get() == NULL)
     {
-      LOG(ERROR) << "Cannot rollback a non-existing transaction";
+      LOG(INFO) << "Cannot rollback a non-existing transaction";
       throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
     }
     else
--- a/Framework/MySQL/MySQLDatabase.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/MySQL/MySQLDatabase.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -69,6 +69,10 @@
       {
         throw Orthanc::OrthancException(Orthanc::ErrorCode_DatabaseUnavailable);
       }
+      else if (error == ER_LOCK_DEADLOCK)
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_DatabaseCannotSerialize);
+      } 
       else
       {
         throw Orthanc::OrthancException(Orthanc::ErrorCode_Database);
@@ -627,6 +631,7 @@
       {
         std::unique_ptr<MySQLDatabase> db(new MySQLDatabase(parameters_));
         db->Open();
+        db->Execute("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE", false);
         return db.release();
       }
       
--- a/Framework/MySQL/MySQLTransaction.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/MySQL/MySQLTransaction.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -58,7 +58,7 @@
   {
     if (active_)
     {
-      LOG(WARNING) << "An active MySQL transaction was dismissed";
+      LOG(INFO) << "An active MySQL transaction was dismissed";
 
       try
       {
--- a/Framework/Plugins/DatabaseBackendAdapterV3.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/Plugins/DatabaseBackendAdapterV3.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -61,28 +61,42 @@
   class DatabaseBackendAdapterV3::Adapter : public boost::noncopyable
   {
   private:
-    std::unique_ptr<IndexBackend>      backend_;
-    OrthancPluginContext*              context_;
-    boost::mutex                       managerMutex_;
-    std::unique_ptr<DatabaseManager>   manager_;
-
-    DatabaseManager& GetManager() const
+    class ManagerReference : public Orthanc::IDynamicObject
     {
-      if (manager_.get() == NULL)
+    private:
+      DatabaseManager*  manager_;
+
+    public:
+      ManagerReference(DatabaseManager& manager) :
+        manager_(&manager)
       {
-        throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
       }
-      else
+
+      DatabaseManager& GetManager()
       {
+        assert(manager_ != NULL);
         return *manager_;
       }
-    }    
+    };
+    
+    std::unique_ptr<IndexBackend>  backend_;
+    OrthancPluginContext*          context_;
+    boost::shared_mutex            connectionsMutex_;
+    size_t                         countConnections_;
+    std::list<DatabaseManager*>    connections_;
+    Orthanc::SharedMessageQueue    availableConnections_;
 
   public:
-    Adapter(IndexBackend* backend) :
-      backend_(backend)
+    Adapter(IndexBackend* backend,
+            size_t countConnections) :
+      backend_(backend),
+      countConnections_(countConnections)
     {
-      if (backend == NULL)
+      if (countConnections == 0)
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
+      }
+      else if (backend == NULL)
       {
         throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
       }
@@ -92,6 +106,16 @@
       }
     }
 
+    ~Adapter()
+    {
+      for (std::list<DatabaseManager*>::iterator
+             it = connections_.begin(); it != connections_.end(); ++it)
+      {
+        assert(*it != NULL);
+        delete *it;
+      }
+    }
+
     OrthancPluginContext* GetContext() const
     {
       return context_;
@@ -99,11 +123,28 @@
 
     void OpenConnections()
     {
-      boost::mutex::scoped_lock  lock(managerMutex_);
+      boost::unique_lock<boost::shared_mutex>  lock(connectionsMutex_);
+
+      if (connections_.size() == 0)
+      {
+        assert(backend_.get() != NULL);
+        
+        std::unique_ptr<IDatabase> database(backend_->OpenDatabaseConnection());
+        backend_->ConfigureDatabase(*database);
+
+        connections_.push_back(new DatabaseManager(database.release()));
 
-      if (manager_.get() == NULL)
-      {
-        manager_.reset(IndexBackend::CreateSingleDatabaseManager(*backend_));
+        for (size_t i = 1; i < countConnections_; i++)
+        {
+          connections_.push_back(new DatabaseManager(backend_->OpenDatabaseConnection()));
+        }
+
+        for (std::list<DatabaseManager*>::iterator
+               it = connections_.begin(); it != connections_.end(); ++it)
+        {
+          assert(*it != NULL);
+          availableConnections_.Enqueue(new ManagerReference(**it));
+        }        
       }
       else
       {
@@ -113,42 +154,66 @@
 
     void CloseConnections()
     {
-      boost::mutex::scoped_lock  lock(managerMutex_);
+      boost::unique_lock<boost::shared_mutex>  lock(connectionsMutex_);
 
-      if (manager_.get() == NULL)
+      if (connections_.size() != countConnections_)
       {
         throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
       }
+      else if (availableConnections_.GetSize() != countConnections_)
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_Database, "Some connections are still in use, bug in the Orthanc core");
+      }
       else
       {
-        manager_->Close();
-        manager_.reset(NULL);
+        for (std::list<DatabaseManager*>::iterator
+               it = connections_.begin(); it != connections_.end(); ++it)
+        {
+          assert(*it != NULL);
+          (*it)->Close();
+        }
       }
     }
 
     class DatabaseAccessor : public boost::noncopyable
     {
     private:
-      boost::mutex::scoped_lock  lock_;
-      IndexBackend&              backend_;
-      DatabaseManager&           manager_;
+      boost::shared_lock<boost::shared_mutex>  lock_;
+      Adapter&                                 adapter_;
+      DatabaseManager*                         manager_;
       
     public:
       DatabaseAccessor(Adapter& adapter) :
-        lock_(adapter.managerMutex_),
-        backend_(*adapter.backend_),
-        manager_(adapter.GetManager())
+        lock_(adapter.connectionsMutex_),
+        adapter_(adapter),
+        manager_(NULL)
       {
+        for (;;)
+        {
+          std::unique_ptr<Orthanc::IDynamicObject> manager(adapter.availableConnections_.Dequeue(100));
+          if (manager.get() != NULL)
+          {
+            manager_ = &dynamic_cast<ManagerReference&>(*manager).GetManager();
+            return;
+          }
+        }
+      }
+
+      ~DatabaseAccessor()
+      {
+        assert(manager_ != NULL);
+        adapter_.availableConnections_.Enqueue(new ManagerReference(*manager_));
       }
 
       IndexBackend& GetBackend() const
       {
-        return backend_;
+        return *adapter_.backend_;
       }
 
       DatabaseManager& GetManager() const
       {
-        return manager_;
+        assert(manager_ != NULL);
+        return *manager_;
       }
     };
   };
@@ -1862,7 +1927,9 @@
   }
 
     
-  void DatabaseBackendAdapterV3::Register(IndexBackend* backend)
+  void DatabaseBackendAdapterV3::Register(IndexBackend* backend,
+                                          size_t countConnections,
+                                          unsigned int maxDatabaseRetries)
   {
     if (isBackendInUse_)
     {
@@ -1948,7 +2015,9 @@
 
     OrthancPluginContext* context = backend->GetContext();
  
-    if (OrthancPluginRegisterDatabaseBackendV3(context, &params, sizeof(params), new Adapter(backend)) != OrthancPluginErrorCode_Success)
+    if (OrthancPluginRegisterDatabaseBackendV3(
+          context, &params, sizeof(params), maxDatabaseRetries,
+          new Adapter(backend, countConnections)) != OrthancPluginErrorCode_Success)
     {
       throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend");
     }
--- a/Framework/Plugins/DatabaseBackendAdapterV3.h	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/Plugins/DatabaseBackendAdapterV3.h	Thu Apr 08 10:50:01 2021 +0200
@@ -57,7 +57,9 @@
       virtual IDatabaseBackendOutput* CreateOutput() ORTHANC_OVERRIDE;
     };
 
-    static void Register(IndexBackend* backend);
+    static void Register(IndexBackend* backend,
+                         size_t countConnections,
+                         unsigned int maxDatabaseRetries);
 
     static void Finalize();
   };
--- a/Framework/Plugins/IndexBackend.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/Plugins/IndexBackend.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -2275,7 +2275,9 @@
   }
 
 
-  void IndexBackend::Register(IndexBackend* backend)
+  void IndexBackend::Register(IndexBackend* backend,
+                              size_t countConnections,
+                              unsigned int maxDatabaseRetries)
   {
     if (backend == NULL)
     {
@@ -2288,7 +2290,7 @@
 #  if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 2)
     if (OrthancPluginCheckVersionAdvanced(backend->GetContext(), 1, 9, 2) == 1)
     {
-      OrthancDatabases::DatabaseBackendAdapterV3::Register(backend);
+      OrthancDatabases::DatabaseBackendAdapterV3::Register(backend, countConnections, maxDatabaseRetries);
       hasLoadedV3 = true;
     }
 #  endif
--- a/Framework/Plugins/IndexBackend.h	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/Plugins/IndexBackend.h	Thu Apr 08 10:50:01 2021 +0200
@@ -381,7 +381,15 @@
                                const char* hashSeries,
                                const char* hashInstance);
 
-    static void Register(IndexBackend* backend);
+    /**
+     * "maxDatabaseRetries" is to handle
+     * "OrthancPluginErrorCode_DatabaseCannotSerialize" if there is a
+     * collision multiple writers. "countConnections" and
+     * "maxDatabaseRetries" are only used if Orthanc >= 1.9.2.
+     **/
+    static void Register(IndexBackend* backend,
+                         size_t countConnections,
+                         unsigned int maxDatabaseRetries);
 
     static void Finalize();
 
--- a/Framework/PostgreSQL/PostgreSQLStatement.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/PostgreSQL/PostgreSQLStatement.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -278,7 +278,16 @@
                               1);
     }
 
-    if (result == NULL)
+    if (PQtransactionStatus(reinterpret_cast<PGconn*>(database_.pg_)) == PQTRANS_INERROR)
+    {
+      if (result != NULL)
+      {
+        PQclear(result);
+      }
+      
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_DatabaseCannotSerialize);
+    }
+    else if (result == NULL)
     {
       database_.ThrowException(true);
     }
--- a/Framework/PostgreSQL/PostgreSQLTransaction.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/PostgreSQL/PostgreSQLTransaction.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -42,7 +42,7 @@
   {
     if (isOpen_)
     {
-      LOG(WARNING) << "PostgreSQL: An active PostgreSQL transaction was dismissed";
+      LOG(INFO) << "PostgreSQL: An active PostgreSQL transaction was dismissed";
 
       try
       {
--- a/MySQL/Plugins/IndexPlugin.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/MySQL/Plugins/IndexPlugin.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -62,8 +62,12 @@
 
     try
     {
+      const size_t countConnections = 5;  // TODO - PARAMETER
+      const unsigned int maxDatabaseRetries = 10;  // TODO - PARAMETER
+      
       OrthancDatabases::MySQLParameters parameters(mysql, configuration);
-      OrthancDatabases::IndexBackend::Register(new OrthancDatabases::MySQLIndex(context, parameters));
+      OrthancDatabases::IndexBackend::Register(
+        new OrthancDatabases::MySQLIndex(context, parameters), countConnections, maxDatabaseRetries);
     }
     catch (Orthanc::OrthancException& e)
     {
--- a/MySQL/Plugins/MySQLIndex.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/MySQL/Plugins/MySQLIndex.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -91,8 +91,6 @@
     {
       MySQLDatabase::ClearDatabase(parameters_);
     }
-    
-    db.Execute("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE", false);
 
     {
       MySQLDatabase::TransientAdvisoryLock lock(db, MYSQL_LOCK_DATABASE_SETUP);
--- a/PostgreSQL/NEWS	Tue Apr 06 15:07:27 2021 +0200
+++ b/PostgreSQL/NEWS	Thu Apr 08 10:50:01 2021 +0200
@@ -1,6 +1,7 @@
 Pending changes in the mainline
 ===============================
 
+* Support of retries for collisions between multiple writers, from Orthanc SDK 1.9.2
 * Support of "OrthancPluginRegisterStorageArea2()" from Orthanc SDK 1.9.0
 
 
--- a/PostgreSQL/Plugins/IndexPlugin.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/PostgreSQL/Plugins/IndexPlugin.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -56,8 +56,12 @@
 
     try
     {
+      const size_t countConnections = 5;  // TODO - PARAMETER
+      const unsigned int maxDatabaseRetries = 10;  // TODO - PARAMETER
+      
       OrthancDatabases::PostgreSQLParameters parameters(postgresql);
-      OrthancDatabases::IndexBackend::Register(new OrthancDatabases::PostgreSQLIndex(context, parameters));
+      OrthancDatabases::IndexBackend::Register(
+        new OrthancDatabases::PostgreSQLIndex(context, parameters), countConnections, maxDatabaseRetries);
     }
     catch (Orthanc::OrthancException& e)
     {
--- a/SQLite/Plugins/IndexPlugin.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/SQLite/Plugins/IndexPlugin.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -60,7 +60,9 @@
     {
       /* Register the SQLite index into Orthanc */
       OrthancDatabases::IndexBackend::Register(
-        new OrthancDatabases::SQLiteIndex(context, "index.db"));  // TODO parameter
+        new OrthancDatabases::SQLiteIndex(context, "index.db"),   // TODO parameter
+        1 /* only 1 connection is possible with SQLite */,
+        0 /* no collision is possible, as SQLite has a global lock */);
     }
     catch (Orthanc::OrthancException& e)
     {