comparison Core/JobsEngine/JobsRegistry.cpp @ 4024:1d2b31fc782f more-changes

new 'changes': JobSubmitted, JobSuccess, JobFailure
author Alain Mazy <alain@mazy.be>
date Tue, 09 Jun 2020 12:20:42 +0200
parents a2e4edc7b9aa
children
comparison
equal deleted inserted replaced
4023:cbdf62468d77 4024:1d2b31fc782f
18 * modify file(s) with this exception, you may extend this exception to 18 * modify file(s) with this exception, you may extend this exception to
19 * your version of the file(s), but you are not obligated to do so. If 19 * your version of the file(s), but you are not obligated to do so. If
20 * you do not wish to do so, delete this exception statement from your 20 * you do not wish to do so, delete this exception statement from your
21 * version. If you delete this exception statement from all source files 21 * version. If you delete this exception statement from all source files
22 * in the program, then also delete it here. 22 * in the program, then also delete it here.
23 * 23 *
24 * This program is distributed in the hope that it will be useful, but 24 * This program is distributed in the hope that it will be useful, but
25 * WITHOUT ANY WARRANTY; without even the implied warranty of 25 * WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27 * General Public License for more details. 27 * General Public License for more details.
28 * 28 *
48 static const char* JOBS = "Jobs"; 48 static const char* JOBS = "Jobs";
49 static const char* JOBS_REGISTRY = "JobsRegistry"; 49 static const char* JOBS_REGISTRY = "JobsRegistry";
50 static const char* CREATION_TIME = "CreationTime"; 50 static const char* CREATION_TIME = "CreationTime";
51 static const char* LAST_CHANGE_TIME = "LastChangeTime"; 51 static const char* LAST_CHANGE_TIME = "LastChangeTime";
52 static const char* RUNTIME = "Runtime"; 52 static const char* RUNTIME = "Runtime";
53 53
54 54
55 class JobsRegistry::JobHandler : public boost::noncopyable 55 class JobsRegistry::JobHandler : public boost::noncopyable
56 { 56 {
57 private: 57 private:
58 std::string id_; 58 std::string id_;
59 JobState state_; 59 JobState state_;
60 std::string jobType_; 60 std::string jobType_;
61 std::unique_ptr<IJob> job_; 61 std::unique_ptr<IJob> job_;
78 } 78 }
79 79
80 lastStateChangeTime_ = now; 80 lastStateChangeTime_ = now;
81 } 81 }
82 82
83 void SetStateInternal(JobState state) 83 void SetStateInternal(JobState state)
84 { 84 {
85 state_ = state; 85 state_ = state;
86 pauseScheduled_ = false; 86 pauseScheduled_ = false;
87 cancelScheduled_ = false; 87 cancelScheduled_ = false;
88 Touch(); 88 Touch();
137 JobState GetState() const 137 JobState GetState() const
138 { 138 {
139 return state_; 139 return state_;
140 } 140 }
141 141
142 void SetState(JobState state) 142 void SetState(JobState state)
143 { 143 {
144 if (state == JobState_Retry) 144 if (state == JobState_Retry)
145 { 145 {
146 // Use "SetRetryState()" 146 // Use "SetRetryState()"
147 throw OrthancException(ErrorCode_BadSequenceOfCalls); 147 throw OrthancException(ErrorCode_BadSequenceOfCalls);
155 void SetRetryState(unsigned int timeout) 155 void SetRetryState(unsigned int timeout)
156 { 156 {
157 if (state_ == JobState_Running) 157 if (state_ == JobState_Running)
158 { 158 {
159 SetStateInternal(JobState_Retry); 159 SetStateInternal(JobState_Retry);
160 retryTime_ = (boost::posix_time::microsec_clock::universal_time() + 160 retryTime_ = (boost::posix_time::microsec_clock::universal_time() +
161 boost::posix_time::milliseconds(timeout)); 161 boost::posix_time::milliseconds(timeout));
162 } 162 }
163 else 163 else
164 { 164 {
165 // Only valid for running jobs 165 // Only valid for running jobs
273 else 273 else
274 { 274 {
275 ok = false; 275 ok = false;
276 } 276 }
277 } 277 }
278 else 278 else
279 { 279 {
280 ok = job_->Serialize(target[JOB]); 280 ok = job_->Serialize(target[JOB]);
281 } 281 }
282 282
283 if (ok) 283 if (ok)
325 325
326 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a, 326 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a,
327 JobHandler*& b) const 327 JobHandler*& b) const
328 { 328 {
329 return a->GetPriority() < b->GetPriority(); 329 return a->GetPriority() < b->GetPriority();
330 } 330 }
331 331
332 332
333 #if defined(NDEBUG) 333 #if defined(NDEBUG)
334 void JobsRegistry::CheckInvariants() const 334 void JobsRegistry::CheckInvariants() const
335 { 335 {
336 } 336 }
337 337
338 #else 338 #else
339 bool JobsRegistry::IsPendingJob(const JobHandler& job) const 339 bool JobsRegistry::IsPendingJob(const JobHandler& job) const
340 { 340 {
341 PendingJobs copy = pendingJobs_; 341 PendingJobs copy = pendingJobs_;
342 while (!copy.empty()) 342 while (!copy.empty())
407 switch (job.GetState()) 407 switch (job.GetState())
408 { 408 {
409 case JobState_Pending: 409 case JobState_Pending:
410 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); 410 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
411 break; 411 break;
412 412
413 case JobState_Success: 413 case JobState_Success:
414 case JobState_Failure: 414 case JobState_Failure:
415 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); 415 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
416 break; 416 break;
417 417
418 case JobState_Retry: 418 case JobState_Retry:
419 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); 419 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
420 break; 420 break;
421 421
422 case JobState_Running: 422 case JobState_Running:
423 case JobState_Paused: 423 case JobState_Paused:
424 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); 424 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
425 break; 425 break;
426 426
480 break; 480 break;
481 481
482 default: 482 default:
483 throw OrthancException(ErrorCode_InternalError); 483 throw OrthancException(ErrorCode_InternalError);
484 } 484 }
485 485
486 LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId(); 486 LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId();
487 487
488 CheckInvariants(); 488 CheckInvariants();
489 489
490 assert(job.GetState() == JobState_Running); 490 assert(job.GetState() == JobState_Running);
493 if (reason == CompletedReason_Canceled) 493 if (reason == CompletedReason_Canceled)
494 { 494 {
495 job.SetLastErrorCode(ErrorCode_CanceledJob); 495 job.SetLastErrorCode(ErrorCode_CanceledJob);
496 } 496 }
497 497
498 { 498 if (observer_ != NULL)
499 boost::shared_lock<boost::shared_mutex> lock(observersMutex_); 499 {
500 500 if (reason == CompletedReason_Success)
501 for (Observers::iterator it = observers_.begin(); it != observers_.end(); ++it) 501 {
502 { 502 observer_->SignalJobSuccess(job.GetId());
503 if (reason == CompletedReason_Success) 503 }
504 { 504 else
505 (*it)->SignalJobSuccess(job.GetId()); 505 {
506 } 506 observer_->SignalJobFailure(job.GetId());
507 else
508 {
509 (*it)->SignalJobFailure(job.GetId());
510 }
511
512 } 507 }
513 } 508 }
514 509
515 // WARNING: The following call might make "job" invalid if the job 510 // WARNING: The following call might make "job" invalid if the job
516 // history size is empty 511 // history size is empty
563 state = it->second->GetState(); 558 state = it->second->GetState();
564 return true; 559 return true;
565 } 560 }
566 } 561 }
567 562
568 563
569 JobsRegistry::~JobsRegistry() 564 JobsRegistry::~JobsRegistry()
570 { 565 {
571 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) 566 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
572 { 567 {
573 assert(it->second != NULL); 568 assert(it->second != NULL);
671 { 666 {
672 if (handler == NULL) 667 if (handler == NULL)
673 { 668 {
674 throw OrthancException(ErrorCode_NullPointer); 669 throw OrthancException(ErrorCode_NullPointer);
675 } 670 }
676 671
677 std::unique_ptr<JobHandler> protection(handler); 672 std::unique_ptr<JobHandler> protection(handler);
678 673
679 { 674 {
680 boost::mutex::scoped_lock lock(mutex_); 675 boost::mutex::scoped_lock lock(mutex_);
681 CheckInvariants(); 676 CheckInvariants();
682 677
683 id = handler->GetId(); 678 id = handler->GetId();
684 int priority = handler->GetPriority(); 679 int priority = handler->GetPriority();
685 680
686 jobsIndex_.insert(std::make_pair(id, protection.release())); 681 jobsIndex_.insert(std::make_pair(id, protection.release()));
687 682
692 case JobState_Running: 687 case JobState_Running:
693 handler->SetState(JobState_Pending); 688 handler->SetState(JobState_Pending);
694 pendingJobs_.push(handler); 689 pendingJobs_.push(handler);
695 pendingJobAvailable_.notify_one(); 690 pendingJobAvailable_.notify_one();
696 break; 691 break;
697 692
698 case JobState_Success: 693 case JobState_Success:
699 SetCompletedJob(*handler, true); 694 SetCompletedJob(*handler, true);
700 break; 695 break;
701 696
702 case JobState_Failure: 697 case JobState_Failure:
703 SetCompletedJob(*handler, false); 698 SetCompletedJob(*handler, false);
704 break; 699 break;
705 700
706 case JobState_Paused: 701 case JobState_Paused:
707 break; 702 break;
708 703
709 default: 704 default:
710 { 705 {
711 std::string details = ("A job should not be loaded from state: " + 706 std::string details = ("A job should not be loaded from state: " +
712 std::string(EnumerationToString(handler->GetState()))); 707 std::string(EnumerationToString(handler->GetState())));
713 throw OrthancException(ErrorCode_InternalError, details); 708 throw OrthancException(ErrorCode_InternalError, details);
714 } 709 }
715 } 710 }
716 711
717 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; 712 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
718 713
719 { 714 if (observer_ != NULL)
720 boost::shared_lock<boost::shared_mutex> lock(observersMutex_); 715 {
721 716 observer_->SignalJobSubmitted(id);
722 for (Observers::iterator it = observers_.begin(); it != observers_.end(); ++it)
723 {
724 (*it)->SignalJobSubmitted(id);
725 }
726 } 717 }
727 718
728 // WARNING: The following call might make "handler" invalid if 719 // WARNING: The following call might make "handler" invalid if
729 // the job history size is empty 720 // the job history size is empty
730 ForgetOldCompletedJobs(); 721 ForgetOldCompletedJobs();
805 else 796 else
806 { 797 {
807 const JobStatus& status = it->second->GetLastStatus(); 798 const JobStatus& status = it->second->GetLastStatus();
808 successContent = status.GetPublicContent(); 799 successContent = status.GetPublicContent();
809 } 800 }
810 801
811 return; 802 return;
812 } 803 }
813 else 804 else
814 { 805 {
815 // This job has not finished yet, wait for new completion 806 // This job has not finished yet, wait for new completion
882 873
883 874
884 void JobsRegistry::RemoveRetryJob(JobHandler* handler) 875 void JobsRegistry::RemoveRetryJob(JobHandler* handler)
885 { 876 {
886 RetryJobs::iterator item = retryJobs_.find(handler); 877 RetryJobs::iterator item = retryJobs_.find(handler);
887 assert(item != retryJobs_.end()); 878 assert(item != retryJobs_.end());
888 retryJobs_.erase(item); 879 retryJobs_.erase(item);
889 } 880 }
890 881
891 882
892 bool JobsRegistry::Pause(const std::string& id) 883 bool JobsRegistry::Pause(const std::string& id)
969 960
970 case JobState_Paused: 961 case JobState_Paused:
971 SetCompletedJob(*found->second, false); 962 SetCompletedJob(*found->second, false);
972 found->second->SetLastErrorCode(ErrorCode_CanceledJob); 963 found->second->SetLastErrorCode(ErrorCode_CanceledJob);
973 break; 964 break;
974 965
975 case JobState_Success: 966 case JobState_Success:
976 case JobState_Failure: 967 case JobState_Failure:
977 // Nothing to be done 968 // Nothing to be done
978 break; 969 break;
979 970
1017 { 1008 {
1018 found->second->SetState(JobState_Pending); 1009 found->second->SetState(JobState_Pending);
1019 pendingJobs_.push(found->second); 1010 pendingJobs_.push(found->second);
1020 pendingJobAvailable_.notify_one(); 1011 pendingJobAvailable_.notify_one();
1021 CheckInvariants(); 1012 CheckInvariants();
1022 return true; 1013 return true;
1023 } 1014 }
1024 } 1015 }
1025 1016
1026 1017
1027 bool JobsRegistry::Resubmit(const std::string& id) 1018 bool JobsRegistry::Resubmit(const std::string& id)
1044 return false; 1035 return false;
1045 } 1036 }
1046 else 1037 else
1047 { 1038 {
1048 found->second->GetJob().Reset(); 1039 found->second->GetJob().Reset();
1049 1040
1050 bool ok = false; 1041 bool ok = false;
1051 for (CompletedJobs::iterator it = completedJobs_.begin(); 1042 for (CompletedJobs::iterator it = completedJobs_.begin();
1052 it != completedJobs_.end(); ++it) 1043 it != completedJobs_.end(); ++it)
1053 { 1044 {
1054 if (*it == found->second) 1045 if (*it == found->second)
1055 { 1046 {
1056 ok = true; 1047 ok = true;
1107 boost::mutex::scoped_lock lock(mutex_); 1098 boost::mutex::scoped_lock lock(mutex_);
1108 return GetStateInternal(state, id); 1099 return GetStateInternal(state, id);
1109 } 1100 }
1110 1101
1111 1102
1112 void JobsRegistry::AddObserver(JobsRegistry::IObserver& observer) 1103 void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer)
1113 { 1104 {
1114 boost::unique_lock<boost::shared_mutex> lock(observersMutex_); 1105 boost::mutex::scoped_lock lock(mutex_);
1115 observers_.insert(&observer); 1106 observer_ = &observer;
1116 } 1107 }
1117 1108
1118 1109
1119 void JobsRegistry::ResetObserver(JobsRegistry::IObserver& observer) 1110 void JobsRegistry::ResetObserver()
1120 { 1111 {
1121 boost::unique_lock<boost::shared_mutex> lock(observersMutex_); 1112 boost::mutex::scoped_lock lock(mutex_);
1122 Observers::iterator it = observers_.find(&observer); 1113 observer_ = NULL;
1123 if (it != observers_.end()) 1114 }
1124 { 1115
1125 observers_.erase(it); 1116
1126 }
1127 }
1128
1129
1130 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry, 1117 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry,
1131 unsigned int timeout) : 1118 unsigned int timeout) :
1132 registry_(registry), 1119 registry_(registry),
1133 handler_(NULL), 1120 handler_(NULL),
1134 targetState_(JobState_Failure), 1121 targetState_(JobState_Failure),
1167 id_ = handler_->GetId(); 1154 id_ = handler_->GetId();
1168 priority_ = handler_->GetPriority(); 1155 priority_ = handler_->GetPriority();
1169 } 1156 }
1170 } 1157 }
1171 1158
1172 1159
1173 JobsRegistry::RunningJob::~RunningJob() 1160 JobsRegistry::RunningJob::~RunningJob()
1174 { 1161 {
1175 if (IsValid()) 1162 if (IsValid())
1176 { 1163 {
1177 boost::mutex::scoped_lock lock(registry_.mutex_); 1164 boost::mutex::scoped_lock lock(registry_.mutex_);
1187 registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success); 1174 registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success);
1188 break; 1175 break;
1189 1176
1190 case JobState_Paused: 1177 case JobState_Paused:
1191 registry_.MarkRunningAsPaused(*handler_); 1178 registry_.MarkRunningAsPaused(*handler_);
1192 break; 1179 break;
1193 1180
1194 case JobState_Retry: 1181 case JobState_Retry:
1195 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_); 1182 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
1196 break; 1183 break;
1197 1184
1198 default: 1185 default:
1199 assert(0); 1186 assert(0);
1200 } 1187 }
1201 } 1188 }
1202 } 1189 }
1203 1190
1204 1191
1205 bool JobsRegistry::RunningJob::IsValid() const 1192 bool JobsRegistry::RunningJob::IsValid() const
1206 { 1193 {
1207 return (handler_ != NULL && 1194 return (handler_ != NULL &&
1208 job_ != NULL); 1195 job_ != NULL);
1209 } 1196 }
1210 1197
1211 1198
1212 const std::string& JobsRegistry::RunningJob::GetId() const 1199 const std::string& JobsRegistry::RunningJob::GetId() const
1213 { 1200 {
1214 if (!IsValid()) 1201 if (!IsValid())
1215 { 1202 {
1216 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1203 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1219 { 1206 {
1220 return id_; 1207 return id_;
1221 } 1208 }
1222 } 1209 }
1223 1210
1224 1211
1225 int JobsRegistry::RunningJob::GetPriority() const 1212 int JobsRegistry::RunningJob::GetPriority() const
1226 { 1213 {
1227 if (!IsValid()) 1214 if (!IsValid())
1228 { 1215 {
1229 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1216 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1231 else 1218 else
1232 { 1219 {
1233 return priority_; 1220 return priority_;
1234 } 1221 }
1235 } 1222 }
1236 1223
1237 1224
1238 IJob& JobsRegistry::RunningJob::GetJob() 1225 IJob& JobsRegistry::RunningJob::GetJob()
1239 { 1226 {
1240 if (!IsValid()) 1227 if (!IsValid())
1241 { 1228 {
1245 { 1232 {
1246 return *job_; 1233 return *job_;
1247 } 1234 }
1248 } 1235 }
1249 1236
1250 1237
1251 bool JobsRegistry::RunningJob::IsPauseScheduled() 1238 bool JobsRegistry::RunningJob::IsPauseScheduled()
1252 { 1239 {
1253 if (!IsValid()) 1240 if (!IsValid())
1254 { 1241 {
1255 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1242 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1257 else 1244 else
1258 { 1245 {
1259 boost::mutex::scoped_lock lock(registry_.mutex_); 1246 boost::mutex::scoped_lock lock(registry_.mutex_);
1260 registry_.CheckInvariants(); 1247 registry_.CheckInvariants();
1261 assert(handler_->GetState() == JobState_Running); 1248 assert(handler_->GetState() == JobState_Running);
1262 1249
1263 return handler_->IsPauseScheduled(); 1250 return handler_->IsPauseScheduled();
1264 } 1251 }
1265 } 1252 }
1266 1253
1267 1254
1268 bool JobsRegistry::RunningJob::IsCancelScheduled() 1255 bool JobsRegistry::RunningJob::IsCancelScheduled()
1269 { 1256 {
1270 if (!IsValid()) 1257 if (!IsValid())
1271 { 1258 {
1272 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1259 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1274 else 1261 else
1275 { 1262 {
1276 boost::mutex::scoped_lock lock(registry_.mutex_); 1263 boost::mutex::scoped_lock lock(registry_.mutex_);
1277 registry_.CheckInvariants(); 1264 registry_.CheckInvariants();
1278 assert(handler_->GetState() == JobState_Running); 1265 assert(handler_->GetState() == JobState_Running);
1279 1266
1280 return handler_->IsCancelScheduled(); 1267 return handler_->IsCancelScheduled();
1281 } 1268 }
1282 } 1269 }
1283 1270
1284 1271
1285 void JobsRegistry::RunningJob::MarkSuccess() 1272 void JobsRegistry::RunningJob::MarkSuccess()
1286 { 1273 {
1287 if (!IsValid()) 1274 if (!IsValid())
1288 { 1275 {
1289 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1276 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1292 { 1279 {
1293 targetState_ = JobState_Success; 1280 targetState_ = JobState_Success;
1294 } 1281 }
1295 } 1282 }
1296 1283
1297 1284
1298 void JobsRegistry::RunningJob::MarkFailure() 1285 void JobsRegistry::RunningJob::MarkFailure()
1299 { 1286 {
1300 if (!IsValid()) 1287 if (!IsValid())
1301 { 1288 {
1302 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1289 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1305 { 1292 {
1306 targetState_ = JobState_Failure; 1293 targetState_ = JobState_Failure;
1307 } 1294 }
1308 } 1295 }
1309 1296
1310 1297
1311 void JobsRegistry::RunningJob::MarkCanceled() 1298 void JobsRegistry::RunningJob::MarkCanceled()
1312 { 1299 {
1313 if (!IsValid()) 1300 if (!IsValid())
1314 { 1301 {
1315 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1302 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1319 targetState_ = JobState_Failure; 1306 targetState_ = JobState_Failure;
1320 canceled_ = true; 1307 canceled_ = true;
1321 } 1308 }
1322 } 1309 }
1323 1310
1324 1311
1325 void JobsRegistry::RunningJob::MarkPause() 1312 void JobsRegistry::RunningJob::MarkPause()
1326 { 1313 {
1327 if (!IsValid()) 1314 if (!IsValid())
1328 { 1315 {
1329 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1316 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1332 { 1319 {
1333 targetState_ = JobState_Paused; 1320 targetState_ = JobState_Paused;
1334 } 1321 }
1335 } 1322 }
1336 1323
1337 1324
1338 void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout) 1325 void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout)
1339 { 1326 {
1340 if (!IsValid()) 1327 if (!IsValid())
1341 { 1328 {
1342 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1329 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1345 { 1332 {
1346 targetState_ = JobState_Retry; 1333 targetState_ = JobState_Retry;
1347 targetRetryTimeout_ = timeout; 1334 targetRetryTimeout_ = timeout;
1348 } 1335 }
1349 } 1336 }
1350 1337
1351 1338
1352 void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code, 1339 void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code,
1353 const std::string& details) 1340 const std::string& details)
1354 { 1341 {
1355 if (!IsValid()) 1342 if (!IsValid())
1357 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1344 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1358 } 1345 }
1359 else 1346 else
1360 { 1347 {
1361 JobStatus status(code, details, *job_); 1348 JobStatus status(code, details, *job_);
1362 1349
1363 boost::mutex::scoped_lock lock(registry_.mutex_); 1350 boost::mutex::scoped_lock lock(registry_.mutex_);
1364 registry_.CheckInvariants(); 1351 registry_.CheckInvariants();
1365 assert(handler_->GetState() == JobState_Running); 1352 assert(handler_->GetState() == JobState_Running);
1366 1353
1367 handler_->SetLastStatus(status); 1354 handler_->SetLastStatus(status);
1368 } 1355 }
1369 } 1356 }
1370 1357
1371 1358
1376 CheckInvariants(); 1363 CheckInvariants();
1377 1364
1378 target = Json::objectValue; 1365 target = Json::objectValue;
1379 target[TYPE] = JOBS_REGISTRY; 1366 target[TYPE] = JOBS_REGISTRY;
1380 target[JOBS] = Json::objectValue; 1367 target[JOBS] = Json::objectValue;
1381 1368
1382 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 1369 for (JobsIndex::const_iterator it = jobsIndex_.begin();
1383 it != jobsIndex_.end(); ++it) 1370 it != jobsIndex_.end(); ++it)
1384 { 1371 {
1385 Json::Value v; 1372 Json::Value v;
1386 if (it->second->Serialize(v)) 1373 if (it->second->Serialize(v))
1387 { 1374 {
1392 1379
1393 1380
1394 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, 1381 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
1395 const Json::Value& s, 1382 const Json::Value& s,
1396 size_t maxCompletedJobs) : 1383 size_t maxCompletedJobs) :
1397 maxCompletedJobs_(maxCompletedJobs) 1384 maxCompletedJobs_(maxCompletedJobs),
1385 observer_(NULL)
1398 { 1386 {
1399 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || 1387 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
1400 !s.isMember(JOBS) || 1388 !s.isMember(JOBS) ||
1401 s[JOBS].type() != Json::objectValue) 1389 s[JOBS].type() != Json::objectValue)
1402 { 1390 {
1450 1438
1451 pending = 0; 1439 pending = 0;
1452 running = 0; 1440 running = 0;
1453 success = 0; 1441 success = 0;
1454 failed = 0; 1442 failed = 0;
1455 1443
1456 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 1444 for (JobsIndex::const_iterator it = jobsIndex_.begin();
1457 it != jobsIndex_.end(); ++it) 1445 it != jobsIndex_.end(); ++it)
1458 { 1446 {
1459 JobHandler& job = *it->second; 1447 JobHandler& job = *it->second;
1460 1448
1467 1455
1468 case JobState_Paused: 1456 case JobState_Paused:
1469 case JobState_Running: 1457 case JobState_Running:
1470 running ++; 1458 running ++;
1471 break; 1459 break;
1472 1460
1473 case JobState_Success: 1461 case JobState_Success:
1474 success ++; 1462 success ++;
1475 break; 1463 break;
1476 1464
1477 case JobState_Failure: 1465 case JobState_Failure:
1479 break; 1467 break;
1480 1468
1481 default: 1469 default:
1482 throw OrthancException(ErrorCode_InternalError); 1470 throw OrthancException(ErrorCode_InternalError);
1483 } 1471 }
1484 } 1472 }
1485 } 1473 }
1486 } 1474 }