diff Framework/Plugins/DatabaseBackendAdapterV3.cpp @ 234:d1b124d116c1

PostgreSQL index plugin handles retries for collisions between multiple writers
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 08 Apr 2021 10:50:01 +0200
parents 7d46c99523a2
children f2b32d31fc99
line wrap: on
line diff
--- a/Framework/Plugins/DatabaseBackendAdapterV3.cpp	Tue Apr 06 15:07:27 2021 +0200
+++ b/Framework/Plugins/DatabaseBackendAdapterV3.cpp	Thu Apr 08 10:50:01 2021 +0200
@@ -61,28 +61,42 @@
   class DatabaseBackendAdapterV3::Adapter : public boost::noncopyable
   {
   private:
-    std::unique_ptr<IndexBackend>      backend_;
-    OrthancPluginContext*              context_;
-    boost::mutex                       managerMutex_;
-    std::unique_ptr<DatabaseManager>   manager_;
-
-    DatabaseManager& GetManager() const
+    class ManagerReference : public Orthanc::IDynamicObject
     {
-      if (manager_.get() == NULL)
+    private:
+      DatabaseManager*  manager_;
+
+    public:
+      ManagerReference(DatabaseManager& manager) :
+        manager_(&manager)
       {
-        throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
       }
-      else
+
+      DatabaseManager& GetManager()
       {
+        assert(manager_ != NULL);
         return *manager_;
       }
-    }    
+    };
+    
+    std::unique_ptr<IndexBackend>  backend_;
+    OrthancPluginContext*          context_;
+    boost::shared_mutex            connectionsMutex_;
+    size_t                         countConnections_;
+    std::list<DatabaseManager*>    connections_;
+    Orthanc::SharedMessageQueue    availableConnections_;
 
   public:
-    Adapter(IndexBackend* backend) :
-      backend_(backend)
+    Adapter(IndexBackend* backend,
+            size_t countConnections) :
+      backend_(backend),
+      countConnections_(countConnections)
     {
-      if (backend == NULL)
+      if (countConnections == 0)
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
+      }
+      else if (backend == NULL)
       {
         throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
       }
@@ -92,6 +106,16 @@
       }
     }
 
+    ~Adapter()
+    {
+      for (std::list<DatabaseManager*>::iterator
+             it = connections_.begin(); it != connections_.end(); ++it)
+      {
+        assert(*it != NULL);
+        delete *it;
+      }
+    }
+
     OrthancPluginContext* GetContext() const
     {
       return context_;
@@ -99,11 +123,28 @@
 
     void OpenConnections()
     {
-      boost::mutex::scoped_lock  lock(managerMutex_);
+      boost::unique_lock<boost::shared_mutex>  lock(connectionsMutex_);
+
+      if (connections_.size() == 0)
+      {
+        assert(backend_.get() != NULL);
+        
+        std::unique_ptr<IDatabase> database(backend_->OpenDatabaseConnection());
+        backend_->ConfigureDatabase(*database);
+
+        connections_.push_back(new DatabaseManager(database.release()));
 
-      if (manager_.get() == NULL)
-      {
-        manager_.reset(IndexBackend::CreateSingleDatabaseManager(*backend_));
+        for (size_t i = 1; i < countConnections_; i++)
+        {
+          connections_.push_back(new DatabaseManager(backend_->OpenDatabaseConnection()));
+        }
+
+        for (std::list<DatabaseManager*>::iterator
+               it = connections_.begin(); it != connections_.end(); ++it)
+        {
+          assert(*it != NULL);
+          availableConnections_.Enqueue(new ManagerReference(**it));
+        }        
       }
       else
       {
@@ -113,42 +154,66 @@
 
     void CloseConnections()
     {
-      boost::mutex::scoped_lock  lock(managerMutex_);
+      boost::unique_lock<boost::shared_mutex>  lock(connectionsMutex_);
 
-      if (manager_.get() == NULL)
+      if (connections_.size() != countConnections_)
       {
         throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
       }
+      else if (availableConnections_.GetSize() != countConnections_)
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_Database, "Some connections are still in use, bug in the Orthanc core");
+      }
       else
       {
-        manager_->Close();
-        manager_.reset(NULL);
+        for (std::list<DatabaseManager*>::iterator
+               it = connections_.begin(); it != connections_.end(); ++it)
+        {
+          assert(*it != NULL);
+          (*it)->Close();
+        }
       }
     }
 
     class DatabaseAccessor : public boost::noncopyable
     {
     private:
-      boost::mutex::scoped_lock  lock_;
-      IndexBackend&              backend_;
-      DatabaseManager&           manager_;
+      boost::shared_lock<boost::shared_mutex>  lock_;
+      Adapter&                                 adapter_;
+      DatabaseManager*                         manager_;
       
     public:
       DatabaseAccessor(Adapter& adapter) :
-        lock_(adapter.managerMutex_),
-        backend_(*adapter.backend_),
-        manager_(adapter.GetManager())
+        lock_(adapter.connectionsMutex_),
+        adapter_(adapter),
+        manager_(NULL)
       {
+        for (;;)
+        {
+          std::unique_ptr<Orthanc::IDynamicObject> manager(adapter.availableConnections_.Dequeue(100));
+          if (manager.get() != NULL)
+          {
+            manager_ = &dynamic_cast<ManagerReference&>(*manager).GetManager();
+            return;
+          }
+        }
+      }
+
+      ~DatabaseAccessor()
+      {
+        assert(manager_ != NULL);
+        adapter_.availableConnections_.Enqueue(new ManagerReference(*manager_));
       }
 
       IndexBackend& GetBackend() const
       {
-        return backend_;
+        return *adapter_.backend_;
       }
 
       DatabaseManager& GetManager() const
       {
-        return manager_;
+        assert(manager_ != NULL);
+        return *manager_;
       }
     };
   };
@@ -1862,7 +1927,9 @@
   }
 
     
-  void DatabaseBackendAdapterV3::Register(IndexBackend* backend)
+  void DatabaseBackendAdapterV3::Register(IndexBackend* backend,
+                                          size_t countConnections,
+                                          unsigned int maxDatabaseRetries)
   {
     if (isBackendInUse_)
     {
@@ -1948,7 +2015,9 @@
 
     OrthancPluginContext* context = backend->GetContext();
  
-    if (OrthancPluginRegisterDatabaseBackendV3(context, &params, sizeof(params), new Adapter(backend)) != OrthancPluginErrorCode_Success)
+    if (OrthancPluginRegisterDatabaseBackendV3(
+          context, &params, sizeof(params), maxDatabaseRetries,
+          new Adapter(backend, countConnections)) != OrthancPluginErrorCode_Success)
     {
       throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend");
     }