changeset 6422:36aff9f84264 queues-timeout tip

integration mainline->queues-timeout
author Sebastien Jodogne <s.jodogne@gmail.com>
date Sat, 15 Nov 2025 12:28:27 +0100
parents b86411c11092 (diff) c557f6bdcbfd (current diff)
children
files NEWS OrthancServer/Plugins/Engine/OrthancPlugins.cpp OrthancServer/Sources/Database/StatelessDatabaseOperations.cpp
diffstat 23 files changed, 739 insertions(+), 66 deletions(-) [+]
line wrap: on
line diff
--- a/NEWS	Sat Nov 15 11:49:39 2025 +0100
+++ b/NEWS	Sat Nov 15 12:28:27 2025 +0100
@@ -1,6 +1,8 @@
 Pending changes in the mainline
 ===============================
 
+TODO: finalize DicomIdentifiersIndex (upgrade from existing DB)
+
 General
 -------
 
@@ -74,6 +76,16 @@
   - Note: the previous "Database" configuration has now been renamed in "Directory" to better differentiate
     the "File" or "DB modes.
 
+Plugin SDK
+----------
+
+* New primitives for handling queues:
+  - OrthancPluginReserveQueueValue is a replacement for OrthancPluginDequeueValue.  This flavour
+    does not directly remove the value from the queue but reserves it for a certain duration.
+  - OrthancPluginAcknowledgeQueueValue removes a reserved value from a queue.
+  - With these 2 primitives, one can ensure not to loose any message in case Orthanc is stopped
+    while handling a message.
+
 
 Version 1.12.9 (2025-08-11)
 ===========================
--- a/OrthancFramework/Resources/Patches/OpenSSL-ConfigureHeaders.py	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancFramework/Resources/Patches/OpenSSL-ConfigureHeaders.py	Sat Nov 15 12:28:27 2025 +0100
@@ -92,43 +92,43 @@
 def Parse(match):
     s = ''
     
-    for t in re.findall('generate_stack_macros\("(.+?)"\)', match.group(1)):
+    for t in re.findall(r'generate_stack_macros\("(.+?)"\)', match.group(1)):
         s += (GENERATE_STACK_MACROS
               .replace('${nametype}', t)
               .replace('${realtype}', t)
               .replace('${plaintype}', t))
         
-    for t in re.findall('generate_const_stack_macros\("(.+?)"\)', match.group(1)):
+    for t in re.findall(r'generate_const_stack_macros\("(.+?)"\)', match.group(1)):
         s += (GENERATE_STACK_MACROS
               .replace('${nametype}', t)
               .replace('${realtype}', 'const %s' % t)
               .replace('${plaintype}', t))
 
-    for t in re.findall('generate_stack_string_macros\(\)', match.group(1)):
+    for t in re.findall(r'generate_stack_string_macros\(\)', match.group(1)):
         s += (GENERATE_STACK_MACROS
               .replace('${nametype}', 'OPENSSL_STRING')
               .replace('${realtype}', 'char')
               .replace('${plaintype}', 'char'))
 
-    for t in re.findall('generate_stack_const_string_macros\(\)', match.group(1)):
+    for t in re.findall(r'generate_stack_const_string_macros\(\)', match.group(1)):
         s += (GENERATE_STACK_MACROS
               .replace('${nametype}', 'OPENSSL_CSTRING')
               .replace('${realtype}', 'const char')
               .replace('${plaintype}', 'char'))
 
-    for t in re.findall('generate_stack_block_macros\(\)', match.group(1)):
+    for t in re.findall(r'generate_stack_block_macros\(\)', match.group(1)):
         s += (GENERATE_STACK_MACROS
               .replace('${nametype}', 'OPENSSL_BLOCK')
               .replace('${realtype}', 'void')
               .replace('${plaintype}', 'void'))
         
-    for t in re.findall('generate_lhash_macros\("(.+?)"\)', match.group(1)):
+    for t in re.findall(r'generate_lhash_macros\("(.+?)"\)', match.group(1)):
         s += GENERATE_LHASH_MACROS.replace('${type}', t)
 
-    for t in re.findall('\$config{rc4_int}', match.group(1)):
+    for t in re.findall(r'\$config{rc4_int}', match.group(1)):
         s += 'unsigned int'
 
-    for t in re.findall('oids_to_c::process_leaves\(.+?\)', match.group(1), re.MULTILINE | re.DOTALL):
+    for t in re.findall(r'oids_to_c::process_leaves\(.+?\)', match.group(1), re.MULTILINE | re.DOTALL):
         if not CURRENT_HEADER in OIDS:
             raise Exception('Unknown header: %s' % CURRENT_HEADER)
 
@@ -145,18 +145,18 @@
     directory = os.path.join(sys.argv[1], base)
     for source in os.listdir(directory):
         if source.endswith('.h.in'):
-            target = re.sub('\.h\.in$', '.h', source)
+            target = re.sub(r'\.h\.in$', '.h', source)
                             
             with open(os.path.join(directory, source), 'r') as f:
                 with open(os.path.join(directory, target), 'w') as g:
                     CURRENT_HEADER = source
-                    g.write(re.sub('{-(.*?)-}.*?$', Parse, f.read(),
+                    g.write(re.sub(r'{-(.*?)-}.*?$', Parse, f.read(),
                                    flags = re.MULTILINE | re.DOTALL))
 
 
 with open(os.path.join(sys.argv[1], 'providers/common/der/orthanc_oids_gen.c'), 'w') as f:
     for (header, content) in OIDS.items():
-        f.write('#include "prov/%s"\n' % re.sub('\.h\.in$', '.h', header))
+        f.write('#include "prov/%s"\n' % re.sub(r'\.h\.in$', '.h', header))
 
     f.write('\n')
         
--- a/OrthancServer/CMakeLists.txt	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/CMakeLists.txt	Sat Nov 15 12:28:27 2025 +0100
@@ -266,6 +266,7 @@
   INSTALL_REVISION_AND_CUSTOM_DATA  ${CMAKE_SOURCE_DIR}/Sources/Database/InstallRevisionAndCustomData.sql  
   INSTALL_DELETED_FILES             ${CMAKE_SOURCE_DIR}/Sources/Database/InstallDeletedFiles.sql
   INSTALL_KEY_VALUE_STORES_AND_QUEUES ${CMAKE_SOURCE_DIR}/Sources/Database/InstallKeyValueStoresAndQueues.sql
+  ADD_TIMEOUT_TO_QUEUES             ${CMAKE_SOURCE_DIR}/Sources/Database/AddTimeoutToQueues.sql
   )
 
 if (STANDALONE_BUILD)
--- a/OrthancServer/Plugins/Engine/OrthancPluginDatabase.cpp	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Engine/OrthancPluginDatabase.cpp	Sat Nov 15 12:28:27 2025 +0100
@@ -1499,6 +1499,21 @@
       throw OrthancException(ErrorCode_InternalError);  // Not supported
     }
 
