592
|
1 import unittest
|
|
2 import time
|
|
3 import os
|
|
4 import threading
|
|
5 from helpers import OrthancTestCase, Helpers
|
|
6
|
597
|
7 from orthanc_api_client import OrthancApiClient, generate_test_dicom_file, ChangeType
|
592
|
8 from orthanc_tools import OrthancTestDbPopulator
|
|
9
|
|
10 import pathlib
|
|
11 import subprocess
|
|
12 import glob
|
|
13 here = pathlib.Path(__file__).parent.resolve()
|
|
14
|
|
15
|
|
16 def worker_upload_folder(orthanc_root_url: str, folder: str, repeat: int, worker_id: int):
|
|
17 o = OrthancApiClient(orthanc_root_url)
|
|
18 for i in range(0, repeat):
|
|
19 o.upload_folder(folder, ignore_errors=True)
|
|
20
|
|
21 def worker_anonymize_study(orthanc_root_url: str, study_id: str, repeat: int, worker_id: int):
|
|
22 o = OrthancApiClient(orthanc_root_url)
|
|
23
|
|
24 for i in range(0, repeat):
|
|
25 o.studies.anonymize(orthanc_id=study_id, delete_original=False)
|
|
26
|
597
|
27 def count_changes(changes, type: ChangeType):
|
|
28 return len([c.change_type for c in changes if c.change_type==type])
|
|
29
|
592
|
30 def worker_upload_delete_study_part(orthanc_root_url: str, folder: str, repeat: int, workers_count: int, worker_id: int):
|
|
31 o = OrthancApiClient(orthanc_root_url)
|
|
32
|
|
33 all_files = glob.glob(os.path.join(folder, '*.dcm'))
|
|
34
|
|
35 for i in range(0, repeat):
|
|
36 instances_ids = []
|
|
37
|
|
38 for i in range(0, len(all_files)):
|
|
39 if i % workers_count == worker_id:
|
|
40 instances_ids.extend(o.upload_file(all_files[i]))
|
|
41
|
|
42 for instance_id in instances_ids:
|
594
|
43 o.instances.delete(orthanc_id=instance_id, ignore_errors=True)
|
592
|
44
|
|
45
|
|
46 class TestConcurrency(OrthancTestCase):
|
|
47
|
|
48 @classmethod
|
|
49 def terminate(cls):
|
|
50
|
|
51 if Helpers.is_docker():
|
|
52 subprocess.run(["docker", "rm", "-f", "pg-server"])
|
|
53 else:
|
|
54 cls.pg_service_process.terminate()
|
|
55
|
|
56
|
|
57 @classmethod
|
|
58 def prepare(cls):
|
|
59 test_name = "Concurrency"
|
|
60 cls._storage_name = "concurrency"
|
|
61
|
|
62 print(f'-------------- preparing {test_name} tests')
|
|
63
|
|
64 cls.clear_storage(storage_name=cls._storage_name)
|
|
65
|
|
66 pg_hostname = "localhost"
|
|
67 if Helpers.is_docker():
|
|
68 pg_hostname = "pg-server"
|
|
69 cls.create_docker_network("concurrency")
|
|
70
|
|
71 config = {
|
|
72 "PostgreSQL" : {
|
|
73 "EnableStorage": False,
|
|
74 "EnableIndex": True,
|
|
75 "Host": pg_hostname,
|
|
76 "Port": 5432,
|
|
77 "Database": "postgres",
|
|
78 "Username": "postgres",
|
|
79 "Password": "postgres",
|
|
80 "IndexConnectionsCount": 10,
|
|
81 "MaximumConnectionRetries" : 2000,
|
|
82 "ConnectionRetryInterval" : 5,
|
|
83 "TransactionMode": "READ COMMITTED",
|
|
84 #"TransactionMode": "SERIALIZABLE",
|
|
85 "EnableVerboseLogs": True
|
|
86 },
|
|
87 "AuthenticationEnabled": False,
|
|
88 "OverwriteInstances": True,
|
|
89 "JobsEngineThreadsCount" : {
|
|
90 "ResourceModification": 8
|
|
91 },
|
|
92 }
|
|
93
|
|
94 config_path = cls.generate_configuration(
|
|
95 config_name=f"{test_name}",
|
|
96 storage_name=cls._storage_name,
|
|
97 config=config,
|
|
98 plugins=Helpers.plugins
|
|
99 )
|
|
100
|
|
101 # launch the docker PG server
|
|
102 print('--------------- launching PostgreSQL server ------------------')
|
|
103
|
|
104 cls.pg_service_process = subprocess.Popen([
|
|
105 "docker", "run", "--rm",
|
|
106 "-p", "5432:5432",
|
|
107 "--network", "concurrency",
|
|
108 "--name", "pg-server",
|
|
109 "--env", "POSTGRES_HOST_AUTH_METHOD=trust",
|
|
110 "postgres:15"])
|
|
111 time.sleep(5)
|
|
112
|
|
113 if Helpers.break_before_preparation:
|
|
114 print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++")
|
|
115 input("Press Enter to continue")
|
|
116 else:
|
|
117 cls.launch_orthanc_under_tests(
|
|
118 config_name=f"{test_name}",
|
|
119 storage_name=cls._storage_name,
|
|
120 config=config,
|
|
121 plugins=Helpers.plugins,
|
|
122 docker_network="concurrency"
|
|
123 )
|
|
124
|
|
125 cls.o = OrthancApiClient(cls.o._root_url)
|
|
126 cls.o.wait_started()
|
|
127 cls.o.delete_all_content()
|
597
|
128
|
|
129 def check_is_empty(self):
|
|
130 self.assertEqual(0, len(self.o.studies.get_all_ids()))
|
|
131 self.assertEqual(0, len(self.o.series.get_all_ids()))
|
|
132 self.assertEqual(0, len(self.o.instances.get_all_ids()))
|
|
133
|
|
134 stats = self.o.get_json("/statistics")
|
|
135 self.assertEqual(0, stats.get("CountPatients"))
|
|
136 self.assertEqual(0, stats.get("CountStudies"))
|
|
137 self.assertEqual(0, stats.get("CountSeries"))
|
|
138 self.assertEqual(0, stats.get("CountInstances"))
|
|
139 self.assertEqual(0, int(stats.get("TotalDiskSize")))
|
|
140 # time.sleep(10000)
|
|
141 self.assertTrue(self.is_storage_empty(self._storage_name))
|
|
142
|
|
143 # all changes shall have been deleted as well
|
|
144 changes, last_change, done = self.o.get_changes(since=0, limit=100000)
|
|
145 self.assertTrue(done)
|
|
146 self.assertEqual(0, len(changes))
|
|
147
|
592
|
148
|
|
149 def execute_workers(self, worker_func, worker_args, workers_count):
|
|
150 workers = []
|
|
151 for i in range(0, workers_count):
|
|
152 t = threading.Thread(target=worker_func, args=worker_args + (i, ))
|
|
153 workers.append(t)
|
|
154 t.start()
|
|
155
|
|
156 for t in workers:
|
|
157 t.join()
|
|
158
|
|
159 def test_concurrent_uploads_same_study(self):
|
|
160 self.o.delete_all_content()
|
|
161 self.clear_storage(storage_name=self._storage_name)
|
|
162
|
|
163 start_time = time.time()
|
|
164 workers_count = 20
|
|
165 repeat_count = 1
|
|
166
|
|
167 # massively reupload the same study multiple times with OverwriteInstances set to true
|
|
168 # Make sure the studies, series and instances are created only once
|
|
169 self.execute_workers(
|
|
170 worker_func=worker_upload_folder,
|
|
171 worker_args=(self.o._root_url, here / "../../Database/Knee", repeat_count,),
|
|
172 workers_count=workers_count)
|
|
173
|
|
174 elapsed = time.time() - start_time
|
593
|
175 print(f"TIMING test_concurrent_uploads_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
|
592
|
176
|
|
177 self.assertTrue(self.o.is_alive())
|
|
178
|
|
179 self.assertEqual(1, len(self.o.studies.get_all_ids()))
|
|
180 self.assertEqual(2, len(self.o.series.get_all_ids()))
|
|
181 self.assertEqual(50, len(self.o.instances.get_all_ids()))
|
|
182
|
|
183 stats = self.o.get_json("/statistics")
|
|
184 self.assertEqual(1, stats.get("CountPatients"))
|
|
185 self.assertEqual(1, stats.get("CountStudies"))
|
|
186 self.assertEqual(2, stats.get("CountSeries"))
|
|
187 self.assertEqual(50, stats.get("CountInstances"))
|
|
188 self.assertEqual(4118738, int(stats.get("TotalDiskSize")))
|
|
189
|
|
190 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
|
|
191
|
597
|
192 self.check_is_empty()
|
592
|
193
|
|
194 def test_concurrent_anonymize_same_study(self):
|
|
195 self.o.delete_all_content()
|
|
196 self.clear_storage(storage_name=self._storage_name)
|
|
197
|
|
198 self.o.upload_folder(here / "../../Database/Knee")
|
|
199 study_id = self.o.studies.get_all_ids()[0]
|
|
200
|
|
201 start_time = time.time()
|
|
202 workers_count = 4
|
|
203 repeat_count = 10
|
|
204
|
|
205 # massively anonymize the same study. This generates new studies and is a
|
|
206 # good way to simulate ingestion of new studies
|
|
207 self.execute_workers(
|
|
208 worker_func=worker_anonymize_study,
|
|
209 worker_args=(self.o._root_url, study_id, repeat_count,),
|
|
210 workers_count=workers_count)
|
|
211
|
|
212 elapsed = time.time() - start_time
|
593
|
213 print(f"TIMING test_concurrent_anonymize_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
|
592
|
214
|
|
215 self.assertTrue(self.o.is_alive())
|
|
216
|
|
217 self.assertEqual(1 + workers_count * repeat_count, len(self.o.studies.get_all_ids()))
|
|
218 self.assertEqual(2 * (1 + workers_count * repeat_count), len(self.o.series.get_all_ids()))
|
|
219 self.assertEqual(50 * (1 + workers_count * repeat_count), len(self.o.instances.get_all_ids()))
|
|
220
|
|
221 stats = self.o.get_json("/statistics")
|
|
222 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountPatients"))
|
|
223 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountStudies"))
|
|
224 self.assertEqual(2 * (1 + workers_count * repeat_count), stats.get("CountSeries"))
|
|
225 self.assertEqual(50 * (1 + workers_count * repeat_count), stats.get("CountInstances"))
|
597
|
226 changes, last_change, done = self.o.get_changes(since=0, limit=100000)
|
|
227 self.assertTrue(done)
|
|
228
|
|
229 self.assertEqual(1 + workers_count * repeat_count, count_changes(changes, ChangeType.NEW_PATIENT))
|
|
230 self.assertEqual(1 + workers_count * repeat_count, count_changes(changes, ChangeType.NEW_STUDY))
|
|
231 self.assertEqual(2 * (1 + workers_count * repeat_count), count_changes(changes, ChangeType.NEW_SERIES))
|
|
232 self.assertEqual(50 * (1 + workers_count * repeat_count), count_changes(changes, ChangeType.NEW_INSTANCE))
|
592
|
233
|
593
|
234 start_time = time.time()
|
|
235
|
592
|
236 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
|
|
237
|
593
|
238 elapsed = time.time() - start_time
|
|
239 print(f"TIMING test_concurrent_anonymize_same_study deletion took: {elapsed:.3f} s")
|
|
240
|
597
|
241 self.check_is_empty()
|
592
|
242
|
|
243
|
|
244 def test_upload_delete_same_study_from_multiple_threads(self):
|
|
245 self.o.delete_all_content()
|
|
246 self.clear_storage(storage_name=self._storage_name)
|
|
247
|
|
248 start_time = time.time()
|
596
|
249 overall_repeat = 10
|
594
|
250
|
|
251 for i in range(0, 10):
|
|
252 workers_count = 5
|
596
|
253 repeat_count = 3
|
594
|
254
|
|
255 # massively upload and delete the same study. Each worker is writing a part of the instances and deleting them.
|
|
256 # We are trying to have multiple workers deleting the last instance of a study at the same time.
|
|
257 self.execute_workers(
|
|
258 worker_func=worker_upload_delete_study_part,
|
|
259 worker_args=(self.o._root_url, here / "../../Database/Knee/T1", repeat_count, workers_count, ),
|
|
260 workers_count=workers_count)
|
592
|
261
|
597
|
262 self.check_is_empty()
|
592
|
263
|
|
264 elapsed = time.time() - start_time
|
596
|
265 print(f"TIMING test_upload_delete_same_study_from_multiple_threads with {workers_count} workers and {repeat_count}x repeat ({overall_repeat}x): {elapsed:.3f} s")
|
592
|
266
|
|
267
|
|
268 # transfers + dicomweb |