view Sphinx/source/plugins/python/queues-and-kvs.py @ 1194:c717ee80e8d9 default tip

updated citations
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 01 Sep 2025 07:48:44 +0200
parents 7492a3915bf2
children
line wrap: on
line source

import orthanc
import json
import threading
import time


worker_thread = None
is_worker_running = False

def ProcessQueueMessages():
    global is_worker_running

    while is_worker_running:
        # get messages from the queue named "instances-to-process" that is stored in Orthanc DB.
        # Get the message from the FRONT for FIFO and from the BACK for a LIFO
        message = orthanc.DequeueValue("instances-to-process", orthanc.QueueOrigin.FRONT)

        if message is None:
            # no messages in the queue
            time.sleep(1)
        else:
            payload = json.loads(message.decode('utf-8'))
            resourceId = payload["resource-id"]
            orthanc.LogInfo(f"processing resource {resourceId}")

            # get the value associated to the key resourceId in the "my-store" Key Value Store.
            value = orthanc.GetKeyValue("my-store", resourceId)
            if value is None:
                orthanc.LogWarning(f"no value for this resource: {resourceId}")
            else:
                orthanc.LogInfo(f"Value for resource {resourceId} is {value.decode('utf-8')}")
                orthanc.DeleteKeyValue("my-store", resourceId)


def OnChange(changeType, level, resource: str):
    global worker_thread
    global is_worker_running

    if changeType == orthanc.ChangeType.NEW_INSTANCE:

        processPayload = {
            "resource-id": resource,
            "my-data": "my-data"
        }

        # Push a message into the queue named "instances-to-process".  It must be a bytes object.
        # The queue is persisted in the Orthanc database and is accessible to all Orthanc instances
        # that are connected to the same database.
        orthanc.EnqueueValue("instances-to-process", json.dumps(processPayload).encode('utf-8'))
        orthanc.LogInfo(f"Enqueued value for resource {resource}")

        # Save information into a store named "my-store".
        # The tuple (store-name, key) must be unique.
        # The value must be a bytes object.
        orthanc.StoreKeyValue("my-store", resource, f"my-data for {resource}".encode('utf-8'))
        orthanc.LogInfo(f"Stored Key-Value for resource {resource}")

    elif changeType == orthanc.ChangeType.ORTHANC_STARTED:

        # start a thread to process the messages from a queue
        worker_thread = threading.Thread(target=ProcessQueueMessages)
        is_worker_running = True
        worker_thread.start()
    
    elif changeType == orthanc.ChangeType.ORTHANC_STOPPED:

        is_worker_running = False
        worker_thread.join()

def OnRestKvsAndQueuesStatistics(output, uri, **request):
    if request['method'] != 'GET':
        output.SendMethodNotAllowed('GET')
    else:
        # show all values in the queue:
        it = orthanc.CreateKeysValuesIterator("my-store")
        values = {}
        while it.Next():
            values[it.GetKey()] = it.GetValue().decode('utf-8')

        statistics = {
            "instances-to-process-size" : orthanc.GetQueueSize("instances-to-process"),
            "my-store-keys-values": values
        }
        output.AnswerBuffer(json.dumps(statistics), 'application/json')

orthanc.RegisterOnChangeCallback(OnChange)
orthanc.RegisterRestCallback('/kvs-queues-statistics', OnRestKvsAndQueuesStatistics)