comparison Framework/Oracle/ThreadedOracle.cpp @ 1098:17660df24c36 broker

simplification of IOracleRunner
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 25 Oct 2019 13:01:24 +0200
parents 4383382db01d
children 66e21ef2d657
comparison
equal deleted inserted replaced
1097:4383382db01d 1098:17660df24c36
44 { 44 {
45 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); 45 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
46 } 46 }
47 } 47 }
48 48
49 boost::weak_ptr<IObserver>& GetReceiver() 49 boost::weak_ptr<IObserver> GetReceiver()
50 { 50 {
51 return receiver_; 51 return receiver_;
52 } 52 }
53 53
54 IOracleCommand& GetCommand() 54 IOracleCommand& GetCommand()
68 boost::weak_ptr<IObserver> receiver_; 68 boost::weak_ptr<IObserver> receiver_;
69 std::auto_ptr<SleepOracleCommand> command_; 69 std::auto_ptr<SleepOracleCommand> command_;
70 boost::posix_time::ptime expiration_; 70 boost::posix_time::ptime expiration_;
71 71
72 public: 72 public:
73 Item(boost::weak_ptr<IObserver>& receiver, 73 Item(boost::weak_ptr<IObserver> receiver,
74 SleepOracleCommand* command) : 74 SleepOracleCommand* command) :
75 receiver_(receiver), 75 receiver_(receiver),
76 command_(command) 76 command_(command)
77 { 77 {
78 if (command == NULL) 78 if (command == NULL)
113 delete *it; 113 delete *it;
114 } 114 }
115 } 115 }
116 } 116 }
117 117
118 void Add(boost::weak_ptr<IObserver>& receiver, 118 void Add(boost::weak_ptr<IObserver> receiver,
119 SleepOracleCommand* command) // Takes ownership 119 SleepOracleCommand* command) // Takes ownership
120 { 120 {
121 boost::mutex::scoped_lock lock(mutex_); 121 boost::mutex::scoped_lock lock(mutex_);
122 122
123 content_.push_back(new Item(receiver, command)); 123 content_.push_back(new Item(receiver, command));
173 173
174 sleepingCommands_->Add(item.GetReceiver(), copy.release()); 174 sleepingCommands_->Add(item.GetReceiver(), copy.release());
175 } 175 }
176 else 176 else
177 { 177 {
178 GenericOracleRunner runner(emitter_, orthanc_); 178 GenericOracleRunner runner(orthanc_);
179 runner.Run(item.GetReceiver(), item.GetCommand()); 179 std::auto_ptr<IMessage> message(runner.Run(item.GetCommand()));
180
181 emitter_.EmitMessage(item.GetReceiver(), *message);
180 } 182 }
181 } 183 }
182 } 184 }
183 185
184 186
253 } 255 }
254 256
255 delete workers_[i]; 257 delete workers_[i];
256 } 258 }
257 } 259 }
258
259 queue_.Clear();
260 } 260 }
261 261
262 262
263 ThreadedOracle::ThreadedOracle(IMessageEmitter& emitter) : 263 ThreadedOracle::ThreadedOracle(IMessageEmitter& emitter) :
264 emitter_(emitter), 264 emitter_(emitter),
370 sleepingWorker_ = boost::thread(SleepingWorker, this); 370 sleepingWorker_ = boost::thread(SleepingWorker, this);
371 } 371 }
372 } 372 }
373 373
374 374
375 void ThreadedOracle::Schedule(boost::shared_ptr<IObserver>& receiver, 375 bool ThreadedOracle::Schedule(boost::shared_ptr<IObserver> receiver,
376 IOracleCommand* command) 376 IOracleCommand* command)
377 { 377 {
378 queue_.Enqueue(new Item(receiver, command)); 378 std::auto_ptr<Item> item(new Item(receiver, command));
379
380 {
381 boost::mutex::scoped_lock lock(mutex_);
382
383 if (state_ == State_Running)
384 {
385 //LOG(INFO) << "New oracle command queued";
386 queue_.Enqueue(item.release());
387 return true;
388 }
389 else
390 {
391 LOG(INFO) << "Command not enqueued, as the oracle is stopped";
392
393 /**
394 * Answering "true" below results in a memory leak within
395 * "OracleScheduler", as the scheduler believes that the
396 * command is still active (i.e. pending to be executed by the
397 * oracle), hereby stalling the scheduler during its
398 * destruction (check out
399 * "sjo-playground/WebViewer/Backend/Leak")
400 **/
401 return false;
402 }
403 }
379 } 404 }
380 } 405 }