changeset 577:0649a19df194

new tests for auth-service
author Alain Mazy <am@osimis.io>
date Fri, 08 Sep 2023 12:03:50 +0200
parents 80ba6f1d521c
children c474f0f815b6
files NewTests/Authorization/Dockerfile NewTests/Authorization/auth_service.py NewTests/Authorization/models.py NewTests/Authorization/test_authorization.py NewTests/README NewTests/helpers.py
diffstat 6 files changed, 152 insertions(+), 69 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/NewTests/Authorization/Dockerfile	Fri Sep 08 12:03:50 2023 +0200
@@ -0,0 +1,11 @@
+FROM python:3.11
+
+RUN pip install uvicorn
+RUN pip install fastapi==0.103.0
+
+RUN mkdir /auth-service
+COPY *.py /auth-service
+
+WORKDIR /auth-service
+ENTRYPOINT [ "uvicorn", "auth_service:app", "--host", "0.0.0.0", "--port", "8020"]
+
--- a/NewTests/Authorization/auth_service.py	Wed Sep 06 17:04:36 2023 +0200
+++ b/NewTests/Authorization/auth_service.py	Fri Sep 08 12:03:50 2023 +0200
@@ -6,6 +6,10 @@
 # Sample Authorization service that is started when the test starts.
 # It does not check token validity and simply implements a set of basic users
 
+
+logger = logging.getLogger()
+logger.setLevel(logging.DEBUG)
+
 app = FastAPI()
 
 
@@ -14,6 +18,13 @@
 def get_user_profile(user_profile_request: UserProfileRequest):
     logging.info("get user profile: " + user_profile_request.json())
 
+    p = UserProfileResponse(
+        name="anonymous",
+        permissions=[],
+        authorized_labels=[],
+        validity=60
+    )
+
     if user_profile_request.token_key == "user-token-key":
         if user_profile_request.token_value == "token-uploader":
             p = UserProfileResponse(
@@ -22,7 +33,6 @@
                 authorized_labels=["*"],
                 validity=60
             )
-            return p
         elif user_profile_request.token_value == "token-admin":
             p = UserProfileResponse(
                 name="admin",
@@ -30,7 +40,6 @@
                 authorized_labels=["*"],
                 validity=60
             )
-            return p
         elif user_profile_request.token_value == "token-user-a":
             p = UserProfileResponse(
                 name="user-a",
@@ -38,6 +47,23 @@
                 authorized_labels=["label_a"],
                 validity=60
             )
-            return p
-            
+
+    return p        
+
+
+@app.post("/tokens/validate")
+def validate_authorization(request: TokenValidationRequest):
+
+    logging.info("validating token: " + request.json())
 
+    granted = False
+    if request.token_value == "token-knix-study":
+        granted = request.orthanc_id == "b9c08539-26f93bde-c81ab0d7-bffaf2cb-a4d0bdd0"
+
+    response = TokenValidationResponse(
+        granted=granted,
+        validity=60
+    )
+
+    logging.info("validate token: " + response.json())
+    return response
--- a/NewTests/Authorization/models.py	Wed Sep 06 17:04:36 2023 +0200
+++ b/NewTests/Authorization/models.py	Fri Sep 08 12:03:50 2023 +0200
@@ -1,25 +1,9 @@
-# SPDX-FileCopyrightText: 2022 - 2023 Orthanc Team SRL <info@orthanc.team>
-#
-# SPDX-License-Identifier: GPL-3.0-or-later
-
 from typing import Optional, List
 from pydantic import BaseModel, Field
-from pydantic.datetime_parse import parse_datetime
 from enum import Enum
 from datetime import datetime
 
 
-class StringDateTime(datetime):
-    @classmethod
-    def __get_validators__(cls):
-        yield parse_datetime
-        yield cls.validate
-
-    @classmethod
-    def validate(cls, v: datetime):
-        return v.isoformat()
-
-
 class Levels(str, Enum):
     PATIENT = 'patient'
     STUDY = 'study'
@@ -67,18 +51,18 @@
     level: Levels
 
     class Config:  # allow creating object from dict (used when deserializing the JWT)
-        allow_population_by_field_name = True
+        populate_by_name = True
 
 
 class TokenCreationRequest(BaseModel):
     id: Optional[str] = None
     resources: List[OrthancResource]
     type: TokenType = Field(default=TokenType.INVALID)
-    expiration_date: Optional[StringDateTime] = Field(alias="expiration-date", default=None)
+    expiration_date: Optional[datetime] = Field(alias="expiration-date", default=None)
     validity_duration: Optional[int] = Field(alias='validity-duration', default=None)            # alternate way to provide an expiration_date, more convenient for instant-links since the duration is relative to the server time, not the client time !
 
     class Config:  # allow creating object from dict (used when deserializing the JWT)
