comparison OrthancStone/Sources/Oracle/ThreadedOracle.cpp @ 1512:244ad1e4e76a

reorganization of folders
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 07 Jul 2020 16:21:02 +0200
parents Framework/Oracle/ThreadedOracle.cpp@30deba7bc8e2
children 85e117739eca
comparison
equal deleted inserted replaced
1511:9dfeee74c1e6 1512:244ad1e4e76a
1 /**
2 * Stone of Orthanc
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2020 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU Affero General Public License
9 * as published by the Free Software Foundation, either version 3 of
10 * the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 **/
20
21
22 #include "ThreadedOracle.h"
23
24 #include "SleepOracleCommand.h"
25
26 #include <Logging.h>
27 #include <OrthancException.h>
28
29 namespace OrthancStone
30 {
31 class ThreadedOracle::Item : public Orthanc::IDynamicObject
32 {
33 private:
34 boost::weak_ptr<IObserver> receiver_;
35 std::unique_ptr<IOracleCommand> command_;
36
37 public:
38 Item(boost::weak_ptr<IObserver> receiver,
39 IOracleCommand* command) :
40 receiver_(receiver),
41 command_(command)
42 {
43 if (command == NULL)
44 {
45 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
46 }
47 }
48
49 boost::weak_ptr<IObserver> GetReceiver()
50 {
51 return receiver_;
52 }
53
54 IOracleCommand& GetCommand()
55 {
56 assert(command_.get() != NULL);
57 return *command_;
58 }
59 };
60
61
62 class ThreadedOracle::SleepingCommands : public boost::noncopyable
63 {
64 private:
65 class Item
66 {
67 private:
68 boost::weak_ptr<IObserver> receiver_;
69 std::unique_ptr<SleepOracleCommand> command_;
70 boost::posix_time::ptime expiration_;
71
72 public:
73 Item(boost::weak_ptr<IObserver> receiver,
74 SleepOracleCommand* command) :
75 receiver_(receiver),
76 command_(command)
77 {
78 if (command == NULL)
79 {
80 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
81 }
82
83 expiration_ = (boost::posix_time::microsec_clock::local_time() +
84 boost::posix_time::milliseconds(command_->GetDelay()));
85 }
86
87 const boost::posix_time::ptime& GetExpirationTime() const
88 {
89 return expiration_;
90 }
91
92 void Awake(IMessageEmitter& emitter)
93 {
94 assert(command_.get() != NULL);
95
96 SleepOracleCommand::TimeoutMessage message(*command_);
97 emitter.EmitMessage(receiver_, message);
98 }
99 };
100
101 typedef std::list<Item*> Content;
102
103 boost::mutex mutex_;
104 Content content_;
105
106 public:
107 ~SleepingCommands()
108 {
109 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
110 {
111 if (*it != NULL)
112 {
113 delete *it;
114 }
115 }
116 }
117
118 void Add(boost::weak_ptr<IObserver> receiver,
119 SleepOracleCommand* command) // Takes ownership
120 {
121 boost::mutex::scoped_lock lock(mutex_);
122
123 content_.push_back(new Item(receiver, command));
124 }
125
126 void AwakeExpired(IMessageEmitter& emitter)
127 {
128 boost::mutex::scoped_lock lock(mutex_);
129
130 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
131
132 Content stillSleeping;
133
134 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
135 {
136 if (*it != NULL &&
137 (*it)->GetExpirationTime() <= now)
138 {
139 (*it)->Awake(emitter);
140 delete *it;
141 *it = NULL;
142 }
143 else
144 {
145 stillSleeping.push_back(*it);
146 }
147 }
148
149 // Compact the still-sleeping commands
150 content_ = stillSleeping;
151 }
152 };
153
154
155 void ThreadedOracle::Step()
156 {
157 std::unique_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100));
158
159 if (object.get() != NULL)
160 {
161 Item& item = dynamic_cast<Item&>(*object);
162
163 if (item.GetCommand().GetType() == IOracleCommand::Type_Sleep)
164 {
165 SleepOracleCommand& command = dynamic_cast<SleepOracleCommand&>(item.GetCommand());
166
167 std::unique_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay()));
168
169 if (command.HasPayload())
170 {
171 copy->AcquirePayload(command.ReleasePayload());
172 }
173
174 sleepingCommands_->Add(item.GetReceiver(), copy.release());
175 }
176 else
177 {
178 GenericOracleRunner runner;
179
180 {
181 boost::mutex::scoped_lock lock(mutex_);
182 runner.SetOrthanc(orthanc_);
183 runner.SetRootDirectory(rootDirectory_);
184
185 #if ORTHANC_ENABLE_DCMTK == 1
186 if (dicomCache_)
187 {
188 runner.SetDicomCache(dicomCache_);
189 }
190 #endif
191 }
192
193 runner.Run(item.GetReceiver(), emitter_, item.GetCommand());
194 }
195 }
196 }
197
198
199 void ThreadedOracle::Worker(ThreadedOracle* that)
200 {
201 assert(that != NULL);
202
203 for (;;)
204 {
205 {
206 boost::mutex::scoped_lock lock(that->mutex_);
207 if (that->state_ != State_Running)
208 {
209 return;
210 }
211 }
212
213 that->Step();
214 }
215 }
216
217
218 void ThreadedOracle::SleepingWorker(ThreadedOracle* that)
219 {
220 assert(that != NULL);
221
222 for (;;)
223 {
224 {
225 boost::mutex::scoped_lock lock(that->mutex_);
226 if (that->state_ != State_Running)
227 {
228 return;
229 }
230 }
231
232 that->sleepingCommands_->AwakeExpired(that->emitter_);
233
234 boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_));
235 }
236 }
237
238
239 void ThreadedOracle::StopInternal()
240 {
241 {
242 boost::mutex::scoped_lock lock(mutex_);
243
244 if (state_ == State_Setup ||
245 state_ == State_Stopped)
246 {
247 return;
248 }
249 else
250 {
251 state_ = State_Stopped;
252 }
253 }
254
255 if (sleepingWorker_.joinable())
256 {
257 sleepingWorker_.join();
258 }
259
260 for (size_t i = 0; i < workers_.size(); i++)
261 {
262 if (workers_[i] != NULL)
263 {
264 if (workers_[i]->joinable())
265 {
266 workers_[i]->join();
267 }
268
269 delete workers_[i];
270 }
271 }
272 }
273
274
275 ThreadedOracle::ThreadedOracle(IMessageEmitter& emitter) :
276 emitter_(emitter),
277 rootDirectory_("."),
278 state_(State_Setup),
279 workers_(4),
280 sleepingCommands_(new SleepingCommands),
281 sleepingTimeResolution_(50) // By default, time resolution of 50ms
282 {
283 }
284
285
286 ThreadedOracle::~ThreadedOracle()
287 {
288 if (state_ == State_Running)
289 {
290 LOG(ERROR) << "The threaded oracle is still running, explicit call to "
291 << "Stop() is mandatory to avoid crashes";
292 }
293
294 try
295 {
296 StopInternal();
297 }
298 catch (Orthanc::OrthancException& e)
299 {
300 LOG(ERROR) << "Exception while stopping the threaded oracle: " << e.What();
301 }
302 catch (...)
303 {
304 LOG(ERROR) << "Native exception while stopping the threaded oracle";
305 }
306 }
307
308
309 void ThreadedOracle::SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc)
310 {
311 boost::mutex::scoped_lock lock(mutex_);
312 orthanc_ = orthanc;
313 }
314
315
316 void ThreadedOracle::SetRootDirectory(const std::string& rootDirectory)
317 {
318 boost::mutex::scoped_lock lock(mutex_);
319 rootDirectory_ = rootDirectory;
320 }
321
322
323 void ThreadedOracle::SetThreadsCount(unsigned int count)
324 {
325 boost::mutex::scoped_lock lock(mutex_);
326
327 if (count <= 0)
328 {
329 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
330 }
331 else if (state_ != State_Setup)
332 {
333 LOG(ERROR) << "ThreadedOracle::SetThreadsCount(): (state_ != State_Setup)";
334 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
335 }
336 else
337 {
338 workers_.resize(count);
339 }
340 }
341
342
343 void ThreadedOracle::SetSleepingTimeResolution(unsigned int milliseconds)
344 {
345 boost::mutex::scoped_lock lock(mutex_);
346
347 if (milliseconds <= 0)
348 {
349 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
350 }
351 else if (state_ != State_Setup)
352 {
353 LOG(ERROR) << "ThreadedOracle::SetSleepingTimeResolution(): (state_ != State_Setup)";
354 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
355 }
356 else
357 {
358 sleepingTimeResolution_ = milliseconds;
359 }
360 }
361
362
363 void ThreadedOracle::SetDicomCacheSize(size_t size)
364 {
365 #if ORTHANC_ENABLE_DCMTK == 1
366 boost::mutex::scoped_lock lock(mutex_);
367
368 if (state_ != State_Setup)
369 {
370 LOG(ERROR) << "ThreadedOracle::SetDicomCacheSize(): (state_ != State_Setup)";
371 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
372 }
373 else
374 {
375 if (size == 0)
376 {
377 dicomCache_.reset();
378 }
379 else
380 {
381 dicomCache_.reset(new ParsedDicomCache(size));
382 }
383 }
384 #endif
385 }
386
387
388 void ThreadedOracle::Start()
389 {
390 boost::mutex::scoped_lock lock(mutex_);
391
392 if (state_ != State_Setup)
393 {
394 LOG(ERROR) << "ThreadedOracle::Start(): (state_ != State_Setup)";
395 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
396 }
397 else
398 {
399 LOG(INFO) << "Starting oracle with " << workers_.size() << " worker threads";
400 state_ = State_Running;
401
402 for (unsigned int i = 0; i < workers_.size(); i++)
403 {
404 workers_[i] = new boost::thread(Worker, this);
405 }
406
407 sleepingWorker_ = boost::thread(SleepingWorker, this);
408 }
409 }
410
411
412 bool ThreadedOracle::Schedule(boost::shared_ptr<IObserver> receiver,
413 IOracleCommand* command)
414 {
415 std::unique_ptr<Item> item(new Item(receiver, command));
416
417 {
418 boost::mutex::scoped_lock lock(mutex_);
419
420 if (state_ == State_Running)
421 {
422 //LOG(INFO) << "New oracle command queued";
423 queue_.Enqueue(item.release());
424 return true;
425 }
426 else
427 {
428 LOG(TRACE) << "Command not enqueued, as the oracle has stopped";
429 return false;
430 }
431 }
432 }
433 }