comparison OrthancServer/Sources/ServerContext.cpp @ 5410:16cbfefa15e9

Solved a deadlock related to the Job Engine events and plugins
author Alain Mazy <am@osimis.io>
date Tue, 07 Nov 2023 12:52:37 +0100
parents 566e8d32bd3a
children d37dff2c0028
comparison
equal deleted inserted replaced
5409:68231ca4363a 5410:16cbfefa15e9
162 { 162 {
163 LOG(ERROR) << "Not enough memory while signaling a change"; 163 LOG(ERROR) << "Not enough memory while signaling a change";
164 } 164 }
165 catch (...) 165 catch (...)
166 { 166 {
167 throw OrthancException(ErrorCode_InternalError); 167 throw OrthancException(ErrorCode_InternalError, "Error while signaling a change");
168 } 168 }
169 } 169 }
170 catch (OrthancException& e) 170 catch (OrthancException& e)
171 { 171 {
172 LOG(ERROR) << "Error in the " << it->GetDescription() 172 LOG(ERROR) << "Error in the " << it->GetDescription()
177 } 177 }
178 } 178 }
179 } 179 }
180 180
181 181
182 void ServerContext::JobEventsThread(ServerContext* that,
183 unsigned int sleepDelay)
184 {
185 while (!that->done_)
186 {
187 std::unique_ptr<IDynamicObject> obj(that->pendingJobEvents_.Dequeue(sleepDelay));
188
189 if (obj.get() != NULL)
190 {
191 const JobEvent& event = dynamic_cast<const JobEvent&>(*obj.get());
192
193 boost::shared_lock<boost::shared_mutex> lock(that->listenersMutex_);
194 for (ServerListeners::iterator it = that->listeners_.begin();
195 it != that->listeners_.end(); ++it)
196 {
197 try
198 {
199 try
200 {
201 it->GetListener().SignalJobEvent(event);
202 }
203 catch (std::bad_alloc&)
204 {
205 LOG(ERROR) << "Not enough memory while signaling a job event";
206 }
207 catch (...)
208 {
209 throw OrthancException(ErrorCode_InternalError, "Error while signaling a job event");
210 }
211 }
212 catch (OrthancException& e)
213 {
214 LOG(ERROR) << "Error in the " << it->GetDescription()
215 << " callback while signaling a job event: " << e.What()
216 << " (code " << e.GetErrorCode() << ")";
217 }
218 }
219 }
220 }
221 }
222
223
182 void ServerContext::SaveJobsThread(ServerContext* that, 224 void ServerContext::SaveJobsThread(ServerContext* that,
183 unsigned int sleepDelay) 225 unsigned int sleepDelay)
184 { 226 {
185 static const boost::posix_time::time_duration PERIODICITY = 227 static const boost::posix_time::time_duration PERIODICITY =
186 boost::posix_time::seconds(10); 228 boost::posix_time::seconds(10);
204 246
205 247
206 void ServerContext::SignalJobSubmitted(const std::string& jobId) 248 void ServerContext::SignalJobSubmitted(const std::string& jobId)
207 { 249 {
208 haveJobsChanged_ = true; 250 haveJobsChanged_ = true;
209 mainLua_.SignalJobSubmitted(jobId); 251 pendingJobEvents_.Enqueue(new JobEvent(JobEventType_Submitted, jobId));
210
211 #if ORTHANC_ENABLE_PLUGINS == 1
212 if (HasPlugins())
213 {
214 GetPlugins().SignalJobSubmitted(jobId);
215 }
216 #endif
217 } 252 }
218 253
219 254
220 void ServerContext::SignalJobSuccess(const std::string& jobId) 255 void ServerContext::SignalJobSuccess(const std::string& jobId)
221 { 256 {
222 haveJobsChanged_ = true; 257 haveJobsChanged_ = true;
223 mainLua_.SignalJobSuccess(jobId); 258 pendingJobEvents_.Enqueue(new JobEvent(JobEventType_Success, jobId));
224
225 #if ORTHANC_ENABLE_PLUGINS == 1
226 if (HasPlugins())
227 {
228 GetPlugins().SignalJobSuccess(jobId);
229 }
230 #endif
231 } 259 }
232 260
233 261
234 void ServerContext::SignalJobFailure(const std::string& jobId) 262 void ServerContext::SignalJobFailure(const std::string& jobId)
235 { 263 {
236 haveJobsChanged_ = true; 264 haveJobsChanged_ = true;
237 mainLua_.SignalJobFailure(jobId); 265 pendingJobEvents_.Enqueue(new JobEvent(JobEventType_Failure, jobId));
238
239 #if ORTHANC_ENABLE_PLUGINS == 1
240 if (HasPlugins())
241 {
242 GetPlugins().SignalJobFailure(jobId);
243 }
244 #endif
245 } 266 }
246 267
247 268
248 void ServerContext::SetupJobsEngine(bool unitTesting, 269 void ServerContext::SetupJobsEngine(bool unitTesting,
249 bool loadJobsFromDatabase) 270 bool loadJobsFromDatabase)
447 468
448 jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200); 469 jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200);
449 470
450 listeners_.push_back(ServerListener(luaListener_, "Lua")); 471 listeners_.push_back(ServerListener(luaListener_, "Lua"));
451 changeThread_ = boost::thread(ChangeThread, this, (unitTesting ? 20 : 100)); 472 changeThread_ = boost::thread(ChangeThread, this, (unitTesting ? 20 : 100));
473 jobEventsThread_ = boost::thread(JobEventsThread, this, (unitTesting ? 20 : 100));
452 474
453 #if HAVE_MALLOC_TRIM == 1 475 #if HAVE_MALLOC_TRIM == 1
454 LOG(INFO) << "Starting memory trimming thread at 30 seconds interval"; 476 LOG(INFO) << "Starting memory trimming thread at 30 seconds interval";
455 memoryTrimmingThread_ = boost::thread(MemoryTrimmingThread, this, 30); 477 memoryTrimmingThread_ = boost::thread(MemoryTrimmingThread, this, 30);
456 #else 478 #else
490 done_ = true; 512 done_ = true;
491 513
492 if (changeThread_.joinable()) 514 if (changeThread_.joinable())
493 { 515 {
494 changeThread_.join(); 516 changeThread_.join();
517 }
518
519 if (jobEventsThread_.joinable())
520 {
521 jobEventsThread_.join();
495 } 522 }
496 523
497 if (saveJobsThread_.joinable()) 524 if (saveJobsThread_.joinable())
498 { 525 {
499 saveJobsThread_.join(); 526 saveJobsThread_.join();