comparison Core/MultiThreading/ThreadedCommandProcessor.cpp @ 467:322c1b497036

cancel and listener for commands
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 12 Jul 2013 14:42:27 +0200
parents 9da3596069b8
children 2d0a347e8cfc
comparison
equal deleted inserted replaced
466:9da3596069b8 467:322c1b497036
52 try 52 try
53 { 53 {
54 if (that->success_) 54 if (that->success_)
55 { 55 {
56 // No command has failed so far 56 // No command has failed so far
57 success = dynamic_cast<ICommand&>(*command).Execute(); 57
58 if (that->cancel_)
59 {
60 // The commands have been canceled. Skip the execution
61 // of this command, yet mark it as succeeded.
62 success = true;
63 }
64 else
65 {
66 success = dynamic_cast<ICommand&>(*command).Execute();
67 }
58 } 68 }
59 else 69 else
60 { 70 {
61 // A command has already failed. Skip the execution of this command. 71 // A command has already failed. Skip the execution of this command.
62 } 72 }
69 boost::mutex::scoped_lock lock(that->mutex_); 79 boost::mutex::scoped_lock lock(that->mutex_);
70 assert(that->remainingCommands_ > 0); 80 assert(that->remainingCommands_ > 0);
71 that->remainingCommands_--; 81 that->remainingCommands_--;
72 82
73 if (!success) 83 if (!success)
84 {
85 if (!that->cancel_ && that->listener_ && that->success_)
86 {
87 // This is the first command that fails
88 that->listener_->SignalFailure();
89 }
90
74 that->success_ = false; 91 that->success_ = false;
92 }
93 else
94 {
95 if (!that->cancel_ && that->listener_)
96 {
97 if (that->remainingCommands_ == 0)
98 {
99 that->listener_->SignalSuccess(that->totalCommands_);
100 }
101 else
102 {
103 that->listener_->SignalProgress(that->totalCommands_ - that->remainingCommands_,
104 that->totalCommands_);
105 }
106 }
107 }
75 108
76 that->processedCommand_.notify_all(); 109 that->processedCommand_.notify_all();
77 } 110 }
78 } 111 }
79 } 112 }
84 { 117 {
85 if (numThreads < 1) 118 if (numThreads < 1)
86 { 119 {
87 throw OrthancException(ErrorCode_ParameterOutOfRange); 120 throw OrthancException(ErrorCode_ParameterOutOfRange);
88 } 121 }
89 122
123 listener_ = NULL;
90 success_ = true; 124 success_ = true;
91 done_ = false; 125 done_ = false;
126 cancel_ = false;
92 threads_.resize(numThreads); 127 threads_.resize(numThreads);
93 remainingCommands_ = 0; 128 remainingCommands_ = 0;
129 totalCommands_ = 0;
94 130
95 for (unsigned int i = 0; i < numThreads; i++) 131 for (unsigned int i = 0; i < numThreads; i++)
96 { 132 {
97 threads_[i] = new boost::thread(Processor, this); 133 threads_[i] = new boost::thread(Processor, this);
98 } 134 }
120 } 156 }
121 157
122 158
123 void ThreadedCommandProcessor::Post(ICommand* command) 159 void ThreadedCommandProcessor::Post(ICommand* command)
124 { 160 {
125 { 161 boost::mutex::scoped_lock lock(mutex_);
126 boost::mutex::scoped_lock lock(mutex_); 162 queue_.Enqueue(command);
127 queue_.Enqueue(command); 163 remainingCommands_++;
128 remainingCommands_++; 164 totalCommands_++;
129 }
130 } 165 }
131 166
132 167
133 bool ThreadedCommandProcessor::Join() 168 bool ThreadedCommandProcessor::Join()
134 { 169 {
137 while (!remainingCommands_ == 0) 172 while (!remainingCommands_ == 0)
138 { 173 {
139 processedCommand_.wait(lock); 174 processedCommand_.wait(lock);
140 } 175 }
141 176
142 // Reset the "success" flag for subsequent commands 177 if (cancel_ && listener_)
178 {
179 listener_->SignalCancel();
180 }
181
182 // Reset the sequence counters for subsequent commands
143 bool hasSucceeded = success_; 183 bool hasSucceeded = success_;
144 success_ = true; 184 success_ = true;
185 totalCommands_ = 0;
186 cancel_ = false;
145 187
146 return hasSucceeded; 188 return hasSucceeded;
147 } 189 }
190
191
192 void ThreadedCommandProcessor::Cancel()
193 {
194 boost::mutex::scoped_lock lock(mutex_);
195
196 cancel_ = true;
197 }
198
199
200 void ThreadedCommandProcessor::SetListener(IListener& listener)
201 {
202 boost::mutex::scoped_lock lock(mutex_);
203 listener_ = &listener;
204 }
148 } 205 }