Mercurial > hg > orthanc-tests
diff NewTests/Concurrency/test_concurrency.py @ 592:6753d96dd71f
new concurrency tests
author | Alain Mazy <am@osimis.io> |
---|---|
date | Tue, 12 Dec 2023 15:31:45 +0100 |
parents | |
children | f4f0e2d24a51 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/NewTests/Concurrency/test_concurrency.py Tue Dec 12 15:31:45 2023 +0100 @@ -0,0 +1,263 @@ +import unittest +import time +import os +import threading +from helpers import OrthancTestCase, Helpers + +from orthanc_api_client import OrthancApiClient, generate_test_dicom_file +from orthanc_tools import OrthancTestDbPopulator + +import pathlib +import subprocess +import glob +here = pathlib.Path(__file__).parent.resolve() + + +def worker_upload_folder(orthanc_root_url: str, folder: str, repeat: int, worker_id: int): + o = OrthancApiClient(orthanc_root_url) + for i in range(0, repeat): + o.upload_folder(folder, ignore_errors=True) + +def worker_anonymize_study(orthanc_root_url: str, study_id: str, repeat: int, worker_id: int): + o = OrthancApiClient(orthanc_root_url) + + for i in range(0, repeat): + o.studies.anonymize(orthanc_id=study_id, delete_original=False) + +def worker_upload_delete_study_part(orthanc_root_url: str, folder: str, repeat: int, workers_count: int, worker_id: int): + o = OrthancApiClient(orthanc_root_url) + + all_files = glob.glob(os.path.join(folder, '*.dcm')) + + for i in range(0, repeat): + instances_ids = [] + + for i in range(0, len(all_files)): + if i % workers_count == worker_id: + instances_ids.extend(o.upload_file(all_files[i])) + + for instance_id in instances_ids: + o.instances.delete(orthanc_id=instance_id) + + +class TestConcurrency(OrthancTestCase): + + @classmethod + def terminate(cls): + + if Helpers.is_docker(): + subprocess.run(["docker", "rm", "-f", "pg-server"]) + else: + cls.pg_service_process.terminate() + + + @classmethod + def prepare(cls): + test_name = "Concurrency" + cls._storage_name = "concurrency" + + print(f'-------------- preparing {test_name} tests') + + cls.clear_storage(storage_name=cls._storage_name) + + pg_hostname = "localhost" + if Helpers.is_docker(): + pg_hostname = "pg-server" + cls.create_docker_network("concurrency") + + config = { + "PostgreSQL" : { + "EnableStorage": False, + "EnableIndex": True, + "Host": pg_hostname, + "Port": 5432, + "Database": "postgres", + "Username": "postgres", + "Password": "postgres", + "IndexConnectionsCount": 10, + "MaximumConnectionRetries" : 2000, + "ConnectionRetryInterval" : 5, + "TransactionMode": "READ COMMITTED", + #"TransactionMode": "SERIALIZABLE", + "EnableVerboseLogs": True + }, + "AuthenticationEnabled": False, + "OverwriteInstances": True, + "JobsEngineThreadsCount" : { + "ResourceModification": 8 + }, + } + + config_path = cls.generate_configuration( + config_name=f"{test_name}", + storage_name=cls._storage_name, + config=config, + plugins=Helpers.plugins + ) + + # launch the docker PG server + print('--------------- launching PostgreSQL server ------------------') + + cls.pg_service_process = subprocess.Popen([ + "docker", "run", "--rm", + "-p", "5432:5432", + "--network", "concurrency", + "--name", "pg-server", + "--env", "POSTGRES_HOST_AUTH_METHOD=trust", + "postgres:15"]) + time.sleep(5) + + if Helpers.break_before_preparation: + print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++") + input("Press Enter to continue") + else: + cls.launch_orthanc_under_tests( + config_name=f"{test_name}", + storage_name=cls._storage_name, + config=config, + plugins=Helpers.plugins, + docker_network="concurrency" + ) + + cls.o = OrthancApiClient(cls.o._root_url) + cls.o.wait_started() + cls.o.delete_all_content() + + + def execute_workers(self, worker_func, worker_args, workers_count): + workers = [] + for i in range(0, workers_count): + t = threading.Thread(target=worker_func, args=worker_args + (i, )) + workers.append(t) + t.start() + + for t in workers: + t.join() + + def test_concurrent_uploads_same_study(self): + self.o.delete_all_content() + self.clear_storage(storage_name=self._storage_name) + + start_time = time.time() + workers_count = 20 + repeat_count = 1 + + # massively reupload the same study multiple times with OverwriteInstances set to true + # Make sure the studies, series and instances are created only once + self.execute_workers( + worker_func=worker_upload_folder, + worker_args=(self.o._root_url, here / "../../Database/Knee", repeat_count,), + workers_count=workers_count) + + elapsed = time.time() - start_time + print(f"test_concurrent_uploads_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") + + self.assertTrue(self.o.is_alive()) + + self.assertEqual(1, len(self.o.studies.get_all_ids())) + self.assertEqual(2, len(self.o.series.get_all_ids())) + self.assertEqual(50, len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(1, stats.get("CountPatients")) + self.assertEqual(1, stats.get("CountStudies")) + self.assertEqual(2, stats.get("CountSeries")) + self.assertEqual(50, stats.get("CountInstances")) + self.assertEqual(4118738, int(stats.get("TotalDiskSize"))) + + self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids()) + + self.assertEqual(0, len(self.o.studies.get_all_ids())) + self.assertEqual(0, len(self.o.series.get_all_ids())) + self.assertEqual(0, len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(0, stats.get("CountPatients")) + self.assertEqual(0, stats.get("CountStudies")) + self.assertEqual(0, stats.get("CountSeries")) + self.assertEqual(0, stats.get("CountInstances")) + self.assertEqual(0, int(stats.get("TotalDiskSize"))) + self.assertTrue(self.is_storage_empty(self._storage_name)) + + + def test_concurrent_anonymize_same_study(self): + self.o.delete_all_content() + self.clear_storage(storage_name=self._storage_name) + + self.o.upload_folder(here / "../../Database/Knee") + study_id = self.o.studies.get_all_ids()[0] + + start_time = time.time() + workers_count = 4 + repeat_count = 10 + + # massively anonymize the same study. This generates new studies and is a + # good way to simulate ingestion of new studies + self.execute_workers( + worker_func=worker_anonymize_study, + worker_args=(self.o._root_url, study_id, repeat_count,), + workers_count=workers_count) + + elapsed = time.time() - start_time + print(f"test_concurrent_anonymize_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") + + self.assertTrue(self.o.is_alive()) + + self.assertEqual(1 + workers_count * repeat_count, len(self.o.studies.get_all_ids())) + self.assertEqual(2 * (1 + workers_count * repeat_count), len(self.o.series.get_all_ids())) + self.assertEqual(50 * (1 + workers_count * repeat_count), len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(1 + workers_count * repeat_count, stats.get("CountPatients")) + self.assertEqual(1 + workers_count * repeat_count, stats.get("CountStudies")) + self.assertEqual(2 * (1 + workers_count * repeat_count), stats.get("CountSeries")) + self.assertEqual(50 * (1 + workers_count * repeat_count), stats.get("CountInstances")) + + self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids()) + + self.assertEqual(0, len(self.o.studies.get_all_ids())) + self.assertEqual(0, len(self.o.series.get_all_ids())) + self.assertEqual(0, len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(0, stats.get("CountPatients")) + self.assertEqual(0, stats.get("CountStudies")) + self.assertEqual(0, stats.get("CountSeries")) + self.assertEqual(0, stats.get("CountInstances")) + self.assertEqual(0, int(stats.get("TotalDiskSize"))) + self.assertTrue(self.is_storage_empty(self._storage_name)) + + + def test_upload_delete_same_study_from_multiple_threads(self): + self.o.delete_all_content() + self.clear_storage(storage_name=self._storage_name) + + start_time = time.time() + workers_count = 5 + repeat_count = 30 + + # massively upload and delete the same study. Each worker is writing a part of the instances and deleting them. + # We are trying to have multiple workers deleting the last instance of a study at the same time. + self.execute_workers( + worker_func=worker_upload_delete_study_part, + worker_args=(self.o._root_url, here / "../../Database/Knee/T1", repeat_count, workers_count, ), + workers_count=workers_count) + + elapsed = time.time() - start_time + print(f"test_upload_delete_same_study_from_multiple_threads with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") + + self.assertTrue(self.o.is_alive()) + + self.assertEqual(0, len(self.o.studies.get_all_ids())) + self.assertEqual(0, len(self.o.series.get_all_ids())) + self.assertEqual(0, len(self.o.instances.get_all_ids())) + + stats = self.o.get_json("/statistics") + self.assertEqual(0, stats.get("CountPatients")) + self.assertEqual(0, stats.get("CountStudies")) + self.assertEqual(0, stats.get("CountSeries")) + self.assertEqual(0, stats.get("CountInstances")) + self.assertEqual(0, int(stats.get("TotalDiskSize"))) + self.assertTrue(self.is_storage_empty(self._storage_name)) + + # transfers + dicomweb \ No newline at end of file