Mercurial > hg > orthanc-databases
changeset 767:3327126233a6 pg-next-699
refactor reserver queue values to match the sqlite implementation
| author | Alain Mazy <am@orthanc.team> |
|---|---|
| date | Fri, 28 Nov 2025 12:46:02 +0100 |
| parents | de9603f25df4 |
| children | a07d176a4b58 |
| files | Framework/Plugins/IndexBackend.cpp Framework/Plugins/IndexUnitTests.h PostgreSQL/NEWS PostgreSQL/Plugins/SQL/PrepareIndex.sql PostgreSQL/Plugins/SQL/Upgrades/Rev6ToRev699.sql PostgreSQL/UnitTests/UnitTestsMain.cpp TODO |
| diffstat | 7 files changed, 80 insertions(+), 40 deletions(-) [+] |
line wrap: on
line diff
--- a/Framework/Plugins/IndexBackend.cpp Fri Nov 28 11:24:39 2025 +0100 +++ b/Framework/Plugins/IndexBackend.cpp Fri Nov 28 12:46:02 2025 +0100 @@ -39,6 +39,15 @@ namespace OrthancDatabases { + static int64_t GetSecondsSinceEpoch() + { + // https://www.boost.org/doc/libs/1_69_0/doc/html/date_time/examples.html#date_time.examples.seconds_since_epoch + static const boost::posix_time::ptime EPOCH(boost::gregorian::date(1970, 1, 1)); + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + return (now - EPOCH).total_seconds(); + } + + static std::string ConvertWildcardToLike(const std::string& query) { std::string s = query; @@ -4515,6 +4524,7 @@ std::unique_ptr<DatabaseManager::CachedStatement> statement; std::string queueIdParameter = formatter.GenerateParameter(queueId); + std::string nowParameter = formatter.GenerateParameter(GetSecondsSinceEpoch()); switch (manager.GetDialect()) { @@ -4523,18 +4533,17 @@ { statement.reset(new DatabaseManager::CachedStatement( STATEMENT_FROM_HERE, manager, - "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MIN(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil < now())) RETURNING value) " + "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MIN(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil <= " + nowParameter + ")) RETURNING value) " "SELECT value FROM poppedRows")); } else { statement.reset(new DatabaseManager::CachedStatement( STATEMENT_FROM_HERE, manager, - "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MAX(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil < now())) RETURNING value) " + "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MAX(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil <= " + nowParameter + ")) RETURNING value) " "SELECT value FROM poppedRows")); } break; - default: throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); } @@ -4585,6 +4594,7 @@ std::string queueIdParameter = formatter.GenerateParameter(queueId); std::string reserveTimeoutParameter = formatter.GenerateParameter(reserveTimeout); + std::string nowParameter = formatter.GenerateParameter(GetSecondsSinceEpoch()); std::string minMax = (fromFront ? "MIN" : "MAX"); std::string sql; @@ -4592,8 +4602,8 @@ switch (manager.GetDialect()) { case Dialect_PostgreSQL: - sql = "WITH RowToUpdate AS (SELECT " + minMax + "(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil < now())) " - " UPDATE Queues SET reservedUntil = now() + (" + reserveTimeoutParameter + "::text || ' seconds')::interval WHERE id IN (SELECT * FROM RowToUpdate) " + sql = "WITH RowToUpdate AS (SELECT " + minMax + "(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil <= " + nowParameter + ")) " + " UPDATE Queues SET reservedUntil = " + nowParameter + " + " + reserveTimeoutParameter + " WHERE id IN (SELECT * FROM RowToUpdate) " " RETURNING id, value;"; break; @@ -4629,13 +4639,16 @@ std::string queueIdParameter = formatter.GenerateParameter(queueId); std::string valueIdParameter = formatter.GenerateParameter(valueId); + int64_t now = GetSecondsSinceEpoch(); + std::string nowParameter = formatter.GenerateParameter(now); switch (manager.GetDialect()) { case Dialect_PostgreSQL: statement.reset(new DatabaseManager::CachedStatement( STATEMENT_FROM_HERE, manager, - "DELETE FROM Queues WHERE queueId=" + queueIdParameter + " AND id=" + valueIdParameter)); + "DELETE FROM Queues WHERE queueId=" + queueIdParameter + " AND id=" + valueIdParameter + + " AND reservedUntil IS NOT NULL AND " + nowParameter + " < reservedUntil RETURNING id")); break; default: @@ -4643,6 +4656,10 @@ } statement->Execute(formatter.GetDictionary()); + if (statement->IsDone()) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource, "Unable to acknowledge a queue value. Has it expired ?"); + } } #endif
--- a/Framework/Plugins/IndexUnitTests.h Fri Nov 28 11:24:39 2025 +0100 +++ b/Framework/Plugins/IndexUnitTests.h Fri Nov 28 12:46:02 2025 +0100 @@ -1068,45 +1068,65 @@ #if ORTHANC_PLUGINS_HAS_RESERVE_QUEUE_VALUE == 1 { - manager->StartTransaction(TransactionType_ReadWrite); - - db.EnqueueValue(*manager, "test", "a"); - db.EnqueueValue(*manager, "test", "b"); - db.EnqueueValue(*manager, "test", "c"); - db.EnqueueValue(*manager, "test", "d"); - db.EnqueueValue(*manager, "test", "e"); - - - ASSERT_EQ(5u, db.GetQueueSize(*manager, "test")); - std::string value; uint64_t valueIdA, valueIdB, valueIdC, valueIdD, valueIdE, valueIdFail; - ASSERT_TRUE(db.ReserveQueueValue(value, valueIdA, *manager, "test", true, 1)); - ASSERT_EQ("a", value); - ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1)); - ASSERT_EQ("b", value); - ASSERT_TRUE(db.ReserveQueueValue(value, valueIdE, *manager, "test", false, 1)); - ASSERT_EQ("e", value); - ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1)); - ASSERT_EQ("d", value); + { + manager->StartTransaction(TransactionType_ReadWrite); + + db.EnqueueValue(*manager, "test", "a"); + db.EnqueueValue(*manager, "test", "b"); + db.EnqueueValue(*manager, "test", "c"); + db.EnqueueValue(*manager, "test", "d"); + db.EnqueueValue(*manager, "test", "e"); + + ASSERT_EQ(5u, db.GetQueueSize(*manager, "test")); - db.AcknowledgeQueueValue(*manager, "test", valueIdA); - db.AcknowledgeQueueValue(*manager, "test", valueIdE); - manager->CommitTransaction(); // NOW() is constant during a transaction -> we need to commit it to get a new NOW() value for the second part of the test + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdA, *manager, "test", true, 1000)); + ASSERT_EQ("a", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1)); + ASSERT_EQ("b", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdE, *manager, "test", false, 1)); + ASSERT_EQ("e", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1)); + ASSERT_EQ("d", value); + manager->CommitTransaction(); + } + + { + manager->StartTransaction(TransactionType_ReadWrite); + + db.AcknowledgeQueueValue(*manager, "test", valueIdA); + db.AcknowledgeQueueValue(*manager, "test", valueIdE); + manager->CommitTransaction(); + } Orthanc::SystemToolbox::USleep(2000000); // Wait 2 seconds -> b and d should be released - manager->StartTransaction(TransactionType_ReadWrite); - ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1)); - ASSERT_EQ("b", value); - ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1)); - ASSERT_EQ("d", value); - ASSERT_TRUE(db.ReserveQueueValue(value, valueIdC, *manager, "test", false, 1)); - ASSERT_EQ("c", value); - ASSERT_FALSE(db.ReserveQueueValue(value, valueIdFail, *manager, "test", false, 1)); + { + manager->StartTransaction(TransactionType_ReadWrite); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1)); + ASSERT_EQ("b", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1)); + ASSERT_EQ("d", value); + ASSERT_TRUE(db.ReserveQueueValue(value, valueIdC, *manager, "test", false, 1)); + ASSERT_EQ("c", value); + ASSERT_FALSE(db.ReserveQueueValue(value, valueIdFail, *manager, "test", false, 1)); - manager->CommitTransaction(); + manager->CommitTransaction(); + } + + Orthanc::SystemToolbox::USleep(2000000); // Wait 2 seconds -> b, c and d should be released + + // try to acknowledge a value after it has expired + { + manager->StartTransaction(TransactionType_ReadWrite); + + ASSERT_THROW(db.AcknowledgeQueueValue(*manager, "test", valueIdC), Orthanc::OrthancException); + + manager->CommitTransaction(); + } + } #endif
--- a/PostgreSQL/NEWS Fri Nov 28 11:24:39 2025 +0100 +++ b/PostgreSQL/NEWS Fri Nov 28 12:46:02 2025 +0100 @@ -31,6 +31,8 @@ * Added a new primary key column in the InvalidChildCounts and GlobalIntegersChanges tables. This new column is required for pg_repack to be able to reclaim space on these tables. +* Upgraded dependencies for static builds (notably on Windows and LSB): + - libpq 18.1 (replacing libpq 13.1) Release 9.0 (2025-08-13)
--- a/PostgreSQL/Plugins/SQL/PrepareIndex.sql Fri Nov 28 11:24:39 2025 +0100 +++ b/PostgreSQL/Plugins/SQL/PrepareIndex.sql Fri Nov 28 12:46:02 2025 +0100 @@ -803,7 +803,7 @@ id BIGSERIAL NOT NULL PRIMARY KEY, queueId TEXT NOT NULL, value BYTEA NOT NULL, - reservedUntil TIMESTAMP DEFAULT NULL + reservedUntil BIGINT DEFAULT NULL -- new in rev 699 ); CREATE INDEX IF NOT EXISTS QueuesIndex ON Queues (queueId, id);
--- a/PostgreSQL/Plugins/SQL/Upgrades/Rev6ToRev699.sql Fri Nov 28 11:24:39 2025 +0100 +++ b/PostgreSQL/Plugins/SQL/Upgrades/Rev6ToRev699.sql Fri Nov 28 12:46:02 2025 +0100 @@ -4,4 +4,4 @@ ALTER TABLE GlobalIntegersChanges ADD COLUMN pk BIGSERIAL PRIMARY KEY; -- Adding the queues timeout -ALTER TABLE Queues ADD COLUMN reservedUntil TIMESTAMP DEFAULT NULL; \ No newline at end of file +ALTER TABLE Queues ADD COLUMN reservedUntil BIGINT DEFAULT NULL; \ No newline at end of file