+    virtual bool ReserveQueueValue(std::string& value,
+                                   uint64_t& valueId, 
+                                   const std::string& queueId,
+                                   QueueOrigin origin,
+                                   uint32_t releaseTimeout) ORTHANC_OVERRIDE
+    {
+      throw OrthancException(ErrorCode_InternalError);  // Not supported
+    }
+      
+    virtual void AcknowledgeQueueValue(const std::string& queueId,
+                                       uint64_t valueId) ORTHANC_OVERRIDE
+    {
+      throw OrthancException(ErrorCode_InternalError);  // Not supported
+    }
+
     virtual void GetAttachmentCustomData(std::string& customData,
                                          const std::string& attachmentUuid) ORTHANC_OVERRIDE
     {
--- a/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV3.cpp	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV3.cpp	Sat Nov 15 12:28:27 2025 +0100
@@ -1111,6 +1111,21 @@
       throw OrthancException(ErrorCode_InternalError);  // Not supported
     }
 
+    virtual bool ReserveQueueValue(std::string& value,
+                                   uint64_t& valueId, 
+                                   const std::string& queueId,
+                                   QueueOrigin origin,
+                                   uint32_t releaseTimeout) ORTHANC_OVERRIDE
+    {
+      throw OrthancException(ErrorCode_InternalError);  // Not supported
+    }
+      
+    virtual void AcknowledgeQueueValue(const std::string& queueId,
+                                       uint64_t valueId) ORTHANC_OVERRIDE
+    {
+      throw OrthancException(ErrorCode_InternalError);  // Not supported
+    }
+
     virtual void GetAttachmentCustomData(std::string& customData,
                                          const std::string& attachmentUuid) ORTHANC_OVERRIDE
     {
--- a/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV4.cpp	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV4.cpp	Sat Nov 15 12:28:27 2025 +0100
@@ -2051,6 +2051,73 @@
         throw OrthancException(ErrorCode_InternalError);
       }
     }
+
+    virtual bool ReserveQueueValue(std::string& value,
+                                   uint64_t& valueId, 
+                                   const std::string& queueId,
+                                   QueueOrigin origin,
+                                   uint32_t releaseTimeout) ORTHANC_OVERRIDE
+    {
+      if (database_.GetDatabaseCapabilities().HasExtendedQueuesSupport())
+      {
+        DatabasePluginMessages::TransactionRequest request;
+        request.mutable_reserve_queue_value()->set_queue_id(queueId);
+        request.mutable_reserve_queue_value()->set_release_timeout(releaseTimeout);
+
+        switch (origin)
+        {
+          case QueueOrigin_Back:
+            request.mutable_dequeue_value()->set_origin(DatabasePluginMessages::QUEUE_ORIGIN_BACK);
+            break;
+
+          case QueueOrigin_Front:
+            request.mutable_dequeue_value()->set_origin(DatabasePluginMessages::QUEUE_ORIGIN_FRONT);
+            break;
+
+          default:
+            throw OrthancException(ErrorCode_InternalError);
+        }
+
+        DatabasePluginMessages::TransactionResponse response;
+        ExecuteTransaction(response, DatabasePluginMessages::OPERATION_RESERVE_QUEUE_VALUE, request);
+
+        if (response.reserve_queue_value().found())
+        {
+          value = response.reserve_queue_value().value();
+          valueId = response.reserve_queue_value().value_id();
+          return true;
+        }
+        else
+        {
+          return false;
+        }
+      }
+      else
+      {
+        // This method shouldn't have been called
+        throw OrthancException(ErrorCode_InternalError);
+      }
+    }
+      
+    virtual void AcknowledgeQueueValue(const std::string& queueId,
+                                       uint64_t valueId) ORTHANC_OVERRIDE
+    {
+
+      if (database_.GetDatabaseCapabilities().HasExtendedQueuesSupport())
+      {
+        DatabasePluginMessages::TransactionRequest request;
+        request.mutable_acknowledge_queue_value()->set_queue_id(queueId);
+        request.mutable_acknowledge_queue_value()->set_value_id(valueId);
+
+        ExecuteTransaction(DatabasePluginMessages::OPERATION_ACKNOWLEDGE_QUEUE_VALUE, request);
+      }
+      else
+      {
+        // This method shouldn't have been called
+        throw OrthancException(ErrorCode_InternalError);
+      }
+    }
+
   };
 
 
@@ -2143,6 +2210,7 @@
       dbCapabilities_.SetHasFindSupport(systemInfo.supports_find());
       dbCapabilities_.SetKeyValueStoresSupport(systemInfo.supports_key_value_stores());
       dbCapabilities_.SetQueuesSupport(systemInfo.supports_queues());
+      dbCapabilities_.SetExtendedQueuesSupport(systemInfo.supports_extended_queues());
       dbCapabilities_.SetAttachmentCustomDataSupport(systemInfo.has_attachment_custom_data());
     }
 
--- a/OrthancServer/Plugins/Engine/OrthancPlugins.cpp	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Engine/OrthancPlugins.cpp	Sat Nov 15 12:28:27 2025 +0100
@@ -4723,6 +4723,14 @@
     }
   }
 
+  static void CheckExtendedQueuesSupport(ServerContext& context)
+  {
+    if (!context.GetIndex().HasExtendedQueuesSupport())
+    {
+      throw OrthancException(ErrorCode_NotImplemented, "The database engine does not support queues (extended)");
+    }
+  }
+
   void OrthancPlugins::ApplyEnqueueValue(const _OrthancPluginEnqueueValue& parameters)
   {
     PImpl::ServerContextReference lock(*pimpl_);
@@ -4760,6 +4768,37 @@
     *parameters.size = lock.GetContext().GetIndex().GetQueueSize(parameters.queueId);
   }
 
+  void OrthancPlugins::ApplyReserveQueueValue(const _OrthancPluginReserveQueueValue& parameters)
+  {
+    PImpl::ServerContextReference lock(*pimpl_);
+
+    CheckExtendedQueuesSupport(lock.GetContext());
+
+    std::string value;
+    uint64_t valueId;
+
+    if (lock.GetContext().GetIndex().ReserveQueueValue(value, valueId, parameters.queueId, Plugins::Convert(parameters.origin), parameters.releaseTimeout))
+    {
+      CopyToMemoryBuffer(parameters.target, value);
+      *parameters.found = true;
+      *parameters.valueId = valueId;
+    }
+    else
+    {
+      *parameters.found = false;
+    }
+  }
+
+  void OrthancPlugins::ApplyAknowledgeQueueValue(const _OrthancPluginAcknowledgeQueueValue& parameters)
+  {
+    PImpl::ServerContextReference lock(*pimpl_);
+
+    CheckExtendedQueuesSupport(lock.GetContext());
+
+    lock.GetContext().GetIndex().AcknowledgeQueueValue(parameters.queueId, parameters.valueId);
+  }
+
+
   void OrthancPlugins::ApplySetStableStatus(const _OrthancPluginSetStableStatus& parameters)
   {
     PImpl::ServerContextReference lock(*pimpl_);
@@ -5920,6 +5959,20 @@
         return true;
       }
 
