# HG changeset patch # User Sebastien Jodogne # Date 1373632947 -7200 # Node ID 322c1b497036687ee34a0c036b9d385cafa96076 # Parent 9da3596069b87c33373655af63d3a25ca98702d1 cancel and listener for commands diff -r 9da3596069b8 -r 322c1b497036 Core/MultiThreading/ThreadedCommandProcessor.cpp --- a/Core/MultiThreading/ThreadedCommandProcessor.cpp Fri Jul 12 12:11:45 2013 +0200 +++ b/Core/MultiThreading/ThreadedCommandProcessor.cpp Fri Jul 12 14:42:27 2013 +0200 @@ -54,7 +54,17 @@ if (that->success_) { // No command has failed so far - success = dynamic_cast(*command).Execute(); + + if (that->cancel_) + { + // The commands have been canceled. Skip the execution + // of this command, yet mark it as succeeded. + success = true; + } + else + { + success = dynamic_cast(*command).Execute(); + } } else { @@ -71,7 +81,30 @@ that->remainingCommands_--; if (!success) + { + if (!that->cancel_ && that->listener_ && that->success_) + { + // This is the first command that fails + that->listener_->SignalFailure(); + } + that->success_ = false; + } + else + { + if (!that->cancel_ && that->listener_) + { + if (that->remainingCommands_ == 0) + { + that->listener_->SignalSuccess(that->totalCommands_); + } + else + { + that->listener_->SignalProgress(that->totalCommands_ - that->remainingCommands_, + that->totalCommands_); + } + } + } that->processedCommand_.notify_all(); } @@ -86,11 +119,14 @@ { throw OrthancException(ErrorCode_ParameterOutOfRange); } - + + listener_ = NULL; success_ = true; done_ = false; + cancel_ = false; threads_.resize(numThreads); remainingCommands_ = 0; + totalCommands_ = 0; for (unsigned int i = 0; i < numThreads; i++) { @@ -122,11 +158,10 @@ void ThreadedCommandProcessor::Post(ICommand* command) { - { - boost::mutex::scoped_lock lock(mutex_); - queue_.Enqueue(command); - remainingCommands_++; - } + boost::mutex::scoped_lock lock(mutex_); + queue_.Enqueue(command); + remainingCommands_++; + totalCommands_++; } @@ -139,10 +174,32 @@ processedCommand_.wait(lock); } - // Reset the "success" flag for subsequent commands + if (cancel_ && listener_) + { + listener_->SignalCancel(); + } + + // Reset the sequence counters for subsequent commands bool hasSucceeded = success_; success_ = true; + totalCommands_ = 0; + cancel_ = false; return hasSucceeded; } + + + void ThreadedCommandProcessor::Cancel() + { + boost::mutex::scoped_lock lock(mutex_); + + cancel_ = true; + } + + + void ThreadedCommandProcessor::SetListener(IListener& listener) + { + boost::mutex::scoped_lock lock(mutex_); + listener_ = &listener; + } } diff -r 9da3596069b8 -r 322c1b497036 Core/MultiThreading/ThreadedCommandProcessor.h --- a/Core/MultiThreading/ThreadedCommandProcessor.h Fri Jul 12 12:11:45 2013 +0200 +++ b/Core/MultiThreading/ThreadedCommandProcessor.h Fri Jul 12 14:42:27 2013 +0200 @@ -40,14 +40,34 @@ { class ThreadedCommandProcessor { + public: + class IListener + { + public: + virtual ~IListener() + { + } + + virtual void SignalProgress(unsigned int current, + unsigned int total) = 0; + + virtual void SignalSuccess(unsigned int total) = 0; + + virtual void SignalFailure() = 0; + + virtual void SignalCancel() = 0; + }; + private: SharedMessageQueue queue_; bool done_; + bool cancel_; std::vector threads_; + IListener* listener_; boost::mutex mutex_; bool success_; - unsigned int remainingCommands_; + unsigned int remainingCommands_, totalCommands_; boost::condition_variable processedCommand_; static void Processor(ThreadedCommandProcessor* that); @@ -61,5 +81,14 @@ void Post(ICommand* command); bool Join(); + + void Cancel(); + + void SetListener(IListener& listener); + + IListener& GetListener() const + { + return *listener_; + } }; }