Mercurial > hg > orthanc-book
changeset 1190:7492a3915bf2
added sample for KVS and queues
author | Alain Mazy <am@orthanc.team> |
---|---|
date | Thu, 14 Aug 2025 18:30:44 +0200 |
parents | 801707730af7 |
children | 44700cfa7656 |
files | Sphinx/source/plugins/python.rst Sphinx/source/plugins/python/queues-and-kvs.py |
diffstat | 2 files changed, 97 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/Sphinx/source/plugins/python.rst Wed Aug 13 19:46:01 2025 +0200 +++ b/Sphinx/source/plugins/python.rst Thu Aug 14 18:30:44 2025 +0200 @@ -1006,6 +1006,16 @@ .. literalinclude:: python/set-stable-status.py :language: python +Using Key-Value Stores and Queues (new in 6.0) +................................... + +Starting from v 6.0, it is possible to store messages in queues that are stored +in the Orthanc DB or to store values in Key-Value Stores that are also stored in DB. + +.. literalinclude:: python/queues-and-kvs.py + :language: python + + .. _python_couchdb: Synchronizing Orthanc with CouchDB
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Sphinx/source/plugins/python/queues-and-kvs.py Thu Aug 14 18:30:44 2025 +0200 @@ -0,0 +1,87 @@ +import orthanc +import json +import threading +import time + + +worker_thread = None +is_worker_running = False + +def ProcessQueueMessages(): + global is_worker_running + + while is_worker_running: + # get messages from the queue named "instances-to-process" that is stored in Orthanc DB. + # Get the message from the FRONT for FIFO and from the BACK for a LIFO + message = orthanc.DequeueValue("instances-to-process", orthanc.QueueOrigin.FRONT) + + if message is None: + # no messages in the queue + time.sleep(1) + else: + payload = json.loads(message.decode('utf-8')) + resourceId = payload["resource-id"] + orthanc.LogInfo(f"processing resource {resourceId}") + + # get the value associated to the key resourceId in the "my-store" Key Value Store. + value = orthanc.GetKeyValue("my-store", resourceId) + if value is None: + orthanc.LogWarning(f"no value for this resource: {resourceId}") + else: + orthanc.LogInfo(f"Value for resource {resourceId} is {value.decode('utf-8')}") + orthanc.DeleteKeyValue("my-store", resourceId) + + +def OnChange(changeType, level, resource: str): + global worker_thread + global is_worker_running + + if changeType == orthanc.ChangeType.NEW_INSTANCE: + + processPayload = { + "resource-id": resource, + "my-data": "my-data" + } + + # Push a message into the queue named "instances-to-process". It must be a bytes object. + # The queue is persisted in the Orthanc database and is accessible to all Orthanc instances + # that are connected to the same database. + orthanc.EnqueueValue("instances-to-process", json.dumps(processPayload).encode('utf-8')) + orthanc.LogInfo(f"Enqueued value for resource {resource}") + + # Save information into a store named "my-store". + # The tuple (store-name, key) must be unique. + # The value must be a bytes object. + orthanc.StoreKeyValue("my-store", resource, f"my-data for {resource}".encode('utf-8')) + orthanc.LogInfo(f"Stored Key-Value for resource {resource}") + + elif changeType == orthanc.ChangeType.ORTHANC_STARTED: + + # start a thread to process the messages from a queue + worker_thread = threading.Thread(target=ProcessQueueMessages) + is_worker_running = True + worker_thread.start() + + elif changeType == orthanc.ChangeType.ORTHANC_STOPPED: + + is_worker_running = False + worker_thread.join() + +def OnRestKvsAndQueuesStatistics(output, uri, **request): + if request['method'] != 'GET': + output.SendMethodNotAllowed('GET') + else: + # show all values in the queue: + it = orthanc.CreateKeysValuesIterator("my-store") + values = {} + while it.Next(): + values[it.GetKey()] = it.GetValue().decode('utf-8') + + statistics = { + "instances-to-process-size" : orthanc.GetQueueSize("instances-to-process"), + "my-store-keys-values": values + } + output.AnswerBuffer(json.dumps(statistics), 'application/json') + +orthanc.RegisterOnChangeCallback(OnChange) +orthanc.RegisterRestCallback('/kvs-queues-statistics', OnRestKvsAndQueuesStatistics) \ No newline at end of file