748
|
1 /**
|
|
2 * Stone of Orthanc
|
|
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
|
|
4 * Department, University Hospital of Liege, Belgium
|
|
5 * Copyright (C) 2017-2019 Osimis S.A., Belgium
|
|
6 *
|
|
7 * This program is free software: you can redistribute it and/or
|
|
8 * modify it under the terms of the GNU Affero General Public License
|
|
9 * as published by the Free Software Foundation, either version 3 of
|
|
10 * the License, or (at your option) any later version.
|
|
11 *
|
|
12 * This program is distributed in the hope that it will be useful, but
|
|
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
15 * Affero General Public License for more details.
|
|
16 *
|
|
17 * You should have received a copy of the GNU Affero General Public License
|
|
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
19 **/
|
|
20
|
|
21
|
|
22 #include "ThreadedOracle.h"
|
|
23
|
|
24 #include "GetOrthancImageCommand.h"
|
|
25 #include "GetOrthancWebViewerJpegCommand.h"
|
|
26 #include "OrthancRestApiCommand.h"
|
|
27 #include "SleepOracleCommand.h"
|
|
28 #include "OracleCommandExceptionMessage.h"
|
|
29
|
|
30 #include <Core/Compression/GzipCompressor.h>
|
|
31 #include <Core/HttpClient.h>
|
|
32 #include <Core/OrthancException.h>
|
|
33 #include <Core/Toolbox.h>
|
|
34
|
|
35
|
|
36 namespace OrthancStone
|
|
37 {
|
|
38 class ThreadedOracle::Item : public Orthanc::IDynamicObject
|
|
39 {
|
|
40 private:
|
|
41 const IObserver& receiver_;
|
|
42 std::auto_ptr<IOracleCommand> command_;
|
|
43
|
|
44 public:
|
|
45 Item(const IObserver& receiver,
|
|
46 IOracleCommand* command) :
|
|
47 receiver_(receiver),
|
|
48 command_(command)
|
|
49 {
|
|
50 if (command == NULL)
|
|
51 {
|
|
52 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
|
|
53 }
|
|
54 }
|
|
55
|
|
56 const IObserver& GetReceiver() const
|
|
57 {
|
|
58 return receiver_;
|
|
59 }
|
|
60
|
|
61 IOracleCommand& GetCommand()
|
|
62 {
|
|
63 assert(command_.get() != NULL);
|
|
64 return *command_;
|
|
65 }
|
|
66 };
|
|
67
|
|
68
|
|
69 class ThreadedOracle::SleepingCommands : public boost::noncopyable
|
|
70 {
|
|
71 private:
|
|
72 class Item
|
|
73 {
|
|
74 private:
|
|
75 const IObserver& receiver_;
|
|
76 std::auto_ptr<SleepOracleCommand> command_;
|
|
77 boost::posix_time::ptime expiration_;
|
|
78
|
|
79 public:
|
|
80 Item(const IObserver& receiver,
|
|
81 SleepOracleCommand* command) :
|
|
82 receiver_(receiver),
|
|
83 command_(command)
|
|
84 {
|
|
85 if (command == NULL)
|
|
86 {
|
|
87 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
|
|
88 }
|
|
89
|
|
90 expiration_ = (boost::posix_time::second_clock::local_time() +
|
|
91 boost::posix_time::milliseconds(command_->GetDelay()));
|
|
92 }
|
|
93
|
|
94 const boost::posix_time::ptime& GetExpirationTime() const
|
|
95 {
|
|
96 return expiration_;
|
|
97 }
|
|
98
|
|
99 void Awake(IMessageEmitter& emitter)
|
|
100 {
|
|
101 assert(command_.get() != NULL);
|
|
102
|
|
103 SleepOracleCommand::TimeoutMessage message(*command_);
|
|
104 emitter.EmitMessage(receiver_, message);
|
|
105 }
|
|
106 };
|
|
107
|
|
108 typedef std::list<Item*> Content;
|
|
109
|
|
110 boost::mutex mutex_;
|
|
111 Content content_;
|
|
112
|
|
113 public:
|
|
114 ~SleepingCommands()
|
|
115 {
|
|
116 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
|
|
117 {
|
|
118 if (*it != NULL)
|
|
119 {
|
|
120 delete *it;
|
|
121 }
|
|
122 }
|
|
123 }
|
|
124
|
|
125 void Add(const IObserver& receiver,
|
|
126 SleepOracleCommand* command) // Takes ownership
|
|
127 {
|
|
128 boost::mutex::scoped_lock lock(mutex_);
|
|
129
|
|
130 content_.push_back(new Item(receiver, command));
|
|
131 }
|
|
132
|
|
133 void AwakeExpired(IMessageEmitter& emitter)
|
|
134 {
|
|
135 boost::mutex::scoped_lock lock(mutex_);
|
|
136
|
|
137 const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
|
|
138
|
|
139 Content stillSleeping;
|
|
140
|
|
141 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
|
|
142 {
|
|
143 if (*it != NULL &&
|
|
144 (*it)->GetExpirationTime() <= now)
|
|
145 {
|
|
146 (*it)->Awake(emitter);
|
|
147 delete *it;
|
|
148 *it = NULL;
|
|
149 }
|
|
150 else
|
|
151 {
|
|
152 stillSleeping.push_back(*it);
|
|
153 }
|
|
154 }
|
|
155
|
|
156 // Compact the still-sleeping commands
|
|
157 content_ = stillSleeping;
|
|
158 }
|
|
159 };
|
|
160
|
|
161
|
|
162 static void CopyHttpHeaders(Orthanc::HttpClient& client,
|
|
163 const Orthanc::HttpClient::HttpHeaders& headers)
|
|
164 {
|
|
165 for (Orthanc::HttpClient::HttpHeaders::const_iterator
|
|
166 it = headers.begin(); it != headers.end(); it++ )
|
|
167 {
|
|
168 client.AddHeader(it->first, it->second);
|
|
169 }
|
|
170 }
|
|
171
|
|
172
|
|
173 static void DecodeAnswer(std::string& answer,
|
|
174 const Orthanc::HttpClient::HttpHeaders& headers)
|
|
175 {
|
|
176 Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None;
|
|
177
|
|
178 for (Orthanc::HttpClient::HttpHeaders::const_iterator it = headers.begin();
|
|
179 it != headers.end(); ++it)
|
|
180 {
|
|
181 std::string s;
|
|
182 Orthanc::Toolbox::ToLowerCase(s, it->first);
|
|
183
|
|
184 if (s == "content-encoding")
|
|
185 {
|
|
186 if (it->second == "gzip")
|
|
187 {
|
|
188 contentEncoding = Orthanc::HttpCompression_Gzip;
|
|
189 }
|
|
190 else
|
|
191 {
|
|
192 throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol,
|
|
193 "Unsupported HTTP Content-Encoding: " + it->second);
|
|
194 }
|
|
195
|
|
196 break;
|
|
197 }
|
|
198 }
|
|
199
|
|
200 if (contentEncoding == Orthanc::HttpCompression_Gzip)
|
|
201 {
|
|
202 std::string compressed;
|
|
203 answer.swap(compressed);
|
|
204
|
|
205 Orthanc::GzipCompressor compressor;
|
|
206 compressor.Uncompress(answer, compressed.c_str(), compressed.size());
|
|
207 }
|
|
208 }
|
|
209
|
|
210
|
|
211 static void Execute(IMessageEmitter& emitter,
|
|
212 const Orthanc::WebServiceParameters& orthanc,
|
|
213 const IObserver& receiver,
|
|
214 const OrthancRestApiCommand& command)
|
|
215 {
|
|
216 Orthanc::HttpClient client(orthanc, command.GetUri());
|
|
217 client.SetMethod(command.GetMethod());
|
|
218 client.SetTimeout(command.GetTimeout());
|
|
219
|
|
220 CopyHttpHeaders(client, command.GetHttpHeaders());
|
|
221
|
|
222 if (command.GetMethod() == Orthanc::HttpMethod_Post ||
|
|
223 command.GetMethod() == Orthanc::HttpMethod_Put)
|
|
224 {
|
|
225 client.SetBody(command.GetBody());
|
|
226 }
|
|
227
|
|
228 std::string answer;
|
|
229 Orthanc::HttpClient::HttpHeaders answerHeaders;
|
|
230 client.ApplyAndThrowException(answer, answerHeaders);
|
|
231
|
|
232 DecodeAnswer(answer, answerHeaders);
|
|
233
|
|
234 OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer);
|
|
235 emitter.EmitMessage(receiver, message);
|
|
236 }
|
|
237
|
|
238
|
|
239 static void Execute(IMessageEmitter& emitter,
|
|
240 const Orthanc::WebServiceParameters& orthanc,
|
|
241 const IObserver& receiver,
|
|
242 const GetOrthancImageCommand& command)
|
|
243 {
|
|
244 Orthanc::HttpClient client(orthanc, command.GetUri());
|
|
245 client.SetTimeout(command.GetTimeout());
|
|
246
|
|
247 CopyHttpHeaders(client, command.GetHttpHeaders());
|
|
248
|
|
249 std::string answer;
|
|
250 Orthanc::HttpClient::HttpHeaders answerHeaders;
|
|
251 client.ApplyAndThrowException(answer, answerHeaders);
|
|
252
|
|
253 DecodeAnswer(answer, answerHeaders);
|
|
254
|
|
255 command.ProcessHttpAnswer(emitter, receiver, answer, answerHeaders);
|
|
256 }
|
|
257
|
|
258
|
|
259 static void Execute(IMessageEmitter& emitter,
|
|
260 const Orthanc::WebServiceParameters& orthanc,
|
|
261 const IObserver& receiver,
|
|
262 const GetOrthancWebViewerJpegCommand& command)
|
|
263 {
|
|
264 Orthanc::HttpClient client(orthanc, command.GetUri());
|
|
265 client.SetTimeout(command.GetTimeout());
|
|
266
|
|
267 CopyHttpHeaders(client, command.GetHttpHeaders());
|
|
268
|
|
269 std::string answer;
|
|
270 Orthanc::HttpClient::HttpHeaders answerHeaders;
|
|
271 client.ApplyAndThrowException(answer, answerHeaders);
|
|
272
|
|
273 DecodeAnswer(answer, answerHeaders);
|
|
274
|
|
275 command.ProcessHttpAnswer(emitter, receiver, answer);
|
|
276 }
|
|
277
|
|
278
|
|
279 void ThreadedOracle::Step()
|
|
280 {
|
|
281 std::auto_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100));
|
|
282
|
|
283 if (object.get() != NULL)
|
|
284 {
|
|
285 Item& item = dynamic_cast<Item&>(*object);
|
|
286
|
|
287 try
|
|
288 {
|
|
289 switch (item.GetCommand().GetType())
|
|
290 {
|
|
291 case IOracleCommand::Type_Sleep:
|
|
292 {
|
|
293 SleepOracleCommand& command = dynamic_cast<SleepOracleCommand&>(item.GetCommand());
|
|
294
|
|
295 std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay()));
|
|
296
|
|
297 if (command.HasPayload())
|
|
298 {
|
|
299 copy->SetPayload(command.ReleasePayload());
|
|
300 }
|
|
301
|
|
302 sleepingCommands_->Add(item.GetReceiver(), copy.release());
|
|
303
|
|
304 break;
|
|
305 }
|
|
306
|
|
307 case IOracleCommand::Type_OrthancRestApi:
|
|
308 Execute(emitter_, orthanc_, item.GetReceiver(),
|
|
309 dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand()));
|
|
310 break;
|
|
311
|
|
312 case IOracleCommand::Type_GetOrthancImage:
|
|
313 Execute(emitter_, orthanc_, item.GetReceiver(),
|
|
314 dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand()));
|
|
315 break;
|
|
316
|
|
317 case IOracleCommand::Type_GetOrthancWebViewerJpeg:
|
|
318 Execute(emitter_, orthanc_, item.GetReceiver(),
|
|
319 dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand()));
|
|
320 break;
|
|
321
|
|
322 default:
|
|
323 throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented);
|
|
324 }
|
|
325 }
|
|
326 catch (Orthanc::OrthancException& e)
|
|
327 {
|
|
328 LOG(ERROR) << "Exception within the oracle: " << e.What();
|
|
329 emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e));
|
|
330 }
|
|
331 catch (...)
|
|
332 {
|
|
333 LOG(ERROR) << "Threaded exception within the oracle";
|
|
334 emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage
|
|
335 (item.GetCommand(), Orthanc::ErrorCode_InternalError));
|
|
336 }
|
|
337 }
|
|
338 }
|
|
339
|
|
340
|
|
341 void ThreadedOracle::Worker(ThreadedOracle* that)
|
|
342 {
|
|
343 assert(that != NULL);
|
|
344
|
|
345 for (;;)
|
|
346 {
|
|
347 {
|
|
348 boost::mutex::scoped_lock lock(that->mutex_);
|
|
349 if (that->state_ != State_Running)
|
|
350 {
|
|
351 return;
|
|
352 }
|
|
353 }
|
|
354
|
|
355 that->Step();
|
|
356 }
|
|
357 }
|
|
358
|
|
359
|
|
360 void ThreadedOracle::SleepingWorker(ThreadedOracle* that)
|
|
361 {
|
|
362 assert(that != NULL);
|
|
363
|
|
364 for (;;)
|
|
365 {
|
|
366 {
|
|
367 boost::mutex::scoped_lock lock(that->mutex_);
|
|
368 if (that->state_ != State_Running)
|
|
369 {
|
|
370 return;
|
|
371 }
|
|
372 }
|
|
373
|
|
374 that->sleepingCommands_->AwakeExpired(that->emitter_);
|
|
375
|
|
376 boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_));
|
|
377 }
|
|
378 }
|
|
379
|
|
380
|
|
381 void ThreadedOracle::StopInternal()
|
|
382 {
|
|
383 {
|
|
384 boost::mutex::scoped_lock lock(mutex_);
|
|
385
|
|
386 if (state_ == State_Setup ||
|
|
387 state_ == State_Stopped)
|
|
388 {
|
|
389 return;
|
|
390 }
|
|
391 else
|
|
392 {
|
|
393 state_ = State_Stopped;
|
|
394 }
|
|
395 }
|
|
396
|
|
397 if (sleepingWorker_.joinable())
|
|
398 {
|
|
399 sleepingWorker_.join();
|
|
400 }
|
|
401
|
|
402 for (size_t i = 0; i < workers_.size(); i++)
|
|
403 {
|
|
404 if (workers_[i] != NULL)
|
|
405 {
|
|
406 if (workers_[i]->joinable())
|
|
407 {
|
|
408 workers_[i]->join();
|
|
409 }
|
|
410
|
|
411 delete workers_[i];
|
|
412 }
|
|
413 }
|
|
414 }
|
|
415
|
|
416
|
|
417 ThreadedOracle::ThreadedOracle(IMessageEmitter& emitter) :
|
|
418 emitter_(emitter),
|
|
419 state_(State_Setup),
|
|
420 workers_(4),
|
|
421 sleepingCommands_(new SleepingCommands),
|
|
422 sleepingTimeResolution_(50) // By default, time resolution of 50ms
|
|
423 {
|
|
424 }
|
|
425
|
|
426
|
|
427 void ThreadedOracle::SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc)
|
|
428 {
|
|
429 boost::mutex::scoped_lock lock(mutex_);
|
|
430
|
|
431 if (state_ != State_Setup)
|
|
432 {
|
|
433 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
|
|
434 }
|
|
435 else
|
|
436 {
|
|
437 orthanc_ = orthanc;
|
|
438 }
|
|
439 }
|
|
440
|
|
441
|
|
442 void ThreadedOracle::SetWorkersCount(unsigned int count)
|
|
443 {
|
|
444 boost::mutex::scoped_lock lock(mutex_);
|
|
445
|
|
446 if (count <= 0)
|
|
447 {
|
|
448 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
|
|
449 }
|
|
450 else if (state_ != State_Setup)
|
|
451 {
|
|
452 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
|
|
453 }
|
|
454 else
|
|
455 {
|
|
456 workers_.resize(count);
|
|
457 }
|
|
458 }
|
|
459
|
|
460
|
|
461 void ThreadedOracle::SetSleepingTimeResolution(unsigned int milliseconds)
|
|
462 {
|
|
463 boost::mutex::scoped_lock lock(mutex_);
|
|
464
|
|
465 if (milliseconds <= 0)
|
|
466 {
|
|
467 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
|
|
468 }
|
|
469 else if (state_ != State_Setup)
|
|
470 {
|
|
471 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
|
|
472 }
|
|
473 else
|
|
474 {
|
|
475 sleepingTimeResolution_ = milliseconds;
|
|
476 }
|
|
477 }
|
|
478
|
|
479
|
|
480 void ThreadedOracle::Start()
|
|
481 {
|
|
482 boost::mutex::scoped_lock lock(mutex_);
|
|
483
|
|
484 if (state_ != State_Setup)
|
|
485 {
|
|
486 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
|
|
487 }
|
|
488 else
|
|
489 {
|
|
490 state_ = State_Running;
|
|
491
|
|
492 for (unsigned int i = 0; i < workers_.size(); i++)
|
|
493 {
|
|
494 workers_[i] = new boost::thread(Worker, this);
|
|
495 }
|
|
496
|
|
497 sleepingWorker_ = boost::thread(SleepingWorker, this);
|
|
498 }
|
|
499 }
|
|
500
|
|
501
|
|
502 void ThreadedOracle::Schedule(const IObserver& receiver,
|
|
503 IOracleCommand* command)
|
|
504 {
|
|
505 queue_.Enqueue(new Item(receiver, command));
|
|
506 }
|
|
507 }
|