changeset 467:322c1b497036

cancel and listener for commands
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 12 Jul 2013 14:42:27 +0200
parents 9da3596069b8
children 456b9d2e9af4
files Core/MultiThreading/ThreadedCommandProcessor.cpp Core/MultiThreading/ThreadedCommandProcessor.h
diffstat 2 files changed, 95 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/Core/MultiThreading/ThreadedCommandProcessor.cpp	Fri Jul 12 12:11:45 2013 +0200
+++ b/Core/MultiThreading/ThreadedCommandProcessor.cpp	Fri Jul 12 14:42:27 2013 +0200
@@ -54,7 +54,17 @@
           if (that->success_)
           {
             // No command has failed so far
-            success = dynamic_cast<ICommand&>(*command).Execute();
+
+            if (that->cancel_)
+            {
+              // The commands have been canceled. Skip the execution
+              // of this command, yet mark it as succeeded.
+              success = true;
+            }
+            else
+            {
+              success = dynamic_cast<ICommand&>(*command).Execute();
+            }
           }
           else
           {
@@ -71,7 +81,30 @@
           that->remainingCommands_--;
 
           if (!success)
+          {
+            if (!that->cancel_ && that->listener_ && that->success_)
+            {
+              // This is the first command that fails
+              that->listener_->SignalFailure();
+            }
+
             that->success_ = false;
+          }
+          else
+          {
+            if (!that->cancel_ && that->listener_)
+            {
+              if (that->remainingCommands_ == 0)
+              {
+                that->listener_->SignalSuccess(that->totalCommands_);
+              }
+              else
+              {
+                that->listener_->SignalProgress(that->totalCommands_ - that->remainingCommands_,
+                                                that->totalCommands_);
+              }
+            }
+          }
 
           that->processedCommand_.notify_all();
         }
@@ -86,11 +119,14 @@
     {
       throw OrthancException(ErrorCode_ParameterOutOfRange);
     }
-    
+
+    listener_ = NULL;
     success_ = true;
     done_ = false;
+    cancel_ = false;
     threads_.resize(numThreads);
     remainingCommands_ = 0;
+    totalCommands_ = 0;
 
     for (unsigned int i = 0; i < numThreads; i++)
     {
@@ -122,11 +158,10 @@
 
   void ThreadedCommandProcessor::Post(ICommand* command)
   {
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      queue_.Enqueue(command);
-      remainingCommands_++;
-    }
+    boost::mutex::scoped_lock lock(mutex_);
+    queue_.Enqueue(command);
+    remainingCommands_++;
+    totalCommands_++;
   }
 
 
@@ -139,10 +174,32 @@
       processedCommand_.wait(lock);
     }
 
-    // Reset the "success" flag for subsequent commands
+    if (cancel_ && listener_)
+    {
+      listener_->SignalCancel();
+    }
+
+    // Reset the sequence counters for subsequent commands
     bool hasSucceeded = success_;
     success_ = true;
+    totalCommands_ = 0;
+    cancel_ = false;
 
     return hasSucceeded;
   }
+
+
+  void ThreadedCommandProcessor::Cancel()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    cancel_ = true;
+  }
+
+
+  void ThreadedCommandProcessor::SetListener(IListener& listener)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    listener_ = &listener;
+  }
 }
--- a/Core/MultiThreading/ThreadedCommandProcessor.h	Fri Jul 12 12:11:45 2013 +0200
+++ b/Core/MultiThreading/ThreadedCommandProcessor.h	Fri Jul 12 14:42:27 2013 +0200
@@ -40,14 +40,34 @@
 {
   class ThreadedCommandProcessor
   {
+  public:
+    class IListener
+    {
+    public:
+      virtual ~IListener()
+      {
+      }
+
+      virtual void SignalProgress(unsigned int current,
+                                  unsigned int total) = 0;
+
+      virtual void SignalSuccess(unsigned int total) = 0;
+
+      virtual void SignalFailure() = 0;
+
+      virtual void SignalCancel() = 0;
+    };
+
   private:
     SharedMessageQueue  queue_;
     bool done_;
+    bool cancel_;
     std::vector<boost::thread*>  threads_;
+    IListener* listener_;
 
     boost::mutex mutex_;
     bool success_;
-    unsigned int remainingCommands_;
+    unsigned int remainingCommands_, totalCommands_;
     boost::condition_variable processedCommand_;
 
     static void Processor(ThreadedCommandProcessor* that);
@@ -61,5 +81,14 @@
     void Post(ICommand* command);
 
     bool Join();
+
+    void Cancel();
+
+    void SetListener(IListener& listener);
+
+    IListener& GetListener() const
+    {
+      return *listener_;
+    }
   };
 }