# HG changeset patch # User Alain Mazy # Date 1694167430 -7200 # Node ID 0649a19df194fa816ff564dd8725dc70291af6b0 # Parent 80ba6f1d521ca50aa17ffd526287c0588cfb8a4e new tests for auth-service diff -r 80ba6f1d521c -r 0649a19df194 NewTests/Authorization/Dockerfile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/NewTests/Authorization/Dockerfile Fri Sep 08 12:03:50 2023 +0200 @@ -0,0 +1,11 @@ +FROM python:3.11 + +RUN pip install uvicorn +RUN pip install fastapi==0.103.0 + +RUN mkdir /auth-service +COPY *.py /auth-service + +WORKDIR /auth-service +ENTRYPOINT [ "uvicorn", "auth_service:app", "--host", "0.0.0.0", "--port", "8020"] + diff -r 80ba6f1d521c -r 0649a19df194 NewTests/Authorization/auth_service.py --- a/NewTests/Authorization/auth_service.py Wed Sep 06 17:04:36 2023 +0200 +++ b/NewTests/Authorization/auth_service.py Fri Sep 08 12:03:50 2023 +0200 @@ -6,6 +6,10 @@ # Sample Authorization service that is started when the test starts. # It does not check token validity and simply implements a set of basic users + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + app = FastAPI() @@ -14,6 +18,13 @@ def get_user_profile(user_profile_request: UserProfileRequest): logging.info("get user profile: " + user_profile_request.json()) + p = UserProfileResponse( + name="anonymous", + permissions=[], + authorized_labels=[], + validity=60 + ) + if user_profile_request.token_key == "user-token-key": if user_profile_request.token_value == "token-uploader": p = UserProfileResponse( @@ -22,7 +33,6 @@ authorized_labels=["*"], validity=60 ) - return p elif user_profile_request.token_value == "token-admin": p = UserProfileResponse( name="admin", @@ -30,7 +40,6 @@ authorized_labels=["*"], validity=60 ) - return p elif user_profile_request.token_value == "token-user-a": p = UserProfileResponse( name="user-a", @@ -38,6 +47,23 @@ authorized_labels=["label_a"], validity=60 ) - return p - + + return p + + +@app.post("/tokens/validate") +def validate_authorization(request: TokenValidationRequest): + + logging.info("validating token: " + request.json()) + granted = False + if request.token_value == "token-knix-study": + granted = request.orthanc_id == "b9c08539-26f93bde-c81ab0d7-bffaf2cb-a4d0bdd0" + + response = TokenValidationResponse( + granted=granted, + validity=60 + ) + + logging.info("validate token: " + response.json()) + return response diff -r 80ba6f1d521c -r 0649a19df194 NewTests/Authorization/models.py --- a/NewTests/Authorization/models.py Wed Sep 06 17:04:36 2023 +0200 +++ b/NewTests/Authorization/models.py Fri Sep 08 12:03:50 2023 +0200 @@ -1,25 +1,9 @@ -# SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL -# -# SPDX-License-Identifier: GPL-3.0-or-later - from typing import Optional, List from pydantic import BaseModel, Field -from pydantic.datetime_parse import parse_datetime from enum import Enum from datetime import datetime -class StringDateTime(datetime): - @classmethod - def __get_validators__(cls): - yield parse_datetime - yield cls.validate - - @classmethod - def validate(cls, v: datetime): - return v.isoformat() - - class Levels(str, Enum): PATIENT = 'patient' STUDY = 'study' @@ -67,18 +51,18 @@ level: Levels class Config: # allow creating object from dict (used when deserializing the JWT) - allow_population_by_field_name = True + populate_by_name = True class TokenCreationRequest(BaseModel): id: Optional[str] = None resources: List[OrthancResource] type: TokenType = Field(default=TokenType.INVALID) - expiration_date: Optional[StringDateTime] = Field(alias="expiration-date", default=None) + expiration_date: Optional[datetime] = Field(alias="expiration-date", default=None) validity_duration: Optional[int] = Field(alias='validity-duration', default=None) # alternate way to provide an expiration_date, more convenient for instant-links since the duration is relative to the server time, not the client time ! class Config: # allow creating object from dict (used when deserializing the JWT) - allow_population_by_field_name = True + populate_by_name = True class TokenCreationResponse(BaseModel): @@ -95,7 +79,7 @@ server_id: Optional[str] = Field(alias="server-id", default=None) level: Optional[Levels] method: Methods - uri: Optional[str] + uri: Optional[str] = None # labels: Optional[List[str]] @@ -141,10 +125,9 @@ class UserProfileResponse(BaseModel): name: str authorized_labels: List[str] = Field(alias="authorized-labels", default_factory=list) - # authorized_labels: List[str] = Field(default_factory=list) permissions: List[UserPermissions] = Field(default_factory=list) validity: int class Config: use_enum_values = True - allow_population_by_field_name = True \ No newline at end of file + populate_by_name = True \ No newline at end of file diff -r 80ba6f1d521c -r 0649a19df194 NewTests/Authorization/test_authorization.py --- a/NewTests/Authorization/test_authorization.py Wed Sep 06 17:04:36 2023 +0200 +++ b/NewTests/Authorization/test_authorization.py Fri Sep 08 12:03:50 2023 +0200 @@ -21,7 +21,7 @@ auth_service_process = None @classmethod - def _terminate(cls): + def terminate(cls): cls.auth_service_process.terminate() @classmethod @@ -33,16 +33,21 @@ cls.clear_storage(storage_name=storage_name) + auth_service_hostname = "localhost" + if Helpers.is_docker(): + auth_service_hostname = "auth-service" + cls.create_docker_network("auth-test-network") + config = { "AuthenticationEnabled": False, "Authorization": { - "WebServiceRootUrl": "http://localhost:8020/", + "WebServiceRootUrl": f"http://{auth_service_hostname}:8020/", "StandardConfigurations": [ "orthanc-explorer-2", "stone-webviewer" ], "CheckedLevel": "studies", - "TokenHttpHeaders": ["user-token-key"], + "TokenHttpHeaders": ["user-token-key", "resource-token-key"], "TokenGetArguments": ["resource-token-key"] } } @@ -54,9 +59,16 @@ plugins=Helpers.plugins ) - # Start the auth-service application as a subprocess and wait for it to start - cls.auth_service_process = subprocess.Popen(["uvicorn", "auth_service:app", "--host", "0.0.0.0", "--port", "8020"], cwd=here) - time.sleep(2) + if Helpers.is_exe(): + # Start the auth-service application as a subprocess and wait for it to start + cls.auth_service_process = subprocess.Popen(["uvicorn", "auth_service:app", "--host", "0.0.0.0", "--port", "8020"], cwd=here) + time.sleep(2) + else: + # first build the docker image for the auth-service + subprocess.run(["docker", "build", "-t", "auth-service", "."], cwd=here) + cls.auth_service_process = subprocess.Popen(["docker", "run", "-p", "8020:8020", "--network", "auth-test-network", "--name", "auth-service", "auth-service"]) + pass + if Helpers.break_before_preparation: print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++") @@ -66,7 +78,8 @@ config_name=f"{test_name}", storage_name=storage_name, config=config, - plugins=Helpers.plugins + plugins=Helpers.plugins, + docker_network="auth-test-network" ) uploader = OrthancApiClient(cls.o._root_url, headers={"user-token-key": "token-uploader"}) @@ -86,6 +99,12 @@ cls.no_label_study_id = uploader.instances.get_parent_study_id(instances_ids[0]) + def assert_is_forbidden(self, api_call): + with self.assertRaises(orthanc_exceptions.HttpError) as ctx: + api_call() + self.assertEqual(403, ctx.exception.http_status_code) + + def test_admin_user(self): o = OrthancApiClient(self.o._root_url, headers={"user-token-key": "token-admin"}) @@ -131,13 +150,8 @@ self.assertEqual("label_a", all_labels[0]) # make sure we can access only the label_a studies - with self.assertRaises(orthanc_exceptions.HttpError) as ctx: - o.studies.get_tags(self.label_b_study_id) - self.assertEqual(403, ctx.exception.http_status_code) - - with self.assertRaises(orthanc_exceptions.HttpError) as ctx: - o.studies.get_tags(self.no_label_study_id) - self.assertEqual(403, ctx.exception.http_status_code) + self.assert_is_forbidden(lambda: o.studies.get_tags(self.label_b_study_id)) + self.assert_is_forbidden(lambda: o.studies.get_tags(self.no_label_study_id)) # should not raise o.studies.get_tags(self.label_a_study_id) @@ -148,9 +162,7 @@ o.instances.get_tags(instances_ids[0]) # make sure we can not access series and instances of the label_b studies - with self.assertRaises(orthanc_exceptions.HttpError) as ctx: - series_ids = o.studies.get_series_ids(self.label_b_study_id) - self.assertEqual(403, ctx.exception.http_status_code) + self.assert_is_forbidden(lambda: o.studies.get_series_ids(self.label_b_study_id)) # make sure tools/find only returns the label_a studies studies = o.studies.find(query={}, @@ -167,25 +179,19 @@ self.assertEqual(self.label_a_study_id, studies[0].orthanc_id) # if searching Any of label_b, expect a Forbidden access - with self.assertRaises(orthanc_exceptions.HttpError) as ctx: - studies = o.studies.find(query={}, - labels=['label_b'], - labels_constraint='Any') - self.assertEqual(403, ctx.exception.http_status_code) + self.assert_is_forbidden(lambda: o.studies.find(query={}, + labels=['label_b'], + labels_constraint='Any')) # if searching None of label_b, expect a Forbidden access because we are not able to compute this filter - with self.assertRaises(orthanc_exceptions.HttpError) as ctx: - studies = o.studies.find(query={}, - labels=['label_b'], - labels_constraint='None') - self.assertEqual(403, ctx.exception.http_status_code) + self.assert_is_forbidden(lambda: o.studies.find(query={}, + labels=['label_b'], + labels_constraint='None')) # if searching All of label_b, expect a Forbidden access because we are not able to compute this filter - with self.assertRaises(orthanc_exceptions.HttpError) as ctx: - studies = o.studies.find(query={}, - labels=['label_b'], - labels_constraint='All') - self.assertEqual(403, ctx.exception.http_status_code) + self.assert_is_forbidden(lambda: o.studies.find(query={}, + labels=['label_b'], + labels_constraint='All')) studies = o.studies.find(query={"PatientName": "KNIX"}, # KNIX is label_a labels=[], @@ -197,8 +203,48 @@ labels_constraint='Any') self.assertEqual(1, len(studies)) - with self.assertRaises(orthanc_exceptions.HttpError) as ctx: - studies = o.studies.find(query={"PatientName": "KNIX"}, # KNIX is label_a - labels=['label_b'], - labels_constraint='Any') - self.assertEqual(403, ctx.exception.http_status_code) \ No newline at end of file + self.assert_is_forbidden(lambda: o.studies.find(query={"PatientName": "KNIX"}, # KNIX is label_a + labels=['label_b'], + labels_constraint='Any')) + + # make sure some generic routes are not accessible + self.assert_is_forbidden(lambda: o.get_json('patients?expand')) + self.assert_is_forbidden(lambda: o.get_json('studies?expand')) + self.assert_is_forbidden(lambda: o.get_json('series?expand')) + self.assert_is_forbidden(lambda: o.get_json('instances?expand')) + self.assert_is_forbidden(lambda: o.get_json('studies')) + self.assert_is_forbidden(lambda: o.get_json('studies/')) + + + + def test_resource_token(self): + + o = OrthancApiClient(self.o._root_url, headers={"resource-token-key": "token-knix-study"}) + + # with a resource token, we can access only the given resource, not generic resources or resources from other studies + + # generic resources are forbidden + self.assert_is_forbidden(lambda: o.studies.find(query={"PatientName": "KNIX"}, # KNIX is label_a + labels=['label_b'], + labels_constraint='Any')) + self.assert_is_forbidden(lambda: o.get_all_labels()) + self.assert_is_forbidden(lambda: o.studies.get_all_ids()) + self.assert_is_forbidden(lambda: o.patients.get_all_ids()) + self.assert_is_forbidden(lambda: o.series.get_all_ids()) + self.assert_is_forbidden(lambda: o.instances.get_all_ids()) + self.assert_is_forbidden(lambda: o.get_json('patients?expand')) + self.assert_is_forbidden(lambda: o.get_json('studies?expand')) + self.assert_is_forbidden(lambda: o.get_json('series?expand')) + self.assert_is_forbidden(lambda: o.get_json('instances?expand')) + + # some resources are still accessible to the 'anonymous' user -> does not throw + o.get_system() + o.lookup("1.2.3") # this route is still explicitely authorized because it is used by Stone + + # other studies are forbidden + self.assert_is_forbidden(lambda: o.studies.get_series_ids(self.label_b_study_id)) + + # the label_a study is allowed + o.studies.get_series_ids(self.label_a_study_id) + + # TODO: test with DicomWEB routes + sub-routes \ No newline at end of file diff -r 80ba6f1d521c -r 0649a19df194 NewTests/README --- a/NewTests/README Wed Sep 06 17:04:36 2023 +0200 +++ b/NewTests/README Fri Sep 08 12:03:50 2023 +0200 @@ -144,3 +144,10 @@ --plugin=/home/alain/o/build/orthanc-dicomweb/libOrthancDicomWeb.so \ --plugin=/home/alain/o/build/orthanc-authorization/libOrthancAuthorization.so \ --break_before_preparation + +with Docker: + +python3 NewTests/main.py --pattern=Authorization.test_authorization.TestAuthorization. \ + --orthanc_under_tests_docker_image=osimis/orthanc:current \ + --orthanc_previous_version_docker_image=osimis/orthanc:22.4.0 \ + --orthanc_under_tests_http_port=8043 diff -r 80ba6f1d521c -r 0649a19df194 NewTests/helpers.py --- a/NewTests/helpers.py Wed Sep 06 17:04:36 2023 +0200 +++ b/NewTests/helpers.py Fri Sep 08 12:03:50 2023 +0200 @@ -82,7 +82,7 @@ def tearDownClass(cls): if not Helpers.break_after_preparation: cls.kill_orthanc() - cls._terminate() + cls.terminate() @classmethod def prepare(cls): @@ -141,6 +141,12 @@ subprocess.run(["docker", "volume", "rm", "-f", storage_name]) @classmethod + def create_docker_network(cls, network: str): + if Helpers.is_docker(): + subprocess.run(["docker", "network", "rm", network]) # ignore error + subprocess.run(["docker", "network", "create", network]) + + @classmethod def launch_orthanc_to_prepare_db(cls, config_name: str = None, config: object = None, config_path: str = None, storage_name: str = None, plugins = []): if config_name and storage_name and config: # generate the configuration file @@ -170,7 +176,7 @@ raise RuntimeError("Invalid configuration, can not launch Orthanc") @classmethod - def launch_orthanc_under_tests(cls, config_name: str = None, config: object = None, config_path: str = None, storage_name: str = None, plugins = []): + def launch_orthanc_under_tests(cls, config_name: str = None, config: object = None, config_path: str = None, storage_name: str = None, plugins = [], docker_network: str = None): if config_name and storage_name and config: # generate the configuration file config_path = cls.generate_configuration( @@ -193,7 +199,8 @@ docker_image=Helpers.orthanc_under_tests_docker_image, storage_name=storage_name, config_name=config_name, - config_path=config_path + config_path=config_path, + network=docker_network ) else: raise RuntimeError("Invalid configuration, can not launch Orthanc") @@ -214,7 +221,7 @@ raise RuntimeError(f"Orthanc failed to start '{exe_path}', conf = '{config_path}'. Check output above") @classmethod - def launch_orthanc_docker(cls, docker_image: str, storage_name: str, config_path: str, config_name: str): + def launch_orthanc_docker(cls, docker_image: str, storage_name: str, config_path: str, config_name: str, network: str = None): storage_path = cls.get_storage_path(storage_name=storage_name) cmd = [ @@ -225,9 +232,12 @@ "-v", f"{storage_path}:/var/lib/orthanc/db/", "--name", config_name, "-p", f"{Helpers.orthanc_under_tests_http_port}:{Helpers.orthanc_under_tests_http_port}", - "-p", f"{Helpers.orthanc_under_tests_dicom_port}:{Helpers.orthanc_under_tests_dicom_port}", - docker_image + "-p", f"{Helpers.orthanc_under_tests_dicom_port}:{Helpers.orthanc_under_tests_dicom_port}" ] + if network: + cmd.extend(["--network", network]) + cmd.append(docker_image) + cls._orthanc_container_name = config_name print("docker cmd line: " + " ".join(cmd))