changeset 2666:2540ac79ab6c jobs

SequenceOfOperationsJob serialization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 08 Jun 2018 15:05:32 +0200
parents 389d050a2e66
children 5fa2f2ce74f0
files Core/DicomParsing/DicomModification.cpp Core/JobsEngine/GenericJobUnserializer.cpp Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp Core/JobsEngine/Operations/SequenceOfOperationsJob.h UnitTestsSources/MultiThreadingTests.cpp
diffstat 5 files changed, 298 insertions(+), 66 deletions(-) [+]
line wrap: on
line diff
--- a/Core/DicomParsing/DicomModification.cpp	Fri Jun 08 13:51:31 2018 +0200
+++ b/Core/DicomParsing/DicomModification.cpp	Fri Jun 08 15:05:32 2018 +0200
@@ -1358,7 +1358,8 @@
   }
 
   
-  DicomModification::DicomModification(const Json::Value& serialized)
+  DicomModification::DicomModification(const Json::Value& serialized) :
+    identifierGenerator_(NULL)
   {
     removePrivateTags_ = SerializationToolbox::ReadBoolean(serialized, REMOVE_PRIVATE_TAGS);
     level_ = StringToResourceType(SerializationToolbox::ReadString(serialized, LEVEL).c_str());
--- a/Core/JobsEngine/GenericJobUnserializer.cpp	Fri Jun 08 13:51:31 2018 +0200
+++ b/Core/JobsEngine/GenericJobUnserializer.cpp	Fri Jun 08 15:05:32 2018 +0200
@@ -40,6 +40,7 @@
 
 #include "Operations/LogJobOperation.h"
 #include "Operations/NullOperationValue.h"
+#include "Operations/SequenceOfOperationsJob.h"
 #include "Operations/StringOperationValue.h"
 
 namespace Orthanc
@@ -48,8 +49,15 @@
   {
     const std::string type = SerializationToolbox::ReadString(source, "Type");
 
-    LOG(ERROR) << "Cannot unserialize job of type: " << type;
-    throw OrthancException(ErrorCode_BadFileFormat);
+    if (type == "SequenceOfOperations")
+    {
+      return new SequenceOfOperationsJob(*this, source);
+    }
+    else
+    {
+      LOG(ERROR) << "Cannot unserialize job of type: " << type;
+      throw OrthancException(ErrorCode_BadFileFormat);
+    }
   }
 
 
--- a/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp	Fri Jun 08 13:51:31 2018 +0200
+++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp	Fri Jun 08 15:05:32 2018 +0200
@@ -36,24 +36,40 @@
 
 #include "../../Logging.h"
 #include "../../OrthancException.h"
+#include "../../SerializationToolbox.h"
+#include "../IJobUnserializer.h"
 
 namespace Orthanc
 {
+  static const char* CURRENT = "Current";
+  static const char* DESCRIPTION = "Description";
+  static const char* DICOM_TIMEOUT = "DicomTimeout";
+  static const char* NEXT_OPERATIONS = "Next";
+  static const char* OPERATION = "Operation";
+  static const char* OPERATIONS = "Operations";
+  static const char* ORIGINAL_INPUTS = "OriginalInputs";
+  static const char* TRAILING_TIMEOUT = "TrailingTimeout";
+  static const char* TYPE = "Type";
+  static const char* WORK_INPUTS = "WorkInputs";
+
+  
   class SequenceOfOperationsJob::Operation : public boost::noncopyable
   {
   private:
-    size_t                        index_;
-    JobOperationValues            originalInputs_;
-    JobOperationValues            workInputs_;
-    std::auto_ptr<IJobOperation>  operation_;
-    std::list<Operation*>         nextOperations_;
-    size_t                        currentInput_;
+    size_t                             index_;
+    std::auto_ptr<IJobOperation>       operation_;
+    std::auto_ptr<JobOperationValues>  originalInputs_;
+    std::auto_ptr<JobOperationValues>  workInputs_;
+    std::list<Operation*>              nextOperations_;
+    size_t                             currentInput_;
 
   public:
     Operation(size_t index,
               IJobOperation* operation) :
       index_(index),
       operation_(operation),
+      originalInputs_(new JobOperationValues),
+      workInputs_(new JobOperationValues),
       currentInput_(0)
     {
       if (operation == NULL)
@@ -71,29 +87,31 @@
       }
       else
       {
-        originalInputs_.Append(value.Clone());
+        originalInputs_->Append(value.Clone());
       }
     }
 
     const JobOperationValues& GetOriginalInputs() const
     {
-      return originalInputs_;
+      return *originalInputs_;
     }
 
     void Reset()
     {
-      workInputs_.Clear();
+      workInputs_->Clear();
       currentInput_ = 0;
     }
 
-    void AddNextOperation(Operation& other)
+    void AddNextOperation(Operation& other,
+                          bool unserializing)
     {
       if (other.index_ <= index_)
       {
         throw OrthancException(ErrorCode_InternalError);
       }
 
-      if (currentInput_ != 0)
+      if (!unserializing &&
+          currentInput_ != 0)
       {
         // Cannot add input after processing has started
         throw OrthancException(ErrorCode_BadSequenceOfCalls);
@@ -106,7 +124,7 @@
 
     bool IsDone() const
     {
-      return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
+      return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize();
     }
 
     void Step(IDicomConnectionManager& connectionManager)
@@ -118,13 +136,13 @@
 
       const JobOperationValue* input;
 
-      if (currentInput_ < originalInputs_.GetSize())
+      if (currentInput_ < originalInputs_->GetSize())
       {
-        input = &originalInputs_.GetValue(currentInput_);
+        input = &originalInputs_->GetValue(currentInput_);
       }
       else
       {
-        input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
+        input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize());
       }
 
       JobOperationValues outputs;
@@ -133,14 +151,14 @@
       if (!nextOperations_.empty())
       {
         std::list<Operation*>::iterator first = nextOperations_.begin();
-        outputs.Move((*first)->workInputs_);
+        outputs.Move(*(*first)->workInputs_);
 
         std::list<Operation*>::iterator current = first;
         ++current;
 
         while (current != nextOperations_.end())
         {
-          (*first)->workInputs_.Copy((*current)->workInputs_);
+          (*first)->workInputs_->Copy(*(*current)->workInputs_);
           ++current;
         }
       }
@@ -151,9 +169,10 @@
     void Serialize(Json::Value& target) const
     {
       target = Json::objectValue;
-      operation_->Serialize(target["Operation"]);
-      originalInputs_.Serialize(target["OriginalInputs"]);
-      workInputs_.Serialize(target["WorkInputs"]);      
+      target[CURRENT] = static_cast<unsigned int>(currentInput_);
+      operation_->Serialize(target[OPERATION]);
+      originalInputs_->Serialize(target[ORIGINAL_INPUTS]);
+      workInputs_->Serialize(target[WORK_INPUTS]);      
 
       Json::Value tmp = Json::arrayValue;
       for (std::list<Operation*>::const_iterator it = nextOperations_.begin();
@@ -162,7 +181,28 @@
         tmp.append(static_cast<int>((*it)->index_));
       }
 
-      target["NextOperations"] = tmp;
+      target[NEXT_OPERATIONS] = tmp;
+    }
+
+    Operation(IJobUnserializer& unserializer,
+              Json::Value::ArrayIndex index,
+              const Json::Value& serialized) :
+      index_(index)
+    {
+      if (serialized.type() != Json::objectValue ||
+          !serialized.isMember(OPERATION) ||
+          !serialized.isMember(ORIGINAL_INPUTS) ||
+          !serialized.isMember(WORK_INPUTS))
+      {
+        throw OrthancException(ErrorCode_BadFileFormat);
+      }
+
+      currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
+      operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION]));
+      originalInputs_.reset(JobOperationValues::Unserialize
+                            (unserializer, serialized[ORIGINAL_INPUTS]));
+      workInputs_.reset(JobOperationValues::Unserialize
+                        (unserializer, serialized[WORK_INPUTS]));
     }
   };
 
