Mercurial > hg > orthanc-tests
comparison Plugins/Transfers/Run.py @ 232:f2af7bdc9bf8
tests for transfers accelerator
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Tue, 05 Mar 2019 07:02:42 +0100 |
parents | |
children | 943166deebcb |
comparison
equal
deleted
inserted
replaced
231:4087505ddfe3 | 232:f2af7bdc9bf8 |
---|---|
1 #!/usr/bin/python | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 | |
5 # Orthanc - A Lightweight, RESTful DICOM Store | |
6 # Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics | |
7 # Department, University Hospital of Liege, Belgium | |
8 # Copyright (C) 2017-2019 Osimis S.A., Belgium | |
9 # | |
10 # This program is free software: you can redistribute it and/or | |
11 # modify it under the terms of the GNU General Public License as | |
12 # published by the Free Software Foundation, either version 3 of the | |
13 # License, or (at your option) any later version. | |
14 # | |
15 # This program is distributed in the hope that it will be useful, but | |
16 # WITHOUT ANY WARRANTY; without even the implied warranty of | |
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
18 # General Public License for more details. | |
19 # | |
20 # You should have received a copy of the GNU General Public License | |
21 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
22 | |
23 | |
24 | |
25 import os | |
26 import pprint | |
27 import sys | |
28 import argparse | |
29 import unittest | |
30 import re | |
31 | |
32 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'Tests')) | |
33 from Toolbox import * | |
34 | |
35 | |
36 ## | |
37 ## Parse the command-line arguments | |
38 ## | |
39 | |
40 parser = argparse.ArgumentParser(description = 'Run the integration tests for the DICOMweb plugin.') | |
41 | |
42 parser.add_argument('--server', | |
43 default = 'localhost', | |
44 help = 'Address of the Orthanc server to test') | |
45 parser.add_argument('--rest', | |
46 type = int, | |
47 default = 8042, | |
48 help = 'Port to the REST API') | |
49 parser.add_argument('--username', | |
50 default = 'alice', | |
51 help = 'Username to the REST API') | |
52 parser.add_argument('--password', | |
53 default = 'orthanctest', | |
54 help = 'Password to the REST API') | |
55 parser.add_argument('--force', help = 'Do not warn the user', | |
56 action = 'store_true') | |
57 parser.add_argument('options', metavar = 'N', nargs = '*', | |
58 help='Arguments to Python unittest') | |
59 | |
60 args = parser.parse_args() | |
61 | |
62 | |
63 ## | |
64 ## Configure the testing context | |
65 ## | |
66 | |
67 if not args.force: | |
68 print(""" | |
69 WARNING: This test will remove all the content of your | |
70 Orthanc instance running on %s! | |
71 | |
72 Are you sure ["yes" to go on]?""" % args.server) | |
73 | |
74 if sys.stdin.readline().strip() != 'yes': | |
75 print('Aborting...') | |
76 exit(0) | |
77 | |
78 | |
79 ORTHANC = DefineOrthanc(server = args.server, | |
80 username = args.username, | |
81 password = args.password, | |
82 restPort = args.rest) | |
83 | |
84 | |
85 ## | |
86 ## The tests | |
87 ## | |
88 | |
89 class Orthanc(unittest.TestCase): | |
90 def setUp(self): | |
91 if (sys.version_info >= (3, 0)): | |
92 # Remove annoying warnings about unclosed socket in Python 3 | |
93 import warnings | |
94 warnings.simplefilter("ignore", ResourceWarning) | |
95 | |
96 DropOrthanc(ORTHANC) | |
97 | |
98 | |
99 def test_list_peers(self): | |
100 peers = DoGet(ORTHANC, '/transfers/peers') | |
101 self.assertEqual(3, len(peers)) | |
102 self.assertEqual('disabled', peers['peer']) | |
103 self.assertEqual('installed', peers['transfers-simple']) | |
104 self.assertEqual('bidirectional', peers['transfers-bidirectional']) | |
105 | |
106 | |
107 def test_pull(self): | |
108 i = UploadInstance(ORTHANC, 'DummyCT.dcm')['ID'] | |
109 | |
110 a = DoPost(ORTHANC, '/transfers/pull', { | |
111 'Compression' : 'gzip', | |
112 'Peer' : 'transfers-simple', | |
113 'Resources' : [ | |
114 { | |
115 'Level' : 'Instance', | |
116 'ID' : i | |
117 }, | |
118 ], | |
119 'Priority' : 10, | |
120 }) | |
121 | |
122 WaitJobDone(ORTHANC, a['ID']) | |
123 | |
124 b = DoGet(ORTHANC, '/jobs/%s' % a['ID']) | |
125 self.assertEqual('PullTransfer', b['Type']) | |
126 self.assertEqual('Success', b['State']) | |
127 self.assertEqual(a['ID'], b['ID']) | |
128 self.assertEqual(10, b['Priority']) | |
129 | |
130 self.assertEqual('gzip', b['Content']['Compression']) | |
131 self.assertEqual(1, b['Content']['CompletedHttpQueries']) | |
132 self.assertEqual('transfers-simple', b['Content']['Peer']) | |
133 self.assertEqual(1, b['Content']['TotalInstances']) | |
134 | |
135 | |
136 def test_send_push(self): | |
137 i = UploadInstance(ORTHANC, 'DummyCT.dcm')['ID'] | |
138 | |
139 a = DoPost(ORTHANC, '/transfers/send', { | |
140 'Compression' : 'gzip', | |
141 'Peer' : 'transfers-simple', | |
142 'Resources' : [ | |
143 { | |
144 'Level' : 'Instance', | |
145 'ID' : i | |
146 }, | |
147 ], | |
148 'Priority' : -10, | |
149 }) | |
150 | |
151 WaitJobDone(ORTHANC, a['ID']) | |
152 | |
153 b = DoGet(ORTHANC, '/jobs/%s' % a['ID']) | |
154 self.assertEqual('PushTransfer', b['Type']) | |
155 self.assertEqual('Success', b['State']) | |
156 self.assertEqual(a['ID'], b['ID']) | |
157 self.assertEqual(-10, b['Priority']) | |
158 | |
159 self.assertEqual('gzip', b['Content']['Compression']) | |
160 self.assertEqual(1, b['Content']['CompletedHttpQueries']) | |
161 self.assertEqual('transfers-simple', b['Content']['Peer']) | |
162 self.assertEqual(1, b['Content']['TotalInstances']) | |
163 | |
164 | |
165 def test_send_bidirectional(self): | |
166 i = UploadInstance(ORTHANC, 'DummyCT.dcm')['ID'] | |
167 | |
168 a = DoPost(ORTHANC, '/transfers/send', { | |
169 'Compression' : 'gzip', | |
170 'Peer' : 'transfers-bidirectional', | |
171 'Resources' : [ | |
172 { | |
173 'Level' : 'Instance', | |
174 'ID' : i | |
175 }, | |
176 ], | |
177 'Priority' : 42, | |
178 }) | |
179 | |
180 self.assertEqual(3, len(a)) | |
181 self.assertEqual('transfers-bidirectional', a['Peer']) | |
182 self.assertTrue('RemoteJob' in a) | |
183 self.assertTrue('URL' in a) | |
184 | |
185 # In this integration test, the remote peer is the same as the local peer | |
186 WaitJobDone(ORTHANC, a['RemoteJob']) | |
187 | |
188 b = DoGet(ORTHANC, '/jobs/%s' % a['RemoteJob']) | |
189 self.assertEqual('PullTransfer', b['Type']) | |
190 self.assertEqual('Success', b['State']) | |
191 self.assertEqual(a['RemoteJob'], b['ID']) | |
192 self.assertEqual(0, b['Priority']) # Priority is chosen by the remote peer | |
193 | |
194 self.assertEqual('gzip', b['Content']['Compression']) | |
195 self.assertEqual(1, b['Content']['CompletedHttpQueries']) | |
196 self.assertEqual('transfers-bidirectional', b['Content']['Peer']) | |
197 self.assertEqual(1, b['Content']['TotalInstances']) | |
198 | |
199 try: | |
200 print('\nStarting the tests...') | |
201 unittest.main(argv = [ sys.argv[0] ] + args.options) | |
202 | |
203 finally: | |
204 print('\nDone') |