view OrthancServer/Resources/Samples/Python/Replicate.py @ 5924:79cd099a2e8d find-refactoring

archive jobs: added missing mutex
author Alain Mazy <am@orthanc.team>
date Mon, 16 Dec 2024 12:43:57 +0100
parents f7adfb22e20e
children
line wrap: on
line source

#!/usr/bin/python

# Orthanc - A Lightweight, RESTful DICOM Store
# Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
# Department, University Hospital of Liege, Belgium
# Copyright (C) 2017-2023 Osimis S.A., Belgium
# Copyright (C) 2024-2024 Orthanc Team SRL, Belgium
# Copyright (C) 2021-2024 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


import base64
import httplib2
import json
import re
import sys

URL_REGEX = re.compile('(http|https)://((.+?):(.+?)@|)(.*)')


if len(sys.argv) != 3:
    print("""
Script to copy the content of one Orthanc server to another Orthanc
server through their REST API.

Usage: %s [SourceURI] [TargetURI]
For instance: %s http://orthanc:password@127.0.0.1:8042/ http://127.0.0.1:8043/
""" % (sys.argv[0], sys.argv[0]))
    exit(-1)



def CreateHeaders(parsedUrl):
    headers = { }
    username = parsedUrl.group(3)
    password = parsedUrl.group(4)

    if username != None and password != None:
        # This is a custom reimplementation of the
        # "Http.add_credentials()" method for Basic HTTP Access
        # Authentication (for some weird reason, this method does not
        # always work)
        # http://en.wikipedia.org/wiki/Basic_access_authentication

        # The ".encode()" and ".decode()" below fix the following issue:
        # https://groups.google.com/g/orthanc-users/c/KZ_iYOet5IQ/m/AUQWiSiAAQAJ
        credentials = (username + ':' + password)
        credentials_base64 = base64.b64encode(credentials.encode("ascii")).decode("ascii")
        headers['authorization'] = 'Basic ' + credentials_base64

    return headers


def GetBaseUrl(parsedUrl):
    return '%s://%s' % (parsedUrl.group(1), parsedUrl.group(5))


def DoGetString(url):
    global URL_REGEX
    parsedUrl = URL_REGEX.match(url)
    headers = CreateHeaders(parsedUrl)

    h = httplib2.Http()
    resp, content = h.request(GetBaseUrl(parsedUrl), 'GET', headers = headers)

    if resp.status == 200:
        return content
    else:
        raise Exception('Unable to contact Orthanc at: ' + url)
    

def DoPostDicom(url, body):
    global URL_REGEX
    parsedUrl = URL_REGEX.match(url)
    headers = CreateHeaders(parsedUrl)
    headers['content-type'] = 'application/dicom'

    h = httplib2.Http()
    resp, content = h.request(GetBaseUrl(parsedUrl), 'POST',
                              body = body,
                              headers = headers)

    if resp.status != 200:
        raise Exception('Unable to contact Orthanc at: ' + url)
    

def _DecodeJson(s):
    if (sys.version_info >= (3, 0)):
        return json.loads(s.decode())
    else:
        return json.loads(s)


def DoGetJson(url):
    return _DecodeJson(DoGetString(url))


SOURCE = sys.argv[1]
TARGET = sys.argv[2]

for study in DoGetJson('%s/studies' % SOURCE):
    print('Sending study %s...' % study)
    for instance in DoGetJson('%s/studies/%s/instances' % (SOURCE, study)):
        dicom = DoGetString('%s/instances/%s/file' % (SOURCE, instance['ID']))
        DoPostDicom('%s/instances' % TARGET, dicom)