comparison Framework/Oracle/ThreadedOracle.cpp @ 748:ab236bb5dbc7

ThreadedOracle
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 22 May 2019 14:46:26 +0200
parents
children 1181e1ad98ec
comparison
equal deleted inserted replaced
746:d716bfb3e07c 748:ab236bb5dbc7
1 /**
2 * Stone of Orthanc
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2019 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU Affero General Public License
9 * as published by the Free Software Foundation, either version 3 of
10 * the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 **/
20
21
22 #include "ThreadedOracle.h"
23
24 #include "GetOrthancImageCommand.h"
25 #include "GetOrthancWebViewerJpegCommand.h"
26 #include "OrthancRestApiCommand.h"
27 #include "SleepOracleCommand.h"
28 #include "OracleCommandExceptionMessage.h"
29
30 #include <Core/Compression/GzipCompressor.h>
31 #include <Core/HttpClient.h>
32 #include <Core/OrthancException.h>
33 #include <Core/Toolbox.h>
34
35
36 namespace OrthancStone
37 {
38 class ThreadedOracle::Item : public Orthanc::IDynamicObject
39 {
40 private:
41 const IObserver& receiver_;
42 std::auto_ptr<IOracleCommand> command_;
43
44 public:
45 Item(const IObserver& receiver,
46 IOracleCommand* command) :
47 receiver_(receiver),
48 command_(command)
49 {
50 if (command == NULL)
51 {
52 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
53 }
54 }
55
56 const IObserver& GetReceiver() const
57 {
58 return receiver_;
59 }
60
61 IOracleCommand& GetCommand()
62 {
63 assert(command_.get() != NULL);
64 return *command_;
65 }
66 };
67
68
69 class ThreadedOracle::SleepingCommands : public boost::noncopyable
70 {
71 private:
72 class Item
73 {
74 private:
75 const IObserver& receiver_;
76 std::auto_ptr<SleepOracleCommand> command_;
77 boost::posix_time::ptime expiration_;
78
79 public:
80 Item(const IObserver& receiver,
81 SleepOracleCommand* command) :
82 receiver_(receiver),
83 command_(command)
84 {
85 if (command == NULL)
86 {
87 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
88 }
89
90 expiration_ = (boost::posix_time::second_clock::local_time() +
91 boost::posix_time::milliseconds(command_->GetDelay()));
92 }
93
94 const boost::posix_time::ptime& GetExpirationTime() const
95 {
96 return expiration_;
97 }
98
99 void Awake(IMessageEmitter& emitter)
100 {
101 assert(command_.get() != NULL);
102
103 SleepOracleCommand::TimeoutMessage message(*command_);
104 emitter.EmitMessage(receiver_, message);
105 }
106 };
107
108 typedef std::list<Item*> Content;
109
110 boost::mutex mutex_;
111 Content content_;
112
113 public:
114 ~SleepingCommands()
115 {
116 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
117 {
118 if (*it != NULL)
119 {
120 delete *it;
121 }
122 }
123 }
124
125 void Add(const IObserver& receiver,
126 SleepOracleCommand* command) // Takes ownership
127 {
128 boost::mutex::scoped_lock lock(mutex_);
129
130 content_.push_back(new Item(receiver, command));
131 }
132
133 void AwakeExpired(IMessageEmitter& emitter)
134 {
135 boost::mutex::scoped_lock lock(mutex_);
136
137 const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
138
139 Content stillSleeping;
140
141 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
142 {
143 if (*it != NULL &&
144 (*it)->GetExpirationTime() <= now)
145 {
146 (*it)->Awake(emitter);
147 delete *it;
148 *it = NULL;
149 }
150 else
151 {
152 stillSleeping.push_back(*it);
153 }
154 }
155
156 // Compact the still-sleeping commands
157 content_ = stillSleeping;
158 }
159 };
160
161
162 static void CopyHttpHeaders(Orthanc::HttpClient& client,
163 const Orthanc::HttpClient::HttpHeaders& headers)
164 {
165 for (Orthanc::HttpClient::HttpHeaders::const_iterator
166 it = headers.begin(); it != headers.end(); it++ )
167 {
168 client.AddHeader(it->first, it->second);
169 }
170 }
171
172
173 static void DecodeAnswer(std::string& answer,
174 const Orthanc::HttpClient::HttpHeaders& headers)
175 {
176 Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None;
177
178 for (Orthanc::HttpClient::HttpHeaders::const_iterator it = headers.begin();
179 it != headers.end(); ++it)
180 {
181 std::string s;
182 Orthanc::Toolbox::ToLowerCase(s, it->first);
183
184 if (s == "content-encoding")
185 {
186 if (it->second == "gzip")
187 {
188 contentEncoding = Orthanc::HttpCompression_Gzip;
189 }
190 else
191 {
192 throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol,
193 "Unsupported HTTP Content-Encoding: " + it->second);
194 }
195
196 break;
197 }
198 }
199
200 if (contentEncoding == Orthanc::HttpCompression_Gzip)
201 {
202 std::string compressed;
203 answer.swap(compressed);
204
205 Orthanc::GzipCompressor compressor;
206 compressor.Uncompress(answer, compressed.c_str(), compressed.size());
207 }
208 }
209
210
211 static void Execute(IMessageEmitter& emitter,
212 const Orthanc::WebServiceParameters& orthanc,
213 const IObserver& receiver,
214 const OrthancRestApiCommand& command)
215 {
216 Orthanc::HttpClient client(orthanc, command.GetUri());
217 client.SetMethod(command.GetMethod());
218 client.SetTimeout(command.GetTimeout());
219
220 CopyHttpHeaders(client, command.GetHttpHeaders());
221
222 if (command.GetMethod() == Orthanc::HttpMethod_Post ||
223 command.GetMethod() == Orthanc::HttpMethod_Put)
224 {
225 client.SetBody(command.GetBody());
226 }
227
228 std::string answer;
229 Orthanc::HttpClient::HttpHeaders answerHeaders;
230 client.ApplyAndThrowException(answer, answerHeaders);
231
232 DecodeAnswer(answer, answerHeaders);
233
234 OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer);
235 emitter.EmitMessage(receiver, message);
236 }
237
238
239 static void Execute(IMessageEmitter& emitter,
240 const Orthanc::WebServiceParameters& orthanc,
241 const IObserver& receiver,
242 const GetOrthancImageCommand& command)
243 {
244 Orthanc::HttpClient client(orthanc, command.GetUri());
245 client.SetTimeout(command.GetTimeout());
246
247 CopyHttpHeaders(client, command.GetHttpHeaders());
248
249 std::string answer;
250 Orthanc::HttpClient::HttpHeaders answerHeaders;
251 client.ApplyAndThrowException(answer, answerHeaders);
252
253 DecodeAnswer(answer, answerHeaders);
254
255 command.ProcessHttpAnswer(emitter, receiver, answer, answerHeaders);
256 }
257
258
259 static void Execute(IMessageEmitter& emitter,
260 const Orthanc::WebServiceParameters& orthanc,
261 const IObserver& receiver,
262 const GetOrthancWebViewerJpegCommand& command)
263 {
264 Orthanc::HttpClient client(orthanc, command.GetUri());
265 client.SetTimeout(command.GetTimeout());
266
267 CopyHttpHeaders(client, command.GetHttpHeaders());
268
269 std::string answer;
270 Orthanc::HttpClient::HttpHeaders answerHeaders;
271 client.ApplyAndThrowException(answer, answerHeaders);
272
273 DecodeAnswer(answer, answerHeaders);
274
275 command.ProcessHttpAnswer(emitter, receiver, answer);
276 }
277
278
279 void ThreadedOracle::Step()
280 {
281 std::auto_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100));
282
283 if (object.get() != NULL)
284 {
285 Item& item = dynamic_cast<Item&>(*object);
286
287 try
288 {
289 switch (item.GetCommand().GetType())
290 {
291 case IOracleCommand::Type_Sleep:
292 {
293 SleepOracleCommand& command = dynamic_cast<SleepOracleCommand&>(item.GetCommand());
294
295 std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay()));
296
297 if (command.HasPayload())
298 {
299 copy->SetPayload(command.ReleasePayload());
300 }
301
302 sleepingCommands_->Add(item.GetReceiver(), copy.release());
303
304 break;
305 }
306
307 case IOracleCommand::Type_OrthancRestApi:
308 Execute(emitter_, orthanc_, item.GetReceiver(),
309 dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand()));
310 break;
311
312 case IOracleCommand::Type_GetOrthancImage:
313 Execute(emitter_, orthanc_, item.GetReceiver(),
314 dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand()));
315 break;
316
317 case IOracleCommand::Type_GetOrthancWebViewerJpeg:
318 Execute(emitter_, orthanc_, item.GetReceiver(),
319 dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand()));
320 break;
321
322 default:
323 throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented);
324 }
325 }
326 catch (Orthanc::OrthancException& e)
327 {
328 LOG(ERROR) << "Exception within the oracle: " << e.What();
329 emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e));
330 }
331 catch (...)
332 {
333 LOG(ERROR) << "Threaded exception within the oracle";
334 emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage
335 (item.GetCommand(), Orthanc::ErrorCode_InternalError));
336 }
337 }
338 }
339
340
341 void ThreadedOracle::Worker(ThreadedOracle* that)
342 {
343 assert(that != NULL);
344
345 for (;;)
346 {
347 {
348 boost::mutex::scoped_lock lock(that->mutex_);
349 if (that->state_ != State_Running)
350 {
351 return;
352 }
353 }
354
355 that->Step();
356 }
357 }
358
359
360 void ThreadedOracle::SleepingWorker(ThreadedOracle* that)
361 {
362 assert(that != NULL);
363
364 for (;;)
365 {
366 {
367 boost::mutex::scoped_lock lock(that->mutex_);
368 if (that->state_ != State_Running)
369 {
370 return;
371 }
372 }
373
374 that->sleepingCommands_->AwakeExpired(that->emitter_);
375
376 boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_));
377 }
378 }
379
380
381 void ThreadedOracle::StopInternal()
382 {
383 {
384 boost::mutex::scoped_lock lock(mutex_);
385
386 if (state_ == State_Setup ||
387 state_ == State_Stopped)
388 {
389 return;
390 }
391 else
392 {
393 state_ = State_Stopped;
394 }
395 }
396
397 if (sleepingWorker_.joinable())
398 {
399 sleepingWorker_.join();
400 }
401
402 for (size_t i = 0; i < workers_.size(); i++)
403 {
404 if (workers_[i] != NULL)
405 {
406 if (workers_[i]->joinable())
407 {
408 workers_[i]->join();
409 }
410
411 delete workers_[i];
412 }
413 }
414 }
415
416
417 ThreadedOracle::ThreadedOracle(IMessageEmitter& emitter) :
418 emitter_(emitter),
419 state_(State_Setup),
420 workers_(4),
421 sleepingCommands_(new SleepingCommands),
422 sleepingTimeResolution_(50) // By default, time resolution of 50ms
423 {
424 }
425
426
427 void ThreadedOracle::SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc)
428 {
429 boost::mutex::scoped_lock lock(mutex_);
430
431 if (state_ != State_Setup)
432 {
433 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
434 }
435 else
436 {
437 orthanc_ = orthanc;
438 }
439 }
440
441
442 void ThreadedOracle::SetWorkersCount(unsigned int count)
443 {
444 boost::mutex::scoped_lock lock(mutex_);
445
446 if (count <= 0)
447 {
448 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
449 }
450 else if (state_ != State_Setup)
451 {
452 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
453 }
454 else
455 {
456 workers_.resize(count);
457 }
458 }
459
460
461 void ThreadedOracle::SetSleepingTimeResolution(unsigned int milliseconds)
462 {
463 boost::mutex::scoped_lock lock(mutex_);
464
465 if (milliseconds <= 0)
466 {
467 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
468 }
469 else if (state_ != State_Setup)
470 {
471 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
472 }
473 else
474 {
475 sleepingTimeResolution_ = milliseconds;
476 }
477 }
478
479
480 void ThreadedOracle::Start()
481 {
482 boost::mutex::scoped_lock lock(mutex_);
483
484 if (state_ != State_Setup)
485 {
486 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
487 }
488 else
489 {
490 state_ = State_Running;
491
492 for (unsigned int i = 0; i < workers_.size(); i++)
493 {
494 workers_[i] = new boost::thread(Worker, this);
495 }
496
497 sleepingWorker_ = boost::thread(SleepingWorker, this);
498 }
499 }
500
501
502 void ThreadedOracle::Schedule(const IObserver& receiver,
503 IOracleCommand* command)
504 {
505 queue_.Enqueue(new Item(receiver, command));
506 }
507 }