592
|
1 import unittest
|
|
2 import time
|
|
3 import os
|
|
4 import threading
|
|
5 from helpers import OrthancTestCase, Helpers
|
|
6
|
|
7 from orthanc_api_client import OrthancApiClient, generate_test_dicom_file
|
|
8 from orthanc_tools import OrthancTestDbPopulator
|
|
9
|
|
10 import pathlib
|
|
11 import subprocess
|
|
12 import glob
|
|
13 here = pathlib.Path(__file__).parent.resolve()
|
|
14
|
|
15
|
|
16 def worker_upload_folder(orthanc_root_url: str, folder: str, repeat: int, worker_id: int):
|
|
17 o = OrthancApiClient(orthanc_root_url)
|
|
18 for i in range(0, repeat):
|
|
19 o.upload_folder(folder, ignore_errors=True)
|
|
20
|
|
21 def worker_anonymize_study(orthanc_root_url: str, study_id: str, repeat: int, worker_id: int):
|
|
22 o = OrthancApiClient(orthanc_root_url)
|
|
23
|
|
24 for i in range(0, repeat):
|
|
25 o.studies.anonymize(orthanc_id=study_id, delete_original=False)
|
|
26
|
|
27 def worker_upload_delete_study_part(orthanc_root_url: str, folder: str, repeat: int, workers_count: int, worker_id: int):
|
|
28 o = OrthancApiClient(orthanc_root_url)
|
|
29
|
|
30 all_files = glob.glob(os.path.join(folder, '*.dcm'))
|
|
31
|
|
32 for i in range(0, repeat):
|
|
33 instances_ids = []
|
|
34
|
|
35 for i in range(0, len(all_files)):
|
|
36 if i % workers_count == worker_id:
|
|
37 instances_ids.extend(o.upload_file(all_files[i]))
|
|
38
|
|
39 for instance_id in instances_ids:
|
|
40 o.instances.delete(orthanc_id=instance_id)
|
|
41
|
|
42
|
|
43 class TestConcurrency(OrthancTestCase):
|
|
44
|
|
45 @classmethod
|
|
46 def terminate(cls):
|
|
47
|
|
48 if Helpers.is_docker():
|
|
49 subprocess.run(["docker", "rm", "-f", "pg-server"])
|
|
50 else:
|
|
51 cls.pg_service_process.terminate()
|
|
52
|
|
53
|
|
54 @classmethod
|
|
55 def prepare(cls):
|
|
56 test_name = "Concurrency"
|
|
57 cls._storage_name = "concurrency"
|
|
58
|
|
59 print(f'-------------- preparing {test_name} tests')
|
|
60
|
|
61 cls.clear_storage(storage_name=cls._storage_name)
|
|
62
|
|
63 pg_hostname = "localhost"
|
|
64 if Helpers.is_docker():
|
|
65 pg_hostname = "pg-server"
|
|
66 cls.create_docker_network("concurrency")
|
|
67
|
|
68 config = {
|
|
69 "PostgreSQL" : {
|
|
70 "EnableStorage": False,
|
|
71 "EnableIndex": True,
|
|
72 "Host": pg_hostname,
|
|
73 "Port": 5432,
|
|
74 "Database": "postgres",
|
|
75 "Username": "postgres",
|
|
76 "Password": "postgres",
|
|
77 "IndexConnectionsCount": 10,
|
|
78 "MaximumConnectionRetries" : 2000,
|
|
79 "ConnectionRetryInterval" : 5,
|
|
80 "TransactionMode": "READ COMMITTED",
|
|
81 #"TransactionMode": "SERIALIZABLE",
|
|
82 "EnableVerboseLogs": True
|
|
83 },
|
|
84 "AuthenticationEnabled": False,
|
|
85 "OverwriteInstances": True,
|
|
86 "JobsEngineThreadsCount" : {
|
|
87 "ResourceModification": 8
|
|
88 },
|
|
89 }
|
|
90
|
|
91 config_path = cls.generate_configuration(
|
|
92 config_name=f"{test_name}",
|
|
93 storage_name=cls._storage_name,
|
|
94 config=config,
|
|
95 plugins=Helpers.plugins
|
|
96 )
|
|
97
|
|
98 # launch the docker PG server
|
|
99 print('--------------- launching PostgreSQL server ------------------')
|
|
100
|
|
101 cls.pg_service_process = subprocess.Popen([
|
|
102 "docker", "run", "--rm",
|
|
103 "-p", "5432:5432",
|
|
104 "--network", "concurrency",
|
|
105 "--name", "pg-server",
|
|
106 "--env", "POSTGRES_HOST_AUTH_METHOD=trust",
|
|
107 "postgres:15"])
|
|
108 time.sleep(5)
|
|
109
|
|
110 if Helpers.break_before_preparation:
|
|
111 print(f"++++ It is now time to start your Orthanc under tests with configuration file '{config_path}' +++++")
|
|
112 input("Press Enter to continue")
|
|
113 else:
|
|
114 cls.launch_orthanc_under_tests(
|
|
115 config_name=f"{test_name}",
|
|
116 storage_name=cls._storage_name,
|
|
117 config=config,
|
|
118 plugins=Helpers.plugins,
|
|
119 docker_network="concurrency"
|
|
120 )
|
|
121
|
|
122 cls.o = OrthancApiClient(cls.o._root_url)
|
|
123 cls.o.wait_started()
|
|
124 cls.o.delete_all_content()
|
|
125
|
|
126
|
|
127 def execute_workers(self, worker_func, worker_args, workers_count):
|
|
128 workers = []
|
|
129 for i in range(0, workers_count):
|
|
130 t = threading.Thread(target=worker_func, args=worker_args + (i, ))
|
|
131 workers.append(t)
|
|
132 t.start()
|
|
133
|
|
134 for t in workers:
|
|
135 t.join()
|
|
136
|
|
137 def test_concurrent_uploads_same_study(self):
|
|
138 self.o.delete_all_content()
|
|
139 self.clear_storage(storage_name=self._storage_name)
|
|
140
|
|
141 start_time = time.time()
|
|
142 workers_count = 20
|
|
143 repeat_count = 1
|
|
144
|
|
145 # massively reupload the same study multiple times with OverwriteInstances set to true
|
|
146 # Make sure the studies, series and instances are created only once
|
|
147 self.execute_workers(
|
|
148 worker_func=worker_upload_folder,
|
|
149 worker_args=(self.o._root_url, here / "../../Database/Knee", repeat_count,),
|
|
150 workers_count=workers_count)
|
|
151
|
|
152 elapsed = time.time() - start_time
|
|
153 print(f"test_concurrent_uploads_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
|
|
154
|
|
155 self.assertTrue(self.o.is_alive())
|
|
156
|
|
157 self.assertEqual(1, len(self.o.studies.get_all_ids()))
|
|
158 self.assertEqual(2, len(self.o.series.get_all_ids()))
|
|
159 self.assertEqual(50, len(self.o.instances.get_all_ids()))
|
|
160
|
|
161 stats = self.o.get_json("/statistics")
|
|
162 self.assertEqual(1, stats.get("CountPatients"))
|
|
163 self.assertEqual(1, stats.get("CountStudies"))
|
|
164 self.assertEqual(2, stats.get("CountSeries"))
|
|
165 self.assertEqual(50, stats.get("CountInstances"))
|
|
166 self.assertEqual(4118738, int(stats.get("TotalDiskSize")))
|
|
167
|
|
168 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
|
|
169
|
|
170 self.assertEqual(0, len(self.o.studies.get_all_ids()))
|
|
171 self.assertEqual(0, len(self.o.series.get_all_ids()))
|
|
172 self.assertEqual(0, len(self.o.instances.get_all_ids()))
|
|
173
|
|
174 stats = self.o.get_json("/statistics")
|
|
175 self.assertEqual(0, stats.get("CountPatients"))
|
|
176 self.assertEqual(0, stats.get("CountStudies"))
|
|
177 self.assertEqual(0, stats.get("CountSeries"))
|
|
178 self.assertEqual(0, stats.get("CountInstances"))
|
|
179 self.assertEqual(0, int(stats.get("TotalDiskSize")))
|
|
180 self.assertTrue(self.is_storage_empty(self._storage_name))
|
|
181
|
|
182
|
|
183 def test_concurrent_anonymize_same_study(self):
|
|
184 self.o.delete_all_content()
|
|
185 self.clear_storage(storage_name=self._storage_name)
|
|
186
|
|
187 self.o.upload_folder(here / "../../Database/Knee")
|
|
188 study_id = self.o.studies.get_all_ids()[0]
|
|
189
|
|
190 start_time = time.time()
|
|
191 workers_count = 4
|
|
192 repeat_count = 10
|
|
193
|
|
194 # massively anonymize the same study. This generates new studies and is a
|
|
195 # good way to simulate ingestion of new studies
|
|
196 self.execute_workers(
|
|
197 worker_func=worker_anonymize_study,
|
|
198 worker_args=(self.o._root_url, study_id, repeat_count,),
|
|
199 workers_count=workers_count)
|
|
200
|
|
201 elapsed = time.time() - start_time
|
|
202 print(f"test_concurrent_anonymize_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
|
|
203
|
|
204 self.assertTrue(self.o.is_alive())
|
|
205
|
|
206 self.assertEqual(1 + workers_count * repeat_count, len(self.o.studies.get_all_ids()))
|
|
207 self.assertEqual(2 * (1 + workers_count * repeat_count), len(self.o.series.get_all_ids()))
|
|
208 self.assertEqual(50 * (1 + workers_count * repeat_count), len(self.o.instances.get_all_ids()))
|
|
209
|
|
210 stats = self.o.get_json("/statistics")
|
|
211 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountPatients"))
|
|
212 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountStudies"))
|
|
213 self.assertEqual(2 * (1 + workers_count * repeat_count), stats.get("CountSeries"))
|
|
214 self.assertEqual(50 * (1 + workers_count * repeat_count), stats.get("CountInstances"))
|
|
215
|
|
216 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
|
|
217
|
|
218 self.assertEqual(0, len(self.o.studies.get_all_ids()))
|
|
219 self.assertEqual(0, len(self.o.series.get_all_ids()))
|
|
220 self.assertEqual(0, len(self.o.instances.get_all_ids()))
|
|
221
|
|
222 stats = self.o.get_json("/statistics")
|
|
223 self.assertEqual(0, stats.get("CountPatients"))
|
|
224 self.assertEqual(0, stats.get("CountStudies"))
|
|
225 self.assertEqual(0, stats.get("CountSeries"))
|
|
226 self.assertEqual(0, stats.get("CountInstances"))
|
|
227 self.assertEqual(0, int(stats.get("TotalDiskSize")))
|
|
228 self.assertTrue(self.is_storage_empty(self._storage_name))
|
|
229
|
|
230
|
|
231 def test_upload_delete_same_study_from_multiple_threads(self):
|
|
232 self.o.delete_all_content()
|
|
233 self.clear_storage(storage_name=self._storage_name)
|
|
234
|
|
235 start_time = time.time()
|
|
236 workers_count = 5
|
|
237 repeat_count = 30
|
|
238
|
|
239 # massively upload and delete the same study. Each worker is writing a part of the instances and deleting them.
|
|
240 # We are trying to have multiple workers deleting the last instance of a study at the same time.
|
|
241 self.execute_workers(
|
|
242 worker_func=worker_upload_delete_study_part,
|
|
243 worker_args=(self.o._root_url, here / "../../Database/Knee/T1", repeat_count, workers_count, ),
|
|
244 workers_count=workers_count)
|
|
245
|
|
246 elapsed = time.time() - start_time
|
|
247 print(f"test_upload_delete_same_study_from_multiple_threads with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
|
|
248
|
|
249 self.assertTrue(self.o.is_alive())
|
|
250
|
|
251 self.assertEqual(0, len(self.o.studies.get_all_ids()))
|
|
252 self.assertEqual(0, len(self.o.series.get_all_ids()))
|
|
253 self.assertEqual(0, len(self.o.instances.get_all_ids()))
|
|
254
|
|
255 stats = self.o.get_json("/statistics")
|
|
256 self.assertEqual(0, stats.get("CountPatients"))
|
|
257 self.assertEqual(0, stats.get("CountStudies"))
|
|
258 self.assertEqual(0, stats.get("CountSeries"))
|
|
259 self.assertEqual(0, stats.get("CountInstances"))
|
|
260 self.assertEqual(0, int(stats.get("TotalDiskSize")))
|
|
261 self.assertTrue(self.is_storage_empty(self._storage_name))
|
|
262
|
|
263 # transfers + dicomweb |