Mercurial > hg > orthanc-databases
diff Framework/Plugins/DatabaseBackendAdapterV3.cpp @ 234:d1b124d116c1
PostgreSQL index plugin handles retries for collisions between multiple writers
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 08 Apr 2021 10:50:01 +0200 |
parents | 7d46c99523a2 |
children | f2b32d31fc99 |
line wrap: on
line diff
--- a/Framework/Plugins/DatabaseBackendAdapterV3.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/Plugins/DatabaseBackendAdapterV3.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -61,28 +61,42 @@ class DatabaseBackendAdapterV3::Adapter : public boost::noncopyable { private: - std::unique_ptr<IndexBackend> backend_; - OrthancPluginContext* context_; - boost::mutex managerMutex_; - std::unique_ptr<DatabaseManager> manager_; - - DatabaseManager& GetManager() const + class ManagerReference : public Orthanc::IDynamicObject { - if (manager_.get() == NULL) + private: + DatabaseManager* manager_; + + public: + ManagerReference(DatabaseManager& manager) : + manager_(&manager) { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } - else + + DatabaseManager& GetManager() { + assert(manager_ != NULL); return *manager_; } - } + }; + + std::unique_ptr<IndexBackend> backend_; + OrthancPluginContext* context_; + boost::shared_mutex connectionsMutex_; + size_t countConnections_; + std::list<DatabaseManager*> connections_; + Orthanc::SharedMessageQueue availableConnections_; public: - Adapter(IndexBackend* backend) : - backend_(backend) + Adapter(IndexBackend* backend, + size_t countConnections) : + backend_(backend), + countConnections_(countConnections) { - if (backend == NULL) + if (countConnections == 0) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + else if (backend == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); } @@ -92,6 +106,16 @@ } } + ~Adapter() + { + for (std::list<DatabaseManager*>::iterator + it = connections_.begin(); it != connections_.end(); ++it) + { + assert(*it != NULL); + delete *it; + } + } + OrthancPluginContext* GetContext() const { return context_; @@ -99,11 +123,28 @@ void OpenConnections() { - boost::mutex::scoped_lock lock(managerMutex_); + boost::unique_lock<boost::shared_mutex> lock(connectionsMutex_); + + if (connections_.size() == 0) + { + assert(backend_.get() != NULL); + + std::unique_ptr<IDatabase> database(backend_->OpenDatabaseConnection()); + backend_->ConfigureDatabase(*database); + + connections_.push_back(new DatabaseManager(database.release())); - if (manager_.get() == NULL) - { - manager_.reset(IndexBackend::CreateSingleDatabaseManager(*backend_)); + for (size_t i = 1; i < countConnections_; i++) + { + connections_.push_back(new DatabaseManager(backend_->OpenDatabaseConnection())); + } + + for (std::list<DatabaseManager*>::iterator + it = connections_.begin(); it != connections_.end(); ++it) + { + assert(*it != NULL); + availableConnections_.Enqueue(new ManagerReference(**it)); + } } else { @@ -113,42 +154,66 @@ void CloseConnections() { - boost::mutex::scoped_lock lock(managerMutex_); + boost::unique_lock<boost::shared_mutex> lock(connectionsMutex_); - if (manager_.get() == NULL) + if (connections_.size() != countConnections_) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } + else if (availableConnections_.GetSize() != countConnections_) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_Database, "Some connections are still in use, bug in the Orthanc core"); + } else { - manager_->Close(); - manager_.reset(NULL); + for (std::list<DatabaseManager*>::iterator + it = connections_.begin(); it != connections_.end(); ++it) + { + assert(*it != NULL); + (*it)->Close(); + } } } class DatabaseAccessor : public boost::noncopyable { private: - boost::mutex::scoped_lock lock_; - IndexBackend& backend_; - DatabaseManager& manager_; + boost::shared_lock<boost::shared_mutex> lock_; + Adapter& adapter_; + DatabaseManager* manager_; public: DatabaseAccessor(Adapter& adapter) : - lock_(adapter.managerMutex_), - backend_(*adapter.backend_), - manager_(adapter.GetManager()) + lock_(adapter.connectionsMutex_), + adapter_(adapter), + manager_(NULL) { + for (;;) + { + std::unique_ptr<Orthanc::IDynamicObject> manager(adapter.availableConnections_.Dequeue(100)); + if (manager.get() != NULL) + { + manager_ = &dynamic_cast<ManagerReference&>(*manager).GetManager(); + return; + } + } + } + + ~DatabaseAccessor() + { + assert(manager_ != NULL); + adapter_.availableConnections_.Enqueue(new ManagerReference(*manager_)); } IndexBackend& GetBackend() const { - return backend_; + return *adapter_.backend_; } DatabaseManager& GetManager() const { - return manager_; + assert(manager_ != NULL); + return *manager_; } }; }; @@ -1862,7 +1927,9 @@ } - void DatabaseBackendAdapterV3::Register(IndexBackend* backend) + void DatabaseBackendAdapterV3::Register(IndexBackend* backend, + size_t countConnections, + unsigned int maxDatabaseRetries) { if (isBackendInUse_) { @@ -1948,7 +2015,9 @@ OrthancPluginContext* context = backend->GetContext(); - if (OrthancPluginRegisterDatabaseBackendV3(context, ¶ms, sizeof(params), new Adapter(backend)) != OrthancPluginErrorCode_Success) + if (OrthancPluginRegisterDatabaseBackendV3( + context, ¶ms, sizeof(params), maxDatabaseRetries, + new Adapter(backend, countConnections)) != OrthancPluginErrorCode_Success) { throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend"); }