592
|
1 import unittest
|
|
2 import time
|
|
3 import os
|
|
4 import threading
|
|
5 from helpers import OrthancTestCase, Helpers
|
|
6
|
598
|
7 from orthanc_api_client import OrthancApiClient, ChangeType
|
|
8 from orthanc_api_client import helpers as OrthancHelpers
|
|
9
|
592
|
10 import pathlib
|
|
11 import subprocess
|
|
12 import glob
|
|
13 here = pathlib.Path(__file__).parent.resolve()
|
|
14
|
|
15
|
|
16 def worker_upload_folder(orthanc_root_url: str, folder: str, repeat: int, worker_id: int):
|
|
17 o = OrthancApiClient(orthanc_root_url)
|
|
18 for i in range(0, repeat):
|
|
19 o.upload_folder(folder, ignore_errors=True)
|
|
20
|
|
21 def worker_anonymize_study(orthanc_root_url: str, study_id: str, repeat: int, worker_id: int):
|
|
22 o = OrthancApiClient(orthanc_root_url)
|
|
23
|
|
24 for i in range(0, repeat):
|
|
25 o.studies.anonymize(orthanc_id=study_id, delete_original=False)
|
|
26
|
597
|
27 def count_changes(changes, type: ChangeType):
|
|
28 return len([c.change_type for c in changes if c.change_type==type])
|
|
29
|
592
|
30 def worker_upload_delete_study_part(orthanc_root_url: str, folder: str, repeat: int, workers_count: int, worker_id: int):
|
|
31 o = OrthancApiClient(orthanc_root_url)
|
|
32
|
|
33 all_files = glob.glob(os.path.join(folder, '*.dcm'))
|
|
34
|
|
35 for i in range(0, repeat):
|
|
36 instances_ids = []
|
|
37
|
|
38 for i in range(0, len(all_files)):
|
598
|
39 if i % workers_count == worker_id: # each thread takes a part
|
592
|
40 instances_ids.extend(o.upload_file(all_files[i]))
|
|
41
|
|
42 for instance_id in instances_ids:
|
594
|
43 o.instances.delete(orthanc_id=instance_id, ignore_errors=True)
|
592
|
44
|
|
45
|
598
|
46 def worker_upload_delete_test_dicoms(orthanc_root_url: str, files_count: int, worker_id: int):
|
|
47 o = OrthancApiClient(orthanc_root_url)
|
|
48
|
|
49 instances_ids = []
|
|
50 counter = 0
|
|
51
|
|
52 for i in range(0, files_count):
|
|
53 counter += 1
|
|
54 dicom_file = OrthancHelpers.generate_test_dicom_file(width=4, height=4,
|
|
55 tags = {
|
|
56 "PatientID" : f"{worker_id}",
|
|
57 "StudyInstanceUID" : f"{worker_id}",
|
|
58 "SeriesInstanceUID" : f"{worker_id}.{counter%10}"
|
|
59 })
|
|
60 instances_ids.extend(o.upload(dicom_file))
|
|
61
|
|
62 study_id = o.instances.get_parent_study_id(instances_ids[0])
|
|
63 o.studies.delete(orthanc_id=study_id)
|
|
64
|
|
65
|
592
|
66 class TestConcurrency(OrthancTestCase):
|
|
67
|
|
68 @classmethod
|
|
69 def terminate(cls):
|
|
70
|
|
71 if Helpers.is_docker():
|
|
72 subprocess.run(["docker", "rm", "-f", "pg-server"])
|
|
73 else:
|
|
74 cls.pg_service_process.terminate()
|
|
75
|
|
76
|
|
77 @classmethod
|
|
78 def prepare(cls):
|
|
79 test_name = "Concurrency"
|
|
80 cls._storage_name = "concurrency"
|
|
81
|
|
82 print(f'-------------- preparing {test_name} tests')
|
|
83
|
|
84 cls.clear_storage(storage_name=cls._storage_name)
|
|
85
|
|
86 pg_hostname = "localhost"
|
|
87 if Helpers.is_docker():
|
|
88 pg_hostname = "pg-server"
|
|
89 cls.create_docker_network("concurrency")
|
|
90
|
|
91 config = {
|
|
92 "PostgreSQL" : {
|
|
93 "EnableStorage": False,
|
|
94 "EnableIndex": True,
|
|
95 "Host": pg_hostname,
|
|
96 "Port": 5432,
|
|
97 "Database": "postgres",
|
|
98 "Username": "postgres",
|
|
99 "Password": "postgres",
|
|
100 "IndexConnectionsCount": 10,
|
|
101 "MaximumConnectionRetries" : 2000,
|
|
102 "ConnectionRetryInterval" : 5,
|
617
|
103 "TransactionMode": "ReadCommitted",
|
|
104 #"TransactionMode": "Serializable",
|
592
|
105 "EnableVerboseLogs": True
|
|
106 },
|
|
107 "AuthenticationEnabled": False,
|
|
108 "OverwriteInstances": True,
|
|
109 "JobsEngineThreadsCount" : {
|
|
110 "ResourceModification": 8
|
|
111 },
|
|
112 }
|
|
113
|
|
114 config_path = cls.generate_configuration(
|
|
115 config_name=f"{test_name}",
|
|
116 storage_name=cls._storage_name,
|
|
117 config=config,
|
|
118 plugins=Helpers.plugins
|
|
119 )
|
|
120
|
|
121 # launch the docker PG server
|
|
122 print('--------------- launching PostgreSQL server ------------------')
|
|
123
|
|
124 cls.pg_service_process = subprocess.Popen([
|
|
125 "docker", "run", "--rm",
|
|
126 "-p", "5432:5432",
|
|
127 "--network", "concurrency",
|
|
128 "--name", "pg-server",
|
|
129 "--env", "POSTGRES_HOST_AUTH_METHOD=trust",
|
|
130 "postgres:15"])
|
|
131 time.sleep(5)
|
|
132
|
|
133 if Helpers.break_before_preparation:
|
|
134 print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++")
|
|
135 input("Press Enter to continue")
|
|
136 else:
|
|
137 cls.launch_orthanc_under_tests(
|
|
138 config_name=f"{test_name}",
|
|
139 storage_name=cls._storage_name,
|
|
140 config=config,
|
|
141 plugins=Helpers.plugins,
|
|
142 docker_network="concurrency"
|
|
143 )
|
|
144
|
|
145 cls.o = OrthancApiClient(cls.o._root_url)
|
|
146 cls.o.wait_started()
|
|
147 cls.o.delete_all_content()
|
597
|
148
|
|
149 def check_is_empty(self):
|
|
150 self.assertEqual(0, len(self.o.studies.get_all_ids()))
|
|
151 self.assertEqual(0, len(self.o.series.get_all_ids()))
|
|
152 self.assertEqual(0, len(self.o.instances.get_all_ids()))
|
|
153
|
629
|
154 stats = self.o.get_json("statistics")
|
597
|
155 self.assertEqual(0, stats.get("CountPatients"))
|
|
156 self.assertEqual(0, stats.get("CountStudies"))
|
|
157 self.assertEqual(0, stats.get("CountSeries"))
|
|
158 self.assertEqual(0, stats.get("CountInstances"))
|
|
159 self.assertEqual(0, int(stats.get("TotalDiskSize")))
|
|
160 # time.sleep(10000)
|
|
161 self.assertTrue(self.is_storage_empty(self._storage_name))
|
|
162
|
|
163 # all changes shall have been deleted as well
|
|
164 changes, last_change, done = self.o.get_changes(since=0, limit=100000)
|
|
165 self.assertTrue(done)
|
|
166 self.assertEqual(0, len(changes))
|
|
167
|
592
|
168
|
|
169 def execute_workers(self, worker_func, worker_args, workers_count):
|
|
170 workers = []
|
|
171 for i in range(0, workers_count):
|
|
172 t = threading.Thread(target=worker_func, args=worker_args + (i, ))
|
|
173 workers.append(t)
|
|
174 t.start()
|
|
175
|
|
176 for t in workers:
|
|
177 t.join()
|
|
178
|
621
|
179 # TODO: reactivate once 1.12.4 is released. It needs this fix: https://orthanc.uclouvain.be/hg/orthanc/rev/acdb8d78bf99
|
|
180 # def test_concurrent_uploads_same_study(self):
|
|
181 # if self.o.is_orthanc_version_at_least(1, 12, 4):
|
|
182
|
|
183 # self.o.delete_all_content()
|
|
184 # self.clear_storage(storage_name=self._storage_name)
|
592
|
185
|
621
|
186 # start_time = time.time()
|
|
187 # workers_count = 20
|
|
188 # repeat_count = 10
|
592
|
189
|
621
|
190 # # massively reupload the same study multiple times with OverwriteInstances set to true
|
|
191 # # Make sure the studies, series and instances are created only once
|
|
192 # self.execute_workers(
|
|
193 # worker_func=worker_upload_folder,
|
|
194 # worker_args=(self.o._root_url, here / "../../Database/Knee", repeat_count,),
|
|
195 # workers_count=workers_count)
|
592
|
196
|
621
|
197 # elapsed = time.time() - start_time
|
|
198 # print(f"TIMING test_concurrent_uploads_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
|
592
|
199
|
621
|
200 # self.assertTrue(self.o.is_alive())
|
592
|
201
|
621
|
202 # self.assertEqual(1, len(self.o.studies.get_all_ids()))
|
|
203 # self.assertEqual(2, len(self.o.series.get_all_ids()))
|
|
204 # self.assertEqual(50, len(self.o.instances.get_all_ids()))
|
592
|
205
|
629
|
206 # stats = self.o.get_json("statistics")
|
621
|
207 # self.assertEqual(1, stats.get("CountPatients"))
|
|
208 # self.assertEqual(1, stats.get("CountStudies"))
|
|
209 # self.assertEqual(2, stats.get("CountSeries"))
|
|
210 # self.assertEqual(50, stats.get("CountInstances"))
|
|
211 # self.assertEqual(4118738, int(stats.get("TotalDiskSize")))
|
592
|
212
|
621
|
213 # self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
|
592
|
214
|
621
|
215 # self.check_is_empty()
|
592
|
216
|
|
217 def test_concurrent_anonymize_same_study(self):
|
|
218 self.o.delete_all_content()
|
|
219 self.clear_storage(storage_name=self._storage_name)
|
|
220
|
|
221 self.o.upload_folder(here / "../../Database/Knee")
|
|
222 study_id = self.o.studies.get_all_ids()[0]
|
|
223
|
|
224 start_time = time.time()
|
|
225 workers_count = 4
|
|
226 repeat_count = 10
|
|
227
|
|
228 # massively anonymize the same study. This generates new studies and is a
|
|
229 # good way to simulate ingestion of new studies
|
|
230 self.execute_workers(
|
|
231 worker_func=worker_anonymize_study,
|
|
232 worker_args=(self.o._root_url, study_id, repeat_count,),
|
|
233 workers_count=workers_count)
|
|
234
|
|
235 elapsed = time.time() - start_time
|
593
|
236 print(f"TIMING test_concurrent_anonymize_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
|
592
|
237
|
|
238 self.assertTrue(self.o.is_alive())
|
|
239
|
|
240 self.assertEqual(1 + workers_count * repeat_count, len(self.o.studies.get_all_ids()))
|
|
241 self.assertEqual(2 * (1 + workers_count * repeat_count), len(self.o.series.get_all_ids()))
|
|
242 self.assertEqual(50 * (1 + workers_count * repeat_count), len(self.o.instances.get_all_ids()))
|
|
243
|
629
|
244 stats = self.o.get_json("statistics")
|
592
|
245 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountPatients"))
|
|
246 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountStudies"))
|
|
247 self.assertEqual(2 * (1 + workers_count * repeat_count), stats.get("CountSeries"))
|
|
248 self.assertEqual(50 * (1 + workers_count * repeat_count), stats.get("CountInstances"))
|
597
|
249 changes, last_change, done = self.o.get_changes(since=0, limit=100000)
|
|
250 self.assertTrue(done)
|
|
251
|
|
252 self.assertEqual(1 + workers_count * repeat_count, count_changes(changes, ChangeType.NEW_PATIENT))
|
|
253 self.assertEqual(1 + workers_count * repeat_count, count_changes(changes, ChangeType.NEW_STUDY))
|
|
254 self.assertEqual(2 * (1 + workers_count * repeat_count), count_changes(changes, ChangeType.NEW_SERIES))
|
|
255 self.assertEqual(50 * (1 + workers_count * repeat_count), count_changes(changes, ChangeType.NEW_INSTANCE))
|
592
|
256
|
593
|
257 start_time = time.time()
|
|
258
|
592
|
259 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
|
|
260
|
593
|
261 elapsed = time.time() - start_time
|
|
262 print(f"TIMING test_concurrent_anonymize_same_study deletion took: {elapsed:.3f} s")
|
|
263
|
597
|
264 self.check_is_empty()
|
592
|
265
|
|
266
|
|
267 def test_upload_delete_same_study_from_multiple_threads(self):
|
|
268 self.o.delete_all_content()
|
|
269 self.clear_storage(storage_name=self._storage_name)
|
|
270
|
|
271 start_time = time.time()
|
596
|
272 overall_repeat = 10
|
594
|
273
|
598
|
274 for i in range(0, overall_repeat):
|
594
|
275 workers_count = 5
|
596
|
276 repeat_count = 3
|
594
|
277
|
|
278 # massively upload and delete the same study. Each worker is writing a part of the instances and deleting them.
|
|
279 # We are trying to have multiple workers deleting the last instance of a study at the same time.
|
|
280 self.execute_workers(
|
|
281 worker_func=worker_upload_delete_study_part,
|
|
282 worker_args=(self.o._root_url, here / "../../Database/Knee/T1", repeat_count, workers_count, ),
|
|
283 workers_count=workers_count)
|
592
|
284
|
597
|
285 self.check_is_empty()
|
592
|
286
|
|
287 elapsed = time.time() - start_time
|
596
|
288 print(f"TIMING test_upload_delete_same_study_from_multiple_threads with {workers_count} workers and {repeat_count}x repeat ({overall_repeat}x): {elapsed:.3f} s")
|
592
|
289
|
|
290
|
598
|
291 def test_upload_multiple_studies_from_multiple_threads(self):
|
|
292 self.o.delete_all_content()
|
|
293 self.clear_storage(storage_name=self._storage_name)
|
|
294
|
|
295 start_time = time.time()
|
|
296 overall_repeat = 3
|
|
297
|
|
298 for i in range(0, overall_repeat):
|
|
299 files_count = 25
|
|
300 workers_count = 10
|
|
301
|
|
302 # massively upload and delete all studies from the test detabase. Each worker is writing all instances from a folder and then deletes them.
|
|
303 # This test is only measuring performances.
|
|
304 self.execute_workers(
|
|
305 worker_func=worker_upload_delete_test_dicoms,
|
|
306 worker_args=(self.o._root_url, files_count, ),
|
|
307 workers_count=workers_count)
|
|
308
|
|
309 self.check_is_empty()
|
|
310
|
|
311 elapsed = time.time() - start_time
|
|
312 print(f"TIMING test_upload_multiple_studies_from_multiple_threads with {workers_count} workers and {files_count} files and repeat {overall_repeat}x: {elapsed:.3f} s")
|
|
313
|
592
|
314 # transfers + dicomweb |