+      case _OrthancPluginService_ReserveQueueValue:
+      {
+        const _OrthancPluginReserveQueueValue& p = *reinterpret_cast<const _OrthancPluginReserveQueueValue*>(parameters);
+        ApplyReserveQueueValue(p);
+        return true;
+      }
+
+      case _OrthancPluginService_AcknowledgeQueueValue:
+      {
+        const _OrthancPluginAcknowledgeQueueValue& p = *reinterpret_cast<const _OrthancPluginAcknowledgeQueueValue*>(parameters);
+        ApplyAknowledgeQueueValue(p);
+        return true;
+      }
+
       case _OrthancPluginService_SetStableStatus:
       {
         const _OrthancPluginSetStableStatus& p = *reinterpret_cast<const _OrthancPluginSetStableStatus*>(parameters);
--- a/OrthancServer/Plugins/Engine/OrthancPlugins.h	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Engine/OrthancPlugins.h	Sat Nov 15 12:28:27 2025 +0100
@@ -249,6 +249,10 @@
 
     void ApplyGetQueueSize(const _OrthancPluginGetQueueSize& parameters);
 
+    void ApplyReserveQueueValue(const _OrthancPluginReserveQueueValue& parameters);
+
+    void ApplyAknowledgeQueueValue(const _OrthancPluginAcknowledgeQueueValue& parameters);
+
     void ApplySetStableStatus(const _OrthancPluginSetStableStatus& parameters);
 
     void ApplyEmitAuditLog(const _OrthancPluginEmitAuditLog& parameters);
--- a/OrthancServer/Plugins/Include/orthanc/OrthancCPlugin.h	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Include/orthanc/OrthancCPlugin.h	Sat Nov 15 12:28:27 2025 +0100
@@ -123,7 +123,7 @@
 
 #define ORTHANC_PLUGINS_MINIMAL_MAJOR_NUMBER     1
 #define ORTHANC_PLUGINS_MINIMAL_MINOR_NUMBER     12
-#define ORTHANC_PLUGINS_MINIMAL_REVISION_NUMBER  9
+#define ORTHANC_PLUGINS_MINIMAL_REVISION_NUMBER  10
 
 
 #if !defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE)
@@ -519,6 +519,8 @@
     _OrthancPluginService_GetQueueSize = 59,                        /* New in Orthanc 1.12.8 */
     _OrthancPluginService_SetStableStatus = 60,                     /* New in Orthanc 1.12.9 */
     _OrthancPluginService_EmitAuditLog = 61,                        /* New in Orthanc 1.12.9 */
+    _OrthancPluginService_ReserveQueueValue = 62,                   /* New in Orthanc 1.12.10 */
+    _OrthancPluginService_AcknowledgeQueueValue = 63,               /* New in Orthanc 1.12.10 */
 
     /* Registration of callbacks */
     _OrthancPluginService_RegisterRestCallback = 1000,
@@ -10412,9 +10414,12 @@
    * @param queueId A unique identifier identifying both the plugin and the queue.
    * @param origin The position from where the value is dequeued (back for LIFO, front for FIFO).
    * @return 0 if success, other value if error.
+   * @deprecated This function should not be used anymore because there is a risk of loosing
+   * a value if the consumer plugin crashes before it has processed the value. Use
+   * OrthancPluginReserveQueueValue() and OrthancPluginAcknowledgeQueueValue() if possible.
    **/
   ORTHANC_PLUGIN_SINCE_SDK("1.12.8")
-  ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginDequeueValue(
+  ORTHANC_PLUGIN_DEPRECATED ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginDequeueValue(
     OrthancPluginContext*         context,
     uint8_t*                      found,    /* out */
     OrthancPluginMemoryBuffer*    target,   /* out */
@@ -10438,7 +10443,7 @@
   } _OrthancPluginGetQueueSize;
 
   /**
-   * @brief Get the number of elements in a queue.
+   * @brief Get the number of elements that are currently stored in a queue.
    *
    * @param context The Orthanc plugin context, as received by OrthancPluginInitialize().
    * @param queueId A unique identifier identifying both the plugin and the queue.
@@ -10706,6 +10711,85 @@
   }
 
 
+  typedef struct
+  {
+    uint8_t*                      found;
+    OrthancPluginMemoryBuffer*    target;
+    const char*                   queueId;
+    OrthancPluginQueueOrigin      origin;
+    uint32_t                      releaseTimeout;
+    uint64_t*                     valueId;
+  } _OrthancPluginReserveQueueValue;
+
+  /**
+   * @brief Reserve a value from a queue, which means that the message is not
+   *        available to other consumers.
+   *        The value is either:
+   *        - removed from the queue when one calls OrthancPluginAcknowledgeQueueValue()
+   *        - or made available again for consumers after the releaseTimeout has expired.
+   *
+   * @param context The Orthanc plugin context, as received by OrthancPluginInitialize().
+   * @param found Pointer to a Boolean that is set to "true" iff. a value has been dequeued.
+   * @param target Memory buffer where to store the value that has been retrieved from the queue.
+   * It must be freed with OrthancPluginFreeMemoryBuffer().
+   * @param queueId A unique identifier identifying both the plugin and the queue.
+   * @param origin The position from where the value is dequeued (back for LIFO, front for FIFO).
+   * @param releaseTimeout Timeout in seconds. If the value is not acknowledged before this delay expires,
+   *                       the value is automatically released and made available for further calls to
+   *                       OrthancPluginDequeueValue() or OrthancPluginReserveQueueValue()
+   * @param valueId An opaque identifier for this value, to be subsequently provided to
+   * OrthancPluginAcknowledgeQueueValue().
+   * @return 0 if success, other value if error.
+   **/
+  ORTHANC_PLUGIN_SINCE_SDK("1.12.10")
+  ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginReserveQueueValue(
+    OrthancPluginContext*         context,
+    uint8_t*                      found,    /* out */
+    OrthancPluginMemoryBuffer*    target,   /* out */
+    uint64_t*                     valueId,  /* out */
+    const char*                   queueId,  /* in */
+    OrthancPluginQueueOrigin      origin,   /* in */
+    uint32_t                      releaseTimeout /* in */)
+  {
+    _OrthancPluginReserveQueueValue params;
+    params.found = found;
+    params.target = target;
+    params.queueId = queueId;
+    params.origin = origin;
+    params.valueId = valueId;
+    params.releaseTimeout = releaseTimeout;
+
+    return context->InvokeService(context, _OrthancPluginService_ReserveQueueValue, &params);
+  }
+
+  typedef struct
+  {
+    const char*                   queueId;
+    uint64_t                      valueId;
+  } _OrthancPluginAcknowledgeQueueValue;
+
+  /**
+   * @brief Acknowledge that a queue value has been properly consumed by the plugin
+   * and can be definitely removed from the queue.
+   *
+   * @param context The Orthanc plugin context, as received by OrthancPluginInitialize().
+   * @param queueId A unique identifier identifying both the plugin and the queue.
+   * @param valueId The opaque identifier for the value obtained from OrthancPluginReserveQueueValue().
+   * @return 0 if success, other value if error.
+   **/
+  ORTHANC_PLUGIN_SINCE_SDK("1.12.10")
+  ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginAcknowledgeQueueValue(
+    OrthancPluginContext*         context,
+    const char*                   queueId,  /* in */
+    uint64_t                      valueId  /* in */)
+  {
+    _OrthancPluginAcknowledgeQueueValue params;
+    params.queueId = queueId;
+    params.valueId = valueId;
+
+    return context->InvokeService(context, _OrthancPluginService_AcknowledgeQueueValue, &params);
+  }
+
 #ifdef  __cplusplus
 }
 #endif
--- a/OrthancServer/Plugins/Include/orthanc/OrthancDatabasePlugin.proto	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Include/orthanc/OrthancDatabasePlugin.proto	Sat Nov 15 12:28:27 2025 +0100
@@ -175,6 +175,7 @@
     bool supports_key_value_stores = 10;  // New in Orthanc 1.12.8
     bool supports_queues = 11;            // New in Orthanc 1.12.8
     bool has_attachment_custom_data = 12; // New in Orthanc 1.12.8
+    bool supports_extended_queues = 13;   // New in Orthanc 1.12.10
   }
 }
 
