Mercurial > hg > orthanc
changeset 6335:29c0b2294188 queues-timeout
OrthancPluginReserveQueueValue + OrthancPluginAcknowledgeQueueValue
line wrap: on
line diff
--- a/NEWS Mon Oct 06 09:51:39 2025 +0200 +++ b/NEWS Tue Oct 07 10:13:06 2025 +0200 @@ -42,6 +42,16 @@ * Upgraded dependencies for static builds: - civetweb 1.16, including patch for CVE-2025-55763 +Plugin SDK +---------- + +* New primitives for handling queues: + - OrthancPluginReserveQueueValue is a replacement for OrthancPluginDequeueValue. This flavour + does not directly remove the value from the queue but reserves it for a certain duration. + - OrthancPluginAcknowledgeQueueValue removes a reserved value from a queue. + - With these 2 primitives, one can ensure not to loose any message in case Orthanc is stopped + while handling a message. + Version 1.12.9 (2025-08-11) ===========================
--- a/OrthancServer/CMakeLists.txt Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/CMakeLists.txt Tue Oct 07 10:13:06 2025 +0200 @@ -266,6 +266,7 @@ INSTALL_REVISION_AND_CUSTOM_DATA ${CMAKE_SOURCE_DIR}/Sources/Database/InstallRevisionAndCustomData.sql INSTALL_DELETED_FILES ${CMAKE_SOURCE_DIR}/Sources/Database/InstallDeletedFiles.sql INSTALL_KEY_VALUE_STORES_AND_QUEUES ${CMAKE_SOURCE_DIR}/Sources/Database/InstallKeyValueStoresAndQueues.sql + ADD_TIMEOUT_TO_QUEUES ${CMAKE_SOURCE_DIR}/Sources/Database/AddTimeoutToQueues.sql ) if (STANDALONE_BUILD)
--- a/OrthancServer/Plugins/Engine/OrthancPluginDatabase.cpp Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Plugins/Engine/OrthancPluginDatabase.cpp Tue Oct 07 10:13:06 2025 +0200 @@ -1499,6 +1499,21 @@ throw OrthancException(ErrorCode_InternalError); // Not supported } + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); // Not supported + } + + virtual void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); // Not supported + } + virtual void GetAttachmentCustomData(std::string& customData, const std::string& attachmentUuid) ORTHANC_OVERRIDE {
--- a/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV3.cpp Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV3.cpp Tue Oct 07 10:13:06 2025 +0200 @@ -1111,6 +1111,21 @@ throw OrthancException(ErrorCode_InternalError); // Not supported } + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); // Not supported + } + + virtual void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); // Not supported + } + virtual void GetAttachmentCustomData(std::string& customData, const std::string& attachmentUuid) ORTHANC_OVERRIDE {
--- a/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV4.cpp Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV4.cpp Tue Oct 07 10:13:06 2025 +0200 @@ -2051,6 +2051,73 @@ throw OrthancException(ErrorCode_InternalError); } } + + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) ORTHANC_OVERRIDE + { + if (database_.GetDatabaseCapabilities().HasExtendedQueuesSupport()) + { + DatabasePluginMessages::TransactionRequest request; + request.mutable_reserve_queue_value()->set_queue_id(queueId); + request.mutable_reserve_queue_value()->set_release_timeout(releaseTimeout); + + switch (origin) + { + case QueueOrigin_Back: + request.mutable_dequeue_value()->set_origin(DatabasePluginMessages::QUEUE_ORIGIN_BACK); + break; + + case QueueOrigin_Front: + request.mutable_dequeue_value()->set_origin(DatabasePluginMessages::QUEUE_ORIGIN_FRONT); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + + DatabasePluginMessages::TransactionResponse response; + ExecuteTransaction(response, DatabasePluginMessages::OPERATION_RESERVE_QUEUE_VALUE, request); + + if (response.reserve_queue_value().found()) + { + value = response.reserve_queue_value().value(); + valueId = response.reserve_queue_value().value_id(); + return true; + } + else + { + return false; + } + } + else + { + // This method shouldn't have been called + throw OrthancException(ErrorCode_InternalError); + } + } + + virtual void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) ORTHANC_OVERRIDE + { + + if (database_.GetDatabaseCapabilities().HasExtendedQueuesSupport()) + { + DatabasePluginMessages::TransactionRequest request; + request.mutable_acknowledge_queue_value()->set_queue_id(queueId); + request.mutable_acknowledge_queue_value()->set_value_id(valueId); + + ExecuteTransaction(DatabasePluginMessages::OPERATION_ACKNOWLEDGE_QUEUE_VALUE, request); + } + else + { + // This method shouldn't have been called + throw OrthancException(ErrorCode_InternalError); + } + } + }; @@ -2143,6 +2210,7 @@ dbCapabilities_.SetHasFindSupport(systemInfo.supports_find()); dbCapabilities_.SetKeyValueStoresSupport(systemInfo.supports_key_value_stores()); dbCapabilities_.SetQueuesSupport(systemInfo.supports_queues()); + dbCapabilities_.SetExtendedQueuesSupport(systemInfo.supports_extended_queues()); dbCapabilities_.SetAttachmentCustomDataSupport(systemInfo.has_attachment_custom_data()); }
--- a/OrthancServer/Plugins/Engine/OrthancPlugins.cpp Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Plugins/Engine/OrthancPlugins.cpp Tue Oct 07 10:13:06 2025 +0200 @@ -4725,6 +4725,14 @@ } } + static void CheckExtendedQueuesSupport(ServerContext& context) + { + if (!context.GetIndex().HasExtendedQueuesSupport()) + { + throw OrthancException(ErrorCode_NotImplemented, "The database engine does not support queues (extended)"); + } + } + void OrthancPlugins::ApplyEnqueueValue(const _OrthancPluginEnqueueValue& parameters) { PImpl::ServerContextReference lock(*pimpl_); @@ -4762,6 +4770,37 @@ *parameters.size = lock.GetContext().GetIndex().GetQueueSize(parameters.queueId); } + void OrthancPlugins::ApplyReserveQueueValue(const _OrthancPluginReserveQueueValue& parameters) + { + PImpl::ServerContextReference lock(*pimpl_); + + CheckExtendedQueuesSupport(lock.GetContext()); + + std::string value; + uint64_t valueId; + + if (lock.GetContext().GetIndex().ReserveQueueValue(value, valueId, parameters.queueId, Plugins::Convert(parameters.origin), parameters.releaseTimeout)) + { + CopyToMemoryBuffer(parameters.target, value); + *parameters.found = true; + *parameters.valueId = valueId; + } + else + { + *parameters.found = false; + } + } + + void OrthancPlugins::ApplyAknowledgeQueueValue(const _OrthancPluginAcknowledgeQueueValue& parameters) + { + PImpl::ServerContextReference lock(*pimpl_); + + CheckExtendedQueuesSupport(lock.GetContext()); + + lock.GetContext().GetIndex().AcknowledgeQueueValue(parameters.queueId, parameters.valueId); + } + + void OrthancPlugins::ApplySetStableStatus(const _OrthancPluginSetStableStatus& parameters) { PImpl::ServerContextReference lock(*pimpl_); @@ -5922,6 +5961,20 @@ return true; } + case _OrthancPluginService_ReserveQueueValue: + { + const _OrthancPluginReserveQueueValue& p = *reinterpret_cast<const _OrthancPluginReserveQueueValue*>(parameters); + ApplyReserveQueueValue(p); + return true; + } + + case _OrthancPluginService_AcknowledgeQueueValue: + { + const _OrthancPluginAcknowledgeQueueValue& p = *reinterpret_cast<const _OrthancPluginAcknowledgeQueueValue*>(parameters); + ApplyAknowledgeQueueValue(p); + return true; + } + case _OrthancPluginService_SetStableStatus: { const _OrthancPluginSetStableStatus& p = *reinterpret_cast<const _OrthancPluginSetStableStatus*>(parameters);
--- a/OrthancServer/Plugins/Engine/OrthancPlugins.h Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Plugins/Engine/OrthancPlugins.h Tue Oct 07 10:13:06 2025 +0200 @@ -249,6 +249,10 @@ void ApplyGetQueueSize(const _OrthancPluginGetQueueSize& parameters); + void ApplyReserveQueueValue(const _OrthancPluginReserveQueueValue& parameters); + + void ApplyAknowledgeQueueValue(const _OrthancPluginAcknowledgeQueueValue& parameters); + void ApplySetStableStatus(const _OrthancPluginSetStableStatus& parameters); void ApplyEmitAuditLog(const _OrthancPluginEmitAuditLog& parameters);
--- a/OrthancServer/Plugins/Include/orthanc/OrthancCPlugin.h Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Plugins/Include/orthanc/OrthancCPlugin.h Tue Oct 07 10:13:06 2025 +0200 @@ -519,6 +519,8 @@ _OrthancPluginService_GetQueueSize = 59, /* New in Orthanc 1.12.8 */ _OrthancPluginService_SetStableStatus = 60, /* New in Orthanc 1.12.9 */ _OrthancPluginService_EmitAuditLog = 61, /* New in Orthanc 1.12.9 */ + _OrthancPluginService_ReserveQueueValue = 62, /* New in Orthanc 1.12.10 */ + _OrthancPluginService_AcknowledgeQueueValue = 63, /* New in Orthanc 1.12.10 */ /* Registration of callbacks */ _OrthancPluginService_RegisterRestCallback = 1000, @@ -10412,9 +10414,11 @@ * @param queueId A unique identifier identifying both the plugin and the queue. * @param origin The position from where the value is dequeued (back for LIFO, front for FIFO). * @return 0 if success, other value if error. - **/ + * @deprecated This function should not be used anymore because two consumers may consume the same + * value. Use OrthancPluginReserveQueueValue and OrthancPluginAcknowledgeQueueValue instead. + **/ ORTHANC_PLUGIN_SINCE_SDK("1.12.8") - ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginDequeueValue( + ORTHANC_PLUGIN_DEPRECATED ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginDequeueValue( OrthancPluginContext* context, uint8_t* found, /* out */ OrthancPluginMemoryBuffer* target, /* out */ @@ -10706,6 +10710,83 @@ } + typedef struct + { + uint8_t* found; + OrthancPluginMemoryBuffer* target; + const char* queueId; + OrthancPluginQueueOrigin origin; + uint32_t releaseTimeout; + uint64_t* valueId; + } _OrthancPluginReserveQueueValue; + + /** + * @brief Reserve a value from a queue which means the message is not + * available for other consumers. + * The value is either: + * - removed from the queue when one calls OrthancPluginAcknowledgeQueueValue + * - or made available again for consumers after the releaseTimeout has expired + * + * @param context The Orthanc plugin context, as received by OrthancPluginInitialize(). + * @param found Pointer to a Boolean that is set to "true" iff. a value has been dequeued. + * @param target Memory buffer where to store the value that has been retrieved from the queue. + * It must be freed with OrthancPluginFreeMemoryBuffer(). + * @param queueId A unique identifier identifying both the plugin and the queue. + * @param origin The position from where the value is dequeued (back for LIFO, front for FIFO). + * @param releaseTimeout Timeout in seconds. If the value is not acknowledge within this + * timeout, the value is automatically released and made available for further + * calls to OrthancPluginDequeueValue or OrthancPluginReserveQueueValue + * @param valueId An opaque id to use in OrthancPluginAcknowledgeQueueValue + * @return 0 if success, other value if error. + **/ + ORTHANC_PLUGIN_SINCE_SDK("1.12.10") + ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginReserveQueueValue( + OrthancPluginContext* context, + uint8_t* found, /* out */ + OrthancPluginMemoryBuffer* target, /* out */ + uint64_t* valueId, /* out */ + const char* queueId, /* in */ + OrthancPluginQueueOrigin origin, /* in */ + uint32_t releaseTimeout /* in */) + { + _OrthancPluginReserveQueueValue params; + params.found = found; + params.target = target; + params.queueId = queueId; + params.origin = origin; + params.valueId = valueId; + params.releaseTimeout = releaseTimeout; + + return context->InvokeService(context, _OrthancPluginService_ReserveQueueValue, ¶ms); + } + + typedef struct + { + const char* queueId; + uint64_t valueId; + } _OrthancPluginAcknowledgeQueueValue; + + /** + * @brief Acknowledge that a queue value has been consumed and definitely removes it from the queue. + * + * @param context The Orthanc plugin context, as received by OrthancPluginInitialize(). + * @param queueId A unique identifier identifying both the plugin and the queue. + * @param valueId An opaque id obtained from OrthancPluginReserveQueueValue + * @return 0 if success, other value if error. + **/ + ORTHANC_PLUGIN_SINCE_SDK("1.12.10") + ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginAcknowledgeQueueValue( + OrthancPluginContext* context, + const char* queueId, /* in */ + uint64_t valueId /* in */) + { + _OrthancPluginAcknowledgeQueueValue params; + params.queueId = queueId; + params.valueId = valueId; + + return context->InvokeService(context, _OrthancPluginService_AcknowledgeQueueValue, ¶ms); + } + #ifdef __cplusplus } #endif
--- a/OrthancServer/Plugins/Include/orthanc/OrthancDatabasePlugin.proto Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Plugins/Include/orthanc/OrthancDatabasePlugin.proto Tue Oct 07 10:13:06 2025 +0200 @@ -175,6 +175,7 @@ bool supports_key_value_stores = 10; // New in Orthanc 1.12.8 bool supports_queues = 11; // New in Orthanc 1.12.8 bool has_attachment_custom_data = 12; // New in Orthanc 1.12.8 + bool supports_extended_queues = 13; // New in Orthanc 1.12.10 } } @@ -339,7 +340,8 @@ OPERATION_GET_QUEUE_SIZE = 59; // New in Orthanc 1.12.8 OPERATION_GET_ATTACHMENT_CUSTOM_DATA = 60; // New in Orthanc 1.12.8 OPERATION_SET_ATTACHMENT_CUSTOM_DATA = 61; // New in Orthanc 1.12.8 - + OPERATION_RESERVE_QUEUE_VALUE = 62; // New in Orthanc 1.12.10 + OPERATION_ACKNOWLEDGE_QUEUE_VALUE = 63; // New in Orthanc 1.12.10 } message Rollback { @@ -1065,6 +1067,20 @@ } } +message ReserveQueueValue { + message Request { + string queue_id = 1; + QueueOrigin origin = 2; + uint32 release_timeout = 3; + } + + message Response { + bool found = 1; + bytes value = 2; + uint64 value_id = 3; + } +} + message GetQueueSize { message Request { string queue_id = 1; @@ -1075,6 +1091,17 @@ } } +message AcknowledgeQueueValue { + message Request { + string queue_id = 1; + uint64 value_id = 2; + } + + message Response { + } +} + + message GetAttachmentCustomData { message Request { string uuid = 1; @@ -1162,6 +1189,8 @@ GetQueueSize.Request get_queue_size = 159; GetAttachmentCustomData.Request get_attachment_custom_data = 160; SetAttachmentCustomData.Request set_attachment_custom_data = 161; + ReserveQueueValue.Request reserve_queue_value = 162; + AcknowledgeQueueValue.Request acknowledge_queue_value = 163; } message TransactionResponse { @@ -1227,6 +1256,8 @@ GetQueueSize.Response get_queue_size = 159; GetAttachmentCustomData.Response get_attachment_custom_data = 160; SetAttachmentCustomData.Response set_attachment_custom_data = 161; + ReserveQueueValue.Response reserve_queue_value = 162; + AcknowledgeQueueValue.Response acknowledge_queue_value = 163; } enum RequestType {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Sources/Database/AddTimeoutToQueues.sql Tue Oct 07 10:13:06 2025 +0200 @@ -0,0 +1,23 @@ +-- Orthanc - A Lightweight, RESTful DICOM Store +-- Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics +-- Department, University Hospital of Liege, Belgium +-- Copyright (C) 2017-2023 Osimis S.A., Belgium +-- Copyright (C) 2024-2025 Orthanc Team SRL, Belgium +-- Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium +-- +-- This program is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- This program is distributed in the hope that it will be useful, but +-- WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +-- General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with this program. If not, see <http://www.gnu.org/licenses/>. + + +ALTER TABLE Queues +ADD COLUMN reservedUntil TIMESTAMP DEFAULT NULL;
--- a/OrthancServer/Sources/Database/IDatabaseWrapper.h Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Sources/Database/IDatabaseWrapper.h Tue Oct 07 10:13:06 2025 +0200 @@ -59,6 +59,7 @@ bool hasAttachmentCustomDataSupport_; bool hasKeyValueStoresSupport_; bool hasQueuesSupport_; + bool hasExtendedQueuesSupport_; public: Capabilities() : @@ -72,7 +73,8 @@ hasExtendedChanges_(false), hasAttachmentCustomDataSupport_(false), hasKeyValueStoresSupport_(false), - hasQueuesSupport_(false) + hasQueuesSupport_(false), + hasExtendedQueuesSupport_(false) { } @@ -186,6 +188,16 @@ return hasQueuesSupport_; } + void SetExtendedQueuesSupport(bool value) + { + hasExtendedQueuesSupport_ = value; + } + + bool HasExtendedQueuesSupport() const + { + return hasExtendedQueuesSupport_; + } + }; @@ -471,6 +483,17 @@ // New in Orthanc 1.12.8, for statistics only virtual uint64_t GetQueueSize(const std::string& queueId) = 0; + // New in Orthanc 1.12.10 + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) = 0; + + // New in Orthanc 1.12.10 + virtual void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) = 0; + };
--- a/OrthancServer/Sources/Database/InstallKeyValueStoresAndQueues.sql Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Sources/Database/InstallKeyValueStoresAndQueues.sql Tue Oct 07 10:13:06 2025 +0200 @@ -30,6 +30,7 @@ id INTEGER PRIMARY KEY AUTOINCREMENT, queueId TEXT NOT NULL, value BLOB NOT NULL + --- reservedUntil TIMESTAMP DEFAULT NULL -- added in AddTimeoutToQueues.sql ); CREATE INDEX QueuesIndex ON Queues (queueId, id);
--- a/OrthancServer/Sources/Database/PrepareDatabase.sql Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Sources/Database/PrepareDatabase.sql Tue Oct 07 10:13:06 2025 +0200 @@ -152,6 +152,9 @@ -- new in Orthanc 1.12.8 ------------------------- equivalent to InstallKeyValueStoresAndQueues.sql ${INSTALL_KEY_VALUE_STORES_AND_QUEUES} +-- new in Orthanc 1.12.10 ------------------------- equivalent to AddTimeoutToQueues.sql +${ADD_TIMEOUT_TO_QUEUES} + -- Track the fact that the "revision" column exists in the "Metadata" and "AttachedFiles" -- tables, and that the "customData" column exists in the "AttachedFiles" table
--- a/OrthancServer/Sources/Database/SQLiteDatabaseWrapper.cpp Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Sources/Database/SQLiteDatabaseWrapper.cpp Tue Oct 07 10:13:06 2025 +0200 @@ -2280,54 +2280,49 @@ } SQLite::Statement s(db_, SQLITE_FROM_HERE, - "INSERT INTO Queues (queueId, value) VALUES (?, ?)"); + "INSERT INTO Queues (queueId, value, reservedUntil) VALUES (?, ?, NULL)"); s.BindString(0, queueId); s.BindBlob(1, value, valueSize); s.Run(); } - // New in Orthanc 1.12.8 + // New in Orthanc 1.12.8 (but obsolete, you should use ReserveQueueValue instead) virtual bool DequeueValue(std::string& value, const std::string& queueId, QueueOrigin origin) ORTHANC_OVERRIDE { - int64_t rowId; - std::unique_ptr<SQLite::Statement> s; - + std::string order; switch (origin) { - case QueueOrigin_Front: - s.reset(new SQLite::Statement(db_, SQLITE_FROM_HERE, "SELECT id, value FROM Queues WHERE queueId=? ORDER BY id ASC LIMIT 1")); + case QueueOrigin_Back: + order = "DESC"; break; - - case QueueOrigin_Back: - s.reset(new SQLite::Statement(db_, SQLITE_FROM_HERE, "SELECT id, value FROM Queues WHERE queueId=? ORDER BY id DESC LIMIT 1")); + case QueueOrigin_Front: + order = "ASC"; break; - default: throw OrthancException(ErrorCode_InternalError); } - s->BindString(0, queueId); - if (!s->Step()) + std::string sql = "WITH RowToDelete AS (SELECT id FROM Queues WHERE queueId=? AND (reservedUntil IS NULL OR reservedUntil < datetime('now')) ORDER BY id " + order + " LIMIT 1) " + "DELETE FROM Queues WHERE id IN (SELECT id FROM RowToDelete) " + "RETURNING value;"; + + SQLite::Statement s(db_, SQLITE_FROM_HERE_DYNAMIC(sql), sql); + + s.BindString(0, queueId); + if (!s.Step()) { // No value found return false; } else { - rowId = s->ColumnInt64(0); - - if (!s->ColumnBlobAsString(1, &value)) + if (!s.ColumnBlobAsString(0, &value)) { throw OrthancException(ErrorCode_NotEnoughMemory); } - SQLite::Statement s2(db_, SQLITE_FROM_HERE, - "DELETE FROM Queues WHERE id = ?"); - s2.BindInt64(0, rowId); - s2.Run(); - return true; } } @@ -2340,6 +2335,77 @@ s.Step(); return s.ColumnInt64(0); } + + // New in Orthanc 1.12.10 + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) ORTHANC_OVERRIDE + { + // { // list queue values + // SQLite::Statement t(db_, SQLITE_FROM_HERE, + // "SELECT id, value, reservedUntil FROM Queues WHERE queueId=?"); + // t.BindString(0, queueId); + // while (t.Step()) + // { + // t.ColumnBlobAsString(1, &value); + // LOG(INFO) << t.ColumnInt64(0) << " " << value << " " << t.ColumnString(2); + // } + // } + + std::string order; + switch (origin) + { + case QueueOrigin_Back: + order = "DESC"; + break; + case QueueOrigin_Front: + order = "ASC"; + break; + default: + throw OrthancException(ErrorCode_InternalError); + } + + std::string sql = "WITH RowToUpdate AS (SELECT id FROM Queues WHERE queueId=? AND (reservedUntil IS NULL OR reservedUntil < datetime('now')) ORDER BY id " + order + " LIMIT 1) " + "UPDATE Queues SET reservedUntil = datetime('now', ?) WHERE id IN (SELECT id FROM RowToUpdate) " + "RETURNING id, value;"; + + SQLite::Statement s(db_, SQLITE_FROM_HERE_DYNAMIC(sql), sql); + + + s.BindString(0, queueId); + std::string timeout = "+" + boost::lexical_cast<std::string>(releaseTimeout) + " seconds"; + + s.BindString(1, timeout); + if (!s.Step()) + { + // No value found + return false; + } + else + { + valueId = static_cast<uint64_t>(s.ColumnInt64(0)); + + if (!s.ColumnBlobAsString(1, &value)) + { + throw OrthancException(ErrorCode_NotEnoughMemory); + } + + return true; + } + + } + + // New in Orthanc 1.12.10 + virtual void AcknowledgeQueueValue(const std::string& /* queueId */, + uint64_t valueId) ORTHANC_OVERRIDE + { + SQLite::Statement s(db_, SQLITE_FROM_HERE, + "DELETE FROM Queues WHERE id = ?"); + s.BindInt64(0, static_cast<int64_t>(valueId)); + s.Run(); + } }; @@ -2678,6 +2744,7 @@ InjectEmbeddedScript(query, "${INSTALL_LABELS_TABLE}", ServerResources::INSTALL_LABELS_TABLE); InjectEmbeddedScript(query, "${INSTALL_DELETED_FILES}", ServerResources::INSTALL_DELETED_FILES); InjectEmbeddedScript(query, "${INSTALL_KEY_VALUE_STORES_AND_QUEUES}", ServerResources::INSTALL_KEY_VALUE_STORES_AND_QUEUES); + InjectEmbeddedScript(query, "${ADD_TIMEOUT_TO_QUEUES}", ServerResources::ADD_TIMEOUT_TO_QUEUES); db_.Execute(query); } @@ -2743,6 +2810,13 @@ LOG(INFO) << "Installing the \"KeyValueStores\" and \"Queues\" tables"; ExecuteEmbeddedScript(db_, ServerResources::INSTALL_KEY_VALUE_STORES_AND_QUEUES); } + + // New in Orthanc 1.12.10 + if (!db_.DoesColumnExist("Queues", "reservedUntil")) + { + LOG(INFO) << "Adding timeout column to the \"Queues\" table"; + ExecuteEmbeddedScript(db_, ServerResources::ADD_TIMEOUT_TO_QUEUES); + } } transaction->Commit(0);
--- a/OrthancServer/Sources/Database/StatelessDatabaseOperations.cpp Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Sources/Database/StatelessDatabaseOperations.cpp Tue Oct 07 10:13:06 2025 +0200 @@ -3226,6 +3226,12 @@ return db_.GetDatabaseCapabilities().HasQueuesSupport(); } + bool StatelessDatabaseOperations::HasExtendedQueuesSupport() + { + boost::shared_lock<boost::shared_mutex> lock(mutex_); + return db_.GetDatabaseCapabilities().HasExtendedQueuesSupport(); + } + void StatelessDatabaseOperations::ExecuteCount(uint64_t& count, const FindRequest& request) { @@ -3572,6 +3578,92 @@ return size; } + bool StatelessDatabaseOperations::ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) + { + if (queueId.empty()) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + + class Operations : public IReadWriteOperations + { + private: + const std::string& queueId_; + uint64_t& valueId_; + std::string& value_; + QueueOrigin origin_; + uint32_t releaseTimeout_; + bool found_; + + public: + Operations(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) : + queueId_(queueId), + valueId_(valueId), + value_(value), + origin_(origin), + releaseTimeout_(releaseTimeout), + found_(false) + { + } + + bool HasFound() + { + return found_; + } + + virtual void Apply(ReadWriteTransaction& transaction) ORTHANC_OVERRIDE + { + found_ = transaction.ReserveQueueValue(value_, valueId_, queueId_, origin_, releaseTimeout_); + } + }; + + Operations operations(value, valueId, queueId, origin, releaseTimeout); + Apply(operations); + + return operations.HasFound(); + } + + + void StatelessDatabaseOperations::AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) + { + if (queueId.empty()) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + + class Operations : public IReadWriteOperations + { + private: + const std::string& queueId_; + uint64_t valueId_; + + public: + Operations(const std::string& queueId, + uint64_t valueId) : + queueId_(queueId), + valueId_(valueId) + { + } + + virtual void Apply(ReadWriteTransaction& transaction) ORTHANC_OVERRIDE + { + transaction.AcknowledgeQueueValue(queueId_, valueId_); + } + }; + + Operations operations(queueId, valueId); + Apply(operations); + } + void StatelessDatabaseOperations::GetAttachmentCustomData(std::string& customData, const std::string& attachmentUuid)
--- a/OrthancServer/Sources/Database/StatelessDatabaseOperations.h Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Sources/Database/StatelessDatabaseOperations.h Tue Oct 07 10:13:06 2025 +0200 @@ -485,6 +485,21 @@ return transaction_.DequeueValue(value, queueId, origin); } + bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) + { + return transaction_.ReserveQueueValue(value, valueId, queueId, origin, releaseTimeout); + } + + void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) + { + return transaction_.AcknowledgeQueueValue(queueId, valueId); + } + void SetAttachmentCustomData(const std::string& attachmentUuid, const void* customData, size_t customDataSize) @@ -622,6 +637,8 @@ bool HasQueuesSupport(); + bool HasExtendedQueuesSupport(); + void GetExportedResources(Json::Value& target, int64_t since, uint32_t limit); @@ -841,6 +858,15 @@ uint64_t GetQueueSize(const std::string& queueId); + bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout); + + void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId); + class KeysValuesIterator : public boost::noncopyable { private:
--- a/OrthancServer/Sources/OrthancRestApi/OrthancRestSystem.cpp Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestSystem.cpp Tue Oct 07 10:13:06 2025 +0200 @@ -97,6 +97,7 @@ static const char* const HAS_EXTENDED_CHANGES = "HasExtendedChanges"; static const char* const HAS_KEY_VALUE_STORES = "HasKeyValueStores"; static const char* const HAS_QUEUES = "HasQueues"; + static const char* const HAS_EXTENDED_QUEUES = "HasExtendedQueues"; static const char* const HAS_EXTENDED_FIND = "HasExtendedFind"; static const char* const READ_ONLY = "ReadOnly"; @@ -213,6 +214,7 @@ result[CAPABILITIES][HAS_EXTENDED_FIND] = OrthancRestApi::GetIndex(call).HasFindSupport(); result[CAPABILITIES][HAS_KEY_VALUE_STORES] = OrthancRestApi::GetIndex(call).HasKeyValueStoresSupport(); result[CAPABILITIES][HAS_QUEUES] = OrthancRestApi::GetIndex(call).HasQueuesSupport(); + result[CAPABILITIES][HAS_EXTENDED_QUEUES] = OrthancRestApi::GetIndex(call).HasExtendedQueuesSupport(); call.GetOutput().AnswerJson(result); }
--- a/OrthancServer/UnitTestsSources/ServerIndexTests.cpp Mon Oct 06 09:51:39 2025 +0200 +++ b/OrthancServer/UnitTestsSources/ServerIndexTests.cpp Tue Oct 07 10:13:06 2025 +0200 @@ -1319,6 +1319,79 @@ db.Close(); } +TEST(SQLiteDatabaseWrapper, QueuesExtended) +{ + SQLiteDatabaseWrapper db; // The SQLite DB is in memory + db.Open(); + + { + StatelessDatabaseOperations op(db, false); + op.SetTransactionContextFactory(new DummyTransactionContextFactory); + + ASSERT_EQ(0u, op.GetQueueSize("test")); + op.EnqueueValue("test", "a"); + ASSERT_EQ(1u, op.GetQueueSize("test")); + op.EnqueueValue("test", "b"); + ASSERT_EQ(2u, op.GetQueueSize("test")); + op.EnqueueValue("test", "c"); + ASSERT_EQ(3u, op.GetQueueSize("test")); + + std::string s; + uint64_t valueId0, valueId1, valueId2, valueId3; + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 100)); + ASSERT_EQ("c", s); + ASSERT_EQ(3u, op.GetQueueSize("test")); // the reserved values are still counted ! + + ASSERT_TRUE(op.ReserveQueueValue(s, valueId1, "test", QueueOrigin_Back, 100)); + ASSERT_EQ("b", s); + + ASSERT_TRUE(op.ReserveQueueValue(s, valueId2, "test", QueueOrigin_Back, 100)); + ASSERT_EQ("a", s); + ASSERT_EQ(3u, op.GetQueueSize("test")); + + ASSERT_FALSE(op.ReserveQueueValue(s, valueId3, "test", QueueOrigin_Back, 100)); + + op.AcknowledgeQueueValue("test", valueId1); + ASSERT_EQ(2u, op.GetQueueSize("test")); + op.AcknowledgeQueueValue("test", valueId2); + op.AcknowledgeQueueValue("test", valueId0); + ASSERT_EQ(0u, op.GetQueueSize("test")); + + op.EnqueueValue("test", "d"); + op.EnqueueValue("test", "e"); + op.EnqueueValue("test", "f"); + op.EnqueueValue("test", "g"); + op.EnqueueValue("test", "h"); + + // reserve 2 values front and back and acknowledge only "e" & "f" + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1)); + ASSERT_EQ("d", s); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1)); + ASSERT_EQ("e", s); + op.AcknowledgeQueueValue("test", valueId0); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1)); + ASSERT_EQ("h", s); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1)); + ASSERT_EQ("g", s); + op.AcknowledgeQueueValue("test", valueId0); + + SystemToolbox::USleep(2000000); // the granularity being the second, we might have to wait up to 2 seconds + + // "d", "f" and "h" remains + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1)); + ASSERT_EQ("d", s); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1)); + ASSERT_EQ("f", s); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1)); + ASSERT_EQ("h", s); + + // the queue is empty at this point since "f" has been reserved already + ASSERT_FALSE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1)); + } + + db.Close(); +} + TEST_F(DatabaseWrapperTest, BinaryCustomData) {
--- a/TODO Mon Oct 06 09:51:39 2025 +0200 +++ b/TODO Tue Oct 07 10:13:06 2025 +0200 @@ -315,11 +315,11 @@ proccess(message) The message will never be processed ... We should have an acknowledge/commit mechanism e.g: - message, messageId = orthanc.DequeueValue2("instances-to-process", orthanc.QueueOrigin.FRONT, 5) # where 5 is a "timeout" - # At this point, the message is still in the queue but will not be dequeued by other consumers. + message, messageId = orthanc.DequeueValue2("instances-to-process", orthanc.QueueOrigin.FRONT, 5) # where 5 is an "acknowledge timeout" + # At this point, the message is still in the queue but will not be visible for other consumers. # If the message is not acknowledged within 5 seconds, it will get back into the queue. process(message) - orthanc.AcknowledgeQueue("instances-to-process", messageId) + orthanc.AcknowledgeQueueMessage("instances-to-process", messageId) # This requires adding a new "timeout" column in the DB with the reservation_expiration timestamp. Note by SJ: Introducing an acknowledgement would greatly complexify
