changeset 576:80ba6f1d521c

new tests for authorization plugin (native only)
author Alain Mazy <am@osimis.io>
date Wed, 06 Sep 2023 17:04:36 +0200
parents 28fef24147fa
children 0649a19df194
files NewTests/Authorization/__init__.py NewTests/Authorization/auth_service.py NewTests/Authorization/models.py NewTests/Authorization/test_authorization.py NewTests/README NewTests/helpers.py NewTests/requirements.txt
diffstat 6 files changed, 421 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/NewTests/Authorization/auth_service.py	Wed Sep 06 17:04:36 2023 +0200
@@ -0,0 +1,43 @@
+from fastapi import FastAPI
+import logging
+from models import *
+import pprint
+
+# Sample Authorization service that is started when the test starts.
+# It does not check token validity and simply implements a set of basic users
+
+app = FastAPI()
+
+
+
+@app.post("/user/get-profile") 
+def get_user_profile(user_profile_request: UserProfileRequest):
+    logging.info("get user profile: " + user_profile_request.json())
+
+    if user_profile_request.token_key == "user-token-key":
+        if user_profile_request.token_value == "token-uploader":
+            p = UserProfileResponse(
+                name="uploader",
+                permissions=["upload", "edit-labels", "delete", "view"],
+                authorized_labels=["*"],
+                validity=60
+            )
+            return p
+        elif user_profile_request.token_value == "token-admin":
+            p = UserProfileResponse(
+                name="admin",
+                permissions=["all"],
+                authorized_labels=["*"],
+                validity=60
+            )
+            return p
+        elif user_profile_request.token_value == "token-user-a":
+            p = UserProfileResponse(
+                name="user-a",
+                permissions=["view"],
+                authorized_labels=["label_a"],
+                validity=60
+            )
+            return p
+            
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/NewTests/Authorization/models.py	Wed Sep 06 17:04:36 2023 +0200
@@ -0,0 +1,150 @@
+# SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL <info@orthanc.team>
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from typing import Optional, List
+from pydantic import BaseModel, Field
+from pydantic.datetime_parse import parse_datetime
+from enum import Enum
+from datetime import datetime
+
+
+class StringDateTime(datetime):
+    @classmethod
+    def __get_validators__(cls):
+        yield parse_datetime
+        yield cls.validate
+
+    @classmethod
+    def validate(cls, v: datetime):
+        return v.isoformat()
+
+
+class Levels(str, Enum):
+    PATIENT = 'patient'
+    STUDY = 'study'
+    SERIES = 'series'
+    INSTANCE = 'instance'
+
+    SYSTEM = 'system'
+
+
+class Methods(str, Enum):
+    GET = 'get'
+    POST = 'post'
+    PUT = 'put'
+    DELETE = 'delete'
+
+
+class DecoderErrorCodes(str, Enum):
+    EXPIRED = 'expired'
+    INVALID = 'invalid'
+    UNKNOWN = 'unknown'
+
+
+class TokenType(str, Enum):
+    OSIMIS_VIEWER_PUBLICATION = 'osimis-viewer-publication'  # a link to open the Osimis viewer valid for a long period
+    MEDDREAM_VIEWER_PUBLICATION = 'meddream-viewer-publication'  # a link to open the MedDream viewer valid for a long period
+    STONE_VIEWER_PUBLICATION = 'stone-viewer-publication'  # a link to open the Stone viewer valid for a long period
+    OHIF_VIEWER_PUBLICATION = 'ohif-viewer-publication'  # a link to open the OHIF viewer valid for a long period
+
+    MEDDREAM_INSTANT_LINK = 'meddream-instant-link'  # a direct link to MedDream viewer that is valid only a few minutes to open the viewer directly
+
+    # OSIMIS_VIEWER_INSTANT_LINK = 'osimis-viewer-instant-link'  # a direct link to Osimis viewer that is valid only a few minutes to open the viewer directly
+    # STONE_VIEWER_INSTANT_LINK = 'stone-viewer-instant-link'  # a direct link to Stone viewer that is valid only a few minutes to open the viewer directly
+    #
+    # DOWNLOAD_INSTANT_LINK = 'download-instant-link'  # a link to download a study/series/instance directly
+    VIEWER_INSTANT_LINK = 'viewer-instant-link'             # a link to a resource to be used directly.
+    DOWNLOAD_INSTANT_LINK = 'download-instant-link'         # a link to a resource to be used directly.
+
+
+    INVALID = 'invalid'
+
+class OrthancResource(BaseModel):
+    dicom_uid: Optional[str] = Field(alias="dicom-uid", default=None)
+    orthanc_id: Optional[str] = Field(alias="orthanc-id", default=None)
+    url: Optional[str] = None                                                       # e.g. a download link /studies/.../archive
+    level: Levels
+
+    class Config:  # allow creating object from dict (used when deserializing the JWT)
+        allow_population_by_field_name = True
+
+
+class TokenCreationRequest(BaseModel):
+    id: Optional[str] = None
+    resources: List[OrthancResource]
+    type: TokenType = Field(default=TokenType.INVALID)
+    expiration_date: Optional[StringDateTime] = Field(alias="expiration-date", default=None)
+    validity_duration: Optional[int] = Field(alias='validity-duration', default=None)            # alternate way to provide an expiration_date, more convenient for instant-links since the duration is relative to the server time, not the client time !
+
+    class Config:  # allow creating object from dict (used when deserializing the JWT)
+        allow_population_by_field_name = True
+
+
+class TokenCreationResponse(BaseModel):
+    request: TokenCreationRequest
+    token: str
+    url: Optional[str] = None
+
+
+class TokenValidationRequest(BaseModel):
+    dicom_uid: Optional[str] = Field(alias="dicom-uid", default=None)
+    orthanc_id: Optional[str] = Field(alias="orthanc-id", default=None)
+    token_key: Optional[str] = Field(alias="token-key", default=None)
+    token_value: Optional[str] = Field(alias="token-value", default=None)
+    server_id: Optional[str] = Field(alias="server-id", default=None)
+    level: Optional[Levels]
+    method: Methods
+    uri: Optional[str]
+#     labels: Optional[List[str]]
+
+
+class TokenValidationResponse(BaseModel):
+    granted: bool
+    validity: int
+
+
+class TokenDecoderRequest(BaseModel):
+    token_key: Optional[str] = Field(alias="token-key", default=None)
+    token_value: Optional[str] = Field(alias="token-value", default=None)
+
+
+class TokenDecoderResponse(BaseModel):
+    token_type: Optional[TokenType] = Field(alias="token-type", default=None)
+    error_code: Optional[DecoderErrorCodes] = Field(alias="error-code", default=None)
+    redirect_url: Optional[str] = Field(alias="redirect-url", default=None)
+
+
+class UserProfileRequest(BaseModel):
+    token_key: Optional[str] = Field(alias="token-key", default=None)
+    token_value: Optional[str] = Field(alias="token-value", default=None)
+    server_id: Optional[str] = Field(alias="server-id", default=None)
+
+
+class UserPermissions(str, Enum):
+    ALL = 'all'
+    VIEW = 'view'
+    DOWNLOAD = 'download'
+    DELETE = 'delete'
+    SEND = 'send'
+    MODIFY = 'modify'
+    ANONYMIZE = 'anonymize'
+    UPLOAD = 'upload'
+    Q_R_REMOTE_MODALITIES = 'q-r-remote-modalities'
+    SETTINGS = 'settings'
+    API_VIEW = 'api-view'
+    EDIT_LABELS = 'edit-labels'
+
+    SHARE = 'share'
+
+
+class UserProfileResponse(BaseModel):
+    name: str
+    authorized_labels: List[str] = Field(alias="authorized-labels", default_factory=list)
+    # authorized_labels: List[str] = Field(default_factory=list)
+    permissions: List[UserPermissions] = Field(default_factory=list)
+    validity: int
+
+    class Config:
+        use_enum_values = True
+        allow_population_by_field_name = True
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/NewTests/Authorization/test_authorization.py	Wed Sep 06 17:04:36 2023 +0200
@@ -0,0 +1,204 @@
+import unittest
+import time
+import pprint
+import subprocess
+from helpers import OrthancTestCase, Helpers
+
+from orthanc_api_client import OrthancApiClient, generate_test_dicom_file
+from orthanc_api_client import exceptions as orthanc_exceptions
+
+import logging
+import pathlib
+here = pathlib.Path(__file__).parent.resolve()
+
+
+
+class TestAuthorization(OrthancTestCase):
+
+    label_a_study_id = None
+    label_b_study_id = None
+    no_label_study_id = None
+    auth_service_process = None
+
+    @classmethod
+    def _terminate(cls):
+        cls.auth_service_process.terminate()
+
+    @classmethod
+    def prepare(cls):
+        test_name = "Authorization"
+        storage_name = "authorization"
+
+        print(f'-------------- preparing {test_name} tests')
+
+        cls.clear_storage(storage_name=storage_name)
+
+        config = {
+                "AuthenticationEnabled": False,
+                "Authorization": {
+                    "WebServiceRootUrl": "http://localhost:8020/",
+                    "StandardConfigurations": [
+                        "orthanc-explorer-2",
+                        "stone-webviewer"
+                    ],
+                    "CheckedLevel": "studies",
+                    "TokenHttpHeaders": ["user-token-key"],
+                    "TokenGetArguments": ["resource-token-key"]
+                }
+            }
+
+        config_path = cls.generate_configuration(
+            config_name=f"{test_name}",
+            storage_name=storage_name,
+            config=config,
+            plugins=Helpers.plugins
+        )
+
+        # Start the auth-service application as a subprocess and wait for it to start
+        cls.auth_service_process = subprocess.Popen(["uvicorn", "auth_service:app", "--host", "0.0.0.0", "--port", "8020"], cwd=here)
+        time.sleep(2)
+
+        if Helpers.break_before_preparation:
+            print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++")
+            input("Press Enter to continue")
+        else:
+            cls.launch_orthanc_under_tests(
+                config_name=f"{test_name}",
+                storage_name=storage_name,
+                config=config,
+                plugins=Helpers.plugins
+            )
+
+        uploader = OrthancApiClient(cls.o._root_url, headers={"user-token-key": "token-uploader"})
+
+        uploader.delete_all_content()
+
+        # upload a few studies and add labels
+        instances_ids = uploader.upload_file(here / "../../Database/Knix/Loc/IM-0001-0001.dcm")
+        cls.label_a_study_id = uploader.instances.get_parent_study_id(instances_ids[0])
+        uploader.studies.add_label(cls.label_a_study_id, "label_a")
+
+        instances_ids = uploader.upload_file(here / "../../Database/Brainix/Epi/IM-0001-0001.dcm")
+        cls.label_b_study_id = uploader.instances.get_parent_study_id(instances_ids[0])
+        uploader.studies.add_label(cls.label_b_study_id, "label_b")
+
+        instances_ids = uploader.upload_file(here / "../../Database/Comunix/Pet/IM-0001-0001.dcm")
+        cls.no_label_study_id = uploader.instances.get_parent_study_id(instances_ids[0])
+
+
+    def test_admin_user(self):
+        
+        o = OrthancApiClient(self.o._root_url, headers={"user-token-key": "token-admin"})
+
+        # make sure we can access all these urls (they would throw if not)
+        system = o.get_system()
+
+        # make sure we can access all studies
+        o.studies.get_tags(self.no_label_study_id)
+        o.studies.get_tags(self.label_a_study_id)
+        o.studies.get_tags(self.label_b_study_id)
+
+        # make sure we can access series and instances of these studies
+        series_ids = o.studies.get_series_ids(self.label_a_study_id)
+        instances_ids = o.series.get_instances_ids(series_ids[0])
+        o.instances.get_tags(instances_ids[0])
+
+        # make sure labels filtering still works
+        self.assertEqual(3, len(o.studies.find(query={},
+                                               labels=[],
+                                               labels_constraint='Any')))
+
+        self.assertEqual(2, len(o.studies.find(query={},
+                                               labels=['label_a', 'label_b'],
+                                               labels_constraint='Any')))
+
+        self.assertEqual(2, len(o.studies.find(query={},
+                                               labels=['label_a'],
+                                               labels_constraint='None')))
+
+        all_labels = o.get_all_labels()
+        self.assertEqual(2, len(all_labels))
+
+    def test_user_a(self):
+        
+        o = OrthancApiClient(self.o._root_url, headers={"user-token-key": "token-user-a"})
+
+        # # make sure we can access all these urls (they would throw if not)
+        # system = o.get_system()
+
+        all_labels = o.get_all_labels()
+        self.assertEqual(1, len(all_labels))
+        self.assertEqual("label_a", all_labels[0])
+
+        # make sure we can access only the label_a studies
+        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
+            o.studies.get_tags(self.label_b_study_id)
+        self.assertEqual(403, ctx.exception.http_status_code)
+
+        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
+            o.studies.get_tags(self.no_label_study_id)
+        self.assertEqual(403, ctx.exception.http_status_code)
+
+        # should not raise
+        o.studies.get_tags(self.label_a_study_id)
+
+        # make sure we can access series and instances of the label_a studies
+        series_ids = o.studies.get_series_ids(self.label_a_study_id)
+        instances_ids = o.series.get_instances_ids(series_ids[0])
+        o.instances.get_tags(instances_ids[0])
+
+        # make sure we can not access series and instances of the label_b studies
+        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
+            series_ids = o.studies.get_series_ids(self.label_b_study_id)
+        self.assertEqual(403, ctx.exception.http_status_code)
+
+        # make sure tools/find only returns the label_a studies
+        studies = o.studies.find(query={},
+                                 labels=[],
+                                 labels_constraint='Any')
+        self.assertEqual(1, len(studies))
+        self.assertEqual(self.label_a_study_id, studies[0].orthanc_id)
+
+        # if searching Any of label_a & label_b, return only label_a
+        studies = o.studies.find(query={},
+                                 labels=['label_a', 'label_b'],
+                                 labels_constraint='Any')
+        self.assertEqual(1, len(studies))
+        self.assertEqual(self.label_a_study_id, studies[0].orthanc_id)
+
+        # if searching Any of label_b, expect a Forbidden access
+        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
+            studies = o.studies.find(query={},
+                                     labels=['label_b'],
+                                     labels_constraint='Any')
+        self.assertEqual(403, ctx.exception.http_status_code)
+
+        # if searching None of label_b, expect a Forbidden access because we are not able to compute this filter
+        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
+            studies = o.studies.find(query={},
+                                     labels=['label_b'],
+                                     labels_constraint='None')
+        self.assertEqual(403, ctx.exception.http_status_code)
+
+        # if searching All of label_b, expect a Forbidden access because we are not able to compute this filter
+        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
+            studies = o.studies.find(query={},
+                                     labels=['label_b'],
+                                     labels_constraint='All')
+        self.assertEqual(403, ctx.exception.http_status_code)
+
+        studies = o.studies.find(query={"PatientName": "KNIX"},  # KNIX is label_a
+                                 labels=[],
+                                 labels_constraint='Any')
+        self.assertEqual(1, len(studies))
+
+        studies = o.studies.find(query={"PatientName": "KNIX"},  # KNIX is label_a
+                                 labels=['label_a'],
+                                 labels_constraint='Any')
+        self.assertEqual(1, len(studies))
+
+        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
+            studies = o.studies.find(query={"PatientName": "KNIX"},  # KNIX is label_a
+                                     labels=['label_b'],
+                                     labels_constraint='Any')
+        self.assertEqual(403, ctx.exception.http_status_code)
\ No newline at end of file
--- a/NewTests/README	Wed Sep 06 17:03:54 2023 +0200
+++ b/NewTests/README	Wed Sep 06 17:04:36 2023 +0200
@@ -132,3 +132,15 @@
                          --plugin=/home/alain/o/build/orthanc-dicomweb/libOrthancDicomWeb.so \
                          --break_after_preparation
 