@@ -339,7 +340,8 @@
   OPERATION_GET_QUEUE_SIZE = 59;              // New in Orthanc 1.12.8
   OPERATION_GET_ATTACHMENT_CUSTOM_DATA = 60;  // New in Orthanc 1.12.8
   OPERATION_SET_ATTACHMENT_CUSTOM_DATA = 61;  // New in Orthanc 1.12.8
-
+  OPERATION_RESERVE_QUEUE_VALUE = 62;         // New in Orthanc 1.12.10
+  OPERATION_ACKNOWLEDGE_QUEUE_VALUE = 63;     // New in Orthanc 1.12.10
 }
 
 message Rollback {
@@ -1065,6 +1067,20 @@
   }
 }
 
+message ReserveQueueValue {
+  message Request {
+    string queue_id = 1;
+    QueueOrigin origin = 2;
+    uint32 release_timeout = 3;
+  }
+
+  message Response {
+    bool found = 1;
+    bytes value = 2;
+    uint64 value_id = 3;
+  }
+}
+
 message GetQueueSize {
   message Request {
     string queue_id = 1;
@@ -1075,6 +1091,17 @@
   }
 }
 
+message AcknowledgeQueueValue {
+  message Request {
+    string queue_id = 1;
+    uint64 value_id = 2;
+  }
+
+  message Response {
+  }
+}
+
+
 message GetAttachmentCustomData {
   message Request {
     string uuid = 1;
@@ -1162,6 +1189,8 @@
   GetQueueSize.Request                    get_queue_size = 159;
   GetAttachmentCustomData.Request         get_attachment_custom_data = 160;
   SetAttachmentCustomData.Request         set_attachment_custom_data = 161;
+  ReserveQueueValue.Request               reserve_queue_value = 162;
+  AcknowledgeQueueValue.Request           acknowledge_queue_value = 163;
 }
 
 message TransactionResponse {
@@ -1227,6 +1256,8 @@
   GetQueueSize.Response                    get_queue_size = 159;
   GetAttachmentCustomData.Response         get_attachment_custom_data = 160;
   SetAttachmentCustomData.Response         set_attachment_custom_data = 161;
+  ReserveQueueValue.Response               reserve_queue_value = 162;
+  AcknowledgeQueueValue.Response           acknowledge_queue_value = 163;
 }
 
 enum RequestType {
--- a/OrthancServer/Plugins/Include/orthanc/OrthancPluginCodeModel.json	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Include/orthanc/OrthancPluginCodeModel.json	Sat Nov 15 12:28:27 2025 +0100
@@ -2234,7 +2234,7 @@
                     "value": 2002
                 },
                 {
-                    "documentation": "The TCP port of the HTTP server is privileged or already in use",
+                    "documentation": "The TCP port of the HTTP server is privileged or already in use or one of the HTTP bind addresses does not exist",
                     "key": "HttpPortInUse",
                     "value": 2003
                 },
@@ -5486,6 +5486,37 @@
                 12,
                 9
             ]
+        },
+        {
+            "args": [
+                {
+                    "name": "arg0",
+                    "sdk_name": "queueId",
+                    "sdk_type": "const char *"
+                },
+                {
+                    "name": "arg1",
+                    "sdk_name": "valueId",
+                    "sdk_type": "uint64_t"
+                }
+            ],
+            "c_function": "OrthancPluginAcknowledgeQueueValue",
+            "documentation": {
+                "args": {
+                    "queueId": "A unique identifier identifying both the plugin and the queue.",
+                    "valueId": "The opaque identifier for the value obtained from OrthancPluginReserveQueueValue()."
+                },
+                "description": [],
+                "return": "0 if success, other value if error.",
+                "summary": "Acknowledge that a queue value has been properly consumed by the plugin and can be definitely removed from the queue."
+            },
+            "return_sdk_enumeration": "OrthancPluginErrorCode",
+            "return_sdk_type": "enumeration",
+            "since_sdk": [
+                1,
+                12,
+                10
+            ]
         }
     ],
     "unwrapped_functions": [
@@ -5538,6 +5569,7 @@
         "OrthancPluginGetQueueSize",
         "OrthancPluginSetStableStatus",
         "OrthancPluginRegisterHttpAuthentication",
-        "OrthancPluginRegisterAuditLogHandler"
+        "OrthancPluginRegisterAuditLogHandler",
+        "OrthancPluginReserveQueueValue"
     ]
 }
\ No newline at end of file
--- a/OrthancServer/Plugins/Samples/Common/OrthancPluginCppWrapper.cpp	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Samples/Common/OrthancPluginCppWrapper.cpp	Sat Nov 15 12:28:27 2025 +0100
@@ -4643,4 +4643,45 @@
     }
   }
 #endif
