changeset 767:3327126233a6 pg-next-699

refactor reserver queue values to match the sqlite implementation
author Alain Mazy <am@orthanc.team>
date Fri, 28 Nov 2025 12:46:02 +0100
parents de9603f25df4
children a07d176a4b58
files Framework/Plugins/IndexBackend.cpp Framework/Plugins/IndexUnitTests.h PostgreSQL/NEWS PostgreSQL/Plugins/SQL/PrepareIndex.sql PostgreSQL/Plugins/SQL/Upgrades/Rev6ToRev699.sql PostgreSQL/UnitTests/UnitTestsMain.cpp TODO
diffstat 7 files changed, 80 insertions(+), 40 deletions(-) [+]
line wrap: on
line diff
--- a/Framework/Plugins/IndexBackend.cpp	Fri Nov 28 11:24:39 2025 +0100
+++ b/Framework/Plugins/IndexBackend.cpp	Fri Nov 28 12:46:02 2025 +0100
@@ -39,6 +39,15 @@
 
 namespace OrthancDatabases
 {
+  static int64_t GetSecondsSinceEpoch()
+  {
+    // https://www.boost.org/doc/libs/1_69_0/doc/html/date_time/examples.html#date_time.examples.seconds_since_epoch
+    static const boost::posix_time::ptime EPOCH(boost::gregorian::date(1970, 1, 1));
+    const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
+    return (now - EPOCH).total_seconds();
+  }
+
+
   static std::string ConvertWildcardToLike(const std::string& query)
   {
     std::string s = query;
@@ -4515,6 +4524,7 @@
       std::unique_ptr<DatabaseManager::CachedStatement> statement;
 
       std::string queueIdParameter = formatter.GenerateParameter(queueId);
+      std::string nowParameter = formatter.GenerateParameter(GetSecondsSinceEpoch());
 
       switch (manager.GetDialect())
       {
@@ -4523,18 +4533,17 @@
           {
             statement.reset(new DatabaseManager::CachedStatement(
                               STATEMENT_FROM_HERE, manager,
-                              "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MIN(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil < now())) RETURNING value) "
+                              "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MIN(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil <= " + nowParameter + ")) RETURNING value) "
                               "SELECT value FROM poppedRows"));
           }
           else
           {
             statement.reset(new DatabaseManager::CachedStatement(
                               STATEMENT_FROM_HERE, manager,
-                              "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MAX(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil < now())) RETURNING value) "
+                              "WITH poppedRows AS (DELETE FROM Queues WHERE id = (SELECT MAX(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil <= " + nowParameter + ")) RETURNING value) "
                               "SELECT value FROM poppedRows"));
           }
           break;
-
         default:
           throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented);
       }
@@ -4585,6 +4594,7 @@
 
       std::string queueIdParameter = formatter.GenerateParameter(queueId);
       std::string reserveTimeoutParameter = formatter.GenerateParameter(reserveTimeout);
+      std::string nowParameter = formatter.GenerateParameter(GetSecondsSinceEpoch());
 
       std::string minMax = (fromFront ? "MIN" : "MAX");
       std::string sql;
@@ -4592,8 +4602,8 @@
       switch (manager.GetDialect())
       {
         case Dialect_PostgreSQL:
-          sql = "WITH RowToUpdate AS (SELECT " + minMax + "(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil < now())) "
-                "  UPDATE Queues SET reservedUntil = now() + (" + reserveTimeoutParameter + "::text || ' seconds')::interval WHERE id IN (SELECT * FROM RowToUpdate) "
+          sql = "WITH RowToUpdate AS (SELECT " + minMax + "(id) FROM Queues WHERE queueId=" + queueIdParameter + " AND (reservedUntil IS NULL OR reservedUntil <= " + nowParameter + ")) "
+                "  UPDATE Queues SET reservedUntil = " + nowParameter + " + " + reserveTimeoutParameter + " WHERE id IN (SELECT * FROM RowToUpdate) "
                 "  RETURNING id, value;";
           break;
 
@@ -4629,13 +4639,16 @@
 
       std::string queueIdParameter = formatter.GenerateParameter(queueId);
       std::string valueIdParameter = formatter.GenerateParameter(valueId);
+      int64_t now = GetSecondsSinceEpoch();
+      std::string nowParameter = formatter.GenerateParameter(now);
 
       switch (manager.GetDialect())
       {
         case Dialect_PostgreSQL:
           statement.reset(new DatabaseManager::CachedStatement(
                             STATEMENT_FROM_HERE, manager,
-                            "DELETE FROM Queues WHERE queueId=" + queueIdParameter + " AND id=" + valueIdParameter));
+                            "DELETE FROM Queues WHERE queueId=" + queueIdParameter + " AND id=" + valueIdParameter +
+                            " AND reservedUntil IS NOT NULL AND " + nowParameter + " < reservedUntil RETURNING id"));
           break;
 
         default:
@@ -4643,6 +4656,10 @@
       }
 
       statement->Execute(formatter.GetDictionary());
