diff Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp @ 2666:2540ac79ab6c jobs

SequenceOfOperationsJob serialization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 08 Jun 2018 15:05:32 +0200
parents 389d050a2e66
children 7cfc8d266f41
line wrap: on
line diff
--- a/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp	Fri Jun 08 13:51:31 2018 +0200
+++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp	Fri Jun 08 15:05:32 2018 +0200
@@ -36,24 +36,40 @@
 
 #include "../../Logging.h"
 #include "../../OrthancException.h"
+#include "../../SerializationToolbox.h"
+#include "../IJobUnserializer.h"
 
 namespace Orthanc
 {
+  static const char* CURRENT = "Current";
+  static const char* DESCRIPTION = "Description";
+  static const char* DICOM_TIMEOUT = "DicomTimeout";
+  static const char* NEXT_OPERATIONS = "Next";
+  static const char* OPERATION = "Operation";
+  static const char* OPERATIONS = "Operations";
+  static const char* ORIGINAL_INPUTS = "OriginalInputs";
+  static const char* TRAILING_TIMEOUT = "TrailingTimeout";
+  static const char* TYPE = "Type";
+  static const char* WORK_INPUTS = "WorkInputs";
+
+  
   class SequenceOfOperationsJob::Operation : public boost::noncopyable
   {
   private:
-    size_t                        index_;
-    JobOperationValues            originalInputs_;
-    JobOperationValues            workInputs_;
-    std::auto_ptr<IJobOperation>  operation_;
-    std::list<Operation*>         nextOperations_;
-    size_t                        currentInput_;
+    size_t                             index_;
+    std::auto_ptr<IJobOperation>       operation_;
+    std::auto_ptr<JobOperationValues>  originalInputs_;
+    std::auto_ptr<JobOperationValues>  workInputs_;
+    std::list<Operation*>              nextOperations_;
+    size_t                             currentInput_;
 
   public:
     Operation(size_t index,
               IJobOperation* operation) :
       index_(index),
       operation_(operation),
+      originalInputs_(new JobOperationValues),
+      workInputs_(new JobOperationValues),
       currentInput_(0)
     {
       if (operation == NULL)
@@ -71,29 +87,31 @@
       }
       else
       {
-        originalInputs_.Append(value.Clone());
+        originalInputs_->Append(value.Clone());
       }
     }
 
     const JobOperationValues& GetOriginalInputs() const
     {
-      return originalInputs_;
+      return *originalInputs_;
     }
 
     void Reset()
     {
-      workInputs_.Clear();
+      workInputs_->Clear();
       currentInput_ = 0;
     }
 
-    void AddNextOperation(Operation& other)
+    void AddNextOperation(Operation& other,
+                          bool unserializing)
     {
       if (other.index_ <= index_)
       {
         throw OrthancException(ErrorCode_InternalError);
       }
 
-      if (currentInput_ != 0)
+      if (!unserializing &&
+          currentInput_ != 0)
       {
         // Cannot add input after processing has started
         throw OrthancException(ErrorCode_BadSequenceOfCalls);
@@ -106,7 +124,7 @@
 
     bool IsDone() const
     {
-      return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
+      return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize();
     }
 
     void Step(IDicomConnectionManager& connectionManager)
@@ -118,13 +136,13 @@
 
       const JobOperationValue* input;
 
-      if (currentInput_ < originalInputs_.GetSize())
+      if (currentInput_ < originalInputs_->GetSize())
       {
-        input = &originalInputs_.GetValue(currentInput_);
+        input = &originalInputs_->GetValue(currentInput_);
       }
       else
       {
-        input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
+        input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize());
       }
 
       JobOperationValues outputs;
@@ -133,14 +151,14 @@
       if (!nextOperations_.empty())
       {
         std::list<Operation*>::iterator first = nextOperations_.begin();
-        outputs.Move((*first)->workInputs_);
+        outputs.Move(*(*first)->workInputs_);
 
         std::list<Operation*>::iterator current = first;
         ++current;
 
         while (current != nextOperations_.end())
         {
-          (*first)->workInputs_.Copy((*current)->workInputs_);
+          (*first)->workInputs_->Copy(*(*current)->workInputs_);
           ++current;
         }
       }
@@ -151,9 +169,10 @@
     void Serialize(Json::Value& target) const
     {
       target = Json::objectValue;
-      operation_->Serialize(target["Operation"]);
-      originalInputs_.Serialize(target["OriginalInputs"]);
-      workInputs_.Serialize(target["WorkInputs"]);      
+      target[CURRENT] = static_cast<unsigned int>(currentInput_);
+      operation_->Serialize(target[OPERATION]);
+      originalInputs_->Serialize(target[ORIGINAL_INPUTS]);
+      workInputs_->Serialize(target[WORK_INPUTS]);      
 
       Json::Value tmp = Json::arrayValue;
       for (std::list<Operation*>::const_iterator it = nextOperations_.begin();
@@ -162,7 +181,28 @@
         tmp.append(static_cast<int>((*it)->index_));
       }
 
-      target["NextOperations"] = tmp;
+      target[NEXT_OPERATIONS] = tmp;
+    }
+
+    Operation(IJobUnserializer& unserializer,
+              Json::Value::ArrayIndex index,
+              const Json::Value& serialized) :
+      index_(index)
+    {
+      if (serialized.type() != Json::objectValue ||
+          !serialized.isMember(OPERATION) ||
+          !serialized.isMember(ORIGINAL_INPUTS) ||
+          !serialized.isMember(WORK_INPUTS))
+      {
+        throw OrthancException(ErrorCode_BadFileFormat);
+      }
+
+      currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
+      operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION]));
+      originalInputs_.reset(JobOperationValues::Unserialize
+                            (unserializer, serialized[ORIGINAL_INPUTS]));
+      workInputs_.reset(JobOperationValues::Unserialize
+                        (unserializer, serialized[WORK_INPUTS]));
     }
   };
 
