Mercurial > hg > orthanc-tests
comparison NewTests/Concurrency/test_concurrency.py @ 592:6753d96dd71f
new concurrency tests
author | Alain Mazy <am@osimis.io> |
---|---|
date | Tue, 12 Dec 2023 15:31:45 +0100 |
parents | |
children | f4f0e2d24a51 |
comparison
equal
deleted
inserted
replaced
591:3cb7c6162c77 | 592:6753d96dd71f |
---|---|
1 import unittest | |
2 import time | |
3 import os | |
4 import threading | |
5 from helpers import OrthancTestCase, Helpers | |
6 | |
7 from orthanc_api_client import OrthancApiClient, generate_test_dicom_file | |
8 from orthanc_tools import OrthancTestDbPopulator | |
9 | |
10 import pathlib | |
11 import subprocess | |
12 import glob | |
13 here = pathlib.Path(__file__).parent.resolve() | |
14 | |
15 | |
16 def worker_upload_folder(orthanc_root_url: str, folder: str, repeat: int, worker_id: int): | |
17 o = OrthancApiClient(orthanc_root_url) | |
18 for i in range(0, repeat): | |
19 o.upload_folder(folder, ignore_errors=True) | |
20 | |
21 def worker_anonymize_study(orthanc_root_url: str, study_id: str, repeat: int, worker_id: int): | |
22 o = OrthancApiClient(orthanc_root_url) | |
23 | |
24 for i in range(0, repeat): | |
25 o.studies.anonymize(orthanc_id=study_id, delete_original=False) | |
26 | |
27 def worker_upload_delete_study_part(orthanc_root_url: str, folder: str, repeat: int, workers_count: int, worker_id: int): | |
28 o = OrthancApiClient(orthanc_root_url) | |
29 | |
30 all_files = glob.glob(os.path.join(folder, '*.dcm')) | |
31 | |
32 for i in range(0, repeat): | |
33 instances_ids = [] | |
34 | |
35 for i in range(0, len(all_files)): | |
36 if i % workers_count == worker_id: | |
37 instances_ids.extend(o.upload_file(all_files[i])) | |
38 | |
39 for instance_id in instances_ids: | |
40 o.instances.delete(orthanc_id=instance_id) | |
41 | |
42 | |
43 class TestConcurrency(OrthancTestCase): | |
44 | |
45 @classmethod | |
46 def terminate(cls): | |
47 | |
48 if Helpers.is_docker(): | |
49 subprocess.run(["docker", "rm", "-f", "pg-server"]) | |
50 else: | |
51 cls.pg_service_process.terminate() | |
52 | |
53 | |
54 @classmethod | |
55 def prepare(cls): | |
56 test_name = "Concurrency" | |
57 cls._storage_name = "concurrency" | |
58 | |
59 print(f'-------------- preparing {test_name} tests') | |
60 | |
61 cls.clear_storage(storage_name=cls._storage_name) | |
62 | |
63 pg_hostname = "localhost" | |
64 if Helpers.is_docker(): | |
65 pg_hostname = "pg-server" | |
66 cls.create_docker_network("concurrency") | |
67 | |
68 config = { | |
69 "PostgreSQL" : { | |
70 "EnableStorage": False, | |
71 "EnableIndex": True, | |
72 "Host": pg_hostname, | |
73 "Port": 5432, | |
74 "Database": "postgres", | |
75 "Username": "postgres", | |
76 "Password": "postgres", | |
77 "IndexConnectionsCount": 10, | |
78 "MaximumConnectionRetries" : 2000, | |
79 "ConnectionRetryInterval" : 5, | |
80 "TransactionMode": "READ COMMITTED", | |
81 #"TransactionMode": "SERIALIZABLE", | |
82 "EnableVerboseLogs": True | |
83 }, | |
84 "AuthenticationEnabled": False, | |
85 "OverwriteInstances": True, | |
86 "JobsEngineThreadsCount" : { | |
87 "ResourceModification": 8 | |
88 }, | |
89 } | |
90 | |
91 config_path = cls.generate_configuration( | |
92 config_name=f"{test_name}", | |
93 storage_name=cls._storage_name, | |
94 config=config, | |
95 plugins=Helpers.plugins | |
96 ) | |
97 | |
98 # launch the docker PG server | |
99 print('--------------- launching PostgreSQL server ------------------') | |
100 | |
101 cls.pg_service_process = subprocess.Popen([ | |
102 "docker", "run", "--rm", | |
103 "-p", "5432:5432", | |
104 "--network", "concurrency", | |
105 "--name", "pg-server", | |
106 "--env", "POSTGRES_HOST_AUTH_METHOD=trust", | |
107 "postgres:15"]) | |
108 time.sleep(5) | |
109 | |
110 if Helpers.break_before_preparation: | |
111 print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++") | |
112 input("Press Enter to continue") | |
113 else: | |
114 cls.launch_orthanc_under_tests( | |
115 config_name=f"{test_name}", | |
116 storage_name=cls._storage_name, | |
117 config=config, | |
118 plugins=Helpers.plugins, | |
119 docker_network="concurrency" | |
120 ) | |
121 | |
122 cls.o = OrthancApiClient(cls.o._root_url) | |
123 cls.o.wait_started() | |
124 cls.o.delete_all_content() | |
125 | |
126 | |
127 def execute_workers(self, worker_func, worker_args, workers_count): | |
128 workers = [] | |
129 for i in range(0, workers_count): | |
130 t = threading.Thread(target=worker_func, args=worker_args + (i, )) | |
131 workers.append(t) | |
132 t.start() | |
133 | |
134 for t in workers: | |
135 t.join() | |
136 | |
137 def test_concurrent_uploads_same_study(self): | |
138 self.o.delete_all_content() | |
139 self.clear_storage(storage_name=self._storage_name) | |
140 | |
141 start_time = time.time() | |
142 workers_count = 20 | |
143 repeat_count = 1 | |
144 | |
145 # massively reupload the same study multiple times with OverwriteInstances set to true | |
146 # Make sure the studies, series and instances are created only once | |
147 self.execute_workers( | |
148 worker_func=worker_upload_folder, | |
149 worker_args=(self.o._root_url, here / "../../Database/Knee", repeat_count,), | |
150 workers_count=workers_count) | |
151 | |
152 elapsed = time.time() - start_time | |
153 print(f"test_concurrent_uploads_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") | |
154 | |
155 self.assertTrue(self.o.is_alive()) | |
156 | |
157 self.assertEqual(1, len(self.o.studies.get_all_ids())) | |
158 self.assertEqual(2, len(self.o.series.get_all_ids())) | |
159 self.assertEqual(50, len(self.o.instances.get_all_ids())) | |
160 | |
161 stats = self.o.get_json("/statistics") | |
162 self.assertEqual(1, stats.get("CountPatients")) | |
163 self.assertEqual(1, stats.get("CountStudies")) | |
164 self.assertEqual(2, stats.get("CountSeries")) | |
165 self.assertEqual(50, stats.get("CountInstances")) | |
166 self.assertEqual(4118738, int(stats.get("TotalDiskSize"))) | |
167 | |
168 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids()) | |
169 | |
170 self.assertEqual(0, len(self.o.studies.get_all_ids())) | |
171 self.assertEqual(0, len(self.o.series.get_all_ids())) | |
172 self.assertEqual(0, len(self.o.instances.get_all_ids())) | |
173 | |
174 stats = self.o.get_json("/statistics") | |
175 self.assertEqual(0, stats.get("CountPatients")) | |
176 self.assertEqual(0, stats.get("CountStudies")) | |
177 self.assertEqual(0, stats.get("CountSeries")) | |
178 self.assertEqual(0, stats.get("CountInstances")) | |
179 self.assertEqual(0, int(stats.get("TotalDiskSize"))) | |
180 self.assertTrue(self.is_storage_empty(self._storage_name)) | |
181 | |
182 | |
183 def test_concurrent_anonymize_same_study(self): | |
184 self.o.delete_all_content() | |
185 self.clear_storage(storage_name=self._storage_name) | |
186 | |
187 self.o.upload_folder(here / "../../Database/Knee") | |
188 study_id = self.o.studies.get_all_ids()[0] | |
189 | |
190 start_time = time.time() | |
191 workers_count = 4 | |
192 repeat_count = 10 | |
193 | |
194 # massively anonymize the same study. This generates new studies and is a | |
195 # good way to simulate ingestion of new studies | |
196 self.execute_workers( | |
197 worker_func=worker_anonymize_study, | |
198 worker_args=(self.o._root_url, study_id, repeat_count,), | |
199 workers_count=workers_count) | |
200 | |
201 elapsed = time.time() - start_time | |
202 print(f"test_concurrent_anonymize_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") | |
203 | |
204 self.assertTrue(self.o.is_alive()) | |
205 | |
206 self.assertEqual(1 + workers_count * repeat_count, len(self.o.studies.get_all_ids())) | |
207 self.assertEqual(2 * (1 + workers_count * repeat_count), len(self.o.series.get_all_ids())) | |
208 self.assertEqual(50 * (1 + workers_count * repeat_count), len(self.o.instances.get_all_ids())) | |
209 | |
210 stats = self.o.get_json("/statistics") | |
211 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountPatients")) | |
212 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountStudies")) | |
213 self.assertEqual(2 * (1 + workers_count * repeat_count), stats.get("CountSeries")) | |
214 self.assertEqual(50 * (1 + workers_count * repeat_count), stats.get("CountInstances")) | |
215 | |
216 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids()) | |
217 | |
218 self.assertEqual(0, len(self.o.studies.get_all_ids())) | |
219 self.assertEqual(0, len(self.o.series.get_all_ids())) | |
220 self.assertEqual(0, len(self.o.instances.get_all_ids())) | |
221 | |
222 stats = self.o.get_json("/statistics") | |
223 self.assertEqual(0, stats.get("CountPatients")) | |
224 self.assertEqual(0, stats.get("CountStudies")) | |
225 self.assertEqual(0, stats.get("CountSeries")) | |
226 self.assertEqual(0, stats.get("CountInstances")) | |
227 self.assertEqual(0, int(stats.get("TotalDiskSize"))) | |
228 self.assertTrue(self.is_storage_empty(self._storage_name)) | |
229 | |
230 | |
231 def test_upload_delete_same_study_from_multiple_threads(self): | |
232 self.o.delete_all_content() | |
233 self.clear_storage(storage_name=self._storage_name) | |
234 | |
235 start_time = time.time() | |
236 workers_count = 5 | |
237 repeat_count = 30 | |
238 | |
239 # massively upload and delete the same study. Each worker is writing a part of the instances and deleting them. | |
240 # We are trying to have multiple workers deleting the last instance of a study at the same time. | |
241 self.execute_workers( | |
242 worker_func=worker_upload_delete_study_part, | |
243 worker_args=(self.o._root_url, here / "../../Database/Knee/T1", repeat_count, workers_count, ), | |
244 workers_count=workers_count) | |
245 | |
246 elapsed = time.time() - start_time | |
247 print(f"test_upload_delete_same_study_from_multiple_threads with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") | |
248 | |
249 self.assertTrue(self.o.is_alive()) | |
250 | |
251 self.assertEqual(0, len(self.o.studies.get_all_ids())) | |
252 self.assertEqual(0, len(self.o.series.get_all_ids())) | |
253 self.assertEqual(0, len(self.o.instances.get_all_ids())) | |
254 | |
255 stats = self.o.get_json("/statistics") | |
256 self.assertEqual(0, stats.get("CountPatients")) | |
257 self.assertEqual(0, stats.get("CountStudies")) | |
258 self.assertEqual(0, stats.get("CountSeries")) | |
259 self.assertEqual(0, stats.get("CountInstances")) | |
260 self.assertEqual(0, int(stats.get("TotalDiskSize"))) | |
261 self.assertTrue(self.is_storage_empty(self._storage_name)) | |
262 | |
263 # transfers + dicomweb |