+
+#if HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES == 1
+  bool Queue::ReserveInternal(std::string& value, uint64_t& valueId, OrthancPluginQueueOrigin origin, uint32_t releaseTimeout)
+  {
+    uint8_t found = false;
+    OrthancPlugins::MemoryBuffer valueBuffer;
+
+    OrthancPluginErrorCode code = OrthancPluginReserveQueueValue(OrthancPlugins::GetGlobalContext(), &found,
+                                                                 *valueBuffer, &valueId, queueId_.c_str(), origin, releaseTimeout);
+
+    if (code != OrthancPluginErrorCode_Success)
+    {
+      ORTHANC_PLUGINS_THROW_PLUGIN_ERROR_CODE(code);
+    }
+    else if (found)
+    {
+      valueBuffer.ToString(value);
+      return true;
+    }
+    else
+    {
+      return false;
+    }
+  }
+
+  bool Queue::ReserveBack(std::string& value, uint64_t& valueId, uint32_t releaseTimeout)
+  {
+    return ReserveInternal(value, valueId, OrthancPluginQueueOrigin_Back, releaseTimeout);
+  }
+    
+  bool Queue::ReserveFront(std::string& value, uint64_t& valueId, uint32_t releaseTimeout)
+  {
+    return ReserveInternal(value, valueId, OrthancPluginQueueOrigin_Front, releaseTimeout);
+  }
+
+  void Queue::Acknowledge(uint64_t valueId)
+  {
+    OrthancPluginAcknowledgeQueueValue(OrthancPlugins::GetGlobalContext(), queueId_.c_str(), valueId);
+  }
+#endif
+
 }
--- a/OrthancServer/Plugins/Samples/Common/OrthancPluginCppWrapper.h	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Plugins/Samples/Common/OrthancPluginCppWrapper.h	Sat Nov 15 12:28:27 2025 +0100
@@ -142,6 +142,12 @@
 #  define HAS_ORTHANC_PLUGIN_QUEUES            0
 #endif
 
+#if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 12, 10)
+#  define HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES   1
+#else
+#  define HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES   0
+#endif
+
 
 // Macro to tag a function as having been deprecated
 #if (__cplusplus >= 201402L)  // C++14
@@ -1726,6 +1732,10 @@
 
     bool DequeueInternal(std::string& value, OrthancPluginQueueOrigin origin);
 
+#if HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES == 1
+    bool ReserveInternal(std::string& value, uint64_t& valueId, OrthancPluginQueueOrigin origin, uint32_t releaseTimeout);
+#endif
+
   public:
     explicit Queue(const std::string& queueId) :
       queueId_(queueId)
@@ -1745,17 +1755,29 @@
       Enqueue(value.empty() ? NULL : value.c_str(), value.size());
     }
 
+    // Use ReserveBack instead
+    ORTHANC_PLUGIN_DEPRECATED
     bool DequeueBack(std::string& value)
     {
       return DequeueInternal(value, OrthancPluginQueueOrigin_Back);
     }
 
+    // Use ReserveFront instead
+    ORTHANC_PLUGIN_DEPRECATED
     bool DequeueFront(std::string& value)
     {
       return DequeueInternal(value, OrthancPluginQueueOrigin_Front);
     }
 
     uint64_t GetSize();
+
+#if HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES == 1
+    bool ReserveBack(std::string& value, uint64_t& valueId, uint32_t releaseTimeout);
+    
+    bool ReserveFront(std::string& value, uint64_t& valueId, uint32_t releaseTimeout);
+
+    void Acknowledge(uint64_t valueId);
+#endif
   };
 #endif
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Sources/Database/AddTimeoutToQueues.sql	Sat Nov 15 12:28:27 2025 +0100
@@ -0,0 +1,23 @@
+-- Orthanc - A Lightweight, RESTful DICOM Store
+-- Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
+-- Department, University Hospital of Liege, Belgium
+-- Copyright (C) 2017-2023 Osimis S.A., Belgium
+-- Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
+-- Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
+--
+-- This program is free software: you can redistribute it and/or
+-- modify it under the terms of the GNU General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- This program is distributed in the hope that it will be useful, but
+-- WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+-- General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+ALTER TABLE Queues
+ADD COLUMN reservedUntil TIMESTAMP DEFAULT NULL;
--- a/OrthancServer/Sources/Database/IDatabaseWrapper.h	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Sources/Database/IDatabaseWrapper.h	Sat Nov 15 12:28:27 2025 +0100
@@ -59,6 +59,7 @@
       bool hasAttachmentCustomDataSupport_;
       bool hasKeyValueStoresSupport_;
       bool hasQueuesSupport_;
+      bool hasExtendedQueuesSupport_;
 
     public:
       Capabilities() :
@@ -72,7 +73,8 @@
         hasExtendedChanges_(false),
         hasAttachmentCustomDataSupport_(false),
         hasKeyValueStoresSupport_(false),
-        hasQueuesSupport_(false)
+        hasQueuesSupport_(false),
+        hasExtendedQueuesSupport_(false)
       {
       }
 
@@ -186,6 +188,16 @@
         return hasQueuesSupport_;
       }
 
+      void SetExtendedQueuesSupport(bool value)
+      {
+        hasExtendedQueuesSupport_ = value;
+      }
+
+      bool HasExtendedQueuesSupport() const
+      {
+        return hasExtendedQueuesSupport_;
+      }
+
     };
 
 
@@ -471,6 +483,17 @@
       // New in Orthanc 1.12.8, for statistics only
       virtual uint64_t GetQueueSize(const std::string& queueId) = 0;
 
+      // New in Orthanc 1.12.10
+      virtual bool ReserveQueueValue(std::string& value,
+                                     uint64_t& valueId, 
+                                     const std::string& queueId,
+                                     QueueOrigin origin,
+                                     uint32_t releaseTimeout) = 0;
+      
+      // New in Orthanc 1.12.10
+      virtual void AcknowledgeQueueValue(const std::string& queueId,
+                                         uint64_t valueId) = 0;
+
     };
 
 
--- a/OrthancServer/Sources/Database/InstallKeyValueStoresAndQueues.sql	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Sources/Database/InstallKeyValueStoresAndQueues.sql	Sat Nov 15 12:28:27 2025 +0100
@@ -30,6 +30,7 @@
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        queueId TEXT NOT NULL,
        value BLOB NOT NULL
+       --- reservedUntil TIMESTAMP DEFAULT NULL  -- added in AddTimeoutToQueues.sql
 );
 
 CREATE INDEX QueuesIndex ON Queues (queueId, id);
--- a/OrthancServer/Sources/Database/PrepareDatabase.sql	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Sources/Database/PrepareDatabase.sql	Sat Nov 15 12:28:27 2025 +0100
@@ -107,9 +107,11 @@
 
 -- The 3 following indexes were added in Orthanc 0.8.5 (database v5)
 CREATE INDEX DicomIdentifiersIndex1 ON DicomIdentifiers(id);
-CREATE INDEX DicomIdentifiersIndex2 ON DicomIdentifiers(tagGroup, tagElement);
+-- CREATE INDEX DicomIdentifiersIndex2 ON DicomIdentifiers(tagGroup, tagElement);  -- disabled in 1.12.10
 CREATE INDEX DicomIdentifiersIndexValues ON DicomIdentifiers(value COLLATE BINARY);
 
