Mercurial > hg > orthanc-databases
changeset 751:2bf57bc22a30 pg-next-699 tip
added support for ReserveQueueValue + AcknowledgeQueueValue
author | Alain Mazy <am@orthanc.team> |
---|---|
date | Thu, 09 Oct 2025 17:20:42 +0200 |
parents | e7a353b083fb |
children | |
files | Framework/Plugins/DatabaseBackendAdapterV4.cpp Framework/Plugins/IDatabaseBackend.h Framework/Plugins/IndexBackend.cpp Framework/Plugins/IndexBackend.h Framework/Plugins/IndexUnitTests.h Framework/Plugins/MessagesToolbox.h MySQL/Plugins/MySQLIndex.h Odbc/Plugins/OdbcIndex.h PostgreSQL/NEWS PostgreSQL/Plugins/PostgreSQLIndex.h PostgreSQL/Plugins/SQL/PrepareIndex.sql PostgreSQL/Plugins/SQL/Upgrades/Rev6ToRev699.sql SQLite/Plugins/SQLiteIndex.cpp SQLite/Plugins/SQLiteIndex.h |
diffstat | 14 files changed, 313 insertions(+), 102 deletions(-) [+] |
line wrap: on
line diff
--- a/Framework/Plugins/DatabaseBackendAdapterV4.cpp Thu Oct 09 17:14:39 2025 +0200 +++ b/Framework/Plugins/DatabaseBackendAdapterV4.cpp Thu Oct 09 17:20:42 2025 +0200 @@ -464,6 +464,10 @@ response.mutable_get_system_information()->set_supports_queues(accessor.GetBackend().HasQueues()); #endif +#if ORTHANC_PLUGINS_HAS_EXTENDED_QUEUES == 1 + response.mutable_get_system_information()->set_supports_extended_queues(accessor.GetBackend().HasExtendedQueues()); +#endif + #if ORTHANC_PLUGINS_HAS_KEY_VALUE_STORES == 1 response.mutable_get_system_information()->set_supports_key_value_stores(accessor.GetBackend().HasKeyValueStores()); #endif @@ -1391,6 +1395,37 @@ #endif +#if ORTHANC_PLUGINS_HAS_EXTENDED_QUEUES == 1 + case Orthanc::DatabasePluginMessages::OPERATION_RESERVE_QUEUE_VALUE: + { + std::string value; + uint64_t valueId; + bool found = backend.ReserveQueueValue(value, valueId, manager, + request.reserve_queue_value().queue_id(), + request.reserve_queue_value().origin() == Orthanc::DatabasePluginMessages::QUEUE_ORIGIN_FRONT, + request.reserve_queue_value().release_timeout()); + response.mutable_reserve_queue_value()->set_found(found); + + if (found) + { + response.mutable_reserve_queue_value()->set_value(value); + response.mutable_reserve_queue_value()->set_value_id(valueId); + } + + break; + } + + case Orthanc::DatabasePluginMessages::OPERATION_ACKNOWLEDGE_QUEUE_VALUE: + { + backend.AcknowledgeQueueValue(manager, + request.acknowledge_queue_value().queue_id(), + request.acknowledge_queue_value().value_id()); + + break; + } + +#endif + #if ORTHANC_PLUGINS_HAS_ATTACHMENTS_CUSTOM_DATA == 1 case Orthanc::DatabasePluginMessages::OPERATION_GET_ATTACHMENT_CUSTOM_DATA: {
--- a/Framework/Plugins/IDatabaseBackend.h Thu Oct 09 17:14:39 2025 +0200 +++ b/Framework/Plugins/IDatabaseBackend.h Thu Oct 09 17:20:42 2025 +0200 @@ -139,6 +139,8 @@ virtual bool HasQueues() const = 0; + virtual bool HasExtendedQueues() const = 0; + virtual bool HasAuditLogs() const = 0; virtual void AddAttachment(DatabaseManager& manager, @@ -525,6 +527,20 @@ const std::string& queueId) = 0; #endif +#if ORTHANC_PLUGINS_HAS_EXTENDED_QUEUES == 1 + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + DatabaseManager& manager, + const std::string& queueId, + bool fromFront, + uint32_t reserveTimeout) = 0; + + virtual void AcknowledgeQueueValue(DatabaseManager& manager, + const std::string& queueId, + uint64_t valueId) = 0; + +#endif + #if ORTHANC_PLUGINS_HAS_ATTACHMENTS_CUSTOM_DATA == 1 virtual void GetAttachmentCustomData(std::string& customData, DatabaseManager& manager,
--- a/Framework/Plugins/IndexBackend.cpp Thu Oct 09 17:14:39 2025 +0200 +++ b/Framework/Plugins/IndexBackend.cpp Thu Oct 09 17:20:42 2025 +0200 @@ -4505,30 +4505,38 @@ } - bool IndexBackend::DequeueValueSQLite(std::string& value, - DatabaseManager& manager, - const std::string& queueId, - bool fromFront) + bool IndexBackend::DequeueValue(std::string& value, + DatabaseManager& manager, + const std::string& queueId, + bool fromFront) { - assert(manager.GetDialect() == Dialect_SQLite); - LookupFormatter formatter(manager.GetDialect()); std::unique_ptr<DatabaseManager::CachedStatement> statement; std::string queueIdParameter = formatter.GenerateParameter(queueId); - if (fromFront) + switch (manager.GetDialect()) { - statement.reset(new DatabaseManager::CachedStatement( - STATEMENT_FROM_HERE, manager, - "SELECT id, value FROM Queues WHERE queueId=" + queueIdParameter + " ORDER BY id ASC LIMIT 1")); - } - else - { - statement.reset(new DatabaseManager::CachedStatement( - STATEMENT_FROM_HERE, manager, - "SELECT id, value FROM Queues WHERE queueId=" + queueIdParameter + " ORDER BY id DESC LIMIT 1")); + case Dialect_PostgreSQL: + if (fromFront) + { + statement.reset(new DatabaseManager::CachedStatement( + STATEMENT_FROM_HERE, manager, + "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MIN(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil < now())) RETURNING value) " + "SELECT value FROM poppedRows")); + } + else + { + statement.reset(new DatabaseManager::CachedStatement( + STATEMENT_FROM_HERE, manager, + "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MAX(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil < now())) RETURNING value) " + "SELECT value FROM poppedRows")); + } + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); } statement->Execute(formatter.GetDictionary()); @@ -4539,83 +4547,12 @@ } else { - statement->SetResultFieldType(0, ValueType_Integer64); - statement->SetResultFieldType(1, ValueType_BinaryString); - - value = statement->ReadString(1); - - { - DatabaseManager::CachedStatement s2(STATEMENT_FROM_HERE, manager, - "DELETE FROM Queues WHERE id=${id}"); - - s2.SetParameterType("id", ValueType_Integer64); - - Dictionary args; - args.SetIntegerValue("id", statement->ReadInteger64(0)); - - s2.Execute(args); - } - + statement->SetResultFieldType(0, ValueType_BinaryString); + value = statement->ReadString(0); return true; } } - - bool IndexBackend::DequeueValue(std::string& value, - DatabaseManager& manager, - const std::string& queueId, - bool fromFront) - { - if (manager.GetDialect() == Dialect_SQLite) - { - return DequeueValueSQLite(value, manager, queueId, fromFront); - } - else - { - LookupFormatter formatter(manager.GetDialect()); - - std::unique_ptr<DatabaseManager::CachedStatement> statement; - - std::string queueIdParameter = formatter.GenerateParameter(queueId); - - switch (manager.GetDialect()) - { - case Dialect_PostgreSQL: - if (fromFront) - { - statement.reset(new DatabaseManager::CachedStatement( - STATEMENT_FROM_HERE, manager, - "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MIN(id) FROM Queues WHERE queueId=" + queueIdParameter + ") RETURNING value) " - "SELECT value FROM poppedRows")); - } - else - { - statement.reset(new DatabaseManager::CachedStatement( - STATEMENT_FROM_HERE, manager, - "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MAX(id) FROM Queues WHERE queueId=" + queueIdParameter + ") RETURNING value) " - "SELECT value FROM poppedRows")); - } - break; - - default: - throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); - } - - statement->Execute(formatter.GetDictionary()); - - if (statement->IsDone()) - { - return false; - } - else - { - statement->SetResultFieldType(0, ValueType_BinaryString); - value = statement->ReadString(0); - return true; - } - } - } - uint64_t IndexBackend::GetQueueSize(DatabaseManager& manager, const std::string& queueId) { @@ -4636,6 +4573,79 @@ } #endif +#if ORTHANC_PLUGINS_HAS_EXTENDED_QUEUES == 1 + bool IndexBackend::ReserveQueueValue(std::string& value, + uint64_t& valueId, + DatabaseManager& manager, + const std::string& queueId, + bool fromFront, + uint32_t reserveTimeout) + { + LookupFormatter formatter(manager.GetDialect()); + + std::string queueIdParameter = formatter.GenerateParameter(queueId); + std::string reserveTimeoutParameter = formatter.GenerateParameter(reserveTimeout); + + std::string minMax = (fromFront ? "MIN" : "MAX"); + std::string sql; + + switch (manager.GetDialect()) + { + case Dialect_PostgreSQL: + sql = "WITH RowToUpdate AS (SELECT " + minMax + "(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil < now())) " + " UPDATE Queues SET reservedUntil = now() + (" + reserveTimeoutParameter + "::text || ' seconds')::interval WHERE id IN (SELECT * FROM RowToUpdate) " + " RETURNING id, value;"; + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); + } + + DatabaseManager::CachedStatement statement(STATEMENT_FROM_HERE_DYNAMIC(sql), manager, sql); + statement.Execute(formatter.GetDictionary()); + + if (statement.IsDone()) + { + return false; + } + else + { + statement.SetResultFieldType(0, ValueType_Integer64); + valueId = statement.ReadInteger64(0); + + statement.SetResultFieldType(1, ValueType_BinaryString); + value = statement.ReadString(1); + return true; + } + } + + void IndexBackend::AcknowledgeQueueValue(DatabaseManager& manager, + const std::string& queueId, + uint64_t valueId) + { + LookupFormatter formatter(manager.GetDialect()); + + std::unique_ptr<DatabaseManager::CachedStatement> statement; + + std::string queueIdParameter = formatter.GenerateParameter(queueId); + std::string valueIdParameter = formatter.GenerateParameter(valueId); + + switch (manager.GetDialect()) + { + case Dialect_PostgreSQL: + statement.reset(new DatabaseManager::CachedStatement( + STATEMENT_FROM_HERE, manager, + "DELETE FROM Queues WHERE queueId=" + queueIdParameter + " AND id=" + valueIdParameter)); + break; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); + } + + statement->Execute(formatter.GetDictionary()); + } +#endif + #if ORTHANC_PLUGINS_HAS_ATTACHMENTS_CUSTOM_DATA == 1 void IndexBackend::GetAttachmentCustomData(std::string& customData, DatabaseManager& manager,
--- a/Framework/Plugins/IndexBackend.h Thu Oct 09 17:14:39 2025 +0200 +++ b/Framework/Plugins/IndexBackend.h Thu Oct 09 17:20:42 2025 +0200 @@ -84,13 +84,6 @@ const Dictionary& args, uint32_t limit); -#if ORTHANC_PLUGINS_HAS_QUEUES == 1 - bool DequeueValueSQLite(std::string& value, - DatabaseManager& manager, - const std::string& queueId, - bool fromFront); -#endif - public: explicit IndexBackend(OrthancPluginContext* context, bool readOnly, @@ -510,6 +503,20 @@ #endif +#if ORTHANC_PLUGINS_HAS_EXTENDED_QUEUES == 1 + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + DatabaseManager& manager, + const std::string& queueId, + bool fromFront, + uint32_t reserveTimeout) ORTHANC_OVERRIDE; + + virtual void AcknowledgeQueueValue(DatabaseManager& manager, + const std::string& queueId, + uint64_t valueId) ORTHANC_OVERRIDE; + +#endif + #if ORTHANC_PLUGINS_HAS_ATTACHMENTS_CUSTOM_DATA == 1 virtual void GetAttachmentCustomData(std::string& customData, DatabaseManager& manager,
--- a/Framework/Plugins/IndexUnitTests.h Thu Oct 09 17:14:39 2025 +0200 +++ b/Framework/Plugins/IndexUnitTests.h Thu Oct 09 17:20:42 2025 +0200 @@ -28,6 +28,7 @@ #include "GlobalProperties.h" #include <Compatibility.h> // For std::unique_ptr<> +#include <SystemToolbox.h> #include <gtest/gtest.h> #include <list> @@ -1065,5 +1066,49 @@ } #endif +#if ORTHANC_PLUGINS_HAS_EXTENDED_QUEUES == 1 + { + manager->StartTransaction(TransactionType_ReadWrite); + + db.EnqueueValue(*manager, "test", "a"); + db.EnqueueValue(*manager, "test", "b"); + db.EnqueueValue(*manager, "test", "c"); + db.EnqueueValue(*manager, "test", "d"); + db.EnqueueValue(*manager, "test", "e"); + + + ASSERT_EQ(5u, db.GetQueueSize(*manager, "test")); + + std::string value; + uint64_t valueIdA, valueIdB, valueIdC, valueIdD, valueIdE, valueIdFail; + + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdA, *manager, "test", true, 1)); + ASSERT_EQ("a", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1)); + ASSERT_EQ("b", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdE, *manager, "test", false, 1)); + ASSERT_EQ("e", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1)); + ASSERT_EQ("d", value); + + db.AcknowledgeQueueValue(*manager, "test", valueIdA); + db.AcknowledgeQueueValue(*manager, "test", valueIdE); + manager->CommitTransaction(); // NOW() is constant during a transaction -> we need to commit it to get a new NOW() value for the second part of the test + + Orthanc::SystemToolbox::USleep(2000000); // Wait 2 seconds -> b and d should be released + + manager->StartTransaction(TransactionType_ReadWrite); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1)); + ASSERT_EQ("b", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1)); + ASSERT_EQ("d", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdC, *manager, "test", false, 1)); + ASSERT_EQ("c", value); + ASSERT_FALSE(db.ReserveQueueValue(value, valueIdFail, *manager, "test", false, 1)); + + manager->CommitTransaction(); + } +#endif + manager->Close(); }
--- a/Framework/Plugins/MessagesToolbox.h Thu Oct 09 17:14:39 2025 +0200 +++ b/Framework/Plugins/MessagesToolbox.h Thu Oct 09 17:20:42 2025 +0200 @@ -54,6 +54,7 @@ #define ORTHANC_PLUGINS_HAS_ATTACHMENTS_CUSTOM_DATA 0 #define ORTHANC_PLUGINS_HAS_KEY_VALUE_STORES 0 #define ORTHANC_PLUGINS_HAS_QUEUES 0 +#define ORTHANC_PLUGINS_HAS_EXTENDED_QUEUES 0 #define ORTHANC_PLUGINS_HAS_AUDIT_LOGS 0 #if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) @@ -75,6 +76,13 @@ # endif #endif +#if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) +# if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 12, 10) +# undef ORTHANC_PLUGINS_HAS_EXTENDED_QUEUES +# define ORTHANC_PLUGINS_HAS_EXTENDED_QUEUES 1 +# endif +#endif + #include <Enumerations.h>
--- a/MySQL/Plugins/MySQLIndex.h Thu Oct 09 17:14:39 2025 +0200 +++ b/MySQL/Plugins/MySQLIndex.h Thu Oct 09 17:20:42 2025 +0200 @@ -76,6 +76,11 @@ return false; } + virtual bool HasExtendedQueues() const ORTHANC_OVERRIDE + { + return false; + } + virtual bool HasAuditLogs() const ORTHANC_OVERRIDE { return false;
--- a/Odbc/Plugins/OdbcIndex.h Thu Oct 09 17:14:39 2025 +0200 +++ b/Odbc/Plugins/OdbcIndex.h Thu Oct 09 17:20:42 2025 +0200 @@ -88,6 +88,11 @@ return false; } + virtual bool HasExtendedQueues() const ORTHANC_OVERRIDE + { + return false; + } + virtual bool HasAuditLogs() const ORTHANC_OVERRIDE { return false;
--- a/PostgreSQL/NEWS Thu Oct 09 17:14:39 2025 +0200 +++ b/PostgreSQL/NEWS Thu Oct 09 17:20:42 2025 +0200 @@ -3,9 +3,9 @@ DB schema revision: 6 Minimum plugin SDK (for build): 1.12.5 -Optimal plugin SDK (for build): 1.12.9 +Optimal plugin SDK (for build): 1.12.10 (TODO: update once released !) Minimum Orthanc runtime: 1.12.5 -Optimal Orthanc runtime: 1.12.9 +Optimal Orthanc runtime: 1.12.10 Minimal Postgresql Server version: 9 Optimal Postgresql Server version: 11+ @@ -20,10 +20,9 @@ schema or that you install the pg_trgm extension manually in the public schema. The plugin now calls 'SET search_path TO $schema' when opening a new connection to the DB. +* SDK: Added support for ReserveQueueValue and AcknowledgeQueueValue (new in SDK 1.12.10) Maintenance: -* Now verifying the DatabasePatchLevel (revision) in another transaction than - the one that upgrades the schema. * Added a new primary key column in the InvalidChildCounts and GlobalIntegersChanges tables. This new column is required for pg_repack to be able to reclaim space on these tables.
--- a/PostgreSQL/Plugins/PostgreSQLIndex.h Thu Oct 09 17:14:39 2025 +0200 +++ b/PostgreSQL/Plugins/PostgreSQLIndex.h Thu Oct 09 17:20:42 2025 +0200 @@ -87,6 +87,11 @@ return true; } + virtual bool HasExtendedQueues() const ORTHANC_OVERRIDE + { + return true; + } + virtual bool HasAuditLogs() const ORTHANC_OVERRIDE { return true;
--- a/PostgreSQL/Plugins/SQL/PrepareIndex.sql Thu Oct 09 17:14:39 2025 +0200 +++ b/PostgreSQL/Plugins/SQL/PrepareIndex.sql Thu Oct 09 17:20:42 2025 +0200 @@ -802,7 +802,8 @@ CREATE TABLE IF NOT EXISTS Queues ( id BIGSERIAL NOT NULL PRIMARY KEY, queueId TEXT NOT NULL, - value BYTEA NOT NULL + value BYTEA NOT NULL, + reservedUntil TIMESTAMP DEFAULT NULL ); CREATE INDEX IF NOT EXISTS QueuesIndex ON Queues (queueId, id);
--- a/PostgreSQL/Plugins/SQL/Upgrades/Rev6ToRev699.sql Thu Oct 09 17:14:39 2025 +0200 +++ b/PostgreSQL/Plugins/SQL/Upgrades/Rev6ToRev699.sql Thu Oct 09 17:20:42 2025 +0200 @@ -1,4 +1,7 @@ -- Adding a PK to these 2 table to allow pg_repack to process these tables, enabling reclaiming disk space and defragmenting the tables. ALTER TABLE InvalidChildCounts ADD COLUMN pk BIGSERIAL PRIMARY KEY; -ALTER TABLE GlobalIntegersChanges ADD COLUMN pk BIGSERIAL PRIMARY KEY; \ No newline at end of file +ALTER TABLE GlobalIntegersChanges ADD COLUMN pk BIGSERIAL PRIMARY KEY; + +-- Adding the queues timeout +ALTER TABLE Queues ADD COLUMN reservedUntil TIMESTAMP DEFAULT NULL; \ No newline at end of file
--- a/SQLite/Plugins/SQLiteIndex.cpp Thu Oct 09 17:14:39 2025 +0200 +++ b/SQLite/Plugins/SQLiteIndex.cpp Thu Oct 09 17:20:42 2025 +0200 @@ -299,4 +299,62 @@ throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); } #endif + +#if ORTHANC_PLUGINS_HAS_QUEUES == 1 + + bool IndexBackend::DequeueValue(std::string& value, + DatabaseManager& manager, + const std::string& queueId, + bool fromFront) + { + assert(manager.GetDialect() == Dialect_SQLite); + + LookupFormatter formatter(manager.GetDialect()); + + std::unique_ptr<DatabaseManager::CachedStatement> statement; + + std::string queueIdParameter = formatter.GenerateParameter(queueId); + + if (fromFront) + { + statement.reset(new DatabaseManager::CachedStatement( + STATEMENT_FROM_HERE, manager, + "SELECT id, value FROM Queues WHERE queueId=" + queueIdParameter + " ORDER BY id ASC LIMIT 1")); + } + else + { + statement.reset(new DatabaseManager::CachedStatement( + STATEMENT_FROM_HERE, manager, + "SELECT id, value FROM Queues WHERE queueId=" + queueIdParameter + " ORDER BY id DESC LIMIT 1")); + } + + statement->Execute(formatter.GetDictionary()); + + if (statement->IsDone()) + { + return false; + } + else + { + statement->SetResultFieldType(0, ValueType_Integer64); + statement->SetResultFieldType(1, ValueType_BinaryString); + + value = statement->ReadString(1); + + { + DatabaseManager::CachedStatement s2(STATEMENT_FROM_HERE, manager, + "DELETE FROM Queues WHERE id=${id}"); + + s2.SetParameterType("id", ValueType_Integer64); + + Dictionary args; + args.SetIntegerValue("id", statement->ReadInteger64(0)); + + s2.Execute(args); + } + + return true; + } + } +#endif }
--- a/SQLite/Plugins/SQLiteIndex.h Thu Oct 09 17:14:39 2025 +0200 +++ b/SQLite/Plugins/SQLiteIndex.h Thu Oct 09 17:20:42 2025 +0200 @@ -70,6 +70,11 @@ return true; } + virtual bool HasExtendedQueues() const ORTHANC_OVERRIDE + { + return false; + } + virtual bool HasAuditLogs() const ORTHANC_OVERRIDE { return false; @@ -108,5 +113,14 @@ { return false; } + +#if ORTHANC_PLUGINS_HAS_QUEUES == 1 + +virtual bool DequeueValue(std::string& value, + DatabaseManager& manager, + const std::string& queueId, + bool fromFront) ORTHANC_OVERRIDE; + +#endif }; }