Mercurial > hg > orthanc-databases
changeset 234:d1b124d116c1
PostgreSQL index plugin handles retries for collisions between multiple writers
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 08 Apr 2021 10:50:01 +0200 |
parents | 7d46c99523a2 |
children | f2b32d31fc99 |
files | Framework/Common/DatabaseManager.cpp Framework/MySQL/MySQLDatabase.cpp Framework/MySQL/MySQLTransaction.cpp Framework/Plugins/DatabaseBackendAdapterV3.cpp Framework/Plugins/DatabaseBackendAdapterV3.h Framework/Plugins/IndexBackend.cpp Framework/Plugins/IndexBackend.h Framework/PostgreSQL/PostgreSQLStatement.cpp Framework/PostgreSQL/PostgreSQLTransaction.cpp MySQL/Plugins/IndexPlugin.cpp MySQL/Plugins/MySQLIndex.cpp PostgreSQL/NEWS PostgreSQL/Plugins/IndexPlugin.cpp SQLite/Plugins/IndexPlugin.cpp |
diffstat | 14 files changed, 150 insertions(+), 45 deletions(-) [+] |
line wrap: on
line diff
--- a/Framework/Common/DatabaseManager.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/Common/DatabaseManager.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -58,7 +58,8 @@ void DatabaseManager::CloseIfUnavailable(Orthanc::ErrorCode e) { - if (e != Orthanc::ErrorCode_Success) + if (e != Orthanc::ErrorCode_Success && + e != Orthanc::ErrorCode_DatabaseCannotSerialize) { transaction_.reset(NULL); } @@ -209,7 +210,7 @@ { if (transaction_.get() == NULL) { - LOG(ERROR) << "Cannot rollback a non-existing transaction"; + LOG(INFO) << "Cannot rollback a non-existing transaction"; throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } else
--- a/Framework/MySQL/MySQLDatabase.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/MySQL/MySQLDatabase.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -69,6 +69,10 @@ { throw Orthanc::OrthancException(Orthanc::ErrorCode_DatabaseUnavailable); } + else if (error == ER_LOCK_DEADLOCK) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_DatabaseCannotSerialize); + } else { throw Orthanc::OrthancException(Orthanc::ErrorCode_Database); @@ -627,6 +631,7 @@ { std::unique_ptr<MySQLDatabase> db(new MySQLDatabase(parameters_)); db->Open(); + db->Execute("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE", false); return db.release(); }
--- a/Framework/MySQL/MySQLTransaction.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/MySQL/MySQLTransaction.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -58,7 +58,7 @@ { if (active_) { - LOG(WARNING) << "An active MySQL transaction was dismissed"; + LOG(INFO) << "An active MySQL transaction was dismissed"; try {
--- a/Framework/Plugins/DatabaseBackendAdapterV3.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/Plugins/DatabaseBackendAdapterV3.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -61,28 +61,42 @@ class DatabaseBackendAdapterV3::Adapter : public boost::noncopyable { private: - std::unique_ptr<IndexBackend> backend_; - OrthancPluginContext* context_; - boost::mutex managerMutex_; - std::unique_ptr<DatabaseManager> manager_; - - DatabaseManager& GetManager() const + class ManagerReference : public Orthanc::IDynamicObject { - if (manager_.get() == NULL) + private: + DatabaseManager* manager_; + + public: + ManagerReference(DatabaseManager& manager) : + manager_(&manager) { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } - else + + DatabaseManager& GetManager() { + assert(manager_ != NULL); return *manager_; } - } + }; + + std::unique_ptr<IndexBackend> backend_; + OrthancPluginContext* context_; + boost::shared_mutex connectionsMutex_; + size_t countConnections_; + std::list<DatabaseManager*> connections_; + Orthanc::SharedMessageQueue availableConnections_; public: - Adapter(IndexBackend* backend) : - backend_(backend) + Adapter(IndexBackend* backend, + size_t countConnections) : + backend_(backend), + countConnections_(countConnections) { - if (backend == NULL) + if (countConnections == 0) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); + } + else if (backend == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); } @@ -92,6 +106,16 @@ } } + ~Adapter() + { + for (std::list<DatabaseManager*>::iterator + it = connections_.begin(); it != connections_.end(); ++it) + { + assert(*it != NULL); + delete *it; + } + } + OrthancPluginContext* GetContext() const { return context_; @@ -99,11 +123,28 @@ void OpenConnections() { - boost::mutex::scoped_lock lock(managerMutex_); + boost::unique_lock<boost::shared_mutex> lock(connectionsMutex_); + + if (connections_.size() == 0) + { + assert(backend_.get() != NULL); + + std::unique_ptr<IDatabase> database(backend_->OpenDatabaseConnection()); + backend_->ConfigureDatabase(*database); + + connections_.push_back(new DatabaseManager(database.release())); - if (manager_.get() == NULL) - { - manager_.reset(IndexBackend::CreateSingleDatabaseManager(*backend_)); + for (size_t i = 1; i < countConnections_; i++) + { + connections_.push_back(new DatabaseManager(backend_->OpenDatabaseConnection())); + } + + for (std::list<DatabaseManager*>::iterator + it = connections_.begin(); it != connections_.end(); ++it) + { + assert(*it != NULL); + availableConnections_.Enqueue(new ManagerReference(**it)); + } } else { @@ -113,42 +154,66 @@ void CloseConnections() { - boost::mutex::scoped_lock lock(managerMutex_); + boost::unique_lock<boost::shared_mutex> lock(connectionsMutex_); - if (manager_.get() == NULL) + if (connections_.size() != countConnections_) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } + else if (availableConnections_.GetSize() != countConnections_) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_Database, "Some connections are still in use, bug in the Orthanc core"); + } else { - manager_->Close(); - manager_.reset(NULL); + for (std::list<DatabaseManager*>::iterator + it = connections_.begin(); it != connections_.end(); ++it) + { + assert(*it != NULL); + (*it)->Close(); + } } } class DatabaseAccessor : public boost::noncopyable { private: - boost::mutex::scoped_lock lock_; - IndexBackend& backend_; - DatabaseManager& manager_; + boost::shared_lock<boost::shared_mutex> lock_; + Adapter& adapter_; + DatabaseManager* manager_; public: DatabaseAccessor(Adapter& adapter) : - lock_(adapter.managerMutex_), - backend_(*adapter.backend_), - manager_(adapter.GetManager()) + lock_(adapter.connectionsMutex_), + adapter_(adapter), + manager_(NULL) { + for (;;) + { + std::unique_ptr<Orthanc::IDynamicObject> manager(adapter.availableConnections_.Dequeue(100)); + if (manager.get() != NULL) + { + manager_ = &dynamic_cast<ManagerReference&>(*manager).GetManager(); + return; + } + } + } + + ~DatabaseAccessor() + { + assert(manager_ != NULL); + adapter_.availableConnections_.Enqueue(new ManagerReference(*manager_)); } IndexBackend& GetBackend() const { - return backend_; + return *adapter_.backend_; } DatabaseManager& GetManager() const { - return manager_; + assert(manager_ != NULL); + return *manager_; } }; }; @@ -1862,7 +1927,9 @@ } - void DatabaseBackendAdapterV3::Register(IndexBackend* backend) + void DatabaseBackendAdapterV3::Register(IndexBackend* backend, + size_t countConnections, + unsigned int maxDatabaseRetries) { if (isBackendInUse_) { @@ -1948,7 +2015,9 @@ OrthancPluginContext* context = backend->GetContext(); - if (OrthancPluginRegisterDatabaseBackendV3(context, ¶ms, sizeof(params), new Adapter(backend)) != OrthancPluginErrorCode_Success) + if (OrthancPluginRegisterDatabaseBackendV3( + context, ¶ms, sizeof(params), maxDatabaseRetries, + new Adapter(backend, countConnections)) != OrthancPluginErrorCode_Success) { throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend"); }
--- a/Framework/Plugins/DatabaseBackendAdapterV3.h Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/Plugins/DatabaseBackendAdapterV3.h Thu Apr 08 10:50:01 2021 +0200 @@ -57,7 +57,9 @@ virtual IDatabaseBackendOutput* CreateOutput() ORTHANC_OVERRIDE; }; - static void Register(IndexBackend* backend); + static void Register(IndexBackend* backend, + size_t countConnections, + unsigned int maxDatabaseRetries); static void Finalize(); };
--- a/Framework/Plugins/IndexBackend.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/Plugins/IndexBackend.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -2275,7 +2275,9 @@ } - void IndexBackend::Register(IndexBackend* backend) + void IndexBackend::Register(IndexBackend* backend, + size_t countConnections, + unsigned int maxDatabaseRetries) { if (backend == NULL) { @@ -2288,7 +2290,7 @@ # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 2) if (OrthancPluginCheckVersionAdvanced(backend->GetContext(), 1, 9, 2) == 1) { - OrthancDatabases::DatabaseBackendAdapterV3::Register(backend); + OrthancDatabases::DatabaseBackendAdapterV3::Register(backend, countConnections, maxDatabaseRetries); hasLoadedV3 = true; } # endif
--- a/Framework/Plugins/IndexBackend.h Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/Plugins/IndexBackend.h Thu Apr 08 10:50:01 2021 +0200 @@ -381,7 +381,15 @@ const char* hashSeries, const char* hashInstance); - static void Register(IndexBackend* backend); + /** + * "maxDatabaseRetries" is to handle + * "OrthancPluginErrorCode_DatabaseCannotSerialize" if there is a + * collision multiple writers. "countConnections" and + * "maxDatabaseRetries" are only used if Orthanc >= 1.9.2. + **/ + static void Register(IndexBackend* backend, + size_t countConnections, + unsigned int maxDatabaseRetries); static void Finalize();
--- a/Framework/PostgreSQL/PostgreSQLStatement.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/PostgreSQL/PostgreSQLStatement.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -278,7 +278,16 @@ 1); } - if (result == NULL) + if (PQtransactionStatus(reinterpret_cast<PGconn*>(database_.pg_)) == PQTRANS_INERROR) + { + if (result != NULL) + { + PQclear(result); + } + + throw Orthanc::OrthancException(Orthanc::ErrorCode_DatabaseCannotSerialize); + } + else if (result == NULL) { database_.ThrowException(true); }
--- a/Framework/PostgreSQL/PostgreSQLTransaction.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/Framework/PostgreSQL/PostgreSQLTransaction.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -42,7 +42,7 @@ { if (isOpen_) { - LOG(WARNING) << "PostgreSQL: An active PostgreSQL transaction was dismissed"; + LOG(INFO) << "PostgreSQL: An active PostgreSQL transaction was dismissed"; try {
--- a/MySQL/Plugins/IndexPlugin.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/MySQL/Plugins/IndexPlugin.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -62,8 +62,12 @@ try { + const size_t countConnections = 5; // TODO - PARAMETER + const unsigned int maxDatabaseRetries = 10; // TODO - PARAMETER + OrthancDatabases::MySQLParameters parameters(mysql, configuration); - OrthancDatabases::IndexBackend::Register(new OrthancDatabases::MySQLIndex(context, parameters)); + OrthancDatabases::IndexBackend::Register( + new OrthancDatabases::MySQLIndex(context, parameters), countConnections, maxDatabaseRetries); } catch (Orthanc::OrthancException& e) {
--- a/MySQL/Plugins/MySQLIndex.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/MySQL/Plugins/MySQLIndex.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -91,8 +91,6 @@ { MySQLDatabase::ClearDatabase(parameters_); } - - db.Execute("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE", false); { MySQLDatabase::TransientAdvisoryLock lock(db, MYSQL_LOCK_DATABASE_SETUP);
--- a/PostgreSQL/NEWS Tue Apr 06 15:07:27 2021 +0200 +++ b/PostgreSQL/NEWS Thu Apr 08 10:50:01 2021 +0200 @@ -1,6 +1,7 @@ Pending changes in the mainline =============================== +* Support of retries for collisions between multiple writers, from Orthanc SDK 1.9.2 * Support of "OrthancPluginRegisterStorageArea2()" from Orthanc SDK 1.9.0
--- a/PostgreSQL/Plugins/IndexPlugin.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/PostgreSQL/Plugins/IndexPlugin.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -56,8 +56,12 @@ try { + const size_t countConnections = 5; // TODO - PARAMETER + const unsigned int maxDatabaseRetries = 10; // TODO - PARAMETER + OrthancDatabases::PostgreSQLParameters parameters(postgresql); - OrthancDatabases::IndexBackend::Register(new OrthancDatabases::PostgreSQLIndex(context, parameters)); + OrthancDatabases::IndexBackend::Register( + new OrthancDatabases::PostgreSQLIndex(context, parameters), countConnections, maxDatabaseRetries); } catch (Orthanc::OrthancException& e) {
--- a/SQLite/Plugins/IndexPlugin.cpp Tue Apr 06 15:07:27 2021 +0200 +++ b/SQLite/Plugins/IndexPlugin.cpp Thu Apr 08 10:50:01 2021 +0200 @@ -60,7 +60,9 @@ { /* Register the SQLite index into Orthanc */ OrthancDatabases::IndexBackend::Register( - new OrthancDatabases::SQLiteIndex(context, "index.db")); // TODO parameter + new OrthancDatabases::SQLiteIndex(context, "index.db"), // TODO parameter + 1 /* only 1 connection is possible with SQLite */, + 0 /* no collision is possible, as SQLite has a global lock */); } catch (Orthanc::OrthancException& e) {