Mercurial > hg > orthanc
comparison Core/JobsEngine/JobsRegistry.cpp @ 2667:5fa2f2ce74f0 jobs
serialization of JobsRegistry
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 08 Jun 2018 15:48:35 +0200 |
parents | 228e2783ce83 |
children | d26dd081df97 |
comparison
equal
deleted
inserted
replaced
2666:2540ac79ab6c | 2667:5fa2f2ce74f0 |
---|---|
39 #include "../Toolbox.h" | 39 #include "../Toolbox.h" |
40 #include "../SerializationToolbox.h" | 40 #include "../SerializationToolbox.h" |
41 | 41 |
42 namespace Orthanc | 42 namespace Orthanc |
43 { | 43 { |
44 static const char* ID = "ID"; | |
45 static const char* STATE = "State"; | |
46 static const char* TYPE = "Type"; | |
47 static const char* PRIORITY = "Priority"; | |
48 static const char* JOB = "Job"; | |
49 static const char* JOBS = "Jobs"; | |
50 static const char* JOBS_REGISTRY = "JobsRegistry"; | |
51 static const char* MAX_COMPLETED_JOBS = "MaxCompletedJobs"; | |
52 static const char* CREATION_TIME = "CreationTime"; | |
53 static const char* RUNTIME = "Runtime"; | |
54 | |
55 | |
44 class JobsRegistry::JobHandler : public boost::noncopyable | 56 class JobsRegistry::JobHandler : public boost::noncopyable |
45 { | 57 { |
46 private: | 58 private: |
47 std::string id_; | 59 std::string id_; |
48 JobState state_; | 60 JobState state_; |
95 { | 107 { |
96 throw OrthancException(ErrorCode_NullPointer); | 108 throw OrthancException(ErrorCode_NullPointer); |
97 } | 109 } |
98 | 110 |
99 job->GetJobType(jobType_); | 111 job->GetJobType(jobType_); |
100 job->Start(); | |
101 | 112 |
102 lastStatus_ = JobStatus(ErrorCode_Success, *job_); | 113 lastStatus_ = JobStatus(ErrorCode_Success, *job_); |
103 } | 114 } |
104 | 115 |
105 const std::string& GetId() const | 116 const std::string& GetId() const |
249 // blocked while a step in the job is running. Instead, we | 260 // blocked while a step in the job is running. Instead, we |
250 // save a snapshot of the serialized job. | 261 // save a snapshot of the serialized job. |
251 | 262 |
252 if (lastStatus_.HasSerialized()) | 263 if (lastStatus_.HasSerialized()) |
253 { | 264 { |
254 target["Job"] = lastStatus_.GetSerialized(); | 265 target[JOB] = lastStatus_.GetSerialized(); |
255 ok = true; | 266 ok = true; |
256 } | 267 } |
257 else | 268 else |
258 { | 269 { |
259 ok = false; | 270 ok = false; |
260 } | 271 } |
261 } | 272 } |
262 else | 273 else |
263 { | 274 { |
264 ok = job_->Serialize(target["Job"]); | 275 ok = job_->Serialize(target[JOB]); |
265 } | 276 } |
266 | 277 |
267 if (ok) | 278 if (ok) |
268 { | 279 { |
269 target["ID"] = id_; | 280 target[ID] = id_; |
270 target["State"] = EnumerationToString(state_); | 281 target[STATE] = EnumerationToString(state_); |
271 target["JobType"] = jobType_; | 282 target[PRIORITY] = priority_; |
272 target["Priority"] = priority_; | 283 target[CREATION_TIME] = boost::posix_time::to_iso_string(creationTime_); |
273 target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_); | 284 target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds()); |
274 target["Runtime"] = static_cast<unsigned int>(runtime_.total_milliseconds()); | |
275 return true; | 285 return true; |
276 } | 286 } |
277 else | 287 else |
278 { | 288 { |
279 LOG(WARNING) << "Job backup is not supported for job of type: " << jobType_; | 289 LOG(WARNING) << "Job backup is not supported for job of type: " << jobType_; |
285 const Json::Value& serialized) : | 295 const Json::Value& serialized) : |
286 lastStateChangeTime_(boost::posix_time::microsec_clock::universal_time()), | 296 lastStateChangeTime_(boost::posix_time::microsec_clock::universal_time()), |
287 pauseScheduled_(false), | 297 pauseScheduled_(false), |
288 cancelScheduled_(false) | 298 cancelScheduled_(false) |
289 { | 299 { |
290 id_ = StringToJobState(SerializationToolbox::ReadString(serialized, "ID")); | 300 id_ = SerializationToolbox::ReadString(serialized, ID); |
291 state_ = StringToJobState(SerializationToolbox::ReadString(serialized, "State")); | 301 state_ = StringToJobState(SerializationToolbox::ReadString(serialized, STATE)); |
292 priority_ = SerializationToolbox::ReadInteger(serialized, "Priority"); | 302 priority_ = SerializationToolbox::ReadInteger(serialized, PRIORITY); |
293 creationTime_ = boost::posix_time::from_iso_string | 303 creationTime_ = boost::posix_time::from_iso_string |
294 (SerializationToolbox::ReadString(serialized, "CreationTime")); | 304 (SerializationToolbox::ReadString(serialized, CREATION_TIME)); |
295 runtime_ = boost::posix_time::milliseconds(SerializationToolbox::ReadInteger(serialized, "Runtime")); | 305 runtime_ = boost::posix_time::milliseconds |
306 (SerializationToolbox::ReadInteger(serialized, RUNTIME)); | |
296 | 307 |
297 retryTime_ = creationTime_; | 308 retryTime_ = creationTime_; |
298 | 309 |
299 if (state_ == JobState_Retry || | 310 if (state_ == JobState_Retry || |
300 state_ == JobState_Running) | 311 state_ == JobState_Running) |
301 { | 312 { |
302 state_ = JobState_Pending; | 313 state_ = JobState_Pending; |
303 } | 314 } |
304 | 315 |
305 job_.reset(unserializer.UnserializeJob(serialized["Job"])); | 316 job_.reset(unserializer.UnserializeJob(serialized[JOB])); |
306 job_->GetJobType(jobType_); | 317 job_->GetJobType(jobType_); |
307 job_->Start(); | 318 job_->Start(); |
308 | 319 |
309 lastStatus_ = JobStatus(ErrorCode_Success, *job_); | 320 lastStatus_ = JobStatus(ErrorCode_Success, *job_); |
310 } | 321 } |
576 return true; | 587 return true; |
577 } | 588 } |
578 } | 589 } |
579 | 590 |
580 | 591 |
581 void JobsRegistry::Serialize(Json::Value& target) | 592 void JobsRegistry::SubmitInternal(std::string& id, |
582 { | 593 JobHandler* handlerRaw) |
594 { | |
595 std::auto_ptr<JobHandler> handler(handlerRaw); | |
596 | |
583 boost::mutex::scoped_lock lock(mutex_); | 597 boost::mutex::scoped_lock lock(mutex_); |
584 CheckInvariants(); | 598 CheckInvariants(); |
585 | 599 |
586 target = Json::arrayValue; | 600 id = handler->GetId(); |
587 | 601 int priority = handler->GetPriority(); |
588 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | 602 |
589 it != jobsIndex_.end(); ++it) | 603 pendingJobs_.push(handler.get()); |
590 { | 604 pendingJobAvailable_.notify_one(); |
591 Json::Value v; | 605 |
592 if (it->second->Serialize(v)) | 606 jobsIndex_.insert(std::make_pair(id, handler.release())); |
593 { | 607 |
594 target.append(v); | 608 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; |
595 } | 609 |
596 } | 610 CheckInvariants(); |
597 } | 611 } |
598 | 612 |
599 | 613 |
600 void JobsRegistry::Submit(std::string& id, | 614 void JobsRegistry::Submit(std::string& id, |
601 IJob* job, // Takes ownership | 615 IJob* job, // Takes ownership |
602 int priority) | 616 int priority) |
603 { | 617 { |
604 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); | 618 SubmitInternal(id, new JobHandler(job, priority)); |
605 | |
606 boost::mutex::scoped_lock lock(mutex_); | |
607 CheckInvariants(); | |
608 | |
609 id = handler->GetId(); | |
610 | |
611 pendingJobs_.push(handler.get()); | |
612 pendingJobAvailable_.notify_one(); | |
613 | |
614 jobsIndex_.insert(std::make_pair(id, handler.release())); | |
615 | |
616 LOG(INFO) << "New job submitted with priority " << priority << ": " << id; | |
617 | |
618 CheckInvariants(); | |
619 } | 619 } |
620 | 620 |
621 | 621 |
622 void JobsRegistry::Submit(IJob* job, // Takes ownership | 622 void JobsRegistry::Submit(IJob* job, // Takes ownership |
623 int priority) | 623 int priority) |
624 { | 624 { |
625 std::string id; | 625 std::string id; |
626 Submit(id, job, priority); | 626 SubmitInternal(id, new JobHandler(job, priority)); |
627 } | 627 } |
628 | 628 |
629 | 629 |
630 bool JobsRegistry::SubmitAndWait(IJob* job, // Takes ownership | 630 bool JobsRegistry::SubmitAndWait(IJob* job, // Takes ownership |
631 int priority) | 631 int priority) |
865 LOG(WARNING) << "Unknown job: " << id; | 865 LOG(WARNING) << "Unknown job: " << id; |
866 return false; | 866 return false; |
867 } | 867 } |
868 else if (found->second->GetState() != JobState_Failure) | 868 else if (found->second->GetState() != JobState_Failure) |
869 { | 869 { |
870 printf("%s\n", EnumerationToString(found->second->GetState())); | |
871 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; | 870 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; |
872 return false; | 871 return false; |
873 } | 872 } |
874 else | 873 else |
875 { | 874 { |
1179 assert(handler_->GetState() == JobState_Running); | 1178 assert(handler_->GetState() == JobState_Running); |
1180 | 1179 |
1181 handler_->SetLastStatus(status); | 1180 handler_->SetLastStatus(status); |
1182 } | 1181 } |
1183 } | 1182 } |
1183 | |
1184 | |
1185 | |
1186 void JobsRegistry::Serialize(Json::Value& target) | |
1187 { | |
1188 boost::mutex::scoped_lock lock(mutex_); | |
1189 CheckInvariants(); | |
1190 | |
1191 target = Json::objectValue; | |
1192 target[TYPE] = JOBS_REGISTRY; | |
1193 target[MAX_COMPLETED_JOBS] = static_cast<unsigned int>(maxCompletedJobs_); | |
1194 target[JOBS] = Json::arrayValue; | |
1195 | |
1196 for (JobsIndex::const_iterator it = jobsIndex_.begin(); | |
1197 it != jobsIndex_.end(); ++it) | |
1198 { | |
1199 Json::Value v; | |
1200 if (it->second->Serialize(v)) | |
1201 { | |
1202 target[JOBS].append(v); | |
1203 } | |
1204 } | |
1205 } | |
1206 | |
1207 | |
1208 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, | |
1209 const Json::Value& s) | |
1210 { | |
1211 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || | |
1212 !s.isMember(JOBS) || | |
1213 s[JOBS].type() != Json::arrayValue) | |
1214 { | |
1215 throw OrthancException(ErrorCode_BadFileFormat); | |
1216 } | |
1217 | |
1218 maxCompletedJobs_ = SerializationToolbox::ReadUnsignedInteger(s, MAX_COMPLETED_JOBS); | |
1219 | |
1220 for (Json::Value::ArrayIndex i = 0; i < s[JOBS].size(); i++) | |
1221 { | |
1222 std::auto_ptr<JobHandler> job(new JobHandler(unserializer, s[JOBS][i])); | |
1223 | |
1224 std::string id; | |
1225 SubmitInternal(id, job.release()); | |
1226 } | |
1227 } | |
1184 } | 1228 } |