Mercurial > hg > orthanc
changeset 6422:36aff9f84264 queues-timeout tip
integration mainline->queues-timeout
| author | Sebastien Jodogne <s.jodogne@gmail.com> |
|---|---|
| date | Sat, 15 Nov 2025 12:28:27 +0100 |
| parents | b86411c11092 (diff) c557f6bdcbfd (current diff) |
| children | |
| files | NEWS OrthancServer/Plugins/Engine/OrthancPlugins.cpp OrthancServer/Sources/Database/StatelessDatabaseOperations.cpp |
| diffstat | 23 files changed, 739 insertions(+), 66 deletions(-) [+] |
line wrap: on
line diff
--- a/NEWS Sat Nov 15 11:49:39 2025 +0100 +++ b/NEWS Sat Nov 15 12:28:27 2025 +0100 @@ -1,6 +1,8 @@ Pending changes in the mainline =============================== +TODO: finalize DicomIdentifiersIndex (upgrade from existing DB) + General ------- @@ -74,6 +76,16 @@ - Note: the previous "Database" configuration has now been renamed in "Directory" to better differentiate the "File" or "DB modes. +Plugin SDK +---------- + +* New primitives for handling queues: + - OrthancPluginReserveQueueValue is a replacement for OrthancPluginDequeueValue. This flavour + does not directly remove the value from the queue but reserves it for a certain duration. + - OrthancPluginAcknowledgeQueueValue removes a reserved value from a queue. + - With these 2 primitives, one can ensure not to loose any message in case Orthanc is stopped + while handling a message. + Version 1.12.9 (2025-08-11) ===========================
--- a/OrthancFramework/Resources/Patches/OpenSSL-ConfigureHeaders.py Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancFramework/Resources/Patches/OpenSSL-ConfigureHeaders.py Sat Nov 15 12:28:27 2025 +0100 @@ -92,43 +92,43 @@ def Parse(match): s = '' - for t in re.findall('generate_stack_macros\("(.+?)"\)', match.group(1)): + for t in re.findall(r'generate_stack_macros\("(.+?)"\)', match.group(1)): s += (GENERATE_STACK_MACROS .replace('${nametype}', t) .replace('${realtype}', t) .replace('${plaintype}', t)) - for t in re.findall('generate_const_stack_macros\("(.+?)"\)', match.group(1)): + for t in re.findall(r'generate_const_stack_macros\("(.+?)"\)', match.group(1)): s += (GENERATE_STACK_MACROS .replace('${nametype}', t) .replace('${realtype}', 'const %s' % t) .replace('${plaintype}', t)) - for t in re.findall('generate_stack_string_macros\(\)', match.group(1)): + for t in re.findall(r'generate_stack_string_macros\(\)', match.group(1)): s += (GENERATE_STACK_MACROS .replace('${nametype}', 'OPENSSL_STRING') .replace('${realtype}', 'char') .replace('${plaintype}', 'char')) - for t in re.findall('generate_stack_const_string_macros\(\)', match.group(1)): + for t in re.findall(r'generate_stack_const_string_macros\(\)', match.group(1)): s += (GENERATE_STACK_MACROS .replace('${nametype}', 'OPENSSL_CSTRING') .replace('${realtype}', 'const char') .replace('${plaintype}', 'char')) - for t in re.findall('generate_stack_block_macros\(\)', match.group(1)): + for t in re.findall(r'generate_stack_block_macros\(\)', match.group(1)): s += (GENERATE_STACK_MACROS .replace('${nametype}', 'OPENSSL_BLOCK') .replace('${realtype}', 'void') .replace('${plaintype}', 'void')) - for t in re.findall('generate_lhash_macros\("(.+?)"\)', match.group(1)): + for t in re.findall(r'generate_lhash_macros\("(.+?)"\)', match.group(1)): s += GENERATE_LHASH_MACROS.replace('${type}', t) - for t in re.findall('\$config{rc4_int}', match.group(1)): + for t in re.findall(r'\$config{rc4_int}', match.group(1)): s += 'unsigned int' - for t in re.findall('oids_to_c::process_leaves\(.+?\)', match.group(1), re.MULTILINE | re.DOTALL): + for t in re.findall(r'oids_to_c::process_leaves\(.+?\)', match.group(1), re.MULTILINE | re.DOTALL): if not CURRENT_HEADER in OIDS: raise Exception('Unknown header: %s' % CURRENT_HEADER) @@ -145,18 +145,18 @@ directory = os.path.join(sys.argv[1], base) for source in os.listdir(directory): if source.endswith('.h.in'): - target = re.sub('\.h\.in$', '.h', source) + target = re.sub(r'\.h\.in$', '.h', source) with open(os.path.join(directory, source), 'r') as f: with open(os.path.join(directory, target), 'w') as g: CURRENT_HEADER = source - g.write(re.sub('{-(.*?)-}.*?$', Parse, f.read(), + g.write(re.sub(r'{-(.*?)-}.*?$', Parse, f.read(), flags = re.MULTILINE | re.DOTALL)) with open(os.path.join(sys.argv[1], 'providers/common/der/orthanc_oids_gen.c'), 'w') as f: for (header, content) in OIDS.items(): - f.write('#include "prov/%s"\n' % re.sub('\.h\.in$', '.h', header)) + f.write('#include "prov/%s"\n' % re.sub(r'\.h\.in$', '.h', header)) f.write('\n')
--- a/OrthancServer/CMakeLists.txt Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/CMakeLists.txt Sat Nov 15 12:28:27 2025 +0100 @@ -266,6 +266,7 @@ INSTALL_REVISION_AND_CUSTOM_DATA ${CMAKE_SOURCE_DIR}/Sources/Database/InstallRevisionAndCustomData.sql INSTALL_DELETED_FILES ${CMAKE_SOURCE_DIR}/Sources/Database/InstallDeletedFiles.sql INSTALL_KEY_VALUE_STORES_AND_QUEUES ${CMAKE_SOURCE_DIR}/Sources/Database/InstallKeyValueStoresAndQueues.sql + ADD_TIMEOUT_TO_QUEUES ${CMAKE_SOURCE_DIR}/Sources/Database/AddTimeoutToQueues.sql ) if (STANDALONE_BUILD)
--- a/OrthancServer/Plugins/Engine/OrthancPluginDatabase.cpp Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Engine/OrthancPluginDatabase.cpp Sat Nov 15 12:28:27 2025 +0100 @@ -1499,6 +1499,21 @@ throw OrthancException(ErrorCode_InternalError); // Not supported } + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); // Not supported + } + + virtual void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); // Not supported + } + virtual void GetAttachmentCustomData(std::string& customData, const std::string& attachmentUuid) ORTHANC_OVERRIDE {
--- a/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV3.cpp Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV3.cpp Sat Nov 15 12:28:27 2025 +0100 @@ -1111,6 +1111,21 @@ throw OrthancException(ErrorCode_InternalError); // Not supported } + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); // Not supported + } + + virtual void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) ORTHANC_OVERRIDE + { + throw OrthancException(ErrorCode_InternalError); // Not supported + } + virtual void GetAttachmentCustomData(std::string& customData, const std::string& attachmentUuid) ORTHANC_OVERRIDE {
--- a/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV4.cpp Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Engine/OrthancPluginDatabaseV4.cpp Sat Nov 15 12:28:27 2025 +0100 @@ -2051,6 +2051,73 @@ throw OrthancException(ErrorCode_InternalError); } } + + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) ORTHANC_OVERRIDE + { + if (database_.GetDatabaseCapabilities().HasExtendedQueuesSupport()) + { + DatabasePluginMessages::TransactionRequest request; + request.mutable_reserve_queue_value()->set_queue_id(queueId); + request.mutable_reserve_queue_value()->set_release_timeout(releaseTimeout); + + switch (origin) + { + case QueueOrigin_Back: + request.mutable_dequeue_value()->set_origin(DatabasePluginMessages::QUEUE_ORIGIN_BACK); + break; + + case QueueOrigin_Front: + request.mutable_dequeue_value()->set_origin(DatabasePluginMessages::QUEUE_ORIGIN_FRONT); + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + + DatabasePluginMessages::TransactionResponse response; + ExecuteTransaction(response, DatabasePluginMessages::OPERATION_RESERVE_QUEUE_VALUE, request); + + if (response.reserve_queue_value().found()) + { + value = response.reserve_queue_value().value(); + valueId = response.reserve_queue_value().value_id(); + return true; + } + else + { + return false; + } + } + else + { + // This method shouldn't have been called + throw OrthancException(ErrorCode_InternalError); + } + } + + virtual void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) ORTHANC_OVERRIDE + { + + if (database_.GetDatabaseCapabilities().HasExtendedQueuesSupport()) + { + DatabasePluginMessages::TransactionRequest request; + request.mutable_acknowledge_queue_value()->set_queue_id(queueId); + request.mutable_acknowledge_queue_value()->set_value_id(valueId); + + ExecuteTransaction(DatabasePluginMessages::OPERATION_ACKNOWLEDGE_QUEUE_VALUE, request); + } + else + { + // This method shouldn't have been called + throw OrthancException(ErrorCode_InternalError); + } + } + }; @@ -2143,6 +2210,7 @@ dbCapabilities_.SetHasFindSupport(systemInfo.supports_find()); dbCapabilities_.SetKeyValueStoresSupport(systemInfo.supports_key_value_stores()); dbCapabilities_.SetQueuesSupport(systemInfo.supports_queues()); + dbCapabilities_.SetExtendedQueuesSupport(systemInfo.supports_extended_queues()); dbCapabilities_.SetAttachmentCustomDataSupport(systemInfo.has_attachment_custom_data()); }
--- a/OrthancServer/Plugins/Engine/OrthancPlugins.cpp Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Engine/OrthancPlugins.cpp Sat Nov 15 12:28:27 2025 +0100 @@ -4723,6 +4723,14 @@ } } + static void CheckExtendedQueuesSupport(ServerContext& context) + { + if (!context.GetIndex().HasExtendedQueuesSupport()) + { + throw OrthancException(ErrorCode_NotImplemented, "The database engine does not support queues (extended)"); + } + } + void OrthancPlugins::ApplyEnqueueValue(const _OrthancPluginEnqueueValue& parameters) { PImpl::ServerContextReference lock(*pimpl_); @@ -4760,6 +4768,37 @@ *parameters.size = lock.GetContext().GetIndex().GetQueueSize(parameters.queueId); } + void OrthancPlugins::ApplyReserveQueueValue(const _OrthancPluginReserveQueueValue& parameters) + { + PImpl::ServerContextReference lock(*pimpl_); + + CheckExtendedQueuesSupport(lock.GetContext()); + + std::string value; + uint64_t valueId; + + if (lock.GetContext().GetIndex().ReserveQueueValue(value, valueId, parameters.queueId, Plugins::Convert(parameters.origin), parameters.releaseTimeout)) + { + CopyToMemoryBuffer(parameters.target, value); + *parameters.found = true; + *parameters.valueId = valueId; + } + else + { + *parameters.found = false; + } + } + + void OrthancPlugins::ApplyAknowledgeQueueValue(const _OrthancPluginAcknowledgeQueueValue& parameters) + { + PImpl::ServerContextReference lock(*pimpl_); + + CheckExtendedQueuesSupport(lock.GetContext()); + + lock.GetContext().GetIndex().AcknowledgeQueueValue(parameters.queueId, parameters.valueId); + } + + void OrthancPlugins::ApplySetStableStatus(const _OrthancPluginSetStableStatus& parameters) { PImpl::ServerContextReference lock(*pimpl_); @@ -5920,6 +5959,20 @@ return true; } + case _OrthancPluginService_ReserveQueueValue: + { + const _OrthancPluginReserveQueueValue& p = *reinterpret_cast<const _OrthancPluginReserveQueueValue*>(parameters); + ApplyReserveQueueValue(p); + return true; + } + + case _OrthancPluginService_AcknowledgeQueueValue: + { + const _OrthancPluginAcknowledgeQueueValue& p = *reinterpret_cast<const _OrthancPluginAcknowledgeQueueValue*>(parameters); + ApplyAknowledgeQueueValue(p); + return true; + } + case _OrthancPluginService_SetStableStatus: { const _OrthancPluginSetStableStatus& p = *reinterpret_cast<const _OrthancPluginSetStableStatus*>(parameters);
--- a/OrthancServer/Plugins/Engine/OrthancPlugins.h Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Engine/OrthancPlugins.h Sat Nov 15 12:28:27 2025 +0100 @@ -249,6 +249,10 @@ void ApplyGetQueueSize(const _OrthancPluginGetQueueSize& parameters); + void ApplyReserveQueueValue(const _OrthancPluginReserveQueueValue& parameters); + + void ApplyAknowledgeQueueValue(const _OrthancPluginAcknowledgeQueueValue& parameters); + void ApplySetStableStatus(const _OrthancPluginSetStableStatus& parameters); void ApplyEmitAuditLog(const _OrthancPluginEmitAuditLog& parameters);
--- a/OrthancServer/Plugins/Include/orthanc/OrthancCPlugin.h Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Include/orthanc/OrthancCPlugin.h Sat Nov 15 12:28:27 2025 +0100 @@ -123,7 +123,7 @@ #define ORTHANC_PLUGINS_MINIMAL_MAJOR_NUMBER 1 #define ORTHANC_PLUGINS_MINIMAL_MINOR_NUMBER 12 -#define ORTHANC_PLUGINS_MINIMAL_REVISION_NUMBER 9 +#define ORTHANC_PLUGINS_MINIMAL_REVISION_NUMBER 10 #if !defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) @@ -519,6 +519,8 @@ _OrthancPluginService_GetQueueSize = 59, /* New in Orthanc 1.12.8 */ _OrthancPluginService_SetStableStatus = 60, /* New in Orthanc 1.12.9 */ _OrthancPluginService_EmitAuditLog = 61, /* New in Orthanc 1.12.9 */ + _OrthancPluginService_ReserveQueueValue = 62, /* New in Orthanc 1.12.10 */ + _OrthancPluginService_AcknowledgeQueueValue = 63, /* New in Orthanc 1.12.10 */ /* Registration of callbacks */ _OrthancPluginService_RegisterRestCallback = 1000, @@ -10412,9 +10414,12 @@ * @param queueId A unique identifier identifying both the plugin and the queue. * @param origin The position from where the value is dequeued (back for LIFO, front for FIFO). * @return 0 if success, other value if error. + * @deprecated This function should not be used anymore because there is a risk of loosing + * a value if the consumer plugin crashes before it has processed the value. Use + * OrthancPluginReserveQueueValue() and OrthancPluginAcknowledgeQueueValue() if possible. **/ ORTHANC_PLUGIN_SINCE_SDK("1.12.8") - ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginDequeueValue( + ORTHANC_PLUGIN_DEPRECATED ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginDequeueValue( OrthancPluginContext* context, uint8_t* found, /* out */ OrthancPluginMemoryBuffer* target, /* out */ @@ -10438,7 +10443,7 @@ } _OrthancPluginGetQueueSize; /** - * @brief Get the number of elements in a queue. + * @brief Get the number of elements that are currently stored in a queue. * * @param context The Orthanc plugin context, as received by OrthancPluginInitialize(). * @param queueId A unique identifier identifying both the plugin and the queue. @@ -10706,6 +10711,85 @@ } + typedef struct + { + uint8_t* found; + OrthancPluginMemoryBuffer* target; + const char* queueId; + OrthancPluginQueueOrigin origin; + uint32_t releaseTimeout; + uint64_t* valueId; + } _OrthancPluginReserveQueueValue; + + /** + * @brief Reserve a value from a queue, which means that the message is not + * available to other consumers. + * The value is either: + * - removed from the queue when one calls OrthancPluginAcknowledgeQueueValue() + * - or made available again for consumers after the releaseTimeout has expired. + * + * @param context The Orthanc plugin context, as received by OrthancPluginInitialize(). + * @param found Pointer to a Boolean that is set to "true" iff. a value has been dequeued. + * @param target Memory buffer where to store the value that has been retrieved from the queue. + * It must be freed with OrthancPluginFreeMemoryBuffer(). + * @param queueId A unique identifier identifying both the plugin and the queue. + * @param origin The position from where the value is dequeued (back for LIFO, front for FIFO). + * @param releaseTimeout Timeout in seconds. If the value is not acknowledged before this delay expires, + * the value is automatically released and made available for further calls to + * OrthancPluginDequeueValue() or OrthancPluginReserveQueueValue() + * @param valueId An opaque identifier for this value, to be subsequently provided to + * OrthancPluginAcknowledgeQueueValue(). + * @return 0 if success, other value if error. + **/ + ORTHANC_PLUGIN_SINCE_SDK("1.12.10") + ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginReserveQueueValue( + OrthancPluginContext* context, + uint8_t* found, /* out */ + OrthancPluginMemoryBuffer* target, /* out */ + uint64_t* valueId, /* out */ + const char* queueId, /* in */ + OrthancPluginQueueOrigin origin, /* in */ + uint32_t releaseTimeout /* in */) + { + _OrthancPluginReserveQueueValue params; + params.found = found; + params.target = target; + params.queueId = queueId; + params.origin = origin; + params.valueId = valueId; + params.releaseTimeout = releaseTimeout; + + return context->InvokeService(context, _OrthancPluginService_ReserveQueueValue, ¶ms); + } + + typedef struct + { + const char* queueId; + uint64_t valueId; + } _OrthancPluginAcknowledgeQueueValue; + + /** + * @brief Acknowledge that a queue value has been properly consumed by the plugin + * and can be definitely removed from the queue. + * + * @param context The Orthanc plugin context, as received by OrthancPluginInitialize(). + * @param queueId A unique identifier identifying both the plugin and the queue. + * @param valueId The opaque identifier for the value obtained from OrthancPluginReserveQueueValue(). + * @return 0 if success, other value if error. + **/ + ORTHANC_PLUGIN_SINCE_SDK("1.12.10") + ORTHANC_PLUGIN_INLINE OrthancPluginErrorCode OrthancPluginAcknowledgeQueueValue( + OrthancPluginContext* context, + const char* queueId, /* in */ + uint64_t valueId /* in */) + { + _OrthancPluginAcknowledgeQueueValue params; + params.queueId = queueId; + params.valueId = valueId; + + return context->InvokeService(context, _OrthancPluginService_AcknowledgeQueueValue, ¶ms); + } + #ifdef __cplusplus } #endif
--- a/OrthancServer/Plugins/Include/orthanc/OrthancDatabasePlugin.proto Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Include/orthanc/OrthancDatabasePlugin.proto Sat Nov 15 12:28:27 2025 +0100 @@ -175,6 +175,7 @@ bool supports_key_value_stores = 10; // New in Orthanc 1.12.8 bool supports_queues = 11; // New in Orthanc 1.12.8 bool has_attachment_custom_data = 12; // New in Orthanc 1.12.8 + bool supports_extended_queues = 13; // New in Orthanc 1.12.10 } } @@ -339,7 +340,8 @@ OPERATION_GET_QUEUE_SIZE = 59; // New in Orthanc 1.12.8 OPERATION_GET_ATTACHMENT_CUSTOM_DATA = 60; // New in Orthanc 1.12.8 OPERATION_SET_ATTACHMENT_CUSTOM_DATA = 61; // New in Orthanc 1.12.8 - + OPERATION_RESERVE_QUEUE_VALUE = 62; // New in Orthanc 1.12.10 + OPERATION_ACKNOWLEDGE_QUEUE_VALUE = 63; // New in Orthanc 1.12.10 } message Rollback { @@ -1065,6 +1067,20 @@ } } +message ReserveQueueValue { + message Request { + string queue_id = 1; + QueueOrigin origin = 2; + uint32 release_timeout = 3; + } + + message Response { + bool found = 1; + bytes value = 2; + uint64 value_id = 3; + } +} + message GetQueueSize { message Request { string queue_id = 1; @@ -1075,6 +1091,17 @@ } } +message AcknowledgeQueueValue { + message Request { + string queue_id = 1; + uint64 value_id = 2; + } + + message Response { + } +} + + message GetAttachmentCustomData { message Request { string uuid = 1; @@ -1162,6 +1189,8 @@ GetQueueSize.Request get_queue_size = 159; GetAttachmentCustomData.Request get_attachment_custom_data = 160; SetAttachmentCustomData.Request set_attachment_custom_data = 161; + ReserveQueueValue.Request reserve_queue_value = 162; + AcknowledgeQueueValue.Request acknowledge_queue_value = 163; } message TransactionResponse { @@ -1227,6 +1256,8 @@ GetQueueSize.Response get_queue_size = 159; GetAttachmentCustomData.Response get_attachment_custom_data = 160; SetAttachmentCustomData.Response set_attachment_custom_data = 161; + ReserveQueueValue.Response reserve_queue_value = 162; + AcknowledgeQueueValue.Response acknowledge_queue_value = 163; } enum RequestType {
--- a/OrthancServer/Plugins/Include/orthanc/OrthancPluginCodeModel.json Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Include/orthanc/OrthancPluginCodeModel.json Sat Nov 15 12:28:27 2025 +0100 @@ -2234,7 +2234,7 @@ "value": 2002 }, { - "documentation": "The TCP port of the HTTP server is privileged or already in use", + "documentation": "The TCP port of the HTTP server is privileged or already in use or one of the HTTP bind addresses does not exist", "key": "HttpPortInUse", "value": 2003 }, @@ -5486,6 +5486,37 @@ 12, 9 ] + }, + { + "args": [ + { + "name": "arg0", + "sdk_name": "queueId", + "sdk_type": "const char *" + }, + { + "name": "arg1", + "sdk_name": "valueId", + "sdk_type": "uint64_t" + } + ], + "c_function": "OrthancPluginAcknowledgeQueueValue", + "documentation": { + "args": { + "queueId": "A unique identifier identifying both the plugin and the queue.", + "valueId": "The opaque identifier for the value obtained from OrthancPluginReserveQueueValue()." + }, + "description": [], + "return": "0 if success, other value if error.", + "summary": "Acknowledge that a queue value has been properly consumed by the plugin and can be definitely removed from the queue." + }, + "return_sdk_enumeration": "OrthancPluginErrorCode", + "return_sdk_type": "enumeration", + "since_sdk": [ + 1, + 12, + 10 + ] } ], "unwrapped_functions": [ @@ -5538,6 +5569,7 @@ "OrthancPluginGetQueueSize", "OrthancPluginSetStableStatus", "OrthancPluginRegisterHttpAuthentication", - "OrthancPluginRegisterAuditLogHandler" + "OrthancPluginRegisterAuditLogHandler", + "OrthancPluginReserveQueueValue" ] } \ No newline at end of file
--- a/OrthancServer/Plugins/Samples/Common/OrthancPluginCppWrapper.cpp Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Samples/Common/OrthancPluginCppWrapper.cpp Sat Nov 15 12:28:27 2025 +0100 @@ -4643,4 +4643,45 @@ } } #endif + +#if HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES == 1 + bool Queue::ReserveInternal(std::string& value, uint64_t& valueId, OrthancPluginQueueOrigin origin, uint32_t releaseTimeout) + { + uint8_t found = false; + OrthancPlugins::MemoryBuffer valueBuffer; + + OrthancPluginErrorCode code = OrthancPluginReserveQueueValue(OrthancPlugins::GetGlobalContext(), &found, + *valueBuffer, &valueId, queueId_.c_str(), origin, releaseTimeout); + + if (code != OrthancPluginErrorCode_Success) + { + ORTHANC_PLUGINS_THROW_PLUGIN_ERROR_CODE(code); + } + else if (found) + { + valueBuffer.ToString(value); + return true; + } + else + { + return false; + } + } + + bool Queue::ReserveBack(std::string& value, uint64_t& valueId, uint32_t releaseTimeout) + { + return ReserveInternal(value, valueId, OrthancPluginQueueOrigin_Back, releaseTimeout); + } + + bool Queue::ReserveFront(std::string& value, uint64_t& valueId, uint32_t releaseTimeout) + { + return ReserveInternal(value, valueId, OrthancPluginQueueOrigin_Front, releaseTimeout); + } + + void Queue::Acknowledge(uint64_t valueId) + { + OrthancPluginAcknowledgeQueueValue(OrthancPlugins::GetGlobalContext(), queueId_.c_str(), valueId); + } +#endif + }
--- a/OrthancServer/Plugins/Samples/Common/OrthancPluginCppWrapper.h Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Plugins/Samples/Common/OrthancPluginCppWrapper.h Sat Nov 15 12:28:27 2025 +0100 @@ -142,6 +142,12 @@ # define HAS_ORTHANC_PLUGIN_QUEUES 0 #endif +#if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 12, 10) +# define HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES 1 +#else +# define HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES 0 +#endif + // Macro to tag a function as having been deprecated #if (__cplusplus >= 201402L) // C++14 @@ -1726,6 +1732,10 @@ bool DequeueInternal(std::string& value, OrthancPluginQueueOrigin origin); +#if HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES == 1 + bool ReserveInternal(std::string& value, uint64_t& valueId, OrthancPluginQueueOrigin origin, uint32_t releaseTimeout); +#endif + public: explicit Queue(const std::string& queueId) : queueId_(queueId) @@ -1745,17 +1755,29 @@ Enqueue(value.empty() ? NULL : value.c_str(), value.size()); } + // Use ReserveBack instead + ORTHANC_PLUGIN_DEPRECATED bool DequeueBack(std::string& value) { return DequeueInternal(value, OrthancPluginQueueOrigin_Back); } + // Use ReserveFront instead + ORTHANC_PLUGIN_DEPRECATED bool DequeueFront(std::string& value) { return DequeueInternal(value, OrthancPluginQueueOrigin_Front); } uint64_t GetSize(); + +#if HAS_ORTHANC_PLUGIN_EXTENDED_QUEUES == 1 + bool ReserveBack(std::string& value, uint64_t& valueId, uint32_t releaseTimeout); + + bool ReserveFront(std::string& value, uint64_t& valueId, uint32_t releaseTimeout); + + void Acknowledge(uint64_t valueId); +#endif }; #endif }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Sources/Database/AddTimeoutToQueues.sql Sat Nov 15 12:28:27 2025 +0100 @@ -0,0 +1,23 @@ +-- Orthanc - A Lightweight, RESTful DICOM Store +-- Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics +-- Department, University Hospital of Liege, Belgium +-- Copyright (C) 2017-2023 Osimis S.A., Belgium +-- Copyright (C) 2024-2025 Orthanc Team SRL, Belgium +-- Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium +-- +-- This program is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- This program is distributed in the hope that it will be useful, but +-- WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +-- General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with this program. If not, see <http://www.gnu.org/licenses/>. + + +ALTER TABLE Queues +ADD COLUMN reservedUntil TIMESTAMP DEFAULT NULL;
--- a/OrthancServer/Sources/Database/IDatabaseWrapper.h Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Sources/Database/IDatabaseWrapper.h Sat Nov 15 12:28:27 2025 +0100 @@ -59,6 +59,7 @@ bool hasAttachmentCustomDataSupport_; bool hasKeyValueStoresSupport_; bool hasQueuesSupport_; + bool hasExtendedQueuesSupport_; public: Capabilities() : @@ -72,7 +73,8 @@ hasExtendedChanges_(false), hasAttachmentCustomDataSupport_(false), hasKeyValueStoresSupport_(false), - hasQueuesSupport_(false) + hasQueuesSupport_(false), + hasExtendedQueuesSupport_(false) { } @@ -186,6 +188,16 @@ return hasQueuesSupport_; } + void SetExtendedQueuesSupport(bool value) + { + hasExtendedQueuesSupport_ = value; + } + + bool HasExtendedQueuesSupport() const + { + return hasExtendedQueuesSupport_; + } + }; @@ -471,6 +483,17 @@ // New in Orthanc 1.12.8, for statistics only virtual uint64_t GetQueueSize(const std::string& queueId) = 0; + // New in Orthanc 1.12.10 + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) = 0; + + // New in Orthanc 1.12.10 + virtual void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) = 0; + };
--- a/OrthancServer/Sources/Database/InstallKeyValueStoresAndQueues.sql Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Sources/Database/InstallKeyValueStoresAndQueues.sql Sat Nov 15 12:28:27 2025 +0100 @@ -30,6 +30,7 @@ id INTEGER PRIMARY KEY AUTOINCREMENT, queueId TEXT NOT NULL, value BLOB NOT NULL + --- reservedUntil TIMESTAMP DEFAULT NULL -- added in AddTimeoutToQueues.sql ); CREATE INDEX QueuesIndex ON Queues (queueId, id);
--- a/OrthancServer/Sources/Database/PrepareDatabase.sql Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Sources/Database/PrepareDatabase.sql Sat Nov 15 12:28:27 2025 +0100 @@ -107,9 +107,11 @@ -- The 3 following indexes were added in Orthanc 0.8.5 (database v5) CREATE INDEX DicomIdentifiersIndex1 ON DicomIdentifiers(id); -CREATE INDEX DicomIdentifiersIndex2 ON DicomIdentifiers(tagGroup, tagElement); +-- CREATE INDEX DicomIdentifiersIndex2 ON DicomIdentifiers(tagGroup, tagElement); -- disabled in 1.12.10 CREATE INDEX DicomIdentifiersIndexValues ON DicomIdentifiers(value COLLATE BINARY); +CREATE INDEX DicomIdentifiersIndexGroupElementValue ON DicomIdentifiers(tagGroup, tagElement, value); -- added in 1.12.10 + CREATE INDEX ChangesIndex ON Changes(internalId); @@ -152,6 +154,9 @@ -- new in Orthanc 1.12.8 ------------------------- equivalent to InstallKeyValueStoresAndQueues.sql ${INSTALL_KEY_VALUE_STORES_AND_QUEUES} +-- new in Orthanc 1.12.10 ------------------------- equivalent to AddTimeoutToQueues.sql +${ADD_TIMEOUT_TO_QUEUES} + -- Track the fact that the "revision" column exists in the "Metadata" and "AttachedFiles" -- tables, and that the "customData" column exists in the "AttachedFiles" table
--- a/OrthancServer/Sources/Database/SQLiteDatabaseWrapper.cpp Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Sources/Database/SQLiteDatabaseWrapper.cpp Sat Nov 15 12:28:27 2025 +0100 @@ -2280,54 +2280,49 @@ } SQLite::Statement s(db_, SQLITE_FROM_HERE, - "INSERT INTO Queues (queueId, value) VALUES (?, ?)"); + "INSERT INTO Queues (queueId, value, reservedUntil) VALUES (?, ?, NULL)"); s.BindString(0, queueId); s.BindBlob(1, value, valueSize); s.Run(); } - // New in Orthanc 1.12.8 + // New in Orthanc 1.12.8 (but obsolete, you should use ReserveQueueValue instead) virtual bool DequeueValue(std::string& value, const std::string& queueId, QueueOrigin origin) ORTHANC_OVERRIDE { - int64_t rowId; - std::unique_ptr<SQLite::Statement> s; - + std::string order; switch (origin) { - case QueueOrigin_Front: - s.reset(new SQLite::Statement(db_, SQLITE_FROM_HERE, "SELECT id, value FROM Queues WHERE queueId=? ORDER BY id ASC LIMIT 1")); + case QueueOrigin_Back: + order = "DESC"; break; - - case QueueOrigin_Back: - s.reset(new SQLite::Statement(db_, SQLITE_FROM_HERE, "SELECT id, value FROM Queues WHERE queueId=? ORDER BY id DESC LIMIT 1")); + case QueueOrigin_Front: + order = "ASC"; break; - default: throw OrthancException(ErrorCode_InternalError); } - s->BindString(0, queueId); - if (!s->Step()) + std::string sql = "WITH RowToDelete AS (SELECT id FROM Queues WHERE queueId=? AND (reservedUntil IS NULL OR reservedUntil < datetime('now')) ORDER BY id " + order + " LIMIT 1) " + "DELETE FROM Queues WHERE id IN (SELECT id FROM RowToDelete) " + "RETURNING value;"; + + SQLite::Statement s(db_, SQLITE_FROM_HERE_DYNAMIC(sql), sql); + + s.BindString(0, queueId); + if (!s.Step()) { // No value found return false; } else { - rowId = s->ColumnInt64(0); - - if (!s->ColumnBlobAsString(1, &value)) + if (!s.ColumnBlobAsString(0, &value)) { throw OrthancException(ErrorCode_NotEnoughMemory); } - SQLite::Statement s2(db_, SQLITE_FROM_HERE, - "DELETE FROM Queues WHERE id = ?"); - s2.BindInt64(0, rowId); - s2.Run(); - return true; } } @@ -2340,6 +2335,77 @@ s.Step(); return s.ColumnInt64(0); } + + // New in Orthanc 1.12.10 + virtual bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) ORTHANC_OVERRIDE + { + // { // list queue values + // SQLite::Statement t(db_, SQLITE_FROM_HERE, + // "SELECT id, value, reservedUntil FROM Queues WHERE queueId=?"); + // t.BindString(0, queueId); + // while (t.Step()) + // { + // t.ColumnBlobAsString(1, &value); + // LOG(INFO) << t.ColumnInt64(0) << " " << value << " " << t.ColumnString(2); + // } + // } + + std::string order; + switch (origin) + { + case QueueOrigin_Back: + order = "DESC"; + break; + case QueueOrigin_Front: + order = "ASC"; + break; + default: + throw OrthancException(ErrorCode_InternalError); + } + + std::string sql = "WITH RowToUpdate AS (SELECT id FROM Queues WHERE queueId=? AND (reservedUntil IS NULL OR reservedUntil < datetime('now')) ORDER BY id " + order + " LIMIT 1) " + "UPDATE Queues SET reservedUntil = datetime('now', ?) WHERE id IN (SELECT id FROM RowToUpdate) " + "RETURNING id, value;"; + + SQLite::Statement s(db_, SQLITE_FROM_HERE_DYNAMIC(sql), sql); + + + s.BindString(0, queueId); + std::string timeout = "+" + boost::lexical_cast<std::string>(releaseTimeout) + " seconds"; + + s.BindString(1, timeout); + if (!s.Step()) + { + // No value found + return false; + } + else + { + valueId = static_cast<uint64_t>(s.ColumnInt64(0)); + + if (!s.ColumnBlobAsString(1, &value)) + { + throw OrthancException(ErrorCode_NotEnoughMemory); + } + + return true; + } + + } + + // New in Orthanc 1.12.10 + virtual void AcknowledgeQueueValue(const std::string& /* queueId */, + uint64_t valueId) ORTHANC_OVERRIDE + { + SQLite::Statement s(db_, SQLITE_FROM_HERE, + "DELETE FROM Queues WHERE id = ?"); + s.BindInt64(0, static_cast<int64_t>(valueId)); + s.Run(); + } }; @@ -2586,6 +2652,7 @@ dbCapabilities_.SetHasFindSupport(HasIntegratedFind()); dbCapabilities_.SetKeyValueStoresSupport(true); dbCapabilities_.SetQueuesSupport(true); + dbCapabilities_.SetExtendedQueuesSupport(true); dbCapabilities_.SetAttachmentCustomDataSupport(true); db_.Open(path); } @@ -2603,6 +2670,7 @@ dbCapabilities_.SetHasFindSupport(HasIntegratedFind()); dbCapabilities_.SetKeyValueStoresSupport(true); dbCapabilities_.SetQueuesSupport(true); + dbCapabilities_.SetExtendedQueuesSupport(true); dbCapabilities_.SetAttachmentCustomDataSupport(true); db_.OpenInMemory(); } @@ -2678,6 +2746,7 @@ InjectEmbeddedScript(query, "${INSTALL_LABELS_TABLE}", ServerResources::INSTALL_LABELS_TABLE); InjectEmbeddedScript(query, "${INSTALL_DELETED_FILES}", ServerResources::INSTALL_DELETED_FILES); InjectEmbeddedScript(query, "${INSTALL_KEY_VALUE_STORES_AND_QUEUES}", ServerResources::INSTALL_KEY_VALUE_STORES_AND_QUEUES); + InjectEmbeddedScript(query, "${ADD_TIMEOUT_TO_QUEUES}", ServerResources::ADD_TIMEOUT_TO_QUEUES); db_.Execute(query); } @@ -2743,6 +2812,13 @@ LOG(INFO) << "Installing the \"KeyValueStores\" and \"Queues\" tables"; ExecuteEmbeddedScript(db_, ServerResources::INSTALL_KEY_VALUE_STORES_AND_QUEUES); } + + // New in Orthanc 1.12.10 + if (!db_.DoesColumnExist("Queues", "reservedUntil")) + { + LOG(INFO) << "Adding timeout column to the \"Queues\" table"; + ExecuteEmbeddedScript(db_, ServerResources::ADD_TIMEOUT_TO_QUEUES); + } } transaction->Commit(0);
--- a/OrthancServer/Sources/Database/StatelessDatabaseOperations.cpp Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Sources/Database/StatelessDatabaseOperations.cpp Sat Nov 15 12:28:27 2025 +0100 @@ -3228,6 +3228,12 @@ return db_.GetDatabaseCapabilities().HasQueuesSupport(); } + bool StatelessDatabaseOperations::HasExtendedQueuesSupport() + { + boost::shared_lock<boost::shared_mutex> lock(mutex_); + return db_.GetDatabaseCapabilities().HasExtendedQueuesSupport(); + } + void StatelessDatabaseOperations::ExecuteCount(uint64_t& count, const FindRequest& request) { @@ -3574,6 +3580,92 @@ return size; } + bool StatelessDatabaseOperations::ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) + { + if (queueId.empty()) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + + class Operations : public IReadWriteOperations + { + private: + const std::string& queueId_; + uint64_t& valueId_; + std::string& value_; + QueueOrigin origin_; + uint32_t releaseTimeout_; + bool found_; + + public: + Operations(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) : + queueId_(queueId), + valueId_(valueId), + value_(value), + origin_(origin), + releaseTimeout_(releaseTimeout), + found_(false) + { + } + + bool HasFound() + { + return found_; + } + + virtual void Apply(ReadWriteTransaction& transaction) ORTHANC_OVERRIDE + { + found_ = transaction.ReserveQueueValue(value_, valueId_, queueId_, origin_, releaseTimeout_); + } + }; + + Operations operations(value, valueId, queueId, origin, releaseTimeout); + Apply(operations); + + return operations.HasFound(); + } + + + void StatelessDatabaseOperations::AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) + { + if (queueId.empty()) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + + class Operations : public IReadWriteOperations + { + private: + const std::string& queueId_; + uint64_t valueId_; + + public: + Operations(const std::string& queueId, + uint64_t valueId) : + queueId_(queueId), + valueId_(valueId) + { + } + + virtual void Apply(ReadWriteTransaction& transaction) ORTHANC_OVERRIDE + { + transaction.AcknowledgeQueueValue(queueId_, valueId_); + } + }; + + Operations operations(queueId, valueId); + Apply(operations); + } + void StatelessDatabaseOperations::GetAttachmentCustomData(std::string& customData, const std::string& attachmentUuid)
--- a/OrthancServer/Sources/Database/StatelessDatabaseOperations.h Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Sources/Database/StatelessDatabaseOperations.h Sat Nov 15 12:28:27 2025 +0100 @@ -485,6 +485,21 @@ return transaction_.DequeueValue(value, queueId, origin); } + bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout) + { + return transaction_.ReserveQueueValue(value, valueId, queueId, origin, releaseTimeout); + } + + void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId) + { + return transaction_.AcknowledgeQueueValue(queueId, valueId); + } + void SetAttachmentCustomData(const std::string& attachmentUuid, const void* customData, size_t customDataSize) @@ -622,6 +637,8 @@ bool HasQueuesSupport(); + bool HasExtendedQueuesSupport(); + void GetExportedResources(Json::Value& target, int64_t since, uint32_t limit); @@ -841,6 +858,15 @@ uint64_t GetQueueSize(const std::string& queueId); + bool ReserveQueueValue(std::string& value, + uint64_t& valueId, + const std::string& queueId, + QueueOrigin origin, + uint32_t releaseTimeout); + + void AcknowledgeQueueValue(const std::string& queueId, + uint64_t valueId); + class KeysValuesIterator : public boost::noncopyable { private:
--- a/OrthancServer/Sources/OrthancRestApi/OrthancRestSystem.cpp Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/Sources/OrthancRestApi/OrthancRestSystem.cpp Sat Nov 15 12:28:27 2025 +0100 @@ -97,6 +97,7 @@ static const char* const HAS_EXTENDED_CHANGES = "HasExtendedChanges"; static const char* const HAS_KEY_VALUE_STORES = "HasKeyValueStores"; static const char* const HAS_QUEUES = "HasQueues"; + static const char* const HAS_EXTENDED_QUEUES = "HasExtendedQueues"; static const char* const HAS_EXTENDED_FIND = "HasExtendedFind"; static const char* const READ_ONLY = "ReadOnly"; @@ -213,6 +214,7 @@ result[CAPABILITIES][HAS_EXTENDED_FIND] = OrthancRestApi::GetIndex(call).HasFindSupport(); result[CAPABILITIES][HAS_KEY_VALUE_STORES] = OrthancRestApi::GetIndex(call).HasKeyValueStoresSupport(); result[CAPABILITIES][HAS_QUEUES] = OrthancRestApi::GetIndex(call).HasQueuesSupport(); + result[CAPABILITIES][HAS_EXTENDED_QUEUES] = OrthancRestApi::GetIndex(call).HasExtendedQueuesSupport(); call.GetOutput().AnswerJson(result); }
--- a/OrthancServer/UnitTestsSources/ServerIndexTests.cpp Sat Nov 15 11:49:39 2025 +0100 +++ b/OrthancServer/UnitTestsSources/ServerIndexTests.cpp Sat Nov 15 12:28:27 2025 +0100 @@ -1319,6 +1319,79 @@ db.Close(); } +TEST(SQLiteDatabaseWrapper, QueuesExtended) +{ + SQLiteDatabaseWrapper db; // The SQLite DB is in memory + db.Open(); + + { + StatelessDatabaseOperations op(db, false); + op.SetTransactionContextFactory(new DummyTransactionContextFactory); + + ASSERT_EQ(0u, op.GetQueueSize("test")); + op.EnqueueValue("test", "a"); + ASSERT_EQ(1u, op.GetQueueSize("test")); + op.EnqueueValue("test", "b"); + ASSERT_EQ(2u, op.GetQueueSize("test")); + op.EnqueueValue("test", "c"); + ASSERT_EQ(3u, op.GetQueueSize("test")); + + std::string s; + uint64_t valueId0, valueId1, valueId2, valueId3; + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 100)); + ASSERT_EQ("c", s); + ASSERT_EQ(3u, op.GetQueueSize("test")); // the reserved values are still counted ! + + ASSERT_TRUE(op.ReserveQueueValue(s, valueId1, "test", QueueOrigin_Back, 100)); + ASSERT_EQ("b", s); + + ASSERT_TRUE(op.ReserveQueueValue(s, valueId2, "test", QueueOrigin_Back, 100)); + ASSERT_EQ("a", s); + ASSERT_EQ(3u, op.GetQueueSize("test")); + + ASSERT_FALSE(op.ReserveQueueValue(s, valueId3, "test", QueueOrigin_Back, 100)); + + op.AcknowledgeQueueValue("test", valueId1); + ASSERT_EQ(2u, op.GetQueueSize("test")); + op.AcknowledgeQueueValue("test", valueId2); + op.AcknowledgeQueueValue("test", valueId0); + ASSERT_EQ(0u, op.GetQueueSize("test")); + + op.EnqueueValue("test", "d"); + op.EnqueueValue("test", "e"); + op.EnqueueValue("test", "f"); + op.EnqueueValue("test", "g"); + op.EnqueueValue("test", "h"); + + // reserve 2 values front and back and acknowledge only "e" & "f" + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1)); + ASSERT_EQ("d", s); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1)); + ASSERT_EQ("e", s); + op.AcknowledgeQueueValue("test", valueId0); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1)); + ASSERT_EQ("h", s); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1)); + ASSERT_EQ("g", s); + op.AcknowledgeQueueValue("test", valueId0); + + SystemToolbox::USleep(2000000); // the granularity being the second, we might have to wait up to 2 seconds + + // "d", "f" and "h" remains + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1)); + ASSERT_EQ("d", s); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Front, 1)); + ASSERT_EQ("f", s); + ASSERT_TRUE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1)); + ASSERT_EQ("h", s); + + // the queue is empty at this point since "f" has been reserved already + ASSERT_FALSE(op.ReserveQueueValue(s, valueId0, "test", QueueOrigin_Back, 1)); + } + + db.Close(); +} + TEST_F(DatabaseWrapperTest, BinaryCustomData) {
--- a/TODO Sat Nov 15 11:49:39 2025 +0100 +++ b/TODO Sat Nov 15 12:28:27 2025 +0100 @@ -329,32 +329,6 @@ can not cross the C/C++ frontier safely -> we need a OrthancPluginRegisterStorageArea3 with a return value. Ex: install DelayedDeletion + S3 storage. Right now, the second plugin to load is just ignored with an error message in the logs. -* Queues: There is currently a risk of loosing messages from the Queues: - message = orthanc.DequeueValue("instances-to-process", orthanc.QueueOrigin.FRONT) - # consider Orthanc is interrupted here (hard shutdown) - proccess(message) - The message will never be processed ... - We should have an acknowledge/commit mechanism e.g: - message, messageId = orthanc.DequeueValue2("instances-to-process", orthanc.QueueOrigin.FRONT, 5) # where 5 is a "timeout" - # At this point, the message is still in the queue but will not be dequeued by other consumers. - # If the message is not acknowledged within 5 seconds, it will get back into the queue. - process(message) - orthanc.AcknowledgeQueue("instances-to-process", messageId) - # This requires adding a new "timeout" column in the DB with the reservation_expiration timestamp. - - Note by SJ: Introducing an acknowledgement would greatly complexify - the SDK and the core of Orthanc. I would favor another approach by - introducing 2 functions in the SDK: - - OrthancPluginQueuePeekFront(context, found, target, queueId): Same behavior as - OrthancPluginDequeueValue(), but limited to the back of the queue, and doesn't modify the queue - - OrthancPluginQueuePopFront(context, target): Remove the front element from the queue - As long as OrthancPluginQueuePopFront() is not called, no message would be lost. - Note that we cannot propose "OrthancPluginQueuePeekBack()" or - "OrthancPluginQueuePeek(..., OrthancPluginQueueOrigin_Back)" (i.e., LIFO stacks), as a - concurrent call to "OrthancPluginEnqueueValue()" would alter the back of the queue. - - Note by AM: With PeekFront: 2 consumers on 2 Orthanc instances might consume the same message no ? - * Add OrthancPluginSetStableStatus2() which would take an additional OrthancPluginResourceType argument to avoid possible ambiguity about the resource of interest