@@ -194,6 +234,13 @@
   }
 
 
+  void SequenceOfOperationsJob::GetDescription(std::string& description)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    description = description_;    
+  }
+
+
   void SequenceOfOperationsJob::Register(IObserver& observer)
   {
     boost::mutex::scoped_lock lock(mutex_);
@@ -267,7 +314,7 @@
     {
       Operation& a = *that_.operations_[input];
       Operation& b = *that_.operations_[output];
-      a.AddNextOperation(b);
+      a.AddNextOperation(b, false /* not unserializing */);
     }
   }
 
@@ -299,7 +346,7 @@
       }
       else
       {
-        LOG(INFO) << "New operation added to the sequence of operations";
+        LOG(INFO) << "New operation were added to the sequence of operations";
       }
     }
 
@@ -366,6 +413,15 @@
     boost::mutex::scoped_lock lock(mutex_);
 
     value = Json::objectValue;
+
+    std::string jobType;
+    GetJobType(jobType);
+    value[TYPE] = jobType;
+    
+    value[DESCRIPTION] = description_;
+    value[TRAILING_TIMEOUT] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
+    value[DICOM_TIMEOUT] = connectionManager_.GetTimeout();
+    value[CURRENT] = static_cast<unsigned int>(current_);
     
     Json::Value tmp = Json::arrayValue;
     for (size_t i = 0; i < operations_.size(); i++)
