comparison Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp @ 2666:2540ac79ab6c jobs

SequenceOfOperationsJob serialization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 08 Jun 2018 15:05:32 +0200
parents 389d050a2e66
children 7cfc8d266f41
comparison
equal deleted inserted replaced
2665:389d050a2e66 2666:2540ac79ab6c
34 #include "../../PrecompiledHeaders.h" 34 #include "../../PrecompiledHeaders.h"
35 #include "SequenceOfOperationsJob.h" 35 #include "SequenceOfOperationsJob.h"
36 36
37 #include "../../Logging.h" 37 #include "../../Logging.h"
38 #include "../../OrthancException.h" 38 #include "../../OrthancException.h"
39 #include "../../SerializationToolbox.h"
40 #include "../IJobUnserializer.h"
39 41
40 namespace Orthanc 42 namespace Orthanc
41 { 43 {
44 static const char* CURRENT = "Current";
45 static const char* DESCRIPTION = "Description";
46 static const char* DICOM_TIMEOUT = "DicomTimeout";
47 static const char* NEXT_OPERATIONS = "Next";
48 static const char* OPERATION = "Operation";
49 static const char* OPERATIONS = "Operations";
50 static const char* ORIGINAL_INPUTS = "OriginalInputs";
51 static const char* TRAILING_TIMEOUT = "TrailingTimeout";
52 static const char* TYPE = "Type";
53 static const char* WORK_INPUTS = "WorkInputs";
54
55
42 class SequenceOfOperationsJob::Operation : public boost::noncopyable 56 class SequenceOfOperationsJob::Operation : public boost::noncopyable
43 { 57 {
44 private: 58 private:
45 size_t index_; 59 size_t index_;
46 JobOperationValues originalInputs_; 60 std::auto_ptr<IJobOperation> operation_;
47 JobOperationValues workInputs_; 61 std::auto_ptr<JobOperationValues> originalInputs_;
48 std::auto_ptr<IJobOperation> operation_; 62 std::auto_ptr<JobOperationValues> workInputs_;
49 std::list<Operation*> nextOperations_; 63 std::list<Operation*> nextOperations_;
50 size_t currentInput_; 64 size_t currentInput_;
51 65
52 public: 66 public:
53 Operation(size_t index, 67 Operation(size_t index,
54 IJobOperation* operation) : 68 IJobOperation* operation) :
55 index_(index), 69 index_(index),
56 operation_(operation), 70 operation_(operation),
71 originalInputs_(new JobOperationValues),
72 workInputs_(new JobOperationValues),
57 currentInput_(0) 73 currentInput_(0)
58 { 74 {
59 if (operation == NULL) 75 if (operation == NULL)
60 { 76 {
61 throw OrthancException(ErrorCode_NullPointer); 77 throw OrthancException(ErrorCode_NullPointer);
69 // Cannot add input after processing has started 85 // Cannot add input after processing has started
70 throw OrthancException(ErrorCode_BadSequenceOfCalls); 86 throw OrthancException(ErrorCode_BadSequenceOfCalls);
71 } 87 }
72 else 88 else
73 { 89 {
74 originalInputs_.Append(value.Clone()); 90 originalInputs_->Append(value.Clone());
75 } 91 }
76 } 92 }
77 93
78 const JobOperationValues& GetOriginalInputs() const 94 const JobOperationValues& GetOriginalInputs() const
79 { 95 {
80 return originalInputs_; 96 return *originalInputs_;
81 } 97 }
82 98
83 void Reset() 99 void Reset()
84 { 100 {
85 workInputs_.Clear(); 101 workInputs_->Clear();
86 currentInput_ = 0; 102 currentInput_ = 0;
87 } 103 }
88 104
89 void AddNextOperation(Operation& other) 105 void AddNextOperation(Operation& other,
106 bool unserializing)
90 { 107 {
91 if (other.index_ <= index_) 108 if (other.index_ <= index_)
92 { 109 {
93 throw OrthancException(ErrorCode_InternalError); 110 throw OrthancException(ErrorCode_InternalError);
94 } 111 }
95 112
96 if (currentInput_ != 0) 113 if (!unserializing &&
114 currentInput_ != 0)
97 { 115 {
98 // Cannot add input after processing has started 116 // Cannot add input after processing has started
99 throw OrthancException(ErrorCode_BadSequenceOfCalls); 117 throw OrthancException(ErrorCode_BadSequenceOfCalls);
100 } 118 }
101 else 119 else
104 } 122 }
105 } 123 }
106 124
107 bool IsDone() const 125 bool IsDone() const
108 { 126 {
109 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); 127 return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize();
110 } 128 }
111 129
112 void Step(IDicomConnectionManager& connectionManager) 130 void Step(IDicomConnectionManager& connectionManager)
113 { 131 {
114 if (IsDone()) 132 if (IsDone())
116 throw OrthancException(ErrorCode_BadSequenceOfCalls); 134 throw OrthancException(ErrorCode_BadSequenceOfCalls);
117 } 135 }
118 136
119 const JobOperationValue* input; 137 const JobOperationValue* input;
120 138
121 if (currentInput_ < originalInputs_.GetSize()) 139 if (currentInput_ < originalInputs_->GetSize())
122 { 140 {
123 input = &originalInputs_.GetValue(currentInput_); 141 input = &originalInputs_->GetValue(currentInput_);
124 } 142 }
125 else 143 else
126 { 144 {
127 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); 145 input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize());
128 } 146 }
129 147
130 JobOperationValues outputs; 148 JobOperationValues outputs;
131 operation_->Apply(outputs, *input, connectionManager); 149 operation_->Apply(outputs, *input, connectionManager);
132 150
133 if (!nextOperations_.empty()) 151 if (!nextOperations_.empty())
134 { 152 {
135 std::list<Operation*>::iterator first = nextOperations_.begin(); 153 std::list<Operation*>::iterator first = nextOperations_.begin();
136 outputs.Move((*first)->workInputs_); 154 outputs.Move(*(*first)->workInputs_);
137 155
138 std::list<Operation*>::iterator current = first; 156 std::list<Operation*>::iterator current = first;
139 ++current; 157 ++current;
140 158
141 while (current != nextOperations_.end()) 159 while (current != nextOperations_.end())
142 { 160 {
143 (*first)->workInputs_.Copy((*current)->workInputs_); 161 (*first)->workInputs_->Copy(*(*current)->workInputs_);
144 ++current; 162 ++current;
145 } 163 }
146 } 164 }
147 165
148 currentInput_ += 1; 166 currentInput_ += 1;
149 } 167 }
150 168
151 void Serialize(Json::Value& target) const 169 void Serialize(Json::Value& target) const
152 { 170 {
153 target = Json::objectValue; 171 target = Json::objectValue;
154 operation_->Serialize(target["Operation"]); 172 target[CURRENT] = static_cast<unsigned int>(currentInput_);
155 originalInputs_.Serialize(target["OriginalInputs"]); 173 operation_->Serialize(target[OPERATION]);
156 workInputs_.Serialize(target["WorkInputs"]); 174 originalInputs_->Serialize(target[ORIGINAL_INPUTS]);
175 workInputs_->Serialize(target[WORK_INPUTS]);
157 176
158 Json::Value tmp = Json::arrayValue; 177 Json::Value tmp = Json::arrayValue;
159 for (std::list<Operation*>::const_iterator it = nextOperations_.begin(); 178 for (std::list<Operation*>::const_iterator it = nextOperations_.begin();
160 it != nextOperations_.end(); ++it) 179 it != nextOperations_.end(); ++it)
161 { 180 {
162 tmp.append(static_cast<int>((*it)->index_)); 181 tmp.append(static_cast<int>((*it)->index_));
163 } 182 }
164 183
165 target["NextOperations"] = tmp; 184 target[NEXT_OPERATIONS] = tmp;
185 }
186
187 Operation(IJobUnserializer& unserializer,
188 Json::Value::ArrayIndex index,
189 const Json::Value& serialized) :
190 index_(index)
191 {
192 if (serialized.type() != Json::objectValue ||
193 !serialized.isMember(OPERATION) ||
194 !serialized.isMember(ORIGINAL_INPUTS) ||
195 !serialized.isMember(WORK_INPUTS))
196 {
197 throw OrthancException(ErrorCode_BadFileFormat);
198 }
199
200 currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
201 operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION]));
202 originalInputs_.reset(JobOperationValues::Unserialize
203 (unserializer, serialized[ORIGINAL_INPUTS]));
204 workInputs_.reset(JobOperationValues::Unserialize
205 (unserializer, serialized[WORK_INPUTS]));
166 } 206 }
167 }; 207 };
168 208
169 209
170 SequenceOfOperationsJob::SequenceOfOperationsJob() : 210 SequenceOfOperationsJob::SequenceOfOperationsJob() :
189 229
190 void SequenceOfOperationsJob::SetDescription(const std::string& description) 230 void SequenceOfOperationsJob::SetDescription(const std::string& description)
191 { 231 {
192 boost::mutex::scoped_lock lock(mutex_); 232 boost::mutex::scoped_lock lock(mutex_);
193 description_ = description; 233 description_ = description;
234 }
235
236
237 void SequenceOfOperationsJob::GetDescription(std::string& description)
238 {
239 boost::mutex::scoped_lock lock(mutex_);
240 description = description_;
194 } 241 }
195 242
196 243
197 void SequenceOfOperationsJob::Register(IObserver& observer) 244 void SequenceOfOperationsJob::Register(IObserver& observer)
198 { 245 {
265 } 312 }
266 else 313 else
267 { 314 {
268 Operation& a = *that_.operations_[input]; 315 Operation& a = *that_.operations_[input];
269 Operation& b = *that_.operations_[output]; 316 Operation& b = *that_.operations_[output];
270 a.AddNextOperation(b); 317 a.AddNextOperation(b, false /* not unserializing */);
271 } 318 }
272 } 319 }
273 320
274 321
275 JobStepResult SequenceOfOperationsJob::ExecuteStep() 322 JobStepResult SequenceOfOperationsJob::ExecuteStep()
297 connectionManager_.Close(); 344 connectionManager_.Close();
298 return JobStepResult::Success(); 345 return JobStepResult::Success();
299 } 346 }
300 else 347 else
301 { 348 {
302 LOG(INFO) << "New operation added to the sequence of operations"; 349 LOG(INFO) << "New operation were added to the sequence of operations";
303 } 350 }
304 } 351 }
305 352
306 assert(current_ < operations_.size()); 353 assert(current_ < operations_.size());
307 354
364 bool SequenceOfOperationsJob::Serialize(Json::Value& value) 411 bool SequenceOfOperationsJob::Serialize(Json::Value& value)
365 { 412 {
366 boost::mutex::scoped_lock lock(mutex_); 413 boost::mutex::scoped_lock lock(mutex_);
367 414
368 value = Json::objectValue; 415 value = Json::objectValue;
416
417 std::string jobType;
418 GetJobType(jobType);
419 value[TYPE] = jobType;
420
421 value[DESCRIPTION] = description_;
422 value[TRAILING_TIMEOUT] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
423 value[DICOM_TIMEOUT] = connectionManager_.GetTimeout();
424 value[CURRENT] = static_cast<unsigned int>(current_);
369 425
370 Json::Value tmp = Json::arrayValue; 426 Json::Value tmp = Json::arrayValue;
371 for (size_t i = 0; i < operations_.size(); i++) 427 for (size_t i = 0; i < operations_.size(); i++)
372 { 428 {
373 Json::Value operation = Json::objectValue; 429 Json::Value operation = Json::objectValue;
374 operations_[i]->Serialize(operation); 430 operations_[i]->Serialize(operation);
375 tmp.append(operation); 431 tmp.append(operation);
376 } 432 }
377 433
378 value["Operations"] = tmp; 434 value[OPERATIONS] = tmp;
379 value["TrailingTimeout"] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
380 value["DicomTimeout"] = connectionManager_.GetTimeout();
381 value["Current"] = static_cast<unsigned int>(current_);
382 435
383 return true; 436 return true;
384 } 437 }
438
439
440 SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer,
441 const Json::Value& serialized) :
442 done_(false)
443 {
444 std::string jobType;
445 GetJobType(jobType);
446
447 if (SerializationToolbox::ReadString(serialized, TYPE) != jobType ||
448 !serialized.isMember(OPERATIONS) ||
449 serialized[OPERATIONS].type() != Json::arrayValue)
450 {
451 throw OrthancException(ErrorCode_BadFileFormat);
452 }
453
454 description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION);
455 trailingTimeout_ = boost::posix_time::milliseconds
456 (SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT));
457 connectionManager_.SetTimeout
458 (SerializationToolbox::ReadUnsignedInteger(serialized, DICOM_TIMEOUT));
459 current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
460
461 const Json::Value& ops = serialized[OPERATIONS];
462
463 // Unserialize the individual operations
464 operations_.reserve(ops.size());
465 for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
466 {
467 operations_.push_back(new Operation(unserializer, i, ops[i]));
468 }
469
470 // Connect the next operations
471 for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
472 {
473 if (!ops[i].isMember(NEXT_OPERATIONS) ||
474 ops[i][NEXT_OPERATIONS].type() != Json::arrayValue)
475 {
476 throw OrthancException(ErrorCode_BadFileFormat);
477 }
478
479 const Json::Value& next = ops[i][NEXT_OPERATIONS];
480 for (Json::Value::ArrayIndex j = 0; j < next.size(); j++)
481 {
482 if (next[j].type() != Json::intValue ||
483 next[j].asInt() < 0 ||
484 next[j].asUInt() >= operations_.size())
485 {
486 throw OrthancException(ErrorCode_BadFileFormat);
487 }
488 else
489 {
490 operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true);
491 }
492 }
493 }
494 }
385 } 495 }