+CREATE INDEX DicomIdentifiersIndexGroupElementValue ON DicomIdentifiers(tagGroup, tagElement, value); -- added in 1.12.10
+
 CREATE INDEX ChangesIndex ON Changes(internalId);
 
 
@@ -152,6 +154,9 @@
 -- new in Orthanc 1.12.8 ------------------------- equivalent to InstallKeyValueStoresAndQueues.sql
 ${INSTALL_KEY_VALUE_STORES_AND_QUEUES}
 
+-- new in Orthanc 1.12.10 ------------------------- equivalent to AddTimeoutToQueues.sql
+${ADD_TIMEOUT_TO_QUEUES}
+
 
 -- Track the fact that the "revision" column exists in the "Metadata" and "AttachedFiles"
 -- tables, and that the "customData" column exists in the "AttachedFiles" table
--- a/OrthancServer/Sources/Database/SQLiteDatabaseWrapper.cpp	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Sources/Database/SQLiteDatabaseWrapper.cpp	Sat Nov 15 12:28:27 2025 +0100
@@ -2280,54 +2280,49 @@
       }
 
       SQLite::Statement s(db_, SQLITE_FROM_HERE,
-                          "INSERT INTO Queues (queueId, value) VALUES (?, ?)");
+                          "INSERT INTO Queues (queueId, value, reservedUntil) VALUES (?, ?, NULL)");
       s.BindString(0, queueId);
       s.BindBlob(1, value, valueSize);
       s.Run();
     }
 
-    // New in Orthanc 1.12.8
+    // New in Orthanc 1.12.8 (but obsolete, you should use ReserveQueueValue instead)
     virtual bool DequeueValue(std::string& value,
                               const std::string& queueId,
                               QueueOrigin origin) ORTHANC_OVERRIDE
     {
-      int64_t rowId;
-      std::unique_ptr<SQLite::Statement> s;
-
+      std::string order;
       switch (origin)
       {
-        case QueueOrigin_Front:
-          s.reset(new SQLite::Statement(db_, SQLITE_FROM_HERE, "SELECT id, value FROM Queues WHERE queueId=? ORDER BY id ASC LIMIT 1"));
+        case QueueOrigin_Back:
+          order = "DESC";
           break;
-
-        case QueueOrigin_Back:
-          s.reset(new SQLite::Statement(db_, SQLITE_FROM_HERE, "SELECT id, value FROM Queues WHERE queueId=? ORDER BY id DESC LIMIT 1"));
+        case QueueOrigin_Front:
+          order = "ASC";
           break;
-
         default:
           throw OrthancException(ErrorCode_InternalError);
       }
 
-      s->BindString(0, queueId);
-      if (!s->Step())
+      std::string sql = "WITH RowToDelete AS (SELECT id FROM Queues WHERE queueId=? AND (reservedUntil IS NULL OR reservedUntil < datetime('now')) ORDER BY id " + order + " LIMIT 1) "
+                        "DELETE FROM Queues WHERE id IN (SELECT id FROM RowToDelete) "
+                        "RETURNING value;";
+
+      SQLite::Statement s(db_, SQLITE_FROM_HERE_DYNAMIC(sql), sql);
+
+      s.BindString(0, queueId);
+      if (!s.Step())
       {
         // No value found
         return false;
       }
       else
       {
-        rowId = s->ColumnInt64(0);
-
-        if (!s->ColumnBlobAsString(1, &value))
+        if (!s.ColumnBlobAsString(0, &value))
         {
           throw OrthancException(ErrorCode_NotEnoughMemory);
         }
 
-        SQLite::Statement s2(db_, SQLITE_FROM_HERE,
-                            "DELETE FROM Queues WHERE id = ?");
-        s2.BindInt64(0, rowId);
-        s2.Run();
-
         return true;
       }    
     }
@@ -2340,6 +2335,77 @@
       s.Step();
       return s.ColumnInt64(0);
     }
+
+    // New in Orthanc 1.12.10
+    virtual bool ReserveQueueValue(std::string& value,
+                                   uint64_t& valueId,
+                                   const std::string& queueId,
+                                   QueueOrigin origin,
+                                   uint32_t releaseTimeout) ORTHANC_OVERRIDE
+    {
+      // { // list queue values
+      //   SQLite::Statement t(db_, SQLITE_FROM_HERE,
+      //                       "SELECT id, value, reservedUntil FROM Queues WHERE queueId=?");
+      //   t.BindString(0, queueId);
+      //   while (t.Step())
+      //   {
+      //     t.ColumnBlobAsString(1, &value);
+      //     LOG(INFO) << t.ColumnInt64(0) << " " << value << " " << t.ColumnString(2);
+      //   }
+      // }
+
+      std::string order;
+      switch (origin)
+      {
+        case QueueOrigin_Back:
+          order = "DESC";
+          break;
+        case QueueOrigin_Front:
+          order = "ASC";
+          break;
+        default:
+          throw OrthancException(ErrorCode_InternalError);
+      }
+
+      std::string sql = "WITH RowToUpdate AS (SELECT id FROM Queues WHERE queueId=? AND (reservedUntil IS NULL OR reservedUntil < datetime('now')) ORDER BY id " + order + " LIMIT 1) "
+                        "UPDATE Queues SET reservedUntil = datetime('now', ?) WHERE id IN (SELECT id FROM RowToUpdate) "
+                        "RETURNING id, value;";
+
+      SQLite::Statement s(db_, SQLITE_FROM_HERE_DYNAMIC(sql), sql);
+
+
+      s.BindString(0, queueId);
+      std::string timeout = "+" + boost::lexical_cast<std::string>(releaseTimeout) + " seconds";
+
+      s.BindString(1, timeout);
+      if (!s.Step())
+      {
+        // No value found
+        return false;
+      }
+      else
+      {
+        valueId = static_cast<uint64_t>(s.ColumnInt64(0));
+
+        if (!s.ColumnBlobAsString(1, &value))
+        {
+          throw OrthancException(ErrorCode_NotEnoughMemory);
+        }
+
+        return true;
+      }    
+
+    }
+    
+    // New in Orthanc 1.12.10
+    virtual void AcknowledgeQueueValue(const std::string& /* queueId */,
+                                        uint64_t valueId) ORTHANC_OVERRIDE
+    {
+      SQLite::Statement s(db_, SQLITE_FROM_HERE,
+                          "DELETE FROM Queues WHERE id = ?");
+      s.BindInt64(0, static_cast<int64_t>(valueId));
+      s.Run();
+    }    
   };
 
 
@@ -2586,6 +2652,7 @@
     dbCapabilities_.SetHasFindSupport(HasIntegratedFind());
     dbCapabilities_.SetKeyValueStoresSupport(true);
     dbCapabilities_.SetQueuesSupport(true);
+    dbCapabilities_.SetExtendedQueuesSupport(true);
     dbCapabilities_.SetAttachmentCustomDataSupport(true);
     db_.Open(path);
   }