@@ -375,11 +431,65 @@
       tmp.append(operation);
     }
 
-    value["Operations"] = tmp;
-    value["TrailingTimeout"] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
-    value["DicomTimeout"] = connectionManager_.GetTimeout();
-    value["Current"] = static_cast<unsigned int>(current_);
+    value[OPERATIONS] = tmp;
 
     return true;
   }
+
+
+  SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer,
+                                                   const Json::Value& serialized) :
+    done_(false)
+  {
+    std::string jobType;
+    GetJobType(jobType);
+    
+    if (SerializationToolbox::ReadString(serialized, TYPE) != jobType ||
+        !serialized.isMember(OPERATIONS) ||
+        serialized[OPERATIONS].type() != Json::arrayValue)
+    {
+      throw OrthancException(ErrorCode_BadFileFormat);
+    }
+
+    description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION);
+    trailingTimeout_ = boost::posix_time::milliseconds
+      (SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT));
+    connectionManager_.SetTimeout
+      (SerializationToolbox::ReadUnsignedInteger(serialized, DICOM_TIMEOUT));
+    current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
+
+    const Json::Value& ops = serialized[OPERATIONS];
+
+    // Unserialize the individual operations
+    operations_.reserve(ops.size());
+    for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
+    {
+      operations_.push_back(new Operation(unserializer, i, ops[i]));
+    }
+
+    // Connect the next operations
+    for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
+    {
+      if (!ops[i].isMember(NEXT_OPERATIONS) ||
+          ops[i][NEXT_OPERATIONS].type() != Json::arrayValue)
+      {
+        throw OrthancException(ErrorCode_BadFileFormat);
+      }
+
+      const Json::Value& next = ops[i][NEXT_OPERATIONS];
+      for (Json::Value::ArrayIndex j = 0; j < next.size(); j++)
+      {
+        if (next[j].type() != Json::intValue ||
+            next[j].asInt() < 0 ||
+            next[j].asUInt() >= operations_.size())
+        {
+          throw OrthancException(ErrorCode_BadFileFormat);
+        }
+        else
+        {
+          operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true);
+        }
+      }
+    }  
+  }
 }
--- a/Core/JobsEngine/Operations/SequenceOfOperationsJob.h	Fri Jun 08 13:51:31 2018 +0200
+++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.h	Fri Jun 08 15:05:32 2018 +0200
@@ -74,10 +74,15 @@
   public:
     SequenceOfOperationsJob();
 
+    SequenceOfOperationsJob(IJobUnserializer& unserializer,
+                            const Json::Value& serialized);
+
     virtual ~SequenceOfOperationsJob();
 
     void SetDescription(const std::string& description);
 
+    void GetDescription(std::string& description);
+
     void Register(IObserver& observer);
 
     // This lock allows adding new operations to the end of the job,
--- a/UnitTestsSources/MultiThreadingTests.cpp	Fri Jun 08 13:51:31 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Fri Jun 08 15:05:32 2018 +0200
@@ -36,6 +36,7 @@
 
 #include "../Core/FileStorage/MemoryStorageArea.h"
 #include "../Core/JobsEngine/JobsEngine.h"
+#include "../Core/Logging.h"
 #include "../Core/MultiThreading/SharedMessageQueue.h"
 #include "../Core/OrthancException.h"
 #include "../Core/SerializationToolbox.h"
@@ -744,6 +745,81 @@
 }
 
 
