Mercurial > hg > orthanc-tests
annotate Tests/Toolbox.py @ 83:3f2170efa8d2
patches for python3
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 23 Jun 2016 18:04:59 +0200 |
parents | 97acfdf0dbce |
children | 2af6c0fb850d |
rev | line source |
---|---|
0 | 1 #!/usr/bin/python |
2 | |
1 | 3 # Orthanc - A Lightweight, RESTful DICOM Store |
73 | 4 # Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics |
1 | 5 # Department, University Hospital of Liege, Belgium |
6 # | |
7 # This program is free software: you can redistribute it and/or | |
8 # modify it under the terms of the GNU General Public License as | |
9 # published by the Free Software Foundation, either version 3 of the | |
10 # License, or (at your option) any later version. | |
11 # | |
12 # This program is distributed in the hope that it will be useful, but | |
13 # WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
15 # General Public License for more details. | |
16 # | |
17 # You should have received a copy of the GNU General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
0 | 19 |
20 | |
21 import hashlib | |
22 import httplib2 | |
23 import json | |
4 | 24 import os |
1 | 25 import re |
4 | 26 import signal |
1 | 27 import subprocess |
4 | 28 import threading |
83 | 29 import sys |
0 | 30 import time |
1 | 31 import zipfile |
0 | 32 |
3 | 33 from PIL import Image |
83 | 34 |
35 if (sys.version_info >= (3, 0)): | |
36 from urllib.parse import urlencode | |
37 from io import StringIO | |
38 from io import BytesIO | |
39 | |
40 else: | |
41 from urllib import urlencode | |
42 | |
43 # http://stackoverflow.com/a/1313868/881731 | |
44 try: | |
45 from cStringIO import StringIO | |
46 except: | |
47 from StringIO import StringIO | |
3 | 48 |
0 | 49 |
83 | 50 def _DecodeJson(s): |
51 t = s | |
52 | |
53 if (sys.version_info >= (3, 0)): | |
54 try: | |
55 t = s.decode() | |
56 except: | |
57 pass | |
58 | |
59 try: | |
60 return json.loads(t) | |
61 except: | |
62 return t | |
0 | 63 |
64 | |
13 | 65 def DefineOrthanc(server = 'localhost', |
66 restPort = 8042, | |
1 | 67 username = None, |
68 password = None, | |
69 aet = 'ORTHANC', | |
70 dicomPort = 4242): | |
13 | 71 #m = re.match(r'(http|https)://([^:]+):([^@]+)@([^@]+)', url) |
72 #if m != None: | |
73 # url = m.groups()[0] + '://' + m.groups()[3] | |
74 # username = m.groups()[1] | |
75 # password = m.groups()[2] | |
0 | 76 |
13 | 77 #if not url.endswith('/'): |
78 # url += '/' | |
0 | 79 |
1 | 80 return { |
13 | 81 'Server' : server, |
82 'Url' : 'http://%s:%d/' % (server, restPort), | |
1 | 83 'Username' : username, |
84 'Password' : password, | |
85 'DicomAet' : aet, | |
86 'DicomPort' : dicomPort | |
87 } | |
0 | 88 |
89 | |
90 def _SetupCredentials(orthanc, http): | |
1 | 91 if (orthanc['Username'] != None and |
92 orthanc['Password'] != None): | |
93 http.add_credentials(orthanc['Username'], orthanc['Password']) | |
0 | 94 |
28 | 95 def DoGetRaw(orthanc, uri, data = {}, body = None, headers = {}): |
0 | 96 d = '' |
97 if len(data.keys()) > 0: | |
98 d = '?' + urlencode(data) | |
99 | |
100 http = httplib2.Http() | |
21
2a29bcff60a7
tests of image decoding
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
13
diff
changeset
|
101 http.follow_redirects = False |
0 | 102 _SetupCredentials(orthanc, http) |
103 | |
1 | 104 resp, content = http.request(orthanc['Url'] + uri + d, 'GET', body = body, |
0 | 105 headers = headers) |
28 | 106 return (resp, content) |
107 | |
108 | |
109 def DoGet(orthanc, uri, data = {}, body = None, headers = {}): | |
110 (resp, content) = DoGetRaw(orthanc, uri, data = data, body = body, headers = headers) | |
111 | |
0 | 112 if not (resp.status in [ 200 ]): |
113 raise Exception(resp.status) | |
114 else: | |
83 | 115 return _DecodeJson(content) |
0 | 116 |
117 def _DoPutOrPost(orthanc, uri, method, data, contentType, headers): | |
118 http = httplib2.Http() | |
21
2a29bcff60a7
tests of image decoding
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
13
diff
changeset
|
119 http.follow_redirects = False |
0 | 120 _SetupCredentials(orthanc, http) |
121 | |
83 | 122 if isinstance(data, (str, bytearray, bytes)): |
0 | 123 body = data |
124 if len(contentType) != 0: | |
125 headers['content-type'] = contentType | |
126 else: | |
127 body = json.dumps(data) | |
128 headers['content-type'] = 'application/json' | |
129 | |
130 headers['expect'] = '' | |
131 | |
1 | 132 resp, content = http.request(orthanc['Url'] + uri, method, |
0 | 133 body = body, |
134 headers = headers) | |
135 if not (resp.status in [ 200, 302 ]): | |
136 raise Exception(resp.status) | |
137 else: | |
83 | 138 return _DecodeJson(content) |
0 | 139 |
140 def DoDelete(orthanc, uri): | |
141 http = httplib2.Http() | |
21
2a29bcff60a7
tests of image decoding
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
13
diff
changeset
|
142 http.follow_redirects = False |
0 | 143 _SetupCredentials(orthanc, http) |
144 | |
1 | 145 resp, content = http.request(orthanc['Url'] + uri, 'DELETE') |
0 | 146 if not (resp.status in [ 200 ]): |
147 raise Exception(resp.status) | |
148 else: | |
83 | 149 return _DecodeJson(content) |
0 | 150 |
151 def DoPut(orthanc, uri, data = {}, contentType = ''): | |
10 | 152 return _DoPutOrPost(orthanc, uri, 'PUT', data, contentType, {}) |
0 | 153 |
154 def DoPost(orthanc, uri, data = {}, contentType = '', headers = {}): | |
155 return _DoPutOrPost(orthanc, uri, 'POST', data, contentType, headers) | |
156 | |
13 | 157 def GetDatabasePath(filename): |
158 return os.path.join(os.path.dirname(__file__), '..', 'Database', filename) | |
159 | |
0 | 160 def UploadInstance(orthanc, filename): |
83 | 161 with open(GetDatabasePath(filename), 'rb') as f: |
162 d = f.read() | |
163 | |
0 | 164 return DoPost(orthanc, '/instances', d, 'application/dicom') |
165 | |
166 def UploadFolder(orthanc, path): | |
13 | 167 for i in os.listdir(GetDatabasePath(path)): |
1 | 168 try: |
169 UploadInstance(orthanc, os.path.join(path, i)) | |
170 except: | |
171 pass | |
0 | 172 |
173 def DropOrthanc(orthanc): | |
174 # Reset the Lua callbacks | |
175 DoPost(orthanc, '/tools/execute-script', 'function OnStoredInstance(instanceId, tags, metadata) end', 'application/lua') | |
176 | |
177 DoDelete(orthanc, '/exports') | |
178 | |
179 for s in DoGet(orthanc, '/patients'): | |
180 DoDelete(orthanc, '/patients/%s' % s) | |
181 | |
182 def ComputeMD5(data): | |
183 m = hashlib.md5() | |
184 m.update(data) | |
185 return m.hexdigest() | |
186 | |
59
84378ada15ab
test_decode_brainix_as_jpeg
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
44
diff
changeset
|
187 def GetImage(orthanc, uri, headers = {}): |
0 | 188 # http://www.pythonware.com/library/pil/handbook/introduction.htm |
59
84378ada15ab
test_decode_brainix_as_jpeg
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
44
diff
changeset
|
189 data = DoGet(orthanc, uri, headers = headers) |
83 | 190 if (sys.version_info >= (3, 0)): |
191 return Image.open(BytesIO(data)) | |
192 else: | |
193 return Image.open(StringIO(data)) | |
0 | 194 |
195 def GetArchive(orthanc, uri): | |
196 # http://stackoverflow.com/a/1313868/881731 | |
197 s = DoGet(orthanc, uri) | |
83 | 198 |
199 if (sys.version_info >= (3, 0)): | |
200 return zipfile.ZipFile(BytesIO(s), "r") | |
201 else: | |
202 return zipfile.ZipFile(StringIO(s), "r") | |
0 | 203 |
1 | 204 def IsDefinedInLua(orthanc, name): |
0 | 205 s = DoPost(orthanc, '/tools/execute-script', 'print(type(%s))' % name, 'application/lua') |
206 return (s.strip() != 'nil') | |
207 | |
1 | 208 def WaitEmpty(orthanc): |
0 | 209 while True: |
1 | 210 if len(DoGet(orthanc, '/instances')) == 0: |
0 | 211 return |
212 time.sleep(0.1) | |
213 | |
1 | 214 def GetDockerHostAddress(): |
215 route = subprocess.check_output([ '/sbin/ip', 'route' ]) | |
216 m = re.search(r'default via ([0-9.]+)', route) | |
217 if m == None: | |
218 return 'localhost' | |
219 else: | |
220 return m.groups()[0] | |
4 | 221 |
44
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
222 def FindExecutable(name): |
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
223 p = os.path.join('/usr/local/bin', name) |
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
224 if os.path.isfile(p): |
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
225 return p |
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
226 |
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
227 p = os.path.join('/usr/local/sbin', name) |
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
228 if os.path.isfile(p): |
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
229 return p |
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
230 |
ffa542cce638
Toolbox.FindExecutable()
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
28
diff
changeset
|
231 return name |
4 | 232 |
233 | |
234 | |
235 class ExternalCommandThread: | |
236 @staticmethod | |
237 def ExternalCommandFunction(arg, stop_event, command, env): | |
238 external = subprocess.Popen(command, env = env) | |
239 | |
240 while (not stop_event.is_set()): | |
241 error = external.poll() | |
242 if error != None: | |
243 # http://stackoverflow.com/a/1489838/881731 | |
244 os._exit(-1) | |
245 stop_event.wait(0.1) | |
246 | |
83 | 247 print('Stopping the external command') |
4 | 248 external.terminate() |
9 | 249 external.communicate() # Wait for the command to stop |
4 | 250 |
251 def __init__(self, command, env = None): | |
252 self.thread_stop = threading.Event() | |
253 self.thread = threading.Thread(target = self.ExternalCommandFunction, | |
254 args = (10, self.thread_stop, command, env)) | |
9 | 255 #self.daemon = True |
4 | 256 self.thread.start() |
257 | |
258 def stop(self): | |
259 self.thread_stop.set() | |
260 self.thread.join() |