comparison OrthancFramework/Sources/JobsEngine/Operations/SequenceOfOperationsJob.cpp @ 4044:d25f4c0fa160 framework

splitting code into OrthancFramework and OrthancServer
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 10 Jun 2020 20:30:34 +0200
parents Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp@6498739a3c3c
children bf7b9edf6b81
comparison
equal deleted inserted replaced
4043:6c6239aec462 4044:d25f4c0fa160
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2020 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation, either version 3 of the
10 * License, or (at your option) any later version.
11 *
12 * In addition, as a special exception, the copyright holders of this
13 * program give permission to link the code of its release with the
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it
15 * that use the same license as the "OpenSSL" library), and distribute
16 * the linked executables. You must obey the GNU General Public License
17 * in all respects for all of the code used other than "OpenSSL". If you
18 * modify file(s) with this exception, you may extend this exception to
19 * your version of the file(s), but you are not obligated to do so. If
20 * you do not wish to do so, delete this exception statement from your
21 * version. If you delete this exception statement from all source files
22 * in the program, then also delete it here.
23 *
24 * This program is distributed in the hope that it will be useful, but
25 * WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27 * General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program. If not, see <http://www.gnu.org/licenses/>.
31 **/
32
33
34 #include "../../PrecompiledHeaders.h"
35 #include "SequenceOfOperationsJob.h"
36
37 #include "../../Logging.h"
38 #include "../../OrthancException.h"
39 #include "../../SerializationToolbox.h"
40 #include "../IJobUnserializer.h"
41
42 namespace Orthanc
43 {
44 static const char* CURRENT = "Current";
45 static const char* DESCRIPTION = "Description";
46 static const char* NEXT_OPERATIONS = "Next";
47 static const char* OPERATION = "Operation";
48 static const char* OPERATIONS = "Operations";
49 static const char* ORIGINAL_INPUTS = "OriginalInputs";
50 static const char* TRAILING_TIMEOUT = "TrailingTimeout";
51 static const char* TYPE = "Type";
52 static const char* WORK_INPUTS = "WorkInputs";
53
54
55 class SequenceOfOperationsJob::Operation : public boost::noncopyable
56 {
57 private:
58 size_t index_;
59 std::unique_ptr<IJobOperation> operation_;
60 std::unique_ptr<JobOperationValues> originalInputs_;
61 std::unique_ptr<JobOperationValues> workInputs_;
62 std::list<Operation*> nextOperations_;
63 size_t currentInput_;
64
65 public:
66 Operation(size_t index,
67 IJobOperation* operation) :
68 index_(index),
69 operation_(operation),
70 originalInputs_(new JobOperationValues),
71 workInputs_(new JobOperationValues),
72 currentInput_(0)
73 {
74 if (operation == NULL)
75 {
76 throw OrthancException(ErrorCode_NullPointer);
77 }
78 }
79
80 void AddOriginalInput(const JobOperationValue& value)
81 {
82 if (currentInput_ != 0)
83 {
84 // Cannot add input after processing has started
85 throw OrthancException(ErrorCode_BadSequenceOfCalls);
86 }
87 else
88 {
89 originalInputs_->Append(value.Clone());
90 }
91 }
92
93 const JobOperationValues& GetOriginalInputs() const
94 {
95 return *originalInputs_;
96 }
97
98 void Reset()
99 {
100 workInputs_->Clear();
101 currentInput_ = 0;
102 }
103
104 void AddNextOperation(Operation& other,
105 bool unserializing)
106 {
107 if (other.index_ <= index_)
108 {
109 throw OrthancException(ErrorCode_InternalError);
110 }
111
112 if (!unserializing &&
113 currentInput_ != 0)
114 {
115 // Cannot add input after processing has started
116 throw OrthancException(ErrorCode_BadSequenceOfCalls);
117 }
118 else
119 {
120 nextOperations_.push_back(&other);
121 }
122 }
123
124 bool IsDone() const
125 {
126 return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize();
127 }
128
129 void Step()
130 {
131 if (IsDone())
132 {
133 throw OrthancException(ErrorCode_BadSequenceOfCalls);
134 }
135
136 const JobOperationValue* input;
137
138 if (currentInput_ < originalInputs_->GetSize())
139 {
140 input = &originalInputs_->GetValue(currentInput_);
141 }
142 else
143 {
144 input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize());
145 }
146
147 JobOperationValues outputs;
148 operation_->Apply(outputs, *input);
149
150 if (!nextOperations_.empty())
151 {
152 std::list<Operation*>::iterator first = nextOperations_.begin();
153 outputs.Move(*(*first)->workInputs_);
154
155 std::list<Operation*>::iterator current = first;
156 ++current;
157
158 while (current != nextOperations_.end())
159 {
160 (*first)->workInputs_->Copy(*(*current)->workInputs_);
161 ++current;
162 }
163 }
164
165 currentInput_ += 1;
166 }
167
168 void Serialize(Json::Value& target) const
169 {
170 target = Json::objectValue;
171 target[CURRENT] = static_cast<unsigned int>(currentInput_);
172 operation_->Serialize(target[OPERATION]);
173 originalInputs_->Serialize(target[ORIGINAL_INPUTS]);
174 workInputs_->Serialize(target[WORK_INPUTS]);
175
176 Json::Value tmp = Json::arrayValue;
177 for (std::list<Operation*>::const_iterator it = nextOperations_.begin();
178 it != nextOperations_.end(); ++it)
179 {
180 tmp.append(static_cast<int>((*it)->index_));
181 }
182
183 target[NEXT_OPERATIONS] = tmp;
184 }
185
186 Operation(IJobUnserializer& unserializer,
187 Json::Value::ArrayIndex index,
188 const Json::Value& serialized) :
189 index_(index)
190 {
191 if (serialized.type() != Json::objectValue ||
192 !serialized.isMember(OPERATION) ||
193 !serialized.isMember(ORIGINAL_INPUTS) ||
194 !serialized.isMember(WORK_INPUTS))
195 {
196 throw OrthancException(ErrorCode_BadFileFormat);
197 }
198
199 currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
200 operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION]));
201 originalInputs_.reset(JobOperationValues::Unserialize
202 (unserializer, serialized[ORIGINAL_INPUTS]));
203 workInputs_.reset(JobOperationValues::Unserialize
204 (unserializer, serialized[WORK_INPUTS]));
205 }
206 };
207
208
209 SequenceOfOperationsJob::SequenceOfOperationsJob() :
210 done_(false),
211 current_(0),
212 trailingTimeout_(boost::posix_time::milliseconds(1000))
213 {
214 }
215
216
217 SequenceOfOperationsJob::~SequenceOfOperationsJob()
218 {
219 for (size_t i = 0; i < operations_.size(); i++)
220 {
221 if (operations_[i] != NULL)
222 {
223 delete operations_[i];
224 }
225 }
226 }
227
228
229 void SequenceOfOperationsJob::SetDescription(const std::string& description)
230 {
231 boost::mutex::scoped_lock lock(mutex_);
232 description_ = description;
233 }
234
235
236 void SequenceOfOperationsJob::GetDescription(std::string& description)
237 {
238 boost::mutex::scoped_lock lock(mutex_);
239 description = description_;
240 }
241
242
243 void SequenceOfOperationsJob::Register(IObserver& observer)
244 {
245 boost::mutex::scoped_lock lock(mutex_);
246 observers_.push_back(&observer);
247 }
248
249
250 void SequenceOfOperationsJob::Lock::SetTrailingOperationTimeout(unsigned int timeout)
251 {
252 that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
253 }
254
255
256 size_t SequenceOfOperationsJob::Lock::AddOperation(IJobOperation* operation)
257 {
258 if (IsDone())
259 {
260 throw OrthancException(ErrorCode_BadSequenceOfCalls);
261 }
262
263 size_t index = that_.operations_.size();
264
265 that_.operations_.push_back(new Operation(index, operation));
266 that_.operationAdded_.notify_one();
267
268 return index;
269 }
270
271
272 void SequenceOfOperationsJob::Lock::AddInput(size_t index,
273 const JobOperationValue& value)
274 {
275 if (IsDone())
276 {
277 throw OrthancException(ErrorCode_BadSequenceOfCalls);
278 }
279 else if (index >= that_.operations_.size() ||
280 index < that_.current_)
281 {
282 throw OrthancException(ErrorCode_ParameterOutOfRange);
283 }
284 else
285 {
286 that_.operations_[index]->AddOriginalInput(value);
287 }
288 }
289
290
291 void SequenceOfOperationsJob::Lock::Connect(size_t input,
292 size_t output)
293 {
294 if (IsDone())
295 {
296 throw OrthancException(ErrorCode_BadSequenceOfCalls);
297 }
298 else if (input >= output ||
299 input >= that_.operations_.size() ||
300 output >= that_.operations_.size() ||
301 input < that_.current_ ||
302 output < that_.current_)
303 {
304 throw OrthancException(ErrorCode_ParameterOutOfRange);
305 }
306 else
307 {
308 Operation& a = *that_.operations_[input];
309 Operation& b = *that_.operations_[output];
310 a.AddNextOperation(b, false /* not unserializing */);
311 }
312 }
313
314
315 JobStepResult SequenceOfOperationsJob::Step(const std::string& jobId)
316 {
317 boost::mutex::scoped_lock lock(mutex_);
318
319 if (current_ == operations_.size())
320 {
321 LOG(INFO) << "Executing the trailing timeout in the sequence of operations";
322 operationAdded_.timed_wait(lock, trailingTimeout_);
323
324 if (current_ == operations_.size())
325 {
326 // No operation was added during the trailing timeout: The
327 // job is over
328 LOG(INFO) << "The sequence of operations is over";
329 done_ = true;
330
331 for (std::list<IObserver*>::iterator it = observers_.begin();
332 it != observers_.end(); ++it)
333 {
334 (*it)->SignalDone(*this);
335 }
336
337 return JobStepResult::Success();
338 }
339 else
340 {
341 LOG(INFO) << "New operation were added to the sequence of operations";
342 }
343 }
344
345 assert(current_ < operations_.size());
346
347 while (current_ < operations_.size() &&
348 operations_[current_]->IsDone())
349 {
350 current_++;
351 }
352
353 if (current_ < operations_.size())
354 {
355 operations_[current_]->Step();
356 }
357
358 return JobStepResult::Continue();
359 }
360
361
362 void SequenceOfOperationsJob::Reset()
363 {
364 boost::mutex::scoped_lock lock(mutex_);
365
366 current_ = 0;
367 done_ = false;
368
369 for (size_t i = 0; i < operations_.size(); i++)
370 {
371 operations_[i]->Reset();
372 }
373 }
374
375
376 float SequenceOfOperationsJob::GetProgress()
377 {
378 boost::mutex::scoped_lock lock(mutex_);
379
380 return (static_cast<float>(current_) /
381 static_cast<float>(operations_.size() + 1));
382 }
383
384
385 void SequenceOfOperationsJob::GetPublicContent(Json::Value& value)
386 {
387 boost::mutex::scoped_lock lock(mutex_);
388
389 value["CountOperations"] = static_cast<unsigned int>(operations_.size());
390 value["Description"] = description_;
391 }
392
393
394 bool SequenceOfOperationsJob::Serialize(Json::Value& value)
395 {
396 boost::mutex::scoped_lock lock(mutex_);
397
398 value = Json::objectValue;
399
400 std::string jobType;
401 GetJobType(jobType);
402 value[TYPE] = jobType;
403
404 value[DESCRIPTION] = description_;
405 value[TRAILING_TIMEOUT] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
406 value[CURRENT] = static_cast<unsigned int>(current_);
407
408 Json::Value tmp = Json::arrayValue;
409 for (size_t i = 0; i < operations_.size(); i++)
410 {
411 Json::Value operation = Json::objectValue;
412 operations_[i]->Serialize(operation);
413 tmp.append(operation);
414 }
415
416 value[OPERATIONS] = tmp;
417
418 return true;
419 }
420
421
422 SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer,
423 const Json::Value& serialized) :
424 done_(false)
425 {
426 std::string jobType;
427 GetJobType(jobType);
428
429 if (SerializationToolbox::ReadString(serialized, TYPE) != jobType ||
430 !serialized.isMember(OPERATIONS) ||
431 serialized[OPERATIONS].type() != Json::arrayValue)
432 {
433 throw OrthancException(ErrorCode_BadFileFormat);
434 }
435
436 description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION);
437 trailingTimeout_ = boost::posix_time::milliseconds
438 (SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT));
439 current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
440
441 const Json::Value& ops = serialized[OPERATIONS];
442
443 // Unserialize the individual operations
444 operations_.reserve(ops.size());
445 for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
446 {
447 operations_.push_back(new Operation(unserializer, i, ops[i]));
448 }
449
450 // Connect the next operations
451 for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
452 {
453 if (!ops[i].isMember(NEXT_OPERATIONS) ||
454 ops[i][NEXT_OPERATIONS].type() != Json::arrayValue)
455 {
456 throw OrthancException(ErrorCode_BadFileFormat);
457 }
458
459 const Json::Value& next = ops[i][NEXT_OPERATIONS];
460 for (Json::Value::ArrayIndex j = 0; j < next.size(); j++)
461 {
462 if (next[j].type() != Json::intValue ||
463 next[j].asInt() < 0 ||
464 next[j].asUInt() >= operations_.size())
465 {
466 throw OrthancException(ErrorCode_BadFileFormat);
467 }
468 else
469 {
470 operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true);
471 }
472 }
473 }
474 }
475 }