Mercurial > hg > orthanc
changeset 2667:5fa2f2ce74f0 jobs
serialization of JobsRegistry
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 08 Jun 2018 15:48:35 +0200 |
parents | 2540ac79ab6c |
children | d26dd081df97 |
files | Core/JobsEngine/JobsRegistry.cpp Core/JobsEngine/JobsRegistry.h UnitTestsSources/MultiThreadingTests.cpp |
diffstat | 3 files changed, 113 insertions(+), 41 deletions(-) [+] |
line wrap: on
line diff
--- a/Core/JobsEngine/JobsRegistry.cpp Fri Jun 08 15:05:32 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.cpp Fri Jun 08 15:48:35 2018 +0200 @@ -41,6 +41,18 @@ namespace Orthanc { + static const char* ID = "ID"; + static const char* STATE = "State"; + static const char* TYPE = "Type"; + static const char* PRIORITY = "Priority"; + static const char* JOB = "Job"; + static const char* JOBS = "Jobs"; + static const char* JOBS_REGISTRY = "JobsRegistry"; + static const char* MAX_COMPLETED_JOBS = "MaxCompletedJobs"; + static const char* CREATION_TIME = "CreationTime"; + static const char* RUNTIME = "Runtime"; + + class JobsRegistry::JobHandler : public boost::noncopyable { private: @@ -97,7 +109,6 @@ } job->GetJobType(jobType_); - job->Start(); lastStatus_ = JobStatus(ErrorCode_Success, *job_); } @@ -251,7 +262,7 @@ if (lastStatus_.HasSerialized()) { - target["Job"] = lastStatus_.GetSerialized(); + target[JOB] = lastStatus_.GetSerialized(); ok = true; } else @@ -261,17 +272,16 @@ } else { - ok = job_->Serialize(target["Job"]); + ok = job_->Serialize(target[JOB]); } if (ok) { - target["ID"] = id_; - target["State"] = EnumerationToString(state_); - target["JobType"] = jobType_; - target["Priority"] = priority_; - target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_); - target["Runtime"] = static_cast<unsigned int>(runtime_.total_milliseconds()); + target[ID] = id_; + target[STATE] = EnumerationToString(state_); + target[PRIORITY] = priority_; + target[CREATION_TIME] = boost::posix_time::to_iso_string(creationTime_); + target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds()); return true; } else @@ -287,12 +297,13 @@ pauseScheduled_(false), cancelScheduled_(false) { - id_ = StringToJobState(SerializationToolbox::ReadString(serialized, "ID")); - state_ = StringToJobState(SerializationToolbox::ReadString(serialized, "State")); - priority_ = SerializationToolbox::ReadInteger(serialized, "Priority"); + id_ = SerializationToolbox::ReadString(serialized, ID); + state_ = StringToJobState(SerializationToolbox::ReadString(serialized, STATE)); + priority_ = SerializationToolbox::ReadInteger(serialized, PRIORITY); creationTime_ = boost::posix_time::from_iso_string - (SerializationToolbox::ReadString(serialized, "CreationTime")); - runtime_ = boost::posix_time::milliseconds(SerializationToolbox::ReadInteger(serialized, "Runtime")); + (SerializationToolbox::ReadString(serialized, CREATION_TIME)); + runtime_ = boost::posix_time::milliseconds + (SerializationToolbox::ReadInteger(serialized, RUNTIME)); retryTime_ = creationTime_; @@ -302,7 +313,7 @@ state_ = JobState_Pending; } - job_.reset(unserializer.UnserializeJob(serialized["Job"])); + job_.reset(unserializer.UnserializeJob(serialized[JOB])); job_->GetJobType(jobType_); job_->Start(); @@ -578,35 +589,16 @@ } - void JobsRegistry::Serialize(Json::Value& target) + void JobsRegistry::SubmitInternal(std::string& id, + JobHandler* handlerRaw) { - boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - target = Json::arrayValue; - - for (JobsIndex::const_iterator it = jobsIndex_.begin(); - it != jobsIndex_.end(); ++it) - { - Json::Value v; - if (it->second->Serialize(v)) - { - target.append(v); - } - } - } - - - void JobsRegistry::Submit(std::string& id, - IJob* job, // Takes ownership - int priority) - { - std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); + std::auto_ptr<JobHandler> handler(handlerRaw); boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); id = handler->GetId(); + int priority = handler->GetPriority(); pendingJobs_.push(handler.get()); pendingJobAvailable_.notify_one(); @@ -619,11 +611,19 @@ } + void JobsRegistry::Submit(std::string& id, + IJob* job, // Takes ownership + int priority) + { + SubmitInternal(id, new JobHandler(job, priority)); + } + + void JobsRegistry::Submit(IJob* job, // Takes ownership int priority) { std::string id; - Submit(id, job, priority); + SubmitInternal(id, new JobHandler(job, priority)); } @@ -867,7 +867,6 @@ } else if (found->second->GetState() != JobState_Failure) { - printf("%s\n", EnumerationToString(found->second->GetState())); LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; return false; } @@ -1181,4 +1180,49 @@ handler_->SetLastStatus(status); } } + + + + void JobsRegistry::Serialize(Json::Value& target) + { + boost::mutex::scoped_lock lock(mutex_); + CheckInvariants(); + + target = Json::objectValue; + target[TYPE] = JOBS_REGISTRY; + target[MAX_COMPLETED_JOBS] = static_cast<unsigned int>(maxCompletedJobs_); + target[JOBS] = Json::arrayValue; + + for (JobsIndex::const_iterator it = jobsIndex_.begin(); + it != jobsIndex_.end(); ++it) + { + Json::Value v; + if (it->second->Serialize(v)) + { + target[JOBS].append(v); + } + } + } + + + JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, + const Json::Value& s) + { + if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || + !s.isMember(JOBS) || + s[JOBS].type() != Json::arrayValue) + { + throw OrthancException(ErrorCode_BadFileFormat); + } + + maxCompletedJobs_ = SerializationToolbox::ReadUnsignedInteger(s, MAX_COMPLETED_JOBS); + + for (Json::Value::ArrayIndex i = 0; i < s[JOBS].size(); i++) + { + std::auto_ptr<JobHandler> job(new JobHandler(unserializer, s[JOBS][i])); + + std::string id; + SubmitInternal(id, job.release()); + } + } }
--- a/Core/JobsEngine/JobsRegistry.h Fri Jun 08 15:05:32 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.h Fri Jun 08 15:48:35 2018 +0200 @@ -112,12 +112,17 @@ void RemoveRetryJob(JobHandler* handler); + void SubmitInternal(std::string& id, + JobHandler* handler); + public: JobsRegistry() : maxCompletedJobs_(10) { } + JobsRegistry(IJobUnserializer& unserializer, + const Json::Value& s); ~JobsRegistry();
--- a/UnitTestsSources/MultiThreadingTests.cpp Fri Jun 08 15:05:32 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri Jun 08 15:48:35 2018 +0200 @@ -132,6 +132,8 @@ virtual bool Serialize(Json::Value& value) { + value = Json::objectValue; + value["Type"] = "DummyJob"; return true; } @@ -180,6 +182,10 @@ { return new DummyInstancesJob(value); } + else if (SerializationToolbox::ReadString(value, "Type") == "DummyJob") + { + return new DummyJob; + } else { return GenericJobUnserializer::UnserializeJob(value); @@ -1465,5 +1471,22 @@ TEST(JobsSerialization, Registry) { - // TODO : Test serialization of JobsRegistry + Json::Value s; + std::string i1, i2; + + { + JobsRegistry registry; + registry.Submit(i1, new DummyJob(), 10); + registry.Submit(i2, new SequenceOfOperationsJob(), 30); + registry.Serialize(s); + } + + { + DummyUnserializer unserializer; + JobsRegistry registry(unserializer, s); + + Json::Value t; + registry.Serialize(t); + ASSERT_TRUE(CheckSameJson(s, t)); + } }