-        allow_population_by_field_name = True
+        populate_by_name = True
 
 
 class TokenCreationResponse(BaseModel):
@@ -95,7 +79,7 @@
     server_id: Optional[str] = Field(alias="server-id", default=None)
     level: Optional[Levels]
     method: Methods
-    uri: Optional[str]
+    uri: Optional[str] = None
 #     labels: Optional[List[str]]
 
 
@@ -141,10 +125,9 @@
 class UserProfileResponse(BaseModel):
     name: str
     authorized_labels: List[str] = Field(alias="authorized-labels", default_factory=list)
-    # authorized_labels: List[str] = Field(default_factory=list)
     permissions: List[UserPermissions] = Field(default_factory=list)
     validity: int
 
     class Config:
         use_enum_values = True
-        allow_population_by_field_name = True
\ No newline at end of file
+        populate_by_name = True
\ No newline at end of file
--- a/NewTests/Authorization/test_authorization.py	Wed Sep 06 17:04:36 2023 +0200
+++ b/NewTests/Authorization/test_authorization.py	Fri Sep 08 12:03:50 2023 +0200
@@ -21,7 +21,7 @@
     auth_service_process = None
 
     @classmethod
-    def _terminate(cls):
+    def terminate(cls):
         cls.auth_service_process.terminate()
 
     @classmethod
@@ -33,16 +33,21 @@
 
         cls.clear_storage(storage_name=storage_name)
 
+        auth_service_hostname = "localhost"
+        if Helpers.is_docker():
+            auth_service_hostname = "auth-service"
+            cls.create_docker_network("auth-test-network")
+
         config = {
                 "AuthenticationEnabled": False,
                 "Authorization": {
-                    "WebServiceRootUrl": "http://localhost:8020/",
+                    "WebServiceRootUrl": f"http://{auth_service_hostname}:8020/",
                     "StandardConfigurations": [
                         "orthanc-explorer-2",
                         "stone-webviewer"
                     ],
                     "CheckedLevel": "studies",
-                    "TokenHttpHeaders": ["user-token-key"],
+                    "TokenHttpHeaders": ["user-token-key", "resource-token-key"],
                     "TokenGetArguments": ["resource-token-key"]
                 }
             }
@@ -54,9 +59,16 @@
             plugins=Helpers.plugins
         )
 
-        # Start the auth-service application as a subprocess and wait for it to start
-        cls.auth_service_process = subprocess.Popen(["uvicorn", "auth_service:app", "--host", "0.0.0.0", "--port", "8020"], cwd=here)
-        time.sleep(2)
+        if Helpers.is_exe():
+            # Start the auth-service application as a subprocess and wait for it to start
+            cls.auth_service_process = subprocess.Popen(["uvicorn", "auth_service:app", "--host", "0.0.0.0", "--port", "8020"], cwd=here)
+            time.sleep(2)
+        else:
+            # first build the docker image for the auth-service
+            subprocess.run(["docker", "build", "-t", "auth-service", "."], cwd=here)
+            cls.auth_service_process = subprocess.Popen(["docker", "run", "-p", "8020:8020", "--network", "auth-test-network", "--name", "auth-service", "auth-service"])
+            pass
+
 
         if Helpers.break_before_preparation:
             print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++")
@@ -66,7 +78,8 @@
                 config_name=f"{test_name}",
                 storage_name=storage_name,
                 config=config,
-                plugins=Helpers.plugins
+                plugins=Helpers.plugins,
+                docker_network="auth-test-network"
             )
 
         uploader = OrthancApiClient(cls.o._root_url, headers={"user-token-key": "token-uploader"})
@@ -86,6 +99,12 @@
         cls.no_label_study_id = uploader.instances.get_parent_study_id(instances_ids[0])
 
 
+    def assert_is_forbidden(self, api_call):
+        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
+            api_call()
+        self.assertEqual(403, ctx.exception.http_status_code)
+
+
     def test_admin_user(self):
         
         o = OrthancApiClient(self.o._root_url, headers={"user-token-key": "token-admin"})
@@ -131,13 +150,8 @@
         self.assertEqual("label_a", all_labels[0])
 
         # make sure we can access only the label_a studies
