comparison NewTests/Concurrency/test_concurrency.py @ 593:f4f0e2d24a51

more pg-transactions tests
author Alain Mazy <am@osimis.io>
date Thu, 14 Dec 2023 10:41:09 +0100
parents 6753d96dd71f
children 27be80b4b1a9
comparison
equal deleted inserted replaced
592:6753d96dd71f 593:f4f0e2d24a51
148 worker_func=worker_upload_folder, 148 worker_func=worker_upload_folder,
149 worker_args=(self.o._root_url, here / "../../Database/Knee", repeat_count,), 149 worker_args=(self.o._root_url, here / "../../Database/Knee", repeat_count,),
150 workers_count=workers_count) 150 workers_count=workers_count)
151 151
152 elapsed = time.time() - start_time 152 elapsed = time.time() - start_time
153 print(f"test_concurrent_uploads_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") 153 print(f"TIMING test_concurrent_uploads_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
154 154
155 self.assertTrue(self.o.is_alive()) 155 self.assertTrue(self.o.is_alive())
156 156
157 self.assertEqual(1, len(self.o.studies.get_all_ids())) 157 self.assertEqual(1, len(self.o.studies.get_all_ids()))
158 self.assertEqual(2, len(self.o.series.get_all_ids())) 158 self.assertEqual(2, len(self.o.series.get_all_ids()))
164 self.assertEqual(2, stats.get("CountSeries")) 164 self.assertEqual(2, stats.get("CountSeries"))
165 self.assertEqual(50, stats.get("CountInstances")) 165 self.assertEqual(50, stats.get("CountInstances"))
166 self.assertEqual(4118738, int(stats.get("TotalDiskSize"))) 166 self.assertEqual(4118738, int(stats.get("TotalDiskSize")))
167 167
168 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids()) 168 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
169
170 self.assertEqual(0, len(self.o.studies.get_all_ids()))
171 self.assertEqual(0, len(self.o.series.get_all_ids()))
172 self.assertEqual(0, len(self.o.instances.get_all_ids()))
173
174 stats = self.o.get_json("/statistics")
175 self.assertEqual(0, stats.get("CountPatients"))
176 self.assertEqual(0, stats.get("CountStudies"))
177 self.assertEqual(0, stats.get("CountSeries"))
178 self.assertEqual(0, stats.get("CountInstances"))
179 self.assertEqual(0, int(stats.get("TotalDiskSize")))
180 # time.sleep(10000)
181 self.assertTrue(self.is_storage_empty(self._storage_name))
182
183 def test_concurrent_anonymize_same_study(self):
184 self.o.delete_all_content()
185 self.clear_storage(storage_name=self._storage_name)
186
187 self.o.upload_folder(here / "../../Database/Knee")
188 study_id = self.o.studies.get_all_ids()[0]
189
190 start_time = time.time()
191 workers_count = 4
192 repeat_count = 10
193
194 # massively anonymize the same study. This generates new studies and is a
195 # good way to simulate ingestion of new studies
196 self.execute_workers(
197 worker_func=worker_anonymize_study,
198 worker_args=(self.o._root_url, study_id, repeat_count,),
199 workers_count=workers_count)
200
201 elapsed = time.time() - start_time
202 print(f"TIMING test_concurrent_anonymize_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
203
204 self.assertTrue(self.o.is_alive())
205
206 self.assertEqual(1 + workers_count * repeat_count, len(self.o.studies.get_all_ids()))
207 self.assertEqual(2 * (1 + workers_count * repeat_count), len(self.o.series.get_all_ids()))
208 self.assertEqual(50 * (1 + workers_count * repeat_count), len(self.o.instances.get_all_ids()))
209
210 stats = self.o.get_json("/statistics")
211 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountPatients"))
212 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountStudies"))
213 self.assertEqual(2 * (1 + workers_count * repeat_count), stats.get("CountSeries"))
214 self.assertEqual(50 * (1 + workers_count * repeat_count), stats.get("CountInstances"))
215
216 start_time = time.time()
217
218 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
219
220 elapsed = time.time() - start_time
221 print(f"TIMING test_concurrent_anonymize_same_study deletion took: {elapsed:.3f} s")
169 222
170 self.assertEqual(0, len(self.o.studies.get_all_ids())) 223 self.assertEqual(0, len(self.o.studies.get_all_ids()))
171 self.assertEqual(0, len(self.o.series.get_all_ids())) 224 self.assertEqual(0, len(self.o.series.get_all_ids()))
172 self.assertEqual(0, len(self.o.instances.get_all_ids())) 225 self.assertEqual(0, len(self.o.instances.get_all_ids()))
173 226
178 self.assertEqual(0, stats.get("CountInstances")) 231 self.assertEqual(0, stats.get("CountInstances"))
179 self.assertEqual(0, int(stats.get("TotalDiskSize"))) 232 self.assertEqual(0, int(stats.get("TotalDiskSize")))
180 self.assertTrue(self.is_storage_empty(self._storage_name)) 233 self.assertTrue(self.is_storage_empty(self._storage_name))
181 234
182 235
183 def test_concurrent_anonymize_same_study(self): 236 def test_upload_delete_same_study_from_multiple_threads(self):
184 self.o.delete_all_content() 237 self.o.delete_all_content()
185 self.clear_storage(storage_name=self._storage_name) 238 self.clear_storage(storage_name=self._storage_name)
186 239
187 self.o.upload_folder(here / "../../Database/Knee") 240 start_time = time.time()
188 study_id = self.o.studies.get_all_ids()[0] 241 workers_count = 5
189 242 repeat_count = 30
190 start_time = time.time() 243
191 workers_count = 4 244 # massively upload and delete the same study. Each worker is writing a part of the instances and deleting them.
192 repeat_count = 10 245 # We are trying to have multiple workers deleting the last instance of a study at the same time.
193
194 # massively anonymize the same study. This generates new studies and is a
195 # good way to simulate ingestion of new studies
196 self.execute_workers( 246 self.execute_workers(
197 worker_func=worker_anonymize_study, 247 worker_func=worker_upload_delete_study_part,
198 worker_args=(self.o._root_url, study_id, repeat_count,), 248 worker_args=(self.o._root_url, here / "../../Database/Knee/T1", repeat_count, workers_count, ),
199 workers_count=workers_count) 249 workers_count=workers_count)
200 250
201 elapsed = time.time() - start_time 251 elapsed = time.time() - start_time
202 print(f"test_concurrent_anonymize_same_study with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s") 252 print(f"TIMING test_upload_delete_same_study_from_multiple_threads with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
203 253
204 self.assertTrue(self.o.is_alive()) 254 self.assertTrue(self.o.is_alive())
205
206 self.assertEqual(1 + workers_count * repeat_count, len(self.o.studies.get_all_ids()))
207 self.assertEqual(2 * (1 + workers_count * repeat_count), len(self.o.series.get_all_ids()))
208 self.assertEqual(50 * (1 + workers_count * repeat_count), len(self.o.instances.get_all_ids()))
209
210 stats = self.o.get_json("/statistics")
211 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountPatients"))
212 self.assertEqual(1 + workers_count * repeat_count, stats.get("CountStudies"))
213 self.assertEqual(2 * (1 + workers_count * repeat_count), stats.get("CountSeries"))
214 self.assertEqual(50 * (1 + workers_count * repeat_count), stats.get("CountInstances"))
215
216 self.o.instances.delete(orthanc_ids=self.o.instances.get_all_ids())
217 255
218 self.assertEqual(0, len(self.o.studies.get_all_ids())) 256 self.assertEqual(0, len(self.o.studies.get_all_ids()))
219 self.assertEqual(0, len(self.o.series.get_all_ids())) 257 self.assertEqual(0, len(self.o.series.get_all_ids()))
220 self.assertEqual(0, len(self.o.instances.get_all_ids())) 258 self.assertEqual(0, len(self.o.instances.get_all_ids()))
221 259
225 self.assertEqual(0, stats.get("CountSeries")) 263 self.assertEqual(0, stats.get("CountSeries"))
226 self.assertEqual(0, stats.get("CountInstances")) 264 self.assertEqual(0, stats.get("CountInstances"))
227 self.assertEqual(0, int(stats.get("TotalDiskSize"))) 265 self.assertEqual(0, int(stats.get("TotalDiskSize")))
228 self.assertTrue(self.is_storage_empty(self._storage_name)) 266 self.assertTrue(self.is_storage_empty(self._storage_name))
229 267
230
231 def test_upload_delete_same_study_from_multiple_threads(self):
232 self.o.delete_all_content()
233 self.clear_storage(storage_name=self._storage_name)
234
235 start_time = time.time()
236 workers_count = 5
237 repeat_count = 30
238
239 # massively upload and delete the same study. Each worker is writing a part of the instances and deleting them.
240 # We are trying to have multiple workers deleting the last instance of a study at the same time.
241 self.execute_workers(
242 worker_func=worker_upload_delete_study_part,
243 worker_args=(self.o._root_url, here / "../../Database/Knee/T1", repeat_count, workers_count, ),
244 workers_count=workers_count)
245
246 elapsed = time.time() - start_time
247 print(f"test_upload_delete_same_study_from_multiple_threads with {workers_count} workers and {repeat_count}x repeat: {elapsed:.3f} s")
248
249 self.assertTrue(self.o.is_alive())
250
251 self.assertEqual(0, len(self.o.studies.get_all_ids()))
252 self.assertEqual(0, len(self.o.series.get_all_ids()))
253 self.assertEqual(0, len(self.o.instances.get_all_ids()))
254
255 stats = self.o.get_json("/statistics")
256 self.assertEqual(0, stats.get("CountPatients"))
257 self.assertEqual(0, stats.get("CountStudies"))
258 self.assertEqual(0, stats.get("CountSeries"))
259 self.assertEqual(0, stats.get("CountInstances"))
260 self.assertEqual(0, int(stats.get("TotalDiskSize")))
261 self.assertTrue(self.is_storage_empty(self._storage_name))
262
263 # transfers + dicomweb 268 # transfers + dicomweb