+static bool CheckSameJson(const Json::Value& a,
+                          const Json::Value& b)
+{
+  std::string s = a.toStyledString();
+  std::string t = b.toStyledString();
+
+  if (s == t)
+  {
+    return true;
+  }
+  else
+  {
+    LOG(ERROR) << "Expected serialization: " << s;
+    LOG(ERROR) << "Actual serialization: " << t;
+    return false;
+  }
+}
+
+
+static bool CheckIdempotentSerialization(IJobUnserializer& unserializer,
+                                         IJob& job)
+{
+  Json::Value a = 42;
+  
+  if (!job.Serialize(a))
+  {
+    return false;
+  }
+  else
+  {
+    std::auto_ptr<IJob> unserialized(unserializer.UnserializeJob(a));
+  
+    Json::Value b = 43;
+    if (unserialized->Serialize(b))
+    {
+      return CheckSameJson(a, b);
+    }
+    else
+    {
+      return false;
+    }
+  }
+}
+
+
+static bool CheckIdempotentSerialization(IJobUnserializer& unserializer,
+                                         IJobOperation& operation)
+{
+  Json::Value a = 42;
+  operation.Serialize(a);
+  
+  std::auto_ptr<IJobOperation> unserialized(unserializer.UnserializeOperation(a));
+  
+  Json::Value b = 43;
+  unserialized->Serialize(b);
+
+  return CheckSameJson(a, b);
+}
+
+
+static bool CheckIdempotentSerialization(IJobUnserializer& unserializer,
+                                         JobOperationValue& value)
+{
+  Json::Value a = 42;
+  value.Serialize(a);
+  
+  std::auto_ptr<JobOperationValue> unserialized(unserializer.UnserializeValue(a));
+  
+  Json::Value b = 43;
+  unserialized->Serialize(b);
+
+  return CheckSameJson(a, b);
+}
+
+
 TEST(JobsSerialization, BadFileFormat)
 {
   GenericJobUnserializer unserializer;
@@ -802,16 +878,16 @@
 
 TEST(JobsSerialization, GenericValues)
 {
+  GenericJobUnserializer unserializer;
   Json::Value s;
 
   {
     NullOperationValue null;
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, null));
     null.Serialize(s);
   }
 
-  GenericJobUnserializer unserializer;
   ASSERT_THROW(unserializer.UnserializeJob(s), OrthancException);
   ASSERT_THROW(unserializer.UnserializeOperation(s), OrthancException);
 
@@ -823,7 +899,7 @@
   {
     StringOperationValue str("Hello");
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, str));
     str.Serialize(s);
   }
 
@@ -838,16 +914,16 @@
 
 TEST(JobsSerialization, GenericOperations)
 {   
+  DummyUnserializer unserializer;
   Json::Value s;
 
   {
     LogJobOperation operation;
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation));
     operation.Serialize(s);
   }
 
-  DummyUnserializer unserializer;
   ASSERT_THROW(unserializer.UnserializeJob(s), OrthancException);
   ASSERT_THROW(unserializer.UnserializeValue(s), OrthancException);
 
@@ -880,7 +956,11 @@
     job.ExecuteStep();
     job.ExecuteStep();
 
-    s = 42;
+    {
+      DummyUnserializer unserializer;
+      ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job));
+    }
+    
     ASSERT_TRUE(job.Serialize(s));
   }
 
@@ -904,6 +984,46 @@
     ASSERT_EQ("world", tmp.GetInstance(2));
     ASSERT_TRUE(tmp.IsFailedInstance("nope"));
   }
+
+  // SequenceOfOperationsJob
+
+  {
+    SequenceOfOperationsJob job;
+    job.SetDescription("hello");
+
+    {
+      SequenceOfOperationsJob::Lock lock(job);
+      size_t a = lock.AddOperation(new LogJobOperation);
+      size_t b = lock.AddOperation(new LogJobOperation);
+      lock.Connect(a, b);
+      lock.AddInput(a, StringOperationValue("hello"));
+      lock.AddInput(a, StringOperationValue("world"));
+      lock.SetDicomAssociationTimeout(200);
+      lock.SetTrailingOperationTimeout(300);
+    }
+
+    ASSERT_EQ(JobStepCode_Continue, job.ExecuteStep().GetCode());
+
+    {
+      GenericJobUnserializer unserializer;
+      ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job));
+    }
+    
+    ASSERT_TRUE(job.Serialize(s));
+  }
+
+  {
+    GenericJobUnserializer unserializer;
+    ASSERT_THROW(unserializer.UnserializeValue(s), OrthancException);
+    ASSERT_THROW(unserializer.UnserializeOperation(s), OrthancException);
+
+    std::auto_ptr<IJob> job;
+    job.reset(unserializer.UnserializeJob(s));
+
+    std::string tmp;
+    dynamic_cast<SequenceOfOperationsJob&>(*job).GetDescription(tmp);
+    ASSERT_EQ("hello", tmp);
+  }  
 }
 
 