@@ -2603,6 +2670,7 @@
     dbCapabilities_.SetHasFindSupport(HasIntegratedFind());
     dbCapabilities_.SetKeyValueStoresSupport(true);
     dbCapabilities_.SetQueuesSupport(true);
+    dbCapabilities_.SetExtendedQueuesSupport(true);
     dbCapabilities_.SetAttachmentCustomDataSupport(true);
     db_.OpenInMemory();
   }
@@ -2678,6 +2746,7 @@
         InjectEmbeddedScript(query, "${INSTALL_LABELS_TABLE}", ServerResources::INSTALL_LABELS_TABLE);
         InjectEmbeddedScript(query, "${INSTALL_DELETED_FILES}", ServerResources::INSTALL_DELETED_FILES);
         InjectEmbeddedScript(query, "${INSTALL_KEY_VALUE_STORES_AND_QUEUES}", ServerResources::INSTALL_KEY_VALUE_STORES_AND_QUEUES);
+        InjectEmbeddedScript(query, "${ADD_TIMEOUT_TO_QUEUES}", ServerResources::ADD_TIMEOUT_TO_QUEUES);
 
         db_.Execute(query);
       }
@@ -2743,6 +2812,13 @@
           LOG(INFO) << "Installing the \"KeyValueStores\" and \"Queues\" tables";
           ExecuteEmbeddedScript(db_, ServerResources::INSTALL_KEY_VALUE_STORES_AND_QUEUES);
         }
+
+        // New in Orthanc 1.12.10
+        if (!db_.DoesColumnExist("Queues", "reservedUntil"))
+        {
+          LOG(INFO) << "Adding timeout column to the \"Queues\" table";
+          ExecuteEmbeddedScript(db_, ServerResources::ADD_TIMEOUT_TO_QUEUES);
+        }
       }
 
       transaction->Commit(0);
--- a/OrthancServer/Sources/Database/StatelessDatabaseOperations.cpp	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Sources/Database/StatelessDatabaseOperations.cpp	Sat Nov 15 12:28:27 2025 +0100
@@ -3228,6 +3228,12 @@
     return db_.GetDatabaseCapabilities().HasQueuesSupport();
   }
 
+  bool StatelessDatabaseOperations::HasExtendedQueuesSupport()
+  {
+    boost::shared_lock<boost::shared_mutex> lock(mutex_);
+    return db_.GetDatabaseCapabilities().HasExtendedQueuesSupport();
+  }
+
   void StatelessDatabaseOperations::ExecuteCount(uint64_t& count,
                                                  const FindRequest& request)
   {
@@ -3574,6 +3580,92 @@
     return size;
   }
 
+  bool StatelessDatabaseOperations::ReserveQueueValue(std::string& value,
+                                                      uint64_t& valueId,
+                                                      const std::string& queueId,
+                                                      QueueOrigin origin,
+                                                      uint32_t releaseTimeout)
+  {
+    if (queueId.empty())
+    {
+      throw OrthancException(ErrorCode_ParameterOutOfRange);
+    }
+
+    class Operations : public IReadWriteOperations
+    {
+    private:
+      const std::string& queueId_;
+      uint64_t& valueId_;
+      std::string& value_;
+      QueueOrigin origin_;
+      uint32_t releaseTimeout_;
+      bool found_;
+
+    public:
+      Operations(std::string& value,
+                 uint64_t& valueId,
+                 const std::string& queueId,
+                 QueueOrigin origin,
+                 uint32_t releaseTimeout) :
+        queueId_(queueId),
+        valueId_(valueId),
+        value_(value),
+        origin_(origin),
+        releaseTimeout_(releaseTimeout),
+        found_(false)
+      {
+      }
+
+      bool HasFound()
+      {
+        return found_;
+      }
+
+      virtual void Apply(ReadWriteTransaction& transaction) ORTHANC_OVERRIDE
+      {
+        found_ = transaction.ReserveQueueValue(value_, valueId_, queueId_, origin_, releaseTimeout_);
+      }
+    };
+
+    Operations operations(value, valueId, queueId, origin, releaseTimeout);
+    Apply(operations);
+
+    return operations.HasFound();
+  }
+
+
+  void StatelessDatabaseOperations::AcknowledgeQueueValue(const std::string& queueId,
+                                                          uint64_t valueId)
+  {
+    if (queueId.empty())
+    {
+      throw OrthancException(ErrorCode_ParameterOutOfRange);
+    }
+
+    class Operations : public IReadWriteOperations
+    {
+    private:
+      const std::string& queueId_;
+      uint64_t valueId_;
+
+    public:
+      Operations(const std::string& queueId,
+                 uint64_t valueId) :
+        queueId_(queueId),
+        valueId_(valueId)
+      {
+      }
+
+      virtual void Apply(ReadWriteTransaction& transaction) ORTHANC_OVERRIDE
+      {
+        transaction.AcknowledgeQueueValue(queueId_, valueId_);
+      }
+    };
+
+    Operations operations(queueId, valueId);
+    Apply(operations);
+  }
+
 
   void StatelessDatabaseOperations::GetAttachmentCustomData(std::string& customData,
                                                             const std::string& attachmentUuid)
--- a/OrthancServer/Sources/Database/StatelessDatabaseOperations.h	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Sources/Database/StatelessDatabaseOperations.h	Sat Nov 15 12:28:27 2025 +0100
@@ -485,6 +485,21 @@
         return transaction_.DequeueValue(value, queueId, origin);
       }
 
+      bool ReserveQueueValue(std::string& value,
+                             uint64_t& valueId,
+                             const std::string& queueId,
+                             QueueOrigin origin,
+                             uint32_t releaseTimeout)
+      {
+        return transaction_.ReserveQueueValue(value, valueId, queueId, origin, releaseTimeout);
+      }
+
+      void AcknowledgeQueueValue(const std::string& queueId,
+                                 uint64_t valueId)
+      {
+        return transaction_.AcknowledgeQueueValue(queueId, valueId);
+      }
+
       void SetAttachmentCustomData(const std::string& attachmentUuid,
                                       const void* customData,
                                       size_t customDataSize)
@@ -622,6 +637,8 @@
 
     bool HasQueuesSupport();
 
+    bool HasExtendedQueuesSupport();
+
     void GetExportedResources(Json::Value& target,
                               int64_t since,
                               uint32_t limit);
@@ -841,6 +858,15 @@
     
     uint64_t GetQueueSize(const std::string& queueId);
 