@@ -194,6 +234,13 @@
   }
 
 
+  void SequenceOfOperationsJob::GetDescription(std::string& description)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    description = description_;    
+  }
+
+
   void SequenceOfOperationsJob::Register(IObserver& observer)
   {
     boost::mutex::scoped_lock lock(mutex_);
@@ -267,7 +314,7 @@
     {
       Operation& a = *that_.operations_[input];
       Operation& b = *that_.operations_[output];
-      a.AddNextOperation(b);
+      a.AddNextOperation(b, false /* not unserializing */);
     }
   }
 
@@ -299,7 +346,7 @@
       }
       else
       {
-        LOG(INFO) << "New operation added to the sequence of operations";
+        LOG(INFO) << "New operation were added to the sequence of operations";
       }
     }
 
@@ -366,6 +413,15 @@
     boost::mutex::scoped_lock lock(mutex_);
 
     value = Json::objectValue;
+
+    std::string jobType;
+    GetJobType(jobType);
+    value[TYPE] = jobType;
+    
+    value[DESCRIPTION] = description_;
+    value[TRAILING_TIMEOUT] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
+    value[DICOM_TIMEOUT] = connectionManager_.GetTimeout();
+    value[CURRENT] = static_cast<unsigned int>(current_);
     
     Json::Value tmp = Json::arrayValue;
     for (size_t i = 0; i < operations_.size(); i++)
@@ -375,11 +431,65 @@
       tmp.append(operation);
     }
 
-    value["Operations"] = tmp;
-    value["TrailingTimeout"] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
-    value["DicomTimeout"] = connectionManager_.GetTimeout();
-    value["Current"] = static_cast<unsigned int>(current_);
+    value[OPERATIONS] = tmp;
 
     return true;
   }
+
+
+  SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer,
+                                                   const Json::Value& serialized) :
+    done_(false)
+  {
+    std::string jobType;
+    GetJobType(jobType);
+    
+    if (SerializationToolbox::ReadString(serialized, TYPE) != jobType ||
+        !serialized.isMember(OPERATIONS) ||
+        serialized[OPERATIONS].type() != Json::arrayValue)
+    {
+      throw OrthancException(ErrorCode_BadFileFormat);
+    }
+
+    description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION);
+    trailingTimeout_ = boost::posix_time::milliseconds
+      (SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT));
+    connectionManager_.SetTimeout
+      (SerializationToolbox::ReadUnsignedInteger(serialized, DICOM_TIMEOUT));
+    current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
+
+    const Json::Value& ops = serialized[OPERATIONS];
+
+    // Unserialize the individual operations
+    operations_.reserve(ops.size());
+    for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
+    {
+      operations_.push_back(new Operation(unserializer, i, ops[i]));
+    }
+
+    // Connect the next operations
+    for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
+    {
+      if (!ops[i].isMember(NEXT_OPERATIONS) ||
+          ops[i][NEXT_OPERATIONS].type() != Json::arrayValue)
+      {
+        throw OrthancException(ErrorCode_BadFileFormat);
+      }
+
+      const Json::Value& next = ops[i][NEXT_OPERATIONS];
+      for (Json::Value::ArrayIndex j = 0; j < next.size(); j++)
+      {
+        if (next[j].type() != Json::intValue ||
+            next[j].asInt() < 0 ||
+            next[j].asUInt() >= operations_.size())
+        {
+          throw OrthancException(ErrorCode_BadFileFormat);
+        }
+        else
+        {
+          operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true);
+        }
+      }
+    }  
+  }
 }