Mercurial > hg > orthanc
comparison Core/MultiThreading/ThreadedCommandProcessor.cpp @ 467:322c1b497036
cancel and listener for commands
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 12 Jul 2013 14:42:27 +0200 |
parents | 9da3596069b8 |
children | 2d0a347e8cfc |
comparison
equal
deleted
inserted
replaced
466:9da3596069b8 | 467:322c1b497036 |
---|---|
52 try | 52 try |
53 { | 53 { |
54 if (that->success_) | 54 if (that->success_) |
55 { | 55 { |
56 // No command has failed so far | 56 // No command has failed so far |
57 success = dynamic_cast<ICommand&>(*command).Execute(); | 57 |
58 if (that->cancel_) | |
59 { | |
60 // The commands have been canceled. Skip the execution | |
61 // of this command, yet mark it as succeeded. | |
62 success = true; | |
63 } | |
64 else | |
65 { | |
66 success = dynamic_cast<ICommand&>(*command).Execute(); | |
67 } | |
58 } | 68 } |
59 else | 69 else |
60 { | 70 { |
61 // A command has already failed. Skip the execution of this command. | 71 // A command has already failed. Skip the execution of this command. |
62 } | 72 } |
69 boost::mutex::scoped_lock lock(that->mutex_); | 79 boost::mutex::scoped_lock lock(that->mutex_); |
70 assert(that->remainingCommands_ > 0); | 80 assert(that->remainingCommands_ > 0); |
71 that->remainingCommands_--; | 81 that->remainingCommands_--; |
72 | 82 |
73 if (!success) | 83 if (!success) |
84 { | |
85 if (!that->cancel_ && that->listener_ && that->success_) | |
86 { | |
87 // This is the first command that fails | |
88 that->listener_->SignalFailure(); | |
89 } | |
90 | |
74 that->success_ = false; | 91 that->success_ = false; |
92 } | |
93 else | |
94 { | |
95 if (!that->cancel_ && that->listener_) | |
96 { | |
97 if (that->remainingCommands_ == 0) | |
98 { | |
99 that->listener_->SignalSuccess(that->totalCommands_); | |
100 } | |
101 else | |
102 { | |
103 that->listener_->SignalProgress(that->totalCommands_ - that->remainingCommands_, | |
104 that->totalCommands_); | |
105 } | |
106 } | |
107 } | |
75 | 108 |
76 that->processedCommand_.notify_all(); | 109 that->processedCommand_.notify_all(); |
77 } | 110 } |
78 } | 111 } |
79 } | 112 } |
84 { | 117 { |
85 if (numThreads < 1) | 118 if (numThreads < 1) |
86 { | 119 { |
87 throw OrthancException(ErrorCode_ParameterOutOfRange); | 120 throw OrthancException(ErrorCode_ParameterOutOfRange); |
88 } | 121 } |
89 | 122 |
123 listener_ = NULL; | |
90 success_ = true; | 124 success_ = true; |
91 done_ = false; | 125 done_ = false; |
126 cancel_ = false; | |
92 threads_.resize(numThreads); | 127 threads_.resize(numThreads); |
93 remainingCommands_ = 0; | 128 remainingCommands_ = 0; |
129 totalCommands_ = 0; | |
94 | 130 |
95 for (unsigned int i = 0; i < numThreads; i++) | 131 for (unsigned int i = 0; i < numThreads; i++) |
96 { | 132 { |
97 threads_[i] = new boost::thread(Processor, this); | 133 threads_[i] = new boost::thread(Processor, this); |
98 } | 134 } |
120 } | 156 } |
121 | 157 |
122 | 158 |
123 void ThreadedCommandProcessor::Post(ICommand* command) | 159 void ThreadedCommandProcessor::Post(ICommand* command) |
124 { | 160 { |
125 { | 161 boost::mutex::scoped_lock lock(mutex_); |
126 boost::mutex::scoped_lock lock(mutex_); | 162 queue_.Enqueue(command); |
127 queue_.Enqueue(command); | 163 remainingCommands_++; |
128 remainingCommands_++; | 164 totalCommands_++; |
129 } | |
130 } | 165 } |
131 | 166 |
132 | 167 |
133 bool ThreadedCommandProcessor::Join() | 168 bool ThreadedCommandProcessor::Join() |
134 { | 169 { |
137 while (!remainingCommands_ == 0) | 172 while (!remainingCommands_ == 0) |
138 { | 173 { |
139 processedCommand_.wait(lock); | 174 processedCommand_.wait(lock); |
140 } | 175 } |
141 | 176 |
142 // Reset the "success" flag for subsequent commands | 177 if (cancel_ && listener_) |
178 { | |
179 listener_->SignalCancel(); | |
180 } | |
181 | |
182 // Reset the sequence counters for subsequent commands | |
143 bool hasSucceeded = success_; | 183 bool hasSucceeded = success_; |
144 success_ = true; | 184 success_ = true; |
185 totalCommands_ = 0; | |
186 cancel_ = false; | |
145 | 187 |
146 return hasSucceeded; | 188 return hasSucceeded; |
147 } | 189 } |
190 | |
191 | |
192 void ThreadedCommandProcessor::Cancel() | |
193 { | |
194 boost::mutex::scoped_lock lock(mutex_); | |
195 | |
196 cancel_ = true; | |
197 } | |
198 | |
199 | |
200 void ThreadedCommandProcessor::SetListener(IListener& listener) | |
201 { | |
202 boost::mutex::scoped_lock lock(mutex_); | |
203 listener_ = &listener; | |
204 } | |
148 } | 205 } |