comparison Core/JobsEngine/JobsRegistry.cpp @ 2667:5fa2f2ce74f0 jobs

serialization of JobsRegistry
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 08 Jun 2018 15:48:35 +0200
parents 228e2783ce83
children d26dd081df97
comparison
equal deleted inserted replaced
2666:2540ac79ab6c 2667:5fa2f2ce74f0
39 #include "../Toolbox.h" 39 #include "../Toolbox.h"
40 #include "../SerializationToolbox.h" 40 #include "../SerializationToolbox.h"
41 41
42 namespace Orthanc 42 namespace Orthanc
43 { 43 {
44 static const char* ID = "ID";
45 static const char* STATE = "State";
46 static const char* TYPE = "Type";
47 static const char* PRIORITY = "Priority";
48 static const char* JOB = "Job";
49 static const char* JOBS = "Jobs";
50 static const char* JOBS_REGISTRY = "JobsRegistry";
51 static const char* MAX_COMPLETED_JOBS = "MaxCompletedJobs";
52 static const char* CREATION_TIME = "CreationTime";
53 static const char* RUNTIME = "Runtime";
54
55
44 class JobsRegistry::JobHandler : public boost::noncopyable 56 class JobsRegistry::JobHandler : public boost::noncopyable
45 { 57 {
46 private: 58 private:
47 std::string id_; 59 std::string id_;
48 JobState state_; 60 JobState state_;
95 { 107 {
96 throw OrthancException(ErrorCode_NullPointer); 108 throw OrthancException(ErrorCode_NullPointer);
97 } 109 }
98 110
99 job->GetJobType(jobType_); 111 job->GetJobType(jobType_);
100 job->Start();
101 112
102 lastStatus_ = JobStatus(ErrorCode_Success, *job_); 113 lastStatus_ = JobStatus(ErrorCode_Success, *job_);
103 } 114 }
104 115
105 const std::string& GetId() const 116 const std::string& GetId() const
249 // blocked while a step in the job is running. Instead, we 260 // blocked while a step in the job is running. Instead, we
250 // save a snapshot of the serialized job. 261 // save a snapshot of the serialized job.
251 262
252 if (lastStatus_.HasSerialized()) 263 if (lastStatus_.HasSerialized())
253 { 264 {
254 target["Job"] = lastStatus_.GetSerialized(); 265 target[JOB] = lastStatus_.GetSerialized();
255 ok = true; 266 ok = true;
256 } 267 }
257 else 268 else
258 { 269 {
259 ok = false; 270 ok = false;
260 } 271 }
261 } 272 }
262 else 273 else
263 { 274 {
264 ok = job_->Serialize(target["Job"]); 275 ok = job_->Serialize(target[JOB]);
265 } 276 }
266 277
267 if (ok) 278 if (ok)
268 { 279 {
269 target["ID"] = id_; 280 target[ID] = id_;
270 target["State"] = EnumerationToString(state_); 281 target[STATE] = EnumerationToString(state_);
271 target["JobType"] = jobType_; 282 target[PRIORITY] = priority_;
272 target["Priority"] = priority_; 283 target[CREATION_TIME] = boost::posix_time::to_iso_string(creationTime_);
273 target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_); 284 target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds());
274 target["Runtime"] = static_cast<unsigned int>(runtime_.total_milliseconds());
275 return true; 285 return true;
276 } 286 }
277 else 287 else
278 { 288 {
279 LOG(WARNING) << "Job backup is not supported for job of type: " << jobType_; 289 LOG(WARNING) << "Job backup is not supported for job of type: " << jobType_;
285 const Json::Value& serialized) : 295 const Json::Value& serialized) :
286 lastStateChangeTime_(boost::posix_time::microsec_clock::universal_time()), 296 lastStateChangeTime_(boost::posix_time::microsec_clock::universal_time()),
287 pauseScheduled_(false), 297 pauseScheduled_(false),
288 cancelScheduled_(false) 298 cancelScheduled_(false)
289 { 299 {
290 id_ = StringToJobState(SerializationToolbox::ReadString(serialized, "ID")); 300 id_ = SerializationToolbox::ReadString(serialized, ID);
291 state_ = StringToJobState(SerializationToolbox::ReadString(serialized, "State")); 301 state_ = StringToJobState(SerializationToolbox::ReadString(serialized, STATE));
292 priority_ = SerializationToolbox::ReadInteger(serialized, "Priority"); 302 priority_ = SerializationToolbox::ReadInteger(serialized, PRIORITY);
293 creationTime_ = boost::posix_time::from_iso_string 303 creationTime_ = boost::posix_time::from_iso_string
294 (SerializationToolbox::ReadString(serialized, "CreationTime")); 304 (SerializationToolbox::ReadString(serialized, CREATION_TIME));
295 runtime_ = boost::posix_time::milliseconds(SerializationToolbox::ReadInteger(serialized, "Runtime")); 305 runtime_ = boost::posix_time::milliseconds
306 (SerializationToolbox::ReadInteger(serialized, RUNTIME));
296 307
297 retryTime_ = creationTime_; 308 retryTime_ = creationTime_;
298 309
299 if (state_ == JobState_Retry || 310 if (state_ == JobState_Retry ||
300 state_ == JobState_Running) 311 state_ == JobState_Running)
301 { 312 {
302 state_ = JobState_Pending; 313 state_ = JobState_Pending;
303 } 314 }
304 315
305 job_.reset(unserializer.UnserializeJob(serialized["Job"])); 316 job_.reset(unserializer.UnserializeJob(serialized[JOB]));
306 job_->GetJobType(jobType_); 317 job_->GetJobType(jobType_);
307 job_->Start(); 318 job_->Start();
308 319
309 lastStatus_ = JobStatus(ErrorCode_Success, *job_); 320 lastStatus_ = JobStatus(ErrorCode_Success, *job_);
310 } 321 }
576 return true; 587 return true;
577 } 588 }
578 } 589 }
579 590
580 591
581 void JobsRegistry::Serialize(Json::Value& target) 592 void JobsRegistry::SubmitInternal(std::string& id,
582 { 593 JobHandler* handlerRaw)
594 {
595 std::auto_ptr<JobHandler> handler(handlerRaw);
596
583 boost::mutex::scoped_lock lock(mutex_); 597 boost::mutex::scoped_lock lock(mutex_);
584 CheckInvariants(); 598 CheckInvariants();
585 599
586 target = Json::arrayValue; 600 id = handler->GetId();
587 601 int priority = handler->GetPriority();
588 for (JobsIndex::const_iterator it = jobsIndex_.begin(); 602
589 it != jobsIndex_.end(); ++it) 603 pendingJobs_.push(handler.get());
590 { 604 pendingJobAvailable_.notify_one();
591 Json::Value v; 605
592 if (it->second->Serialize(v)) 606 jobsIndex_.insert(std::make_pair(id, handler.release()));
593 { 607
594 target.append(v); 608 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
595 } 609
596 } 610 CheckInvariants();
597 } 611 }
598 612
599 613
600 void JobsRegistry::Submit(std::string& id, 614 void JobsRegistry::Submit(std::string& id,
601 IJob* job, // Takes ownership 615 IJob* job, // Takes ownership
602 int priority) 616 int priority)
603 { 617 {
604 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); 618 SubmitInternal(id, new JobHandler(job, priority));
605
606 boost::mutex::scoped_lock lock(mutex_);
607 CheckInvariants();
608
609 id = handler->GetId();
610
611 pendingJobs_.push(handler.get());
612 pendingJobAvailable_.notify_one();
613
614 jobsIndex_.insert(std::make_pair(id, handler.release()));
615
616 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
617
618 CheckInvariants();
619 } 619 }
620 620
621 621
622 void JobsRegistry::Submit(IJob* job, // Takes ownership 622 void JobsRegistry::Submit(IJob* job, // Takes ownership
623 int priority) 623 int priority)
624 { 624 {
625 std::string id; 625 std::string id;
626 Submit(id, job, priority); 626 SubmitInternal(id, new JobHandler(job, priority));
627 } 627 }
628 628
629 629
630 bool JobsRegistry::SubmitAndWait(IJob* job, // Takes ownership 630 bool JobsRegistry::SubmitAndWait(IJob* job, // Takes ownership
631 int priority) 631 int priority)
865 LOG(WARNING) << "Unknown job: " << id; 865 LOG(WARNING) << "Unknown job: " << id;
866 return false; 866 return false;
867 } 867 }
868 else if (found->second->GetState() != JobState_Failure) 868 else if (found->second->GetState() != JobState_Failure)
869 { 869 {
870 printf("%s\n", EnumerationToString(found->second->GetState()));
871 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; 870 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
872 return false; 871 return false;
873 } 872 }
874 else 873 else
875 { 874 {
1179 assert(handler_->GetState() == JobState_Running); 1178 assert(handler_->GetState() == JobState_Running);
1180 1179
1181 handler_->SetLastStatus(status); 1180 handler_->SetLastStatus(status);
1182 } 1181 }
1183 } 1182 }
1183
1184
1185
1186 void JobsRegistry::Serialize(Json::Value& target)
1187 {
1188 boost::mutex::scoped_lock lock(mutex_);
1189 CheckInvariants();
1190
1191 target = Json::objectValue;
1192 target[TYPE] = JOBS_REGISTRY;
1193 target[MAX_COMPLETED_JOBS] = static_cast<unsigned int>(maxCompletedJobs_);
1194 target[JOBS] = Json::arrayValue;
1195
1196 for (JobsIndex::const_iterator it = jobsIndex_.begin();
1197 it != jobsIndex_.end(); ++it)
1198 {
1199 Json::Value v;
1200 if (it->second->Serialize(v))
1201 {
1202 target[JOBS].append(v);
1203 }
1204 }
1205 }
1206
1207
1208 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
1209 const Json::Value& s)
1210 {
1211 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
1212 !s.isMember(JOBS) ||
1213 s[JOBS].type() != Json::arrayValue)
1214 {
1215 throw OrthancException(ErrorCode_BadFileFormat);
1216 }
1217
1218 maxCompletedJobs_ = SerializationToolbox::ReadUnsignedInteger(s, MAX_COMPLETED_JOBS);
1219
1220 for (Json::Value::ArrayIndex i = 0; i < s[JOBS].size(); i++)
1221 {
1222 std::auto_ptr<JobHandler> job(new JobHandler(unserializer, s[JOBS][i]));
1223
1224 std::string id;
1225 SubmitInternal(id, job.release());
1226 }
1227 }
1184 } 1228 }