comparison NewTests/Concurrency/test_concurrency.py @ 592:6753d96dd71f

new concurrency tests
author Alain Mazy <am@osimis.io>
date Tue, 12 Dec 2023 15:31:45 +0100
parents
children f4f0e2d24a51
comparison
equal deleted inserted replaced
591:3cb7c6162c77 592:6753d96dd71f
1 import unittest
2 import time
3 import os
4 import threading
5 from helpers import OrthancTestCase, Helpers
6
7 from orthanc_api_client import OrthancApiClient, generate_test_dicom_file
8 from orthanc_tools import OrthancTestDbPopulator
9
10 import pathlib
11 import subprocess
12 import glob
13 here = pathlib.Path(__file__).parent.resolve()
14
15
16 def worker_upload_folder(orthanc_root_url: str, folder: str, repeat: int, worker_id: int):
17 o = OrthancApiClient(orthanc_root_url)
18 for i in range(0, repeat):
19 o.upload_folder(folder, ignore_errors=True)
20
21 def worker_anonymize_study(orthanc_root_url: str, study_id: str, repeat: int, worker_id: int):
22 o = OrthancApiClient(orthanc_root_url)
23
24 for i in range(0, repeat):
25 o.studies.anonymize(orthanc_id=study_id, delete_original=False)
26
27 def worker_upload_delete_study_part(orthanc_root_url: str, folder: str, repeat: int, workers_count: int, worker_id: int):
28 o = OrthancApiClient(orthanc_root_url)
29
30 all_files = glob.glob(os.path.join(folder, '*.dcm'))
31
32 for i in range(0, repeat):
33 instances_ids = []
34
35 for i in range(0, len(all_files)):
36 if i % workers_count == worker_id:
37 instances_ids.extend(o.upload_file(all_files[i]))
38
39 for instance_id in instances_ids:
40 o.instances.delete(orthanc_id=instance_id)
41
42
43 class TestConcurrency(OrthancTestCase):
44
45 @classmethod
46 def terminate(cls):
47
48 if Helpers.is_docker():
49 subprocess.run(["docker", "rm", "-f", "pg-server"])
50 else:
51 cls.pg_service_process.terminate()
52
53
54 @classmethod
55 def prepare(cls):
56 test_name = "Concurrency"
57 cls._storage_name = "concurrency"
58
59 print(f'-------------- preparing {test_name} tests')
60
61 cls.clear_storage(storage_name=cls._storage_name)
62
63 pg_hostname = "localhost"
64 if Helpers.is_docker():
65 pg_hostname = "pg-server"
66 cls.create_docker_network("concurrency")
67
68 config = {
69 "PostgreSQL" : {
70 "EnableStorage": False,
71 "EnableIndex": True,
72 "Host": pg_hostname,
73 "Port": 5432,
74 "Database": "postgres",
75 "Username": "postgres",
76 "Password": "postgres",
77 "IndexConnectionsCount": 10,
78 "MaximumConnectionRetries" : 2000,
79 "ConnectionRetryInterval" : 5,
80 "TransactionMode": "READ COMMITTED",
81 #"TransactionMode": "SERIALIZABLE",
82 "EnableVerboseLogs": True
83 },
84 "AuthenticationEnabled": False,
85 "OverwriteInstances": True,
86 "JobsEngineThreadsCount" : {
87 "ResourceModification": 8
88 },
89 }
90
91 config_path = cls.generate_configuration(
92 config_name=f"{test_name}",
93 storage_name=cls._storage_name,
94 config=config,
95 plugins=Helpers.plugins
96 )
97
98 # launch the docker PG server
99 print('--------------- launching PostgreSQL server ------------------')
100
101 cls.pg_service_process = subprocess.Popen([
102 "docker", "run", "--rm",
103 "-p", "5432:5432",
104 "--network", "concurrency",
105 "--name", "pg-server",
106 "--env", "POSTGRES_HOST_AUTH_METHOD=trust",
107 "postgres:15"])
108 time.sleep(5)
109
110 if Helpers.break_before_preparation:
111 print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++")
112 input("Press Enter to continue")
113 else:
114 cls.launch_orthanc_under_tests(
115 config_name=f"{test_name}",
116 storage_name=cls._storage_name,
117 config=config,
118 plugins=Helpers.plugins,
119 docker_network="concurrency"
120 )
121
122 cls.o = OrthancApiClient(cls.o._root_url)
123 cls.o.wait_started()
124 cls.o.delete_all_content()
125
126
127 def execute_workers(self, worker_func, worker_args, workers_count):
128 workers = []
129 for i in range(0, workers_count):
130 t = threading.Thread(target=worker_func, args=worker_args + (i, ))
131 workers.append(t)
132 t.start()
133
134 for t in workers:
135 t.join()
136
137 def test_concurrent_uploads_same_study(self):
138 self.o.delete_all_content()
139 self.clear_storage(storage_name=self._storage_name)
140
141 start_time = time.time()
142 workers_count = 20
143 repeat_count = 1
144
145 # massively reupload the same study multiple times with OverwriteInstances set to true
146 # Make sure the studies, series and instances are created only once
147 self.execute_workers(
148 worker_func=worker_upload_folder,
149 worker_args=(self.o._root_url, here / "../../Database/Knee", repeat_count,),
150 workers_count=workers_count)
151
152 elapsed = time.time() - start_time
153 print(f"test_concurrent_uploads_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
154
155 self.assertTrue(self.o.is_alive())
156
157 self.assertEqual(1, len(self.o.studies.get_all_ids()))
158 self.assertEqual(2, len(self.o.series.get_all_ids()))
159 self.assertEqual(50, len(self.o.instances.get_all_ids()))
160
161 stats = self.o.get_json("/statistics")
162 self.assertEqual(1, stats.get("CountPatients"))
163 self.assertEqual(1, stats.get("CountStudies"))
164 self.assertEqual(2, stats.get("CountSeries"))
165 self.assertEqual(50, stats.get("CountInstances"))
166 self.assertEqual(4118738, int(stats.get("TotalDiskSize")))
167
168 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
169
170 self.assertEqual(0, len(self.o.studies.get_all_ids()))
171 self.assertEqual(0, len(self.o.series.get_all_ids()))
172 self.assertEqual(0, len(self.o.instances.get_all_ids()))
173
174 stats = self.o.get_json("/statistics")
175 self.assertEqual(0, stats.get("CountPatients"))
176 self.assertEqual(0, stats.get("CountStudies"))
177 self.assertEqual(0, stats.get("CountSeries"))
178 self.assertEqual(0, stats.get("CountInstances"))
179 self.assertEqual(0, int(stats.get("TotalDiskSize")))
180 self.assertTrue(self.is_storage_empty(self._storage_name))
181
182
183 def test_concurrent_anonymize_same_study(self):
184 self.o.delete_all_content()
185 self.clear_storage(storage_name=self._storage_name)
186
187 self.o.upload_folder(here / "../../Database/Knee")
188 study_id = self.o.studies.get_all_ids()[0]
189
190 start_time = time.time()
191 workers_count = 4
192 repeat_count = 10
193
194 # massively anonymize the same study. This generates new studies and is a
195 # good way to simulate ingestion of new studies
196 self.execute_workers(
197 worker_func=worker_anonymize_study,
198 worker_args=(self.o._root_url, study_id, repeat_count,),
199 workers_count=workers_count)
200
201 elapsed = time.time() - start_time
202 print(f"test_concurrent_anonymize_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
203
204 self.assertTrue(self.o.is_alive())
205
206 self.assertEqual(1 + workers_count * repeat_count, len(self.o.studies.get_all_ids()))
207 self.assertEqual(2 * (1 + workers_count * repeat_count), len(self.o.series.get_all_ids()))
208 self.assertEqual(50 * (1 + workers_count * repeat_count), len(self.o.instances.get_all_ids()))
209
210 stats = self.o.get_json("/statistics")
211 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountPatients"))
212 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountStudies"))
213 self.assertEqual(2 * (1 + workers_count * repeat_count), stats.get("CountSeries"))
214 self.assertEqual(50 * (1 + workers_count * repeat_count), stats.get("CountInstances"))
215
216 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
217
218 self.assertEqual(0, len(self.o.studies.get_all_ids()))
219 self.assertEqual(0, len(self.o.series.get_all_ids()))
220 self.assertEqual(0, len(self.o.instances.get_all_ids()))
221
222 stats = self.o.get_json("/statistics")
223 self.assertEqual(0, stats.get("CountPatients"))
224 self.assertEqual(0, stats.get("CountStudies"))
225 self.assertEqual(0, stats.get("CountSeries"))
226 self.assertEqual(0, stats.get("CountInstances"))
227 self.assertEqual(0, int(stats.get("TotalDiskSize")))
228 self.assertTrue(self.is_storage_empty(self._storage_name))
229
230
231 def test_upload_delete_same_study_from_multiple_threads(self):
232 self.o.delete_all_content()
233 self.clear_storage(storage_name=self._storage_name)
234
235 start_time = time.time()
236 workers_count = 5
237 repeat_count = 30
238
239 # massively upload and delete the same study. Each worker is writing a part of the instances and deleting them.
240 # We are trying to have multiple workers deleting the last instance of a study at the same time.
241 self.execute_workers(
242 worker_func=worker_upload_delete_study_part,
243 worker_args=(self.o._root_url, here / "../../Database/Knee/T1", repeat_count, workers_count, ),
244 workers_count=workers_count)
245
246 elapsed = time.time() - start_time
247 print(f"test_upload_delete_same_study_from_multiple_threads with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
248
249 self.assertTrue(self.o.is_alive())
250
251 self.assertEqual(0, len(self.o.studies.get_all_ids()))
252 self.assertEqual(0, len(self.o.series.get_all_ids()))
253 self.assertEqual(0, len(self.o.instances.get_all_ids()))
254
255 stats = self.o.get_json("/statistics")
256 self.assertEqual(0, stats.get("CountPatients"))
257 self.assertEqual(0, stats.get("CountStudies"))
258 self.assertEqual(0, stats.get("CountSeries"))
259 self.assertEqual(0, stats.get("CountInstances"))
260 self.assertEqual(0, int(stats.get("TotalDiskSize")))
261 self.assertTrue(self.is_storage_empty(self._storage_name))
262
263 # transfers + dicomweb