+
+Authorization:
+--------------
+
+Run the Authorization tests with your locally build version and break before execution to allow you to start your debugger.
+
+python3 NewTests/main.py --pattern=Authorization.test_authorization.TestAuthorization.* \
+                         --orthanc_under_tests_exe=/home/alain/o/build/orthanc/Orthanc \
+                         --orthanc_under_tests_http_port=8043 \
+                         --plugin=/home/alain/o/build/orthanc-dicomweb/libOrthancDicomWeb.so \
+                         --plugin=/home/alain/o/build/orthanc-authorization/libOrthancAuthorization.so \
+                         --break_before_preparation
--- a/NewTests/helpers.py	Wed Sep 06 17:03:54 2023 +0200
+++ b/NewTests/helpers.py	Wed Sep 06 17:04:36 2023 +0200
@@ -82,12 +82,17 @@
     def tearDownClass(cls):
         if not Helpers.break_after_preparation:
             cls.kill_orthanc()
+        cls._terminate()
 
     @classmethod
     def prepare(cls):
         pass # to override
 
     @classmethod
+    def terminate(cls):
+        pass # to override
+
+    @classmethod
     def _prepare(cls):
         if not Helpers.skip_preparation:
             cls.prepare()
@@ -243,7 +248,10 @@
     @classmethod
     def kill_orthanc(cls):
         if Helpers.is_exe():
-            cls._orthanc_process.kill()
+            if cls._orthanc_process:
+                cls._orthanc_process.kill()
+            else:
+                return
         else:
             subprocess.run(["docker", "stop", cls._orthanc_container_name])
         output = cls.get_orthanc_process_output()
--- a/NewTests/requirements.txt	Wed Sep 06 17:03:54 2023 +0200
+++ b/NewTests/requirements.txt	Wed Sep 06 17:04:36 2023 +0200
@@ -1,1 +1,3 @@
-orthanc-tools==0.4.9
\ No newline at end of file
+orthanc-api-client>=0.13.6
+orthanc-tools>=0.8.9
+uvicorn
\ No newline at end of file