Mercurial > hg > orthanc
comparison Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp @ 2666:2540ac79ab6c jobs
SequenceOfOperationsJob serialization
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 08 Jun 2018 15:05:32 +0200 |
parents | 389d050a2e66 |
children | 7cfc8d266f41 |
comparison
equal
deleted
inserted
replaced
2665:389d050a2e66 | 2666:2540ac79ab6c |
---|---|
34 #include "../../PrecompiledHeaders.h" | 34 #include "../../PrecompiledHeaders.h" |
35 #include "SequenceOfOperationsJob.h" | 35 #include "SequenceOfOperationsJob.h" |
36 | 36 |
37 #include "../../Logging.h" | 37 #include "../../Logging.h" |
38 #include "../../OrthancException.h" | 38 #include "../../OrthancException.h" |
39 #include "../../SerializationToolbox.h" | |
40 #include "../IJobUnserializer.h" | |
39 | 41 |
40 namespace Orthanc | 42 namespace Orthanc |
41 { | 43 { |
44 static const char* CURRENT = "Current"; | |
45 static const char* DESCRIPTION = "Description"; | |
46 static const char* DICOM_TIMEOUT = "DicomTimeout"; | |
47 static const char* NEXT_OPERATIONS = "Next"; | |
48 static const char* OPERATION = "Operation"; | |
49 static const char* OPERATIONS = "Operations"; | |
50 static const char* ORIGINAL_INPUTS = "OriginalInputs"; | |
51 static const char* TRAILING_TIMEOUT = "TrailingTimeout"; | |
52 static const char* TYPE = "Type"; | |
53 static const char* WORK_INPUTS = "WorkInputs"; | |
54 | |
55 | |
42 class SequenceOfOperationsJob::Operation : public boost::noncopyable | 56 class SequenceOfOperationsJob::Operation : public boost::noncopyable |
43 { | 57 { |
44 private: | 58 private: |
45 size_t index_; | 59 size_t index_; |
46 JobOperationValues originalInputs_; | 60 std::auto_ptr<IJobOperation> operation_; |
47 JobOperationValues workInputs_; | 61 std::auto_ptr<JobOperationValues> originalInputs_; |
48 std::auto_ptr<IJobOperation> operation_; | 62 std::auto_ptr<JobOperationValues> workInputs_; |
49 std::list<Operation*> nextOperations_; | 63 std::list<Operation*> nextOperations_; |
50 size_t currentInput_; | 64 size_t currentInput_; |
51 | 65 |
52 public: | 66 public: |
53 Operation(size_t index, | 67 Operation(size_t index, |
54 IJobOperation* operation) : | 68 IJobOperation* operation) : |
55 index_(index), | 69 index_(index), |
56 operation_(operation), | 70 operation_(operation), |
71 originalInputs_(new JobOperationValues), | |
72 workInputs_(new JobOperationValues), | |
57 currentInput_(0) | 73 currentInput_(0) |
58 { | 74 { |
59 if (operation == NULL) | 75 if (operation == NULL) |
60 { | 76 { |
61 throw OrthancException(ErrorCode_NullPointer); | 77 throw OrthancException(ErrorCode_NullPointer); |
69 // Cannot add input after processing has started | 85 // Cannot add input after processing has started |
70 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 86 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
71 } | 87 } |
72 else | 88 else |
73 { | 89 { |
74 originalInputs_.Append(value.Clone()); | 90 originalInputs_->Append(value.Clone()); |
75 } | 91 } |
76 } | 92 } |
77 | 93 |
78 const JobOperationValues& GetOriginalInputs() const | 94 const JobOperationValues& GetOriginalInputs() const |
79 { | 95 { |
80 return originalInputs_; | 96 return *originalInputs_; |
81 } | 97 } |
82 | 98 |
83 void Reset() | 99 void Reset() |
84 { | 100 { |
85 workInputs_.Clear(); | 101 workInputs_->Clear(); |
86 currentInput_ = 0; | 102 currentInput_ = 0; |
87 } | 103 } |
88 | 104 |
89 void AddNextOperation(Operation& other) | 105 void AddNextOperation(Operation& other, |
106 bool unserializing) | |
90 { | 107 { |
91 if (other.index_ <= index_) | 108 if (other.index_ <= index_) |
92 { | 109 { |
93 throw OrthancException(ErrorCode_InternalError); | 110 throw OrthancException(ErrorCode_InternalError); |
94 } | 111 } |
95 | 112 |
96 if (currentInput_ != 0) | 113 if (!unserializing && |
114 currentInput_ != 0) | |
97 { | 115 { |
98 // Cannot add input after processing has started | 116 // Cannot add input after processing has started |
99 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 117 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
100 } | 118 } |
101 else | 119 else |
104 } | 122 } |
105 } | 123 } |
106 | 124 |
107 bool IsDone() const | 125 bool IsDone() const |
108 { | 126 { |
109 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); | 127 return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize(); |
110 } | 128 } |
111 | 129 |
112 void Step(IDicomConnectionManager& connectionManager) | 130 void Step(IDicomConnectionManager& connectionManager) |
113 { | 131 { |
114 if (IsDone()) | 132 if (IsDone()) |
116 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 134 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
117 } | 135 } |
118 | 136 |
119 const JobOperationValue* input; | 137 const JobOperationValue* input; |
120 | 138 |
121 if (currentInput_ < originalInputs_.GetSize()) | 139 if (currentInput_ < originalInputs_->GetSize()) |
122 { | 140 { |
123 input = &originalInputs_.GetValue(currentInput_); | 141 input = &originalInputs_->GetValue(currentInput_); |
124 } | 142 } |
125 else | 143 else |
126 { | 144 { |
127 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); | 145 input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize()); |
128 } | 146 } |
129 | 147 |
130 JobOperationValues outputs; | 148 JobOperationValues outputs; |
131 operation_->Apply(outputs, *input, connectionManager); | 149 operation_->Apply(outputs, *input, connectionManager); |
132 | 150 |
133 if (!nextOperations_.empty()) | 151 if (!nextOperations_.empty()) |
134 { | 152 { |
135 std::list<Operation*>::iterator first = nextOperations_.begin(); | 153 std::list<Operation*>::iterator first = nextOperations_.begin(); |
136 outputs.Move((*first)->workInputs_); | 154 outputs.Move(*(*first)->workInputs_); |
137 | 155 |
138 std::list<Operation*>::iterator current = first; | 156 std::list<Operation*>::iterator current = first; |
139 ++current; | 157 ++current; |
140 | 158 |
141 while (current != nextOperations_.end()) | 159 while (current != nextOperations_.end()) |
142 { | 160 { |
143 (*first)->workInputs_.Copy((*current)->workInputs_); | 161 (*first)->workInputs_->Copy(*(*current)->workInputs_); |
144 ++current; | 162 ++current; |
145 } | 163 } |
146 } | 164 } |
147 | 165 |
148 currentInput_ += 1; | 166 currentInput_ += 1; |
149 } | 167 } |
150 | 168 |
151 void Serialize(Json::Value& target) const | 169 void Serialize(Json::Value& target) const |
152 { | 170 { |
153 target = Json::objectValue; | 171 target = Json::objectValue; |
154 operation_->Serialize(target["Operation"]); | 172 target[CURRENT] = static_cast<unsigned int>(currentInput_); |
155 originalInputs_.Serialize(target["OriginalInputs"]); | 173 operation_->Serialize(target[OPERATION]); |
156 workInputs_.Serialize(target["WorkInputs"]); | 174 originalInputs_->Serialize(target[ORIGINAL_INPUTS]); |
175 workInputs_->Serialize(target[WORK_INPUTS]); | |
157 | 176 |
158 Json::Value tmp = Json::arrayValue; | 177 Json::Value tmp = Json::arrayValue; |
159 for (std::list<Operation*>::const_iterator it = nextOperations_.begin(); | 178 for (std::list<Operation*>::const_iterator it = nextOperations_.begin(); |
160 it != nextOperations_.end(); ++it) | 179 it != nextOperations_.end(); ++it) |
161 { | 180 { |
162 tmp.append(static_cast<int>((*it)->index_)); | 181 tmp.append(static_cast<int>((*it)->index_)); |
163 } | 182 } |
164 | 183 |
165 target["NextOperations"] = tmp; | 184 target[NEXT_OPERATIONS] = tmp; |
185 } | |
186 | |
187 Operation(IJobUnserializer& unserializer, | |
188 Json::Value::ArrayIndex index, | |
189 const Json::Value& serialized) : | |
190 index_(index) | |
191 { | |
192 if (serialized.type() != Json::objectValue || | |
193 !serialized.isMember(OPERATION) || | |
194 !serialized.isMember(ORIGINAL_INPUTS) || | |
195 !serialized.isMember(WORK_INPUTS)) | |
196 { | |
197 throw OrthancException(ErrorCode_BadFileFormat); | |
198 } | |
199 | |
200 currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT); | |
201 operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION])); | |
202 originalInputs_.reset(JobOperationValues::Unserialize | |
203 (unserializer, serialized[ORIGINAL_INPUTS])); | |
204 workInputs_.reset(JobOperationValues::Unserialize | |
205 (unserializer, serialized[WORK_INPUTS])); | |
166 } | 206 } |
167 }; | 207 }; |
168 | 208 |
169 | 209 |
170 SequenceOfOperationsJob::SequenceOfOperationsJob() : | 210 SequenceOfOperationsJob::SequenceOfOperationsJob() : |
189 | 229 |
190 void SequenceOfOperationsJob::SetDescription(const std::string& description) | 230 void SequenceOfOperationsJob::SetDescription(const std::string& description) |
191 { | 231 { |
192 boost::mutex::scoped_lock lock(mutex_); | 232 boost::mutex::scoped_lock lock(mutex_); |
193 description_ = description; | 233 description_ = description; |
234 } | |
235 | |
236 | |
237 void SequenceOfOperationsJob::GetDescription(std::string& description) | |
238 { | |
239 boost::mutex::scoped_lock lock(mutex_); | |
240 description = description_; | |
194 } | 241 } |
195 | 242 |
196 | 243 |
197 void SequenceOfOperationsJob::Register(IObserver& observer) | 244 void SequenceOfOperationsJob::Register(IObserver& observer) |
198 { | 245 { |
265 } | 312 } |
266 else | 313 else |
267 { | 314 { |
268 Operation& a = *that_.operations_[input]; | 315 Operation& a = *that_.operations_[input]; |
269 Operation& b = *that_.operations_[output]; | 316 Operation& b = *that_.operations_[output]; |
270 a.AddNextOperation(b); | 317 a.AddNextOperation(b, false /* not unserializing */); |
271 } | 318 } |
272 } | 319 } |
273 | 320 |
274 | 321 |
275 JobStepResult SequenceOfOperationsJob::ExecuteStep() | 322 JobStepResult SequenceOfOperationsJob::ExecuteStep() |
297 connectionManager_.Close(); | 344 connectionManager_.Close(); |
298 return JobStepResult::Success(); | 345 return JobStepResult::Success(); |
299 } | 346 } |
300 else | 347 else |
301 { | 348 { |
302 LOG(INFO) << "New operation added to the sequence of operations"; | 349 LOG(INFO) << "New operation were added to the sequence of operations"; |
303 } | 350 } |
304 } | 351 } |
305 | 352 |
306 assert(current_ < operations_.size()); | 353 assert(current_ < operations_.size()); |
307 | 354 |
364 bool SequenceOfOperationsJob::Serialize(Json::Value& value) | 411 bool SequenceOfOperationsJob::Serialize(Json::Value& value) |
365 { | 412 { |
366 boost::mutex::scoped_lock lock(mutex_); | 413 boost::mutex::scoped_lock lock(mutex_); |
367 | 414 |
368 value = Json::objectValue; | 415 value = Json::objectValue; |
416 | |
417 std::string jobType; | |
418 GetJobType(jobType); | |
419 value[TYPE] = jobType; | |
420 | |
421 value[DESCRIPTION] = description_; | |
422 value[TRAILING_TIMEOUT] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds()); | |
423 value[DICOM_TIMEOUT] = connectionManager_.GetTimeout(); | |
424 value[CURRENT] = static_cast<unsigned int>(current_); | |
369 | 425 |
370 Json::Value tmp = Json::arrayValue; | 426 Json::Value tmp = Json::arrayValue; |
371 for (size_t i = 0; i < operations_.size(); i++) | 427 for (size_t i = 0; i < operations_.size(); i++) |
372 { | 428 { |
373 Json::Value operation = Json::objectValue; | 429 Json::Value operation = Json::objectValue; |
374 operations_[i]->Serialize(operation); | 430 operations_[i]->Serialize(operation); |
375 tmp.append(operation); | 431 tmp.append(operation); |
376 } | 432 } |
377 | 433 |
378 value["Operations"] = tmp; | 434 value[OPERATIONS] = tmp; |
379 value["TrailingTimeout"] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds()); | |
380 value["DicomTimeout"] = connectionManager_.GetTimeout(); | |
381 value["Current"] = static_cast<unsigned int>(current_); | |
382 | 435 |
383 return true; | 436 return true; |
384 } | 437 } |
438 | |
439 | |
440 SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer, | |
441 const Json::Value& serialized) : | |
442 done_(false) | |
443 { | |
444 std::string jobType; | |
445 GetJobType(jobType); | |
446 | |
447 if (SerializationToolbox::ReadString(serialized, TYPE) != jobType || | |
448 !serialized.isMember(OPERATIONS) || | |
449 serialized[OPERATIONS].type() != Json::arrayValue) | |
450 { | |
451 throw OrthancException(ErrorCode_BadFileFormat); | |
452 } | |
453 | |
454 description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION); | |
455 trailingTimeout_ = boost::posix_time::milliseconds | |
456 (SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT)); | |
457 connectionManager_.SetTimeout | |
458 (SerializationToolbox::ReadUnsignedInteger(serialized, DICOM_TIMEOUT)); | |
459 current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT); | |
460 | |
461 const Json::Value& ops = serialized[OPERATIONS]; | |
462 | |
463 // Unserialize the individual operations | |
464 operations_.reserve(ops.size()); | |
465 for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++) | |
466 { | |
467 operations_.push_back(new Operation(unserializer, i, ops[i])); | |
468 } | |
469 | |
470 // Connect the next operations | |
471 for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++) | |
472 { | |
473 if (!ops[i].isMember(NEXT_OPERATIONS) || | |
474 ops[i][NEXT_OPERATIONS].type() != Json::arrayValue) | |
475 { | |
476 throw OrthancException(ErrorCode_BadFileFormat); | |
477 } | |
478 | |
479 const Json::Value& next = ops[i][NEXT_OPERATIONS]; | |
480 for (Json::Value::ArrayIndex j = 0; j < next.size(); j++) | |
481 { | |
482 if (next[j].type() != Json::intValue || | |
483 next[j].asInt() < 0 || | |
484 next[j].asUInt() >= operations_.size()) | |
485 { | |
486 throw OrthancException(ErrorCode_BadFileFormat); | |
487 } | |
488 else | |
489 { | |
490 operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true); | |
491 } | |
492 } | |
493 } | |
494 } | |
385 } | 495 } |