+      if (statement->IsDone())
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource, "Unable to acknowledge a queue value. Has it expired ?");
+      }
     }
 #endif
 
--- a/Framework/Plugins/IndexUnitTests.h	Fri Nov 28 11:24:39 2025 +0100
+++ b/Framework/Plugins/IndexUnitTests.h	Fri Nov 28 12:46:02 2025 +0100
@@ -1068,45 +1068,65 @@
 
 #if ORTHANC_PLUGINS_HAS_RESERVE_QUEUE_VALUE == 1
   {
-    manager->StartTransaction(TransactionType_ReadWrite);
-
-    db.EnqueueValue(*manager, "test", "a");
-    db.EnqueueValue(*manager, "test", "b");
-    db.EnqueueValue(*manager, "test", "c");
-    db.EnqueueValue(*manager, "test", "d");
-    db.EnqueueValue(*manager, "test", "e");
-
-
-    ASSERT_EQ(5u, db.GetQueueSize(*manager, "test"));
-
     std::string value;
     uint64_t valueIdA, valueIdB, valueIdC, valueIdD, valueIdE, valueIdFail;
 
-    ASSERT_TRUE(db.ReserveQueueValue(value, valueIdA, *manager, "test", true, 1));
-    ASSERT_EQ("a", value);
-    ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1));
-    ASSERT_EQ("b", value);
-    ASSERT_TRUE(db.ReserveQueueValue(value, valueIdE, *manager, "test", false, 1));
-    ASSERT_EQ("e", value);
-    ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1));
-    ASSERT_EQ("d", value);
+    {
+      manager->StartTransaction(TransactionType_ReadWrite);
+
+      db.EnqueueValue(*manager, "test", "a");
+      db.EnqueueValue(*manager, "test", "b");
+      db.EnqueueValue(*manager, "test", "c");
+      db.EnqueueValue(*manager, "test", "d");
+      db.EnqueueValue(*manager, "test", "e");
+
+      ASSERT_EQ(5u, db.GetQueueSize(*manager, "test"));
 
-    db.AcknowledgeQueueValue(*manager, "test", valueIdA);
-    db.AcknowledgeQueueValue(*manager, "test", valueIdE);
-    manager->CommitTransaction();  // NOW() is constant during a transaction -> we need to commit it to get a new NOW() value for the second part of the test
+      ASSERT_TRUE(db.ReserveQueueValue(value, valueIdA, *manager, "test", true, 1000));
+      ASSERT_EQ("a", value);
+      ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1));
+      ASSERT_EQ("b", value);
+      ASSERT_TRUE(db.ReserveQueueValue(value, valueIdE, *manager, "test", false, 1));
+      ASSERT_EQ("e", value);
+      ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1));
+      ASSERT_EQ("d", value);
+      manager->CommitTransaction(); 
+    }
+
+    {
+      manager->StartTransaction(TransactionType_ReadWrite);
+
+      db.AcknowledgeQueueValue(*manager, "test", valueIdA);
+      db.AcknowledgeQueueValue(*manager, "test", valueIdE);
+      manager->CommitTransaction(); 
+    }
 
     Orthanc::SystemToolbox::USleep(2000000);  // Wait 2 seconds -> b and d should be released
 
