# HG changeset patch # User Alain Mazy # Date 1694012676 -7200 # Node ID 80ba6f1d521ca50aa17ffd526287c0588cfb8a4e # Parent 28fef24147fa615a23e71be7f28501601ddeb05d new tests for authorization plugin (native only) diff -r 28fef24147fa -r 80ba6f1d521c NewTests/Authorization/__init__.py diff -r 28fef24147fa -r 80ba6f1d521c NewTests/Authorization/auth_service.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/NewTests/Authorization/auth_service.py Wed Sep 06 17:04:36 2023 +0200 @@ -0,0 +1,43 @@ +from fastapi import FastAPI +import logging +from models import * +import pprint + +# Sample Authorization service that is started when the test starts. +# It does not check token validity and simply implements a set of basic users + +app = FastAPI() + + + +@app.post("/user/get-profile") +def get_user_profile(user_profile_request: UserProfileRequest): + logging.info("get user profile: " + user_profile_request.json()) + + if user_profile_request.token_key == "user-token-key": + if user_profile_request.token_value == "token-uploader": + p = UserProfileResponse( + name="uploader", + permissions=["upload", "edit-labels", "delete", "view"], + authorized_labels=["*"], + validity=60 + ) + return p + elif user_profile_request.token_value == "token-admin": + p = UserProfileResponse( + name="admin", + permissions=["all"], + authorized_labels=["*"], + validity=60 + ) + return p + elif user_profile_request.token_value == "token-user-a": + p = UserProfileResponse( + name="user-a", + permissions=["view"], + authorized_labels=["label_a"], + validity=60 + ) + return p + + diff -r 28fef24147fa -r 80ba6f1d521c NewTests/Authorization/models.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/NewTests/Authorization/models.py Wed Sep 06 17:04:36 2023 +0200 @@ -0,0 +1,150 @@ +# SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from typing import Optional, List +from pydantic import BaseModel, Field +from pydantic.datetime_parse import parse_datetime +from enum import Enum +from datetime import datetime + + +class StringDateTime(datetime): + @classmethod + def __get_validators__(cls): + yield parse_datetime + yield cls.validate + + @classmethod + def validate(cls, v: datetime): + return v.isoformat() + + +class Levels(str, Enum): + PATIENT = 'patient' + STUDY = 'study' + SERIES = 'series' + INSTANCE = 'instance' + + SYSTEM = 'system' + + +class Methods(str, Enum): + GET = 'get' + POST = 'post' + PUT = 'put' + DELETE = 'delete' + + +class DecoderErrorCodes(str, Enum): + EXPIRED = 'expired' + INVALID = 'invalid' + UNKNOWN = 'unknown' + + +class TokenType(str, Enum): + OSIMIS_VIEWER_PUBLICATION = 'osimis-viewer-publication' # a link to open the Osimis viewer valid for a long period + MEDDREAM_VIEWER_PUBLICATION = 'meddream-viewer-publication' # a link to open the MedDream viewer valid for a long period + STONE_VIEWER_PUBLICATION = 'stone-viewer-publication' # a link to open the Stone viewer valid for a long period + OHIF_VIEWER_PUBLICATION = 'ohif-viewer-publication' # a link to open the OHIF viewer valid for a long period + + MEDDREAM_INSTANT_LINK = 'meddream-instant-link' # a direct link to MedDream viewer that is valid only a few minutes to open the viewer directly + + # OSIMIS_VIEWER_INSTANT_LINK = 'osimis-viewer-instant-link' # a direct link to Osimis viewer that is valid only a few minutes to open the viewer directly + # STONE_VIEWER_INSTANT_LINK = 'stone-viewer-instant-link' # a direct link to Stone viewer that is valid only a few minutes to open the viewer directly + # + # DOWNLOAD_INSTANT_LINK = 'download-instant-link' # a link to download a study/series/instance directly + VIEWER_INSTANT_LINK = 'viewer-instant-link' # a link to a resource to be used directly. + DOWNLOAD_INSTANT_LINK = 'download-instant-link' # a link to a resource to be used directly. + + + INVALID = 'invalid' + +class OrthancResource(BaseModel): + dicom_uid: Optional[str] = Field(alias="dicom-uid", default=None) + orthanc_id: Optional[str] = Field(alias="orthanc-id", default=None) + url: Optional[str] = None # e.g. a download link /studies/.../archive + level: Levels + + class Config: # allow creating object from dict (used when deserializing the JWT) + allow_population_by_field_name = True + + +class TokenCreationRequest(BaseModel): + id: Optional[str] = None + resources: List[OrthancResource] + type: TokenType = Field(default=TokenType.INVALID) + expiration_date: Optional[StringDateTime] = Field(alias="expiration-date", default=None) + validity_duration: Optional[int] = Field(alias='validity-duration', default=None) # alternate way to provide an expiration_date, more convenient for instant-links since the duration is relative to the server time, not the client time ! + + class Config: # allow creating object from dict (used when deserializing the JWT) + allow_population_by_field_name = True + + +class TokenCreationResponse(BaseModel): + request: TokenCreationRequest + token: str + url: Optional[str] = None + + +class TokenValidationRequest(BaseModel): + dicom_uid: Optional[str] = Field(alias="dicom-uid", default=None) + orthanc_id: Optional[str] = Field(alias="orthanc-id", default=None) + token_key: Optional[str] = Field(alias="token-key", default=None) + token_value: Optional[str] = Field(alias="token-value", default=None) + server_id: Optional[str] = Field(alias="server-id", default=None) + level: Optional[Levels] + method: Methods + uri: Optional[str] +# labels: Optional[List[str]] + + +class TokenValidationResponse(BaseModel): + granted: bool + validity: int + + +class TokenDecoderRequest(BaseModel): + token_key: Optional[str] = Field(alias="token-key", default=None) + token_value: Optional[str] = Field(alias="token-value", default=None) + + +class TokenDecoderResponse(BaseModel): + token_type: Optional[TokenType] = Field(alias="token-type", default=None) + error_code: Optional[DecoderErrorCodes] = Field(alias="error-code", default=None) + redirect_url: Optional[str] = Field(alias="redirect-url", default=None) + + +class UserProfileRequest(BaseModel): + token_key: Optional[str] = Field(alias="token-key", default=None) + token_value: Optional[str] = Field(alias="token-value", default=None) + server_id: Optional[str] = Field(alias="server-id", default=None) + + +class UserPermissions(str, Enum): + ALL = 'all' + VIEW = 'view' + DOWNLOAD = 'download' + DELETE = 'delete' + SEND = 'send' + MODIFY = 'modify' + ANONYMIZE = 'anonymize' + UPLOAD = 'upload' + Q_R_REMOTE_MODALITIES = 'q-r-remote-modalities' + SETTINGS = 'settings' + API_VIEW = 'api-view' + EDIT_LABELS = 'edit-labels' + + SHARE = 'share' + + +class UserProfileResponse(BaseModel): + name: str + authorized_labels: List[str] = Field(alias="authorized-labels", default_factory=list) + # authorized_labels: List[str] = Field(default_factory=list) + permissions: List[UserPermissions] = Field(default_factory=list) + validity: int + + class Config: + use_enum_values = True + allow_population_by_field_name = True \ No newline at end of file diff -r 28fef24147fa -r 80ba6f1d521c NewTests/Authorization/test_authorization.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/NewTests/Authorization/test_authorization.py Wed Sep 06 17:04:36 2023 +0200 @@ -0,0 +1,204 @@ +import unittest +import time +import pprint +import subprocess +from helpers import OrthancTestCase, Helpers + +from orthanc_api_client import OrthancApiClient, generate_test_dicom_file +from orthanc_api_client import exceptions as orthanc_exceptions + +import logging +import pathlib +here = pathlib.Path(__file__).parent.resolve() + + + +class TestAuthorization(OrthancTestCase): + + label_a_study_id = None + label_b_study_id = None + no_label_study_id = None + auth_service_process = None + + @classmethod + def _terminate(cls): + cls.auth_service_process.terminate() + + @classmethod + def prepare(cls): + test_name = "Authorization" + storage_name = "authorization" + + print(f'-------------- preparing {test_name} tests') + + cls.clear_storage(storage_name=storage_name) + + config = { + "AuthenticationEnabled": False, + "Authorization": { + "WebServiceRootUrl": "http://localhost:8020/", + "StandardConfigurations": [ + "orthanc-explorer-2", + "stone-webviewer" + ], + "CheckedLevel": "studies", + "TokenHttpHeaders": ["user-token-key"], + "TokenGetArguments": ["resource-token-key"] + } + } + + config_path = cls.generate_configuration( + config_name=f"{test_name}", + storage_name=storage_name, + config=config, + plugins=Helpers.plugins + ) + + # Start the auth-service application as a subprocess and wait for it to start + cls.auth_service_process = subprocess.Popen(["uvicorn", "auth_service:app", "--host", "0.0.0.0", "--port", "8020"], cwd=here) + time.sleep(2) + + if Helpers.break_before_preparation: + print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++") + input("Press Enter to continue") + else: + cls.launch_orthanc_under_tests( + config_name=f"{test_name}", + storage_name=storage_name, + config=config, + plugins=Helpers.plugins + ) + + uploader = OrthancApiClient(cls.o._root_url, headers={"user-token-key": "token-uploader"}) + + uploader.delete_all_content() + + # upload a few studies and add labels + instances_ids = uploader.upload_file(here / "../../Database/Knix/Loc/IM-0001-0001.dcm") + cls.label_a_study_id = uploader.instances.get_parent_study_id(instances_ids[0]) + uploader.studies.add_label(cls.label_a_study_id, "label_a") + + instances_ids = uploader.upload_file(here / "../../Database/Brainix/Epi/IM-0001-0001.dcm") + cls.label_b_study_id = uploader.instances.get_parent_study_id(instances_ids[0]) + uploader.studies.add_label(cls.label_b_study_id, "label_b") + + instances_ids = uploader.upload_file(here / "../../Database/Comunix/Pet/IM-0001-0001.dcm") + cls.no_label_study_id = uploader.instances.get_parent_study_id(instances_ids[0]) + + + def test_admin_user(self): + + o = OrthancApiClient(self.o._root_url, headers={"user-token-key": "token-admin"}) + + # make sure we can access all these urls (they would throw if not) + system = o.get_system() + + # make sure we can access all studies + o.studies.get_tags(self.no_label_study_id) + o.studies.get_tags(self.label_a_study_id) + o.studies.get_tags(self.label_b_study_id) + + # make sure we can access series and instances of these studies + series_ids = o.studies.get_series_ids(self.label_a_study_id) + instances_ids = o.series.get_instances_ids(series_ids[0]) + o.instances.get_tags(instances_ids[0]) + + # make sure labels filtering still works + self.assertEqual(3, len(o.studies.find(query={}, + labels=[], + labels_constraint='Any'))) + + self.assertEqual(2, len(o.studies.find(query={}, + labels=['label_a', 'label_b'], + labels_constraint='Any'))) + + self.assertEqual(2, len(o.studies.find(query={}, + labels=['label_a'], + labels_constraint='None'))) + + all_labels = o.get_all_labels() + self.assertEqual(2, len(all_labels)) + + def test_user_a(self): + + o = OrthancApiClient(self.o._root_url, headers={"user-token-key": "token-user-a"}) + + # # make sure we can access all these urls (they would throw if not) + # system = o.get_system() + + all_labels = o.get_all_labels() + self.assertEqual(1, len(all_labels)) + self.assertEqual("label_a", all_labels[0]) + + # make sure we can access only the label_a studies + with self.assertRaises(orthanc_exceptions.HttpError) as ctx: + o.studies.get_tags(self.label_b_study_id) + self.assertEqual(403, ctx.exception.http_status_code) + + with self.assertRaises(orthanc_exceptions.HttpError) as ctx: + o.studies.get_tags(self.no_label_study_id) + self.assertEqual(403, ctx.exception.http_status_code) + + # should not raise + o.studies.get_tags(self.label_a_study_id) + + # make sure we can access series and instances of the label_a studies + series_ids = o.studies.get_series_ids(self.label_a_study_id) + instances_ids = o.series.get_instances_ids(series_ids[0]) + o.instances.get_tags(instances_ids[0]) + + # make sure we can not access series and instances of the label_b studies + with self.assertRaises(orthanc_exceptions.HttpError) as ctx: + series_ids = o.studies.get_series_ids(self.label_b_study_id) + self.assertEqual(403, ctx.exception.http_status_code) + + # make sure tools/find only returns the label_a studies + studies = o.studies.find(query={}, + labels=[], + labels_constraint='Any') + self.assertEqual(1, len(studies)) + self.assertEqual(self.label_a_study_id, studies[0].orthanc_id) + + # if searching Any of label_a & label_b, return only label_a + studies = o.studies.find(query={}, + labels=['label_a', 'label_b'], + labels_constraint='Any') + self.assertEqual(1, len(studies)) + self.assertEqual(self.label_a_study_id, studies[0].orthanc_id) + + # if searching Any of label_b, expect a Forbidden access + with self.assertRaises(orthanc_exceptions.HttpError) as ctx: + studies = o.studies.find(query={}, + labels=['label_b'], + labels_constraint='Any') + self.assertEqual(403, ctx.exception.http_status_code) + + # if searching None of label_b, expect a Forbidden access because we are not able to compute this filter + with self.assertRaises(orthanc_exceptions.HttpError) as ctx: + studies = o.studies.find(query={}, + labels=['label_b'], + labels_constraint='None') + self.assertEqual(403, ctx.exception.http_status_code) + + # if searching All of label_b, expect a Forbidden access because we are not able to compute this filter + with self.assertRaises(orthanc_exceptions.HttpError) as ctx: + studies = o.studies.find(query={}, + labels=['label_b'], + labels_constraint='All') + self.assertEqual(403, ctx.exception.http_status_code) + + studies = o.studies.find(query={"PatientName": "KNIX"}, # KNIX is label_a + labels=[], + labels_constraint='Any') + self.assertEqual(1, len(studies)) + + studies = o.studies.find(query={"PatientName": "KNIX"}, # KNIX is label_a + labels=['label_a'], + labels_constraint='Any') + self.assertEqual(1, len(studies)) + + with self.assertRaises(orthanc_exceptions.HttpError) as ctx: + studies = o.studies.find(query={"PatientName": "KNIX"}, # KNIX is label_a + labels=['label_b'], + labels_constraint='Any') + self.assertEqual(403, ctx.exception.http_status_code) \ No newline at end of file diff -r 28fef24147fa -r 80ba6f1d521c NewTests/README --- a/NewTests/README Wed Sep 06 17:03:54 2023 +0200 +++ b/NewTests/README Wed Sep 06 17:04:36 2023 +0200 @@ -132,3 +132,15 @@ --plugin=/home/alain/o/build/orthanc-dicomweb/libOrthancDicomWeb.so \ --break_after_preparation + +Authorization: +-------------- + +Run the Authorization tests with your locally build version and break before execution to allow you to start your debugger. + +python3 NewTests/main.py --pattern=Authorization.test_authorization.TestAuthorization.* \ + --orthanc_under_tests_exe=/home/alain/o/build/orthanc/Orthanc \ + --orthanc_under_tests_http_port=8043 \ + --plugin=/home/alain/o/build/orthanc-dicomweb/libOrthancDicomWeb.so \ + --plugin=/home/alain/o/build/orthanc-authorization/libOrthancAuthorization.so \ + --break_before_preparation diff -r 28fef24147fa -r 80ba6f1d521c NewTests/helpers.py --- a/NewTests/helpers.py Wed Sep 06 17:03:54 2023 +0200 +++ b/NewTests/helpers.py Wed Sep 06 17:04:36 2023 +0200 @@ -82,12 +82,17 @@ def tearDownClass(cls): if not Helpers.break_after_preparation: cls.kill_orthanc() + cls._terminate() @classmethod def prepare(cls): pass # to override @classmethod + def terminate(cls): + pass # to override + + @classmethod def _prepare(cls): if not Helpers.skip_preparation: cls.prepare() @@ -243,7 +248,10 @@ @classmethod def kill_orthanc(cls): if Helpers.is_exe(): - cls._orthanc_process.kill() + if cls._orthanc_process: + cls._orthanc_process.kill() + else: + return else: subprocess.run(["docker", "stop", cls._orthanc_container_name]) output = cls.get_orthanc_process_output() diff -r 28fef24147fa -r 80ba6f1d521c NewTests/requirements.txt --- a/NewTests/requirements.txt Wed Sep 06 17:03:54 2023 +0200 +++ b/NewTests/requirements.txt Wed Sep 06 17:04:36 2023 +0200 @@ -1,1 +1,3 @@ -orthanc-tools==0.4.9 \ No newline at end of file +orthanc-api-client>=0.13.6 +orthanc-tools>=0.8.9 +uvicorn \ No newline at end of file