-        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
-            o.studies.get_tags(self.label_b_study_id)
-        self.assertEqual(403, ctx.exception.http_status_code)
-
-        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
-            o.studies.get_tags(self.no_label_study_id)
-        self.assertEqual(403, ctx.exception.http_status_code)
+        self.assert_is_forbidden(lambda: o.studies.get_tags(self.label_b_study_id))
+        self.assert_is_forbidden(lambda: o.studies.get_tags(self.no_label_study_id))
 
         # should not raise
         o.studies.get_tags(self.label_a_study_id)
@@ -148,9 +162,7 @@
         o.instances.get_tags(instances_ids[0])
 
         # make sure we can not access series and instances of the label_b studies
-        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
-            series_ids = o.studies.get_series_ids(self.label_b_study_id)
-        self.assertEqual(403, ctx.exception.http_status_code)
+        self.assert_is_forbidden(lambda: o.studies.get_series_ids(self.label_b_study_id))
 
         # make sure tools/find only returns the label_a studies
         studies = o.studies.find(query={},
@@ -167,25 +179,19 @@
         self.assertEqual(self.label_a_study_id, studies[0].orthanc_id)
 
         # if searching Any of label_b, expect a Forbidden access
-        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
-            studies = o.studies.find(query={},
-                                     labels=['label_b'],
-                                     labels_constraint='Any')
-        self.assertEqual(403, ctx.exception.http_status_code)
+        self.assert_is_forbidden(lambda: o.studies.find(query={},
+                                                        labels=['label_b'],
+                                                        labels_constraint='Any'))
 
         # if searching None of label_b, expect a Forbidden access because we are not able to compute this filter
-        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
-            studies = o.studies.find(query={},
-                                     labels=['label_b'],
-                                     labels_constraint='None')
-        self.assertEqual(403, ctx.exception.http_status_code)
+        self.assert_is_forbidden(lambda: o.studies.find(query={},
+                                                        labels=['label_b'],
+                                                        labels_constraint='None'))
 
         # if searching All of label_b, expect a Forbidden access because we are not able to compute this filter
-        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
-            studies = o.studies.find(query={},
-                                     labels=['label_b'],
-                                     labels_constraint='All')
-        self.assertEqual(403, ctx.exception.http_status_code)
+        self.assert_is_forbidden(lambda: o.studies.find(query={},
+                                                        labels=['label_b'],
+                                                        labels_constraint='All'))
 
         studies = o.studies.find(query={"PatientName": "KNIX"},  # KNIX is label_a
                                  labels=[],
@@ -197,8 +203,48 @@
                                  labels_constraint='Any')
         self.assertEqual(1, len(studies))
 
-        with self.assertRaises(orthanc_exceptions.HttpError) as ctx:
-            studies = o.studies.find(query={"PatientName": "KNIX"},  # KNIX is label_a
-                                     labels=['label_b'],
-                                     labels_constraint='Any')
-        self.assertEqual(403, ctx.exception.http_status_code)
\ No newline at end of file
+        self.assert_is_forbidden(lambda: o.studies.find(query={"PatientName": "KNIX"},  # KNIX is label_a
+                                                        labels=['label_b'],
+                                                        labels_constraint='Any'))
+
+        # make sure some generic routes are not accessible
+        self.assert_is_forbidden(lambda: o.get_json('patients?expand'))
+        self.assert_is_forbidden(lambda: o.get_json('studies?expand'))
+        self.assert_is_forbidden(lambda: o.get_json('series?expand'))
+        self.assert_is_forbidden(lambda: o.get_json('instances?expand'))
+        self.assert_is_forbidden(lambda: o.get_json('studies'))
+        self.assert_is_forbidden(lambda: o.get_json('studies/'))
+
+
+
+    def test_resource_token(self):
+
+        o = OrthancApiClient(self.o._root_url, headers={"resource-token-key": "token-knix-study"})
+
+        # with a resource token, we can access only the given resource, not generic resources or resources from other studies
+
+        # generic resources are forbidden
+        self.assert_is_forbidden(lambda: o.studies.find(query={"PatientName": "KNIX"},  # KNIX is label_a
+                                                        labels=['label_b'],
+                                                        labels_constraint='Any'))
+        self.assert_is_forbidden(lambda: o.get_all_labels())
+        self.assert_is_forbidden(lambda: o.studies.get_all_ids())
+        self.assert_is_forbidden(lambda: o.patients.get_all_ids())
+        self.assert_is_forbidden(lambda: o.series.get_all_ids())
+        self.assert_is_forbidden(lambda: o.instances.get_all_ids())
+        self.assert_is_forbidden(lambda: o.get_json('patients?expand'))
+        self.assert_is_forbidden(lambda: o.get_json('studies?expand'))
+        self.assert_is_forbidden(lambda: o.get_json('series?expand'))
+        self.assert_is_forbidden(lambda: o.get_json('instances?expand'))
+        
+        # some resources are still accessible to the 'anonymous' user  -> does not throw
+        o.get_system()
+        o.lookup("1.2.3")   # this route is still explicitely authorized because it is used by Stone
+
+        # other studies are forbidden
+        self.assert_is_forbidden(lambda: o.studies.get_series_ids(self.label_b_study_id))
+
+        # the label_a study is allowed
+        o.studies.get_series_ids(self.label_a_study_id)
+
+        # TODO: test with DicomWEB routes + sub-routes
\ No newline at end of file
--- a/NewTests/README	Wed Sep 06 17:04:36 2023 +0200
+++ b/NewTests/README	Fri Sep 08 12:03:50 2023 +0200
@@ -144,3 +144,10 @@
                          --plugin=/home/alain/o/build/orthanc-dicomweb/libOrthancDicomWeb.so \
                          --plugin=/home/alain/o/build/orthanc-authorization/libOrthancAuthorization.so \
                          --break_before_preparation
+
+with Docker:
+
+python3 NewTests/main.py --pattern=Authorization.test_authorization.TestAuthorization. \
+                         --orthanc_under_tests_docker_image=osimis/orthanc:current \
+                         --orthanc_previous_version_docker_image=osimis/orthanc:22.4.0 \
+                         --orthanc_under_tests_http_port=8043
--- a/NewTests/helpers.py	Wed Sep 06 17:04:36 2023 +0200
+++ b/NewTests/helpers.py	Fri Sep 08 12:03:50 2023 +0200
@@ -82,7 +82,7 @@
     def tearDownClass(cls):
         if not Helpers.break_after_preparation:
             cls.kill_orthanc()
-        cls._terminate()
+        cls.terminate()
 
     @classmethod
     def prepare(cls):
@@ -141,6 +141,12 @@
             subprocess.run(["docker", "volume", "rm", "-f", storage_name])
 
     @classmethod
+    def create_docker_network(cls, network: str):
+        if Helpers.is_docker():
+            subprocess.run(["docker", "network", "rm", network])      # ignore error
+            subprocess.run(["docker", "network", "create", network])
+
+    @classmethod
     def launch_orthanc_to_prepare_db(cls, config_name: str = None, config: object = None, config_path: str = None, storage_name: str = None, plugins = []):
         if config_name and storage_name and config:
             # generate the configuration file
@@ -170,7 +176,7 @@
             raise RuntimeError("Invalid configuration, can not launch Orthanc")
 
     @classmethod
-    def launch_orthanc_under_tests(cls, config_name: str = None, config: object = None, config_path: str = None, storage_name: str = None, plugins = []):
+    def launch_orthanc_under_tests(cls, config_name: str = None, config: object = None, config_path: str = None, storage_name: str = None, plugins = [], docker_network: str = None):
         if config_name and storage_name and config:
             # generate the configuration file
             config_path = cls.generate_configuration(
@@ -193,7 +199,8 @@
                 docker_image=Helpers.orthanc_under_tests_docker_image,
                 storage_name=storage_name,
                 config_name=config_name,
-                config_path=config_path
+                config_path=config_path,
+                network=docker_network
             )
         else:
             raise RuntimeError("Invalid configuration, can not launch Orthanc")
@@ -214,7 +221,7 @@
                 raise RuntimeError(f"Orthanc failed to start '{exe_path}', conf = '{config_path}'.  Check output above")
 
     @classmethod
-    def launch_orthanc_docker(cls, docker_image: str, storage_name: str, config_path: str, config_name: str):
+    def launch_orthanc_docker(cls, docker_image: str, storage_name: str, config_path: str, config_name: str, network: str = None):
             storage_path = cls.get_storage_path(storage_name=storage_name)
 
             cmd = [
@@ -225,9 +232,12 @@
                     "-v", f"{storage_path}:/var/lib/orthanc/db/",
                     "--name", config_name,
                     "-p", f"{Helpers.orthanc_under_tests_http_port}:{Helpers.orthanc_under_tests_http_port}",
-                    "-p", f"{Helpers.orthanc_under_tests_dicom_port}:{Helpers.orthanc_under_tests_dicom_port}",
-                    docker_image
+                    "-p", f"{Helpers.orthanc_under_tests_dicom_port}:{Helpers.orthanc_under_tests_dicom_port}"
                 ]
+            if network:
+                cmd.extend(["--network", network])
+            cmd.append(docker_image)
+
             cls._orthanc_container_name = config_name
             print("docker cmd line: " + " ".join(cmd))