+    bool ReserveQueueValue(std::string& value,
+                           uint64_t& valueId,
+                           const std::string& queueId,
+                           QueueOrigin origin,
+                           uint32_t releaseTimeout);
+
+    void AcknowledgeQueueValue(const std::string& queueId,
+                               uint64_t valueId);
+
     class KeysValuesIterator : public boost::noncopyable
     {
     private:
--- a/OrthancServer/Sources/OrthancRestApi/OrthancRestSystem.cpp	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestSystem.cpp	Sat Nov 15 12:28:27 2025 +0100
@@ -97,6 +97,7 @@
     static const char* const HAS_EXTENDED_CHANGES = "HasExtendedChanges";
     static const char* const HAS_KEY_VALUE_STORES = "HasKeyValueStores";
     static const char* const HAS_QUEUES = "HasQueues";
+    static const char* const HAS_EXTENDED_QUEUES = "HasExtendedQueues";
     static const char* const HAS_EXTENDED_FIND = "HasExtendedFind";
     static const char* const READ_ONLY = "ReadOnly";
 
@@ -213,6 +214,7 @@
     result[CAPABILITIES][HAS_EXTENDED_FIND] = OrthancRestApi::GetIndex(call).HasFindSupport();
     result[CAPABILITIES][HAS_KEY_VALUE_STORES] = OrthancRestApi::GetIndex(call).HasKeyValueStoresSupport();
     result[CAPABILITIES][HAS_QUEUES] = OrthancRestApi::GetIndex(call).HasQueuesSupport();
+    result[CAPABILITIES][HAS_EXTENDED_QUEUES] = OrthancRestApi::GetIndex(call).HasExtendedQueuesSupport();
     
     call.GetOutput().AnswerJson(result);
   }
--- a/OrthancServer/UnitTestsSources/ServerIndexTests.cpp	Sat Nov 15 11:49:39 2025 +0100
+++ b/OrthancServer/UnitTestsSources/ServerIndexTests.cpp	Sat Nov 15 12:28:27 2025 +0100
@@ -1319,6 +1319,79 @@
   db.Close();
 }
 
+TEST(SQLiteDatabaseWrapper, QueuesExtended)
+{
+  SQLiteDatabaseWrapper db;   // The SQLite DB is in memory
+  db.Open();
+
+  {
+    StatelessDatabaseOperations op(db, false);
+    op.SetTransactionContextFactory(new DummyTransactionContextFactory);
+
+    ASSERT_EQ(0u, op.GetQueueSize("test"));
+    op.EnqueueValue("test", "a");
+    ASSERT_EQ(1u, op.GetQueueSize("test"));
+    op.EnqueueValue("test", "b");
+    ASSERT_EQ(2u, op.GetQueueSize("test"));
+    op.EnqueueValue("test", "c");
+    ASSERT_EQ(3u, op.GetQueueSize("test"));
+
+    std::string s;
+    uint64_t valueId0, valueId1, valueId2, valueId3;
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 100));  
+    ASSERT_EQ("c", s);
+    ASSERT_EQ(3u, op.GetQueueSize("test"));  // the reserved values are still counted !
+    
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId1, "test", QueueOrigin_Back, 100));  
+    ASSERT_EQ("b", s);
+    
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId2, "test", QueueOrigin_Back, 100));  
+    ASSERT_EQ("a", s);
+    ASSERT_EQ(3u, op.GetQueueSize("test"));
+
+    ASSERT_FALSE(op.ReserveQueueValue(s, valueId3, "test", QueueOrigin_Back, 100));
+
+    op.AcknowledgeQueueValue("test", valueId1);
+    ASSERT_EQ(2u, op.GetQueueSize("test"));
+    op.AcknowledgeQueueValue("test", valueId2);
+    op.AcknowledgeQueueValue("test", valueId0);
+    ASSERT_EQ(0u, op.GetQueueSize("test"));
+
+    op.EnqueueValue("test", "d");
+    op.EnqueueValue("test", "e");
+    op.EnqueueValue("test", "f");
+    op.EnqueueValue("test", "g");
+    op.EnqueueValue("test", "h");
+
+    // reserve 2 values front and back and acknowledge only "e" & "f"
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1));  
+    ASSERT_EQ("d", s);
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1));  
+    ASSERT_EQ("e", s);
+    op.AcknowledgeQueueValue("test", valueId0);
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1));  
+    ASSERT_EQ("h", s);
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1));  
+    ASSERT_EQ("g", s);
+    op.AcknowledgeQueueValue("test", valueId0);
+
+    SystemToolbox::USleep(2000000); // the granularity being the second, we might have to wait up to 2 seconds
+
+    // "d", "f" and "h" remains
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1));  
+    ASSERT_EQ("d", s);
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1));  
+    ASSERT_EQ("f", s);
+    ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1));  
+    ASSERT_EQ("h", s);
+    
+    // the queue is empty at this point since "f" has been reserved already
+    ASSERT_FALSE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1));  
+  }
+
+  db.Close();
+}
+
 
 TEST_F(DatabaseWrapperTest, BinaryCustomData)
 {
--- a/TODO	Sat Nov 15 11:49:39 2025 +0100
+++ b/TODO	Sat Nov 15 12:28:27 2025 +0100
@@ -329,32 +329,6 @@
   can not cross the C/C++ frontier safely -> we need a OrthancPluginRegisterStorageArea3 with a return value.
   Ex: install DelayedDeletion + S3 storage.  Right now, the second plugin to load is just ignored with an error
   message in the logs.
-* Queues: There is currently a risk of loosing messages from the Queues:
-    message = orthanc.DequeueValue("instances-to-process", orthanc.QueueOrigin.FRONT)
-    # consider Orthanc is interrupted here (hard shutdown)
-    proccess(message)
-  The message will never be processed ...
-  We should have an acknowledge/commit mechanism e.g:
-    message, messageId = orthanc.DequeueValue2("instances-to-process", orthanc.QueueOrigin.FRONT, 5) # where 5 is a "timeout"
-    # At this point, the message is still in the queue but will not be dequeued by other consumers.
-    # If the message is not acknowledged within 5 seconds, it will get back into the queue.
-    process(message)
-    orthanc.AcknowledgeQueue("instances-to-process", messageId)
-    # This requires adding a new "timeout" column in the DB with the reservation_expiration timestamp.
-
-  Note by SJ: Introducing an acknowledgement would greatly complexify
-  the SDK and the core of Orthanc. I would favor another approach by
-  introducing 2 functions in the SDK:
-  - OrthancPluginQueuePeekFront(context, found, target, queueId): Same behavior as
-    OrthancPluginDequeueValue(), but limited to the back of the queue, and doesn't modify the queue
-  - OrthancPluginQueuePopFront(context, target): Remove the front element from the queue
-  As long as OrthancPluginQueuePopFront() is not called, no message would be lost.
-  Note that we cannot propose "OrthancPluginQueuePeekBack()" or
-  "OrthancPluginQueuePeek(..., OrthancPluginQueueOrigin_Back)" (i.e., LIFO stacks), as a
-  concurrent call to "OrthancPluginEnqueueValue()" would alter the back of the queue.
-
-  Note by AM: With PeekFront: 2 consumers on 2 Orthanc instances might consume the same message no ?
-
 * Add OrthancPluginSetStableStatus2() which would take an additional OrthancPluginResourceType
   argument to avoid possible ambiguity about the resource of interest