-    manager->StartTransaction(TransactionType_ReadWrite);
-    ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1));
-    ASSERT_EQ("b", value);
-    ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1));
-    ASSERT_EQ("d", value);
-    ASSERT_TRUE(db.ReserveQueueValue(value, valueIdC, *manager, "test", false, 1));
-    ASSERT_EQ("c", value);
-    ASSERT_FALSE(db.ReserveQueueValue(value, valueIdFail, *manager, "test", false, 1));
+    {
+      manager->StartTransaction(TransactionType_ReadWrite);
+      ASSERT_TRUE(db.ReserveQueueValue(value, valueIdB, *manager, "test", true, 1));
+      ASSERT_EQ("b", value);
+      ASSERT_TRUE(db.ReserveQueueValue(value, valueIdD, *manager, "test", false, 1));
+      ASSERT_EQ("d", value);
+      ASSERT_TRUE(db.ReserveQueueValue(value, valueIdC, *manager, "test", false, 1));
+      ASSERT_EQ("c", value);
+      ASSERT_FALSE(db.ReserveQueueValue(value, valueIdFail, *manager, "test", false, 1));
 
-    manager->CommitTransaction();
+      manager->CommitTransaction();
+    }
+
+    Orthanc::SystemToolbox::USleep(2000000);  // Wait 2 seconds -> b, c and d should be released
+
+    // try to acknowledge a value after it has expired
+    {
+      manager->StartTransaction(TransactionType_ReadWrite);
+
+      ASSERT_THROW(db.AcknowledgeQueueValue(*manager, "test", valueIdC), Orthanc::OrthancException);
+
+      manager->CommitTransaction();
+    }
+
   }
 #endif
 
--- a/PostgreSQL/NEWS	Fri Nov 28 11:24:39 2025 +0100
+++ b/PostgreSQL/NEWS	Fri Nov 28 12:46:02 2025 +0100
@@ -31,6 +31,8 @@
 * Added a new primary key column in the InvalidChildCounts and GlobalIntegersChanges
   tables.  This new column is required for pg_repack to be able to reclaim space on
   these tables.
+* Upgraded dependencies for static builds (notably on Windows and LSB):
+  - libpq 18.1 (replacing libpq 13.1)
 
 
 Release 9.0 (2025-08-13)
--- a/PostgreSQL/Plugins/SQL/PrepareIndex.sql	Fri Nov 28 11:24:39 2025 +0100
+++ b/PostgreSQL/Plugins/SQL/PrepareIndex.sql	Fri Nov 28 12:46:02 2025 +0100
@@ -803,7 +803,7 @@
        id BIGSERIAL NOT NULL PRIMARY KEY,
        queueId TEXT NOT NULL,
        value BYTEA NOT NULL,
-       reservedUntil TIMESTAMP DEFAULT NULL
+       reservedUntil BIGINT DEFAULT NULL -- new in rev 699
 );
 
 CREATE INDEX IF NOT EXISTS QueuesIndex ON Queues (queueId, id);
--- a/PostgreSQL/Plugins/SQL/Upgrades/Rev6ToRev699.sql	Fri Nov 28 11:24:39 2025 +0100
+++ b/PostgreSQL/Plugins/SQL/Upgrades/Rev6ToRev699.sql	Fri Nov 28 12:46:02 2025 +0100
@@ -4,4 +4,4 @@
 ALTER TABLE GlobalIntegersChanges ADD COLUMN pk BIGSERIAL PRIMARY KEY;
 
 -- Adding the queues timeout
-ALTER TABLE Queues ADD COLUMN reservedUntil TIMESTAMP DEFAULT NULL;
\ No newline at end of file
+ALTER TABLE Queues ADD COLUMN reservedUntil BIGINT DEFAULT NULL;
\ No newline at end of file
--- a/PostgreSQL/UnitTests/UnitTestsMain.cpp	Fri Nov 28 11:24:39 2025 +0100
+++ b/PostgreSQL/UnitTests/UnitTestsMain.cpp	Fri Nov 28 12:46:02 2025 +0100
@@ -38,7 +38,7 @@
 
 TEST(PostgreSQL, Version)
 {
-  ASSERT_STREQ("13.1", PG_VERSION);
+  ASSERT_STREQ("18.1", PG_VERSION);
 }
 #endif
 
--- a/TODO	Fri Nov 28 11:24:39 2025 +0100
+++ b/TODO	Fri Nov 28 12:46:02 2025 +0100
@@ -27,6 +27,7 @@
 PostgreSQL
 ----------
 
+* upgrade to libpq 18.1
 
 -----
 MySQL