@@ -1044,12 +1164,6 @@
 }
 
 
-TEST(JobsSerialization, Registry)
-{   
-  // TODO : Test serialization of JobsRegistry
-}
-
-
 namespace
 {
   class OrthancJobsSerialization : public testing::Test
@@ -1101,16 +1215,15 @@
   ASSERT_TRUE(CreateInstance(id));
 
   Json::Value s;
-
+  OrthancJobUnserializer unserializer(GetContext());
+    
   {
     DicomInstanceOperationValue instance(GetContext(), id);
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, instance));
     instance.Serialize(s);
   }
 
-  OrthancJobUnserializer unserializer(GetContext());
-    
   std::auto_ptr<JobOperationValue> value;
   value.reset(unserializer.UnserializeValue(s));
   ASSERT_EQ(JobOperationValue::Type_DicomInstance, value->GetType());
@@ -1133,17 +1246,17 @@
   ASSERT_TRUE(CreateInstance(id));
 
   Json::Value s;
+  OrthancJobUnserializer unserializer(GetContext()); 
 
   // DeleteResourceOperation
   
   {
     DeleteResourceOperation operation(GetContext());
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation));
     operation.Serialize(s);
   }
 
-  OrthancJobUnserializer unserializer(GetContext()); 
   std::auto_ptr<IJobOperation> operation;
 
   {
@@ -1164,7 +1277,7 @@
 
     StorePeerOperation operation(peer);
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation));
     operation.Serialize(s);
   }
 
@@ -1189,7 +1302,7 @@
 
     StoreScuOperation operation("TEST", modality);
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation));
     operation.Serialize(s);
   }
 
@@ -1212,7 +1325,7 @@
     operation.AddPreArgument("b");
     operation.AddPostArgument("c");
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation));
     operation.Serialize(s);
   }
 
@@ -1236,7 +1349,7 @@
     
     ModifyInstanceOperation operation(GetContext(), RequestOrigin_Lua, modification.release());
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, operation));
     operation.Serialize(s);
   }
 
@@ -1264,6 +1377,8 @@
 
   // DicomModalityStoreJob
 
+  OrthancJobUnserializer unserializer(GetContext()); 
+
   {
     RemoteModalityParameters modality;
     modality.SetApplicationEntityTitle("REMOTE");
@@ -1276,12 +1391,10 @@
     job.SetRemoteModality(modality);
     job.SetMoveOriginator("MOVESCU", 42);
 
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job));
     ASSERT_TRUE(job.Serialize(s));
   }
 
-  OrthancJobUnserializer unserializer(GetContext()); 
-
   {
     std::auto_ptr<IJob> job;
     job.reset(unserializer.UnserializeJob(s));
@@ -1309,7 +1422,7 @@
     OrthancPeerStoreJob job(GetContext());
     job.SetPeer(peer);
     
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job));
     ASSERT_TRUE(job.Serialize(s));
   }
 
@@ -1334,7 +1447,7 @@
     job.SetModification(modification.release(), true);
     job.SetOrigin(DicomInstanceOrigin::FromLua());
     
-    s = 42;
+    ASSERT_TRUE(CheckIdempotentSerialization(unserializer, job));
     ASSERT_TRUE(job.Serialize(s));
   }
 
@@ -1347,15 +1460,10 @@
     ASSERT_EQ(RequestOrigin_Lua, tmp.GetOrigin().GetRequestOrigin());
     ASSERT_TRUE(tmp.GetModification().IsRemoved(DICOM_TAG_STUDY_DESCRIPTION));
   }
+}
 
-  // SequenceOfOperationsJob.h
 
-  {
-    SequenceOfOperationsJob job;
-    
-    s = 42;
-    ASSERT_TRUE(job.Serialize(s));
-  }
-
-  std::cout << s;
+TEST(JobsSerialization, Registry)
+{   
+  // TODO : Test serialization of JobsRegistry
 }