# HG changeset patch # User Alain Mazy # Date 1702391505 -3600 # Node ID 6753d96dd71f826d5495de910ca4e4058773208c # Parent 3cb7c6162c77f7eec03de32ffd37bb7c8fea190f new concurrency tests diff -r 3cb7c6162c77 -r 6753d96dd71f NewTests/Concurrency/__init__.py diff -r 3cb7c6162c77 -r 6753d96dd71f NewTests/Concurrency/test_concurrency.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/NewTests/Concurrency/test_concurrency.py Tue Dec 12 15:31:45 2023 +0100 @@ -0,0 +1,263 @@ +import unittest +import time +import os +import threading +from helpers import OrthancTestCase, Helpers + +from orthanc_api_client import OrthancApiClient, generate_test_dicom_file +from orthanc_tools import OrthancTestDbPopulator + +import pathlib +import subprocess +import glob +here = pathlib.Path(__file__).parent.resolve() + + +def worker_upload_folder(orthanc_root_url: str, folder: str, repeat: int, worker_id: int): + o = OrthancApiClient(orthanc_root_url) + for i in range(0, repeat): + o.upload_folder(folder, ignore_errors=True) + +def worker_anonymize_study(orthanc_root_url: str, study_id: str, repeat: int, worker_id: int): + o = OrthancApiClient(orthanc_root_url) + + for i in range(0, repeat): + o.studies.anonymize(orthanc_id=study_id, delete_original=False) + +def worker_upload_delete_study_part(orthanc_root_url: str, folder: str, repeat: int, workers_count: int, worker_id: int): + o = OrthancApiClient(orthanc_root_url) + + all_files = glob.glob(os.path.join(folder, '*.dcm')) + + for i in range(0, repeat): + instances_ids = [] + + for i in range(0, len(all_files)): + if i % workers_count == worker_id: + instances_ids.extend(o.upload_file(all_files[i])) + + for instance_id in instances_ids: + o.instances.delete(orthanc_id=instance_id) + + +class TestConcurrency(OrthancTestCase): + + @classmethod + def terminate(cls): + + if Helpers.is_docker(): + subprocess.run(["docker", "rm", "-f", "pg-server"]) + else: + cls.pg_service_process.terminate() + + + @classmethod + def prepare(cls): + test_name = "Concurrency" + cls._storage_name = "concurrency" + + print(f'-------------- preparing {test_name} tests') + + cls.clear_storage(storage_name=cls._storage_name) + + pg_hostname = "localhost" + if Helpers.is_docker(): + pg_hostname = "pg-server" + cls.create_docker_network("concurrency") + + config = { + "PostgreSQL" : { + "EnableStorage": False, + "EnableIndex": True, + "Host": pg_hostname, + "Port": 5432, + "Database": "postgres", + "Username": "postgres", + "Password": "postgres", + "IndexConnectionsCount": 10, + "MaximumConnectionRetries" : 2000, + "ConnectionRetryInterval" : 5, + "TransactionMode": "READ COMMITTED", + #"TransactionMode": "SERIALIZABLE", + "EnableVerboseLogs": True + }, + "AuthenticationEnabled": False, + "OverwriteInstances": True, + "JobsEngineThreadsCount" : { + "ResourceModification": 8 + }, + } + + config_path = cls.generate_configuration( + config_name=f"{test_name}", + storage_name=cls._storage_name, + config=config, + plugins=Helpers.plugins + ) + + # launch the docker PG server + print('--------------- launching PostgreSQL server ------------------') + + cls.pg_service_process = subprocess.Popen([ + "docker", "run", "--rm", + "-p", "5432:5432", + "--network", "concurrency", + "--name", "pg-server", + "--env", "POSTGRES_HOST_AUTH_METHOD=trust", + "postgres:15"]) + time.sleep(5) + + if Helpers.break_before_preparation: + print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++") + input("Press Enter to continue") + else: + cls.launch_orthanc_under_tests( + config_name=f"{test_name}", + storage_name=cls._storage_name, + config=config, + plugins=Helpers.plugins, + docker_network="concurrency" + ) + + cls.o = OrthancApiClient(cls.o._root_url) + cls.o.wait_started() + cls.o.delete_all_content() + + + def execute_workers(self, worker_func, worker_args, workers_count): + workers = [] + for i in range(0, workers_count): + t = threading.Thread(target=worker_func, args=worker_args + (i, )) + workers.append(t) + t.start() + + for t in workers: + t.join() + + def test_concurrent_uploads_same_study(self): + self.o.delete_all_content() + self.clear_storage(storage_name=self._storage_name) + + start_time = time.time() + workers_count = 20 + repeat_count = 1 + + # massively reupload the same study multiple times with OverwriteInstances set to true + # Make sure the studies, series and instances are created only once + self.execute_workers( + worker_func=worker_upload_folder, + worker_args=(self.o._root_url, here / "../../Database/Knee", repeat_count,), + workers_count=workers_count) + + elapsed = time.time() - start_time + print(f"test_concurrent_uploads_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") + + self.assertTrue(self.o.is_alive()) + + self.assertEqual(1, len(self.o.studies.get_all_ids())) + self.assertEqual(2, len(self.o.series.get_all_ids())) + self.assertEqual(50, len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(1, stats.get("CountPatients")) + self.assertEqual(1, stats.get("CountStudies")) + self.assertEqual(2, stats.get("CountSeries")) + self.assertEqual(50, stats.get("CountInstances")) + self.assertEqual(4118738, int(stats.get("TotalDiskSize"))) + + self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids()) + + self.assertEqual(0, len(self.o.studies.get_all_ids())) + self.assertEqual(0, len(self.o.series.get_all_ids())) + self.assertEqual(0, len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(0, stats.get("CountPatients")) + self.assertEqual(0, stats.get("CountStudies")) + self.assertEqual(0, stats.get("CountSeries")) + self.assertEqual(0, stats.get("CountInstances")) + self.assertEqual(0, int(stats.get("TotalDiskSize"))) + self.assertTrue(self.is_storage_empty(self._storage_name)) + + + def test_concurrent_anonymize_same_study(self): + self.o.delete_all_content() + self.clear_storage(storage_name=self._storage_name) + + self.o.upload_folder(here / "../../Database/Knee") + study_id = self.o.studies.get_all_ids()[0] + + start_time = time.time() + workers_count = 4 + repeat_count = 10 + + # massively anonymize the same study. This generates new studies and is a + # good way to simulate ingestion of new studies + self.execute_workers( + worker_func=worker_anonymize_study, + worker_args=(self.o._root_url, study_id, repeat_count,), + workers_count=workers_count) + + elapsed = time.time() - start_time + print(f"test_concurrent_anonymize_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") + + self.assertTrue(self.o.is_alive()) + + self.assertEqual(1 + workers_count * repeat_count, len(self.o.studies.get_all_ids())) + self.assertEqual(2 * (1 + workers_count * repeat_count), len(self.o.series.get_all_ids())) + self.assertEqual(50 * (1 + workers_count * repeat_count), len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(1 + workers_count * repeat_count, stats.get("CountPatients")) + self.assertEqual(1 + workers_count * repeat_count, stats.get("CountStudies")) + self.assertEqual(2 * (1 + workers_count * repeat_count), stats.get("CountSeries")) + self.assertEqual(50 * (1 + workers_count * repeat_count), stats.get("CountInstances")) + + self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids()) + + self.assertEqual(0, len(self.o.studies.get_all_ids())) + self.assertEqual(0, len(self.o.series.get_all_ids())) + self.assertEqual(0, len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(0, stats.get("CountPatients")) + self.assertEqual(0, stats.get("CountStudies")) + self.assertEqual(0, stats.get("CountSeries")) + self.assertEqual(0, stats.get("CountInstances")) + self.assertEqual(0, int(stats.get("TotalDiskSize"))) + self.assertTrue(self.is_storage_empty(self._storage_name)) + + + def test_upload_delete_same_study_from_multiple_threads(self): + self.o.delete_all_content() + self.clear_storage(storage_name=self._storage_name) + + start_time = time.time() + workers_count = 5 + repeat_count = 30 + + # massively upload and delete the same study. Each worker is writing a part of the instances and deleting them. + # We are trying to have multiple workers deleting the last instance of a study at the same time. + self.execute_workers( + worker_func=worker_upload_delete_study_part, + worker_args=(self.o._root_url, here / "../../Database/Knee/T1", repeat_count, workers_count, ), + workers_count=workers_count) + + elapsed = time.time() - start_time + print(f"test_upload_delete_same_study_from_multiple_threads with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") + + self.assertTrue(self.o.is_alive()) + + self.assertEqual(0, len(self.o.studies.get_all_ids())) + self.assertEqual(0, len(self.o.series.get_all_ids())) + self.assertEqual(0, len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(0, stats.get("CountPatients")) + self.assertEqual(0, stats.get("CountStudies")) + self.assertEqual(0, stats.get("CountSeries")) + self.assertEqual(0, stats.get("CountInstances")) + self.assertEqual(0, int(stats.get("TotalDiskSize"))) + self.assertTrue(self.is_storage_empty(self._storage_name)) + + # transfers + dicomweb \ No newline at end of file diff -r 3cb7c6162c77 -r 6753d96dd71f NewTests/README --- a/NewTests/README Tue Dec 12 10:14:37 2023 +0100 +++ b/NewTests/README Tue Dec 12 15:31:45 2023 +0100 @@ -151,3 +151,22 @@ --orthanc_under_tests_docker_image=osimis/orthanc:current \ --orthanc_previous_version_docker_image=osimis/orthanc:22.4.0 \ --orthanc_under_tests_http_port=8043 + + +Concurrency: +----------- + +Run the Concurrency tests with your locally build version and break before execution to allow you to start your debugger. + +python3 NewTests/main.py --pattern=Concurrency.test_concurrency.TestConcurrency.* \ + --orthanc_under_tests_exe=/home/alain/o/build/orthanc/Orthanc \ + --orthanc_under_tests_http_port=8043 \ + --plugin=/home/alain/o/build/orthanc-dicomweb/libOrthancDicomWeb.so \ + --plugin=/home/alain/o/build/pg/libOrthancPostgreSQLIndex.so \ + --break_before_preparation + +with Docker: + +python3 NewTests/main.py --pattern=Concurrency.test_concurrency.TestConcurrency.* \ + --orthanc_under_tests_docker_image=osimis/orthanc:current \ + --orthanc_under_tests_http_port=8043 diff -r 3cb7c6162c77 -r 6753d96dd71f NewTests/helpers.py --- a/NewTests/helpers.py Tue Dec 12 10:14:37 2023 +0100 +++ b/NewTests/helpers.py Tue Dec 12 15:31:45 2023 +0100 @@ -5,6 +5,7 @@ import os import typing import shutil +import glob from threading import Thread @@ -71,6 +72,7 @@ _orthanc_container_name = None _orthanc_is_running = False _orthanc_logger_thread = None + _show_orthanc_output = False @classmethod def setUpClass(cls): @@ -134,11 +136,13 @@ @classmethod def clear_storage(cls, storage_name: str): - if Helpers.is_exe(): - storage_path = cls.get_storage_path(storage_name=storage_name) - shutil.rmtree(storage_path, ignore_errors=True) - elif Helpers.is_docker(): - subprocess.run(["docker", "volume", "rm", "-f", storage_name]) + storage_path = cls.get_storage_path(storage_name=storage_name) + shutil.rmtree(storage_path, ignore_errors=True) + + @classmethod + def is_storage_empty(cls, storage_name: str): + storage_path = cls.get_storage_path(storage_name=storage_name) + return len(glob.glob(os.path.join(storage_path, '*'))) == 0 @classmethod def create_docker_network(cls, network: str): @@ -176,7 +180,9 @@ raise RuntimeError("Invalid configuration, can not launch Orthanc") @classmethod - def launch_orthanc_under_tests(cls, config_name: str = None, config: object = None, config_path: str = None, storage_name: str = None, plugins = [], docker_network: str = None): + def launch_orthanc_under_tests(cls, config_name: str = None, config: object = None, config_path: str = None, storage_name: str = None, plugins = [], docker_network: str = None, enable_verbose: bool = False, show_orthanc_output: bool = False): + cls._show_orthanc_output = show_orthanc_output + if config_name and storage_name and config: # generate the configuration file config_path = cls.generate_configuration( @@ -192,7 +198,8 @@ if Helpers.orthanc_under_tests_exe: cls.launch_orthanc_exe( exe_path=Helpers.orthanc_under_tests_exe, - config_path=config_path + config_path=config_path, + enable_verbose=enable_verbose ) elif Helpers.orthanc_under_tests_docker_image: cls.launch_orthanc_docker( @@ -200,15 +207,21 @@ storage_name=storage_name, config_name=config_name, config_path=config_path, - network=docker_network + network=docker_network, + enable_verbose=enable_verbose ) else: raise RuntimeError("Invalid configuration, can not launch Orthanc") @classmethod - def launch_orthanc_exe(cls, exe_path: str, config_path: str): + def launch_orthanc_exe(cls, exe_path: str, config_path: str, enable_verbose: bool = False, show_orthanc_output: bool = False): + if enable_verbose: + cmd = [exe_path, "--verbose", config_path] + else: + cmd = [exe_path, config_path] + cls._orthanc_process = subprocess.Popen( - [exe_path, "--verbose", config_path], + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) @@ -221,13 +234,13 @@ raise RuntimeError(f"Orthanc failed to start '{exe_path}', conf = '{config_path}'. Check output above") @classmethod - def launch_orthanc_docker(cls, docker_image: str, storage_name: str, config_path: str, config_name: str, network: str = None): + def launch_orthanc_docker(cls, docker_image: str, storage_name: str, config_path: str, config_name: str, network: str = None, enable_verbose: bool = False): storage_path = cls.get_storage_path(storage_name=storage_name) cmd = [ "docker", "run", "--rm", - "-e", "VERBOSE_ENABLED=true", - "-e", "VERBOSE_STARTUP=true", + "-e", "VERBOSE_ENABLED=true" if enable_verbose else "VERBOSE_ENABLED=false", + "-e", "VERBOSE_STARTUP=true" if enable_verbose else "VERBOSE_STARTUP=false", "-v", f"{config_path}:/etc/orthanc/orthanc.json", "-v", f"{storage_path}:/var/lib/orthanc/db/", "--name", config_name, @@ -264,8 +277,11 @@ return else: subprocess.run(["docker", "stop", cls._orthanc_container_name]) - output = cls.get_orthanc_process_output() - print("Orthanc output\n" + output) + + if cls._show_orthanc_output: + output = cls.get_orthanc_process_output() + print("Orthanc output\n" + output) + cls._orthanc_process = None @classmethod