Mercurial > hg > orthanc
changeset 2666:2540ac79ab6c jobs
SequenceOfOperationsJob serialization
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 08 Jun 2018 15:05:32 +0200 |
parents | 389d050a2e66 |
children | 5fa2f2ce74f0 |
files | Core/DicomParsing/DicomModification.cpp Core/JobsEngine/GenericJobUnserializer.cpp Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp Core/JobsEngine/Operations/SequenceOfOperationsJob.h UnitTestsSources/MultiThreadingTests.cpp |
diffstat | 5 files changed, 298 insertions(+), 66 deletions(-) [+] |
line wrap: on
line diff
--- a/Core/DicomParsing/DicomModification.cpp Fri Jun 08 13:51:31 2018 +0200 +++ b/Core/DicomParsing/DicomModification.cpp Fri Jun 08 15:05:32 2018 +0200 @@ -1358,7 +1358,8 @@ } - DicomModification::DicomModification(const Json::Value& serialized) + DicomModification::DicomModification(const Json::Value& serialized) : + identifierGenerator_(NULL) { removePrivateTags_ = SerializationToolbox::ReadBoolean(serialized, REMOVE_PRIVATE_TAGS); level_ = StringToResourceType(SerializationToolbox::ReadString(serialized, LEVEL).c_str());
--- a/Core/JobsEngine/GenericJobUnserializer.cpp Fri Jun 08 13:51:31 2018 +0200 +++ b/Core/JobsEngine/GenericJobUnserializer.cpp Fri Jun 08 15:05:32 2018 +0200 @@ -40,6 +40,7 @@ #include "Operations/LogJobOperation.h" #include "Operations/NullOperationValue.h" +#include "Operations/SequenceOfOperationsJob.h" #include "Operations/StringOperationValue.h" namespace Orthanc @@ -48,8 +49,15 @@ { const std::string type = SerializationToolbox::ReadString(source, "Type"); - LOG(ERROR) << "Cannot unserialize job of type: " << type; - throw OrthancException(ErrorCode_BadFileFormat); + if (type == "SequenceOfOperations") + { + return new SequenceOfOperationsJob(*this, source); + } + else + { + LOG(ERROR) << "Cannot unserialize job of type: " << type; + throw OrthancException(ErrorCode_BadFileFormat); + } }
--- a/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp Fri Jun 08 13:51:31 2018 +0200 +++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp Fri Jun 08 15:05:32 2018 +0200 @@ -36,24 +36,40 @@ #include "../../Logging.h" #include "../../OrthancException.h" +#include "../../SerializationToolbox.h" +#include "../IJobUnserializer.h" namespace Orthanc { + static const char* CURRENT = "Current"; + static const char* DESCRIPTION = "Description"; + static const char* DICOM_TIMEOUT = "DicomTimeout"; + static const char* NEXT_OPERATIONS = "Next"; + static const char* OPERATION = "Operation"; + static const char* OPERATIONS = "Operations"; + static const char* ORIGINAL_INPUTS = "OriginalInputs"; + static const char* TRAILING_TIMEOUT = "TrailingTimeout"; + static const char* TYPE = "Type"; + static const char* WORK_INPUTS = "WorkInputs"; + + class SequenceOfOperationsJob::Operation : public boost::noncopyable { private: - size_t index_; - JobOperationValues originalInputs_; - JobOperationValues workInputs_; - std::auto_ptr<IJobOperation> operation_; - std::list<Operation*> nextOperations_; - size_t currentInput_; + size_t index_; + std::auto_ptr<IJobOperation> operation_; + std::auto_ptr<JobOperationValues> originalInputs_; + std::auto_ptr<JobOperationValues> workInputs_; + std::list<Operation*> nextOperations_; + size_t currentInput_; public: Operation(size_t index, IJobOperation* operation) : index_(index), operation_(operation), + originalInputs_(new JobOperationValues), + workInputs_(new JobOperationValues), currentInput_(0) { if (operation == NULL) @@ -71,29 +87,31 @@ } else { - originalInputs_.Append(value.Clone()); + originalInputs_->Append(value.Clone()); } } const JobOperationValues& GetOriginalInputs() const { - return originalInputs_; + return *originalInputs_; } void Reset() { - workInputs_.Clear(); + workInputs_->Clear(); currentInput_ = 0; } - void AddNextOperation(Operation& other) + void AddNextOperation(Operation& other, + bool unserializing) { if (other.index_ <= index_) { throw OrthancException(ErrorCode_InternalError); } - if (currentInput_ != 0) + if (!unserializing && + currentInput_ != 0) { // Cannot add input after processing has started throw OrthancException(ErrorCode_BadSequenceOfCalls); @@ -106,7 +124,7 @@ bool IsDone() const { - return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); + return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize(); } void Step(IDicomConnectionManager& connectionManager) @@ -118,13 +136,13 @@ const JobOperationValue* input; - if (currentInput_ < originalInputs_.GetSize()) + if (currentInput_ < originalInputs_->GetSize()) { - input = &originalInputs_.GetValue(currentInput_); + input = &originalInputs_->GetValue(currentInput_); } else { - input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); + input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize()); } JobOperationValues outputs; @@ -133,14 +151,14 @@ if (!nextOperations_.empty()) { std::list<Operation*>::iterator first = nextOperations_.begin(); - outputs.Move((*first)->workInputs_); + outputs.Move(*(*first)->workInputs_); std::list<Operation*>::iterator current = first; ++current; while (current != nextOperations_.end()) { - (*first)->workInputs_.Copy((*current)->workInputs_); + (*first)->workInputs_->Copy(*(*current)->workInputs_); ++current; } } @@ -151,9 +169,10 @@ void Serialize(Json::Value& target) const { target = Json::objectValue; - operation_->Serialize(target["Operation"]); - originalInputs_.Serialize(target["OriginalInputs"]); - workInputs_.Serialize(target["WorkInputs"]); + target[CURRENT] = static_cast<unsigned int>(currentInput_); + operation_->Serialize(target[OPERATION]); + originalInputs_->Serialize(target[ORIGINAL_INPUTS]); + workInputs_->Serialize(target[WORK_INPUTS]); Json::Value tmp = Json::arrayValue; for (std::list<Operation*>::const_iterator it = nextOperations_.begin(); @@ -162,7 +181,28 @@ tmp.append(static_cast<int>((*it)->index_)); } - target["NextOperations"] = tmp; + target[NEXT_OPERATIONS] = tmp; + } + + Operation(IJobUnserializer& unserializer, + Json::Value::ArrayIndex index, + const Json::Value& serialized) : + index_(index) + { + if (serialized.type() != Json::objectValue || + !serialized.isMember(OPERATION) || + !serialized.isMember(ORIGINAL_INPUTS) || + !serialized.isMember(WORK_INPUTS)) + { + throw OrthancException(ErrorCode_BadFileFormat); + } + + currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT); + operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION])); + originalInputs_.reset(JobOperationValues::Unserialize + (unserializer, serialized[ORIGINAL_INPUTS])); + workInputs_.reset(JobOperationValues::Unserialize + (unserializer, serialized[WORK_INPUTS])); } }; @@ -194,6 +234,13 @@ } + void SequenceOfOperationsJob::GetDescription(std::string& description) + { + boost::mutex::scoped_lock lock(mutex_); + description = description_; + } + + void SequenceOfOperationsJob::Register(IObserver& observer) { boost::mutex::scoped_lock lock(mutex_); @@ -267,7 +314,7 @@ { Operation& a = *that_.operations_[input]; Operation& b = *that_.operations_[output]; - a.AddNextOperation(b); + a.AddNextOperation(b, false /* not unserializing */); } } @@ -299,7 +346,7 @@ } else { - LOG(INFO) << "New operation added to the sequence of operations"; + LOG(INFO) << "New operation were added to the sequence of operations"; } } @@ -366,6 +413,15 @@ boost::mutex::scoped_lock lock(mutex_); value = Json::objectValue; + + std::string jobType; + GetJobType(jobType); + value[TYPE] = jobType; + + value[DESCRIPTION] = description_; + value[TRAILING_TIMEOUT] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds()); + value[DICOM_TIMEOUT] = connectionManager_.GetTimeout(); + value[CURRENT] = static_cast<unsigned int>(current_); Json::Value tmp = Json::arrayValue; for (size_t i = 0; i < operations_.size(); i++) @@ -375,11 +431,65 @@ tmp.append(operation); } - value["Operations"] = tmp; - value["TrailingTimeout"] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds()); - value["DicomTimeout"] = connectionManager_.GetTimeout(); - value["Current"] = static_cast<unsigned int>(current_); + value[OPERATIONS] = tmp; return true; } + + + SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer, + const Json::Value& serialized) : + done_(false) + { + std::string jobType; + GetJobType(jobType); + + if (SerializationToolbox::ReadString(serialized, TYPE) != jobType || + !serialized.isMember(OPERATIONS) || + serialized[OPERATIONS].type() != Json::arrayValue) + { + throw OrthancException(ErrorCode_BadFileFormat); + } + + description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION); + trailingTimeout_ = boost::posix_time::milliseconds + (SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT)); + connectionManager_.SetTimeout + (SerializationToolbox::ReadUnsignedInteger(serialized, DICOM_TIMEOUT)); + current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT); + + const Json::Value& ops = serialized[OPERATIONS]; + + // Unserialize the individual operations + operations_.reserve(ops.size()); + for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++) + { + operations_.push_back(new Operation(unserializer, i, ops[i])); + } + + // Connect the next operations + for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++) + { + if (!ops[i].isMember(NEXT_OPERATIONS) || + ops[i][NEXT_OPERATIONS].type() != Json::arrayValue) + { + throw OrthancException(ErrorCode_BadFileFormat); + } + + const Json::Value& next = ops[i][NEXT_OPERATIONS]; + for (Json::Value::ArrayIndex j = 0; j < next.size(); j++) + { + if (next[j].type() != Json::intValue || + next[j].asInt() < 0 || + next[j].asUInt() >= operations_.size()) + { + throw OrthancException(ErrorCode_BadFileFormat); + } + else + { + operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true); + } + } + } + } }
--- a/Core/JobsEngine/Operations/SequenceOfOperationsJob.h Fri Jun 08 13:51:31 2018 +0200 +++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.h Fri Jun 08 15:05:32 2018 +0200 @@ -74,10 +74,15 @@ public: SequenceOfOperationsJob(); + SequenceOfOperationsJob(IJobUnserializer& unserializer, + const Json::Value& serialized); + virtual ~SequenceOfOperationsJob(); void SetDescription(const std::string& description); + void GetDescription(std::string& description); + void Register(IObserver& observer); // This lock allows adding new operations to the end of the job,
--- a/UnitTestsSources/MultiThreadingTests.cpp Fri Jun 08 13:51:31 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri Jun 08 15:05:32 2018 +0200 @@ -36,6 +36,7 @@ #include "../Core/FileStorage/MemoryStorageArea.h" #include "../Core/JobsEngine/JobsEngine.h" +#include "../Core/Logging.h" #include "../Core/MultiThreading/SharedMessageQueue.h" #include "../Core/OrthancException.h" #include "../Core/SerializationToolbox.h" @@ -744,6 +745,81 @@ } +static bool CheckSameJson(const Json::Value& a, + const Json::Value& b) +{ + std::string s = a.toStyledString(); + std::string t = b.toStyledString(); + + if (s == t) + { + return true; + } + else + { + LOG(ERROR) << "Expected serialization: " << s; + LOG(ERROR) << "Actual serialization: " << t; + return false; + } +} + + +static bool CheckIdempotentSerialization(IJobUnserializer& unserializer, + IJob& job) +{ + Json::Value a = 42; + + if (!job.Serialize(a)) + { + return false; + } + else + { + std::auto_ptr<IJob> unserialized(unserializer.UnserializeJob(a)); + + Json::Value b = 43; + if (unserialized->Serialize(b)) + { + return CheckSameJson(a, b); + } + else + { + return false; + } + } +} + + +static bool CheckIdempotentSerialization(IJobUnserializer& unserializer, + IJobOperation& operation) +{ + Json::Value a = 42; + operation.Serialize(a); + + std::auto_ptr<IJobOperation> unserialized(unserializer.UnserializeOperation(a)); + + Json::Value b = 43; + unserialized->Serialize(b); + + return CheckSameJson(a, b); +} + + +static bool CheckIdempotentSerialization(IJobUnserializer& unserializer, + JobOperationValue& value) +{ + Json::Value a = 42; + value.Serialize(a); + + std::auto_ptr<JobOperationValue> unserialized(unserializer.UnserializeValue(a)); + + Json::Value b = 43; + unserialized->Serialize(b); + + return CheckSameJson(a, b); +} + + TEST(JobsSerialization, BadFileFormat) { GenericJobUnserializer unserializer; @@ -802,16 +878,16 @@ TEST(JobsSerialization, GenericValues) { + GenericJobUnserializer unserializer; Json::Value s; { NullOperationValue null; - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, null)); null.Serialize(s); } - GenericJobUnserializer unserializer; ASSERT_THROW(unserializer.UnserializeJob(s), OrthancException); ASSERT_THROW(unserializer.UnserializeOperation(s), OrthancException); @@ -823,7 +899,7 @@ { StringOperationValue str("Hello"); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, str)); str.Serialize(s); } @@ -838,16 +914,16 @@ TEST(JobsSerialization, GenericOperations) { + DummyUnserializer unserializer; Json::Value s; { LogJobOperation operation; - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation)); operation.Serialize(s); } - DummyUnserializer unserializer; ASSERT_THROW(unserializer.UnserializeJob(s), OrthancException); ASSERT_THROW(unserializer.UnserializeValue(s), OrthancException); @@ -880,7 +956,11 @@ job.ExecuteStep(); job.ExecuteStep(); - s = 42; + { + DummyUnserializer unserializer; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job)); + } + ASSERT_TRUE(job.Serialize(s)); } @@ -904,6 +984,46 @@ ASSERT_EQ("world", tmp.GetInstance(2)); ASSERT_TRUE(tmp.IsFailedInstance("nope")); } + + // SequenceOfOperationsJob + + { + SequenceOfOperationsJob job; + job.SetDescription("hello"); + + { + SequenceOfOperationsJob::Lock lock(job); + size_t a = lock.AddOperation(new LogJobOperation); + size_t b = lock.AddOperation(new LogJobOperation); + lock.Connect(a, b); + lock.AddInput(a, StringOperationValue("hello")); + lock.AddInput(a, StringOperationValue("world")); + lock.SetDicomAssociationTimeout(200); + lock.SetTrailingOperationTimeout(300); + } + + ASSERT_EQ(JobStepCode_Continue, job.ExecuteStep().GetCode()); + + { + GenericJobUnserializer unserializer; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job)); + } + + ASSERT_TRUE(job.Serialize(s)); + } + + { + GenericJobUnserializer unserializer; + ASSERT_THROW(unserializer.UnserializeValue(s), OrthancException); + ASSERT_THROW(unserializer.UnserializeOperation(s), OrthancException); + + std::auto_ptr<IJob> job; + job.reset(unserializer.UnserializeJob(s)); + + std::string tmp; + dynamic_cast<SequenceOfOperationsJob&>(*job).GetDescription(tmp); + ASSERT_EQ("hello", tmp); + } } @@ -1044,12 +1164,6 @@ } -TEST(JobsSerialization, Registry) -{ - // TODO : Test serialization of JobsRegistry -} - - namespace { class OrthancJobsSerialization : public testing::Test @@ -1101,16 +1215,15 @@ ASSERT_TRUE(CreateInstance(id)); Json::Value s; - + OrthancJobUnserializer unserializer(GetContext()); + { DicomInstanceOperationValue instance(GetContext(), id); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, instance)); instance.Serialize(s); } - OrthancJobUnserializer unserializer(GetContext()); - std::auto_ptr<JobOperationValue> value; value.reset(unserializer.UnserializeValue(s)); ASSERT_EQ(JobOperationValue::Type_DicomInstance, value->GetType()); @@ -1133,17 +1246,17 @@ ASSERT_TRUE(CreateInstance(id)); Json::Value s; + OrthancJobUnserializer unserializer(GetContext()); // DeleteResourceOperation { DeleteResourceOperation operation(GetContext()); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation)); operation.Serialize(s); } - OrthancJobUnserializer unserializer(GetContext()); std::auto_ptr<IJobOperation> operation; { @@ -1164,7 +1277,7 @@ StorePeerOperation operation(peer); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation)); operation.Serialize(s); } @@ -1189,7 +1302,7 @@ StoreScuOperation operation("TEST", modality); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation)); operation.Serialize(s); } @@ -1212,7 +1325,7 @@ operation.AddPreArgument("b"); operation.AddPostArgument("c"); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation)); operation.Serialize(s); } @@ -1236,7 +1349,7 @@ ModifyInstanceOperation operation(GetContext(), RequestOrigin_Lua, modification.release()); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation)); operation.Serialize(s); } @@ -1264,6 +1377,8 @@ // DicomModalityStoreJob + OrthancJobUnserializer unserializer(GetContext()); + { RemoteModalityParameters modality; modality.SetApplicationEntityTitle("REMOTE"); @@ -1276,12 +1391,10 @@ job.SetRemoteModality(modality); job.SetMoveOriginator("MOVESCU", 42); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job)); ASSERT_TRUE(job.Serialize(s)); } - OrthancJobUnserializer unserializer(GetContext()); - { std::auto_ptr<IJob> job; job.reset(unserializer.UnserializeJob(s)); @@ -1309,7 +1422,7 @@ OrthancPeerStoreJob job(GetContext()); job.SetPeer(peer); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job)); ASSERT_TRUE(job.Serialize(s)); } @@ -1334,7 +1447,7 @@ job.SetModification(modification.release(), true); job.SetOrigin(DicomInstanceOrigin::FromLua()); - s = 42; + ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job)); ASSERT_TRUE(job.Serialize(s)); } @@ -1347,15 +1460,10 @@ ASSERT_EQ(RequestOrigin_Lua, tmp.GetOrigin().GetRequestOrigin()); ASSERT_TRUE(tmp.GetModification().IsRemoved(DICOM_TAG_STUDY_DESCRIPTION)); } +} - // SequenceOfOperationsJob.h - { - SequenceOfOperationsJob job; - - s = 42; - ASSERT_TRUE(job.Serialize(s)); - } - - std::cout << s; +TEST(JobsSerialization, Registry) +{ + // TODO : Test serialization of JobsRegistry }