view OrthancFramework/Sources/JobsEngine/Operations/SequenceOfOperationsJob.cpp @ 4478:5248be65146a

todo
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 27 Jan 2021 17:28:34 +0100
parents d9473bd5ed43
children 0a38000b086d
line wrap: on
line source

/**
 * Orthanc - A Lightweight, RESTful DICOM Store
 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
 * Department, University Hospital of Liege, Belgium
 * Copyright (C) 2017-2021 Osimis S.A., Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program. If not, see
 * <http://www.gnu.org/licenses/>.
 **/


#include "../../PrecompiledHeaders.h"
#include "SequenceOfOperationsJob.h"

#include "../../Logging.h"
#include "../../OrthancException.h"
#include "../../SerializationToolbox.h"
#include "../IJobUnserializer.h"

namespace Orthanc
{
  static const char* CURRENT = "Current";
  static const char* DESCRIPTION = "Description";
  static const char* NEXT_OPERATIONS = "Next";
  static const char* OPERATION = "Operation";
  static const char* OPERATIONS = "Operations";
  static const char* ORIGINAL_INPUTS = "OriginalInputs";
  static const char* TRAILING_TIMEOUT = "TrailingTimeout";
  static const char* TYPE = "Type";
  static const char* WORK_INPUTS = "WorkInputs";

  
  class SequenceOfOperationsJob::Operation : public boost::noncopyable
  {
  private:
    size_t                               index_;
    std::unique_ptr<IJobOperation>       operation_;
    std::unique_ptr<JobOperationValues>  originalInputs_;
    std::unique_ptr<JobOperationValues>  workInputs_;
    std::list<Operation*>                nextOperations_;
    size_t                               currentInput_;

  public:
    Operation(size_t index,
              IJobOperation* operation) :
      index_(index),
      operation_(operation),
      originalInputs_(new JobOperationValues),
      workInputs_(new JobOperationValues),
      currentInput_(0)
    {
      if (operation == NULL)
      {
        throw OrthancException(ErrorCode_NullPointer);
      }
    }

    void AddOriginalInput(const IJobOperationValue& value)
    {
      if (currentInput_ != 0)
      {
        // Cannot add input after processing has started
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }
      else
      {
        originalInputs_->Append(value.Clone());
      }
    }

    const JobOperationValues& GetOriginalInputs() const
    {
      return *originalInputs_;
    }

    void Reset()
    {
      workInputs_->Clear();
      currentInput_ = 0;
    }

    void AddNextOperation(Operation& other,
                          bool unserializing)
    {
      if (other.index_ <= index_)
      {
        throw OrthancException(ErrorCode_InternalError);
      }

      if (!unserializing &&
          currentInput_ != 0)
      {
        // Cannot add input after processing has started
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }
      else
      {
        nextOperations_.push_back(&other);
      }
    }

    bool IsDone() const
    {
      return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize();
    }

    void Step()
    {
      if (IsDone())
      {
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }

      const IJobOperationValue* input;

      if (currentInput_ < originalInputs_->GetSize())
      {
        input = &originalInputs_->GetValue(currentInput_);
      }
      else
      {
        input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize());
      }

      JobOperationValues outputs;
      operation_->Apply(outputs, *input);

      if (!nextOperations_.empty())
      {
        std::list<Operation*>::iterator first = nextOperations_.begin();
        outputs.Move(*(*first)->workInputs_);

        std::list<Operation*>::iterator current = first;
        ++current;

        while (current != nextOperations_.end())
        {
          (*first)->workInputs_->Copy(*(*current)->workInputs_);
          ++current;
        }
      }

      currentInput_ += 1;
    }

    void Serialize(Json::Value& target) const
    {
      target = Json::objectValue;
      target[CURRENT] = static_cast<unsigned int>(currentInput_);
      operation_->Serialize(target[OPERATION]);
      originalInputs_->Serialize(target[ORIGINAL_INPUTS]);
      workInputs_->Serialize(target[WORK_INPUTS]);      

      Json::Value tmp = Json::arrayValue;
      for (std::list<Operation*>::const_iterator it = nextOperations_.begin();
           it != nextOperations_.end(); ++it)
      {
        tmp.append(static_cast<int>((*it)->index_));
      }

      target[NEXT_OPERATIONS] = tmp;
    }

    Operation(IJobUnserializer& unserializer,
              Json::Value::ArrayIndex index,
              const Json::Value& serialized) :
      index_(index)
    {
      if (serialized.type() != Json::objectValue ||
          !serialized.isMember(OPERATION) ||
          !serialized.isMember(ORIGINAL_INPUTS) ||
          !serialized.isMember(WORK_INPUTS))
      {
        throw OrthancException(ErrorCode_BadFileFormat);
      }

      currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
      operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION]));
      originalInputs_.reset(JobOperationValues::Unserialize
                            (unserializer, serialized[ORIGINAL_INPUTS]));
      workInputs_.reset(JobOperationValues::Unserialize
                        (unserializer, serialized[WORK_INPUTS]));
    }
  };


  SequenceOfOperationsJob::SequenceOfOperationsJob() :
    done_(false),
    current_(0),
    trailingTimeout_(boost::posix_time::milliseconds(1000))
  {
  }


  SequenceOfOperationsJob::~SequenceOfOperationsJob()
  {
    for (size_t i = 0; i < operations_.size(); i++)
    {
      if (operations_[i] != NULL)
      {
        delete operations_[i];
      }
    }
  }


  void SequenceOfOperationsJob::SetDescription(const std::string& description)
  {
    boost::mutex::scoped_lock lock(mutex_);
    description_ = description;
  }


  void SequenceOfOperationsJob::GetDescription(std::string& description)
  {
    boost::mutex::scoped_lock lock(mutex_);
    description = description_;    
  }


  void SequenceOfOperationsJob::Register(IObserver& observer)
  {
    boost::mutex::scoped_lock lock(mutex_);
    observers_.push_back(&observer);
  }


#if ORTHANC_BUILDING_FRAMEWORK_LIBRARY == 1
  void SequenceOfOperationsJob::Lock::AddInput(size_t index,
                                               const JobOperationValue& value)
  {
    throw OrthancException(ErrorCode_DiscontinuedAbi, "Removed in 1.8.1");
  }
#endif


  SequenceOfOperationsJob::Lock::Lock(SequenceOfOperationsJob& that) :
    that_(that),
    lock_(that.mutex_)
  {
  }

  bool SequenceOfOperationsJob::Lock::IsDone() const
  {
    return that_.done_;
  }

  void SequenceOfOperationsJob::Lock::SetTrailingOperationTimeout(unsigned int timeout)
  {
    that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
  }

  
  size_t SequenceOfOperationsJob::Lock::AddOperation(IJobOperation* operation)
  {
    if (IsDone())
    {
      throw OrthancException(ErrorCode_BadSequenceOfCalls);
    }

    size_t index = that_.operations_.size();

    that_.operations_.push_back(new Operation(index, operation));
    that_.operationAdded_.notify_one();

    return index;
  }

  size_t SequenceOfOperationsJob::Lock::GetOperationsCount() const
  {
    return that_.operations_.size();
  }


  void SequenceOfOperationsJob::Lock::AddInput(size_t index,
                                               const IJobOperationValue& value)
  {
    if (IsDone())
    {
      throw OrthancException(ErrorCode_BadSequenceOfCalls);
    }
    else if (index >= that_.operations_.size() ||
             index < that_.current_)
    {
      throw OrthancException(ErrorCode_ParameterOutOfRange);
    }
    else
    {
      that_.operations_[index]->AddOriginalInput(value);
    }
  }
      

  void SequenceOfOperationsJob::Lock::Connect(size_t input,
                                              size_t output)
  {
    if (IsDone())
    {
      throw OrthancException(ErrorCode_BadSequenceOfCalls);
    }
    else if (input >= output ||
             input >= that_.operations_.size() ||
             output >= that_.operations_.size() ||
             input < that_.current_ ||
             output < that_.current_)
    {
      throw OrthancException(ErrorCode_ParameterOutOfRange);
    }
    else
    {
      Operation& a = *that_.operations_[input];
      Operation& b = *that_.operations_[output];
      a.AddNextOperation(b, false /* not unserializing */);
    }
  }


  void SequenceOfOperationsJob::Start()
  {
  }


  JobStepResult SequenceOfOperationsJob::Step(const std::string& jobId)
  {
    boost::mutex::scoped_lock lock(mutex_);

    if (current_ == operations_.size())
    {
      LOG(INFO) << "Executing the trailing timeout in the sequence of operations";
      operationAdded_.timed_wait(lock, trailingTimeout_);
            
      if (current_ == operations_.size())
      {
        // No operation was added during the trailing timeout: The
        // job is over
        LOG(INFO) << "The sequence of operations is over";
        done_ = true;

        for (std::list<IObserver*>::iterator it = observers_.begin(); 
             it != observers_.end(); ++it)
        {
          (*it)->SignalDone(*this);
        }

        return JobStepResult::Success();
      }
      else
      {
        LOG(INFO) << "New operation were added to the sequence of operations";
      }
    }

    assert(current_ < operations_.size());

    while (current_ < operations_.size() &&
           operations_[current_]->IsDone())
    {
      current_++;
    }

    if (current_ < operations_.size())
    {
      operations_[current_]->Step();
    }

    return JobStepResult::Continue();
  }


  void SequenceOfOperationsJob::Reset()
  {
    boost::mutex::scoped_lock lock(mutex_);
      
    current_ = 0;
    done_ = false;

    for (size_t i = 0; i < operations_.size(); i++)
    {
      operations_[i]->Reset();
    }
  }

  void SequenceOfOperationsJob::Stop(JobStopReason reason)
  {
  }


  float SequenceOfOperationsJob::GetProgress()
  {
    boost::mutex::scoped_lock lock(mutex_);
      
    return (static_cast<float>(current_) / 
            static_cast<float>(operations_.size() + 1));
  }

  void SequenceOfOperationsJob::GetJobType(std::string& target)
  {
    target = "SequenceOfOperations";
  }


  void SequenceOfOperationsJob::GetPublicContent(Json::Value& value)
  {
    boost::mutex::scoped_lock lock(mutex_);

    value["CountOperations"] = static_cast<unsigned int>(operations_.size());
    value["Description"] = description_;
  }


  bool SequenceOfOperationsJob::Serialize(Json::Value& value)
  {
    boost::mutex::scoped_lock lock(mutex_);

    value = Json::objectValue;

    std::string jobType;
    GetJobType(jobType);
    value[TYPE] = jobType;
    
    value[DESCRIPTION] = description_;
    value[TRAILING_TIMEOUT] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
    value[CURRENT] = static_cast<unsigned int>(current_);
    
    Json::Value tmp = Json::arrayValue;
    for (size_t i = 0; i < operations_.size(); i++)
    {
      Json::Value operation = Json::objectValue;
      operations_[i]->Serialize(operation);
      tmp.append(operation);
    }

    value[OPERATIONS] = tmp;

    return true;
  }

  bool SequenceOfOperationsJob::GetOutput(std::string& output,
                                          MimeType& mime,
                                          const std::string& key)
  {
    return false;
  }

  void SequenceOfOperationsJob::AwakeTrailingSleep()
  {
    operationAdded_.notify_one();
  }


  SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer,
                                                   const Json::Value& serialized) :
    done_(false)
  {
    std::string jobType;
    GetJobType(jobType);
    
    if (SerializationToolbox::ReadString(serialized, TYPE) != jobType ||
        !serialized.isMember(OPERATIONS) ||
        serialized[OPERATIONS].type() != Json::arrayValue)
    {
      throw OrthancException(ErrorCode_BadFileFormat);
    }

    description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION);
    trailingTimeout_ = boost::posix_time::milliseconds
      (SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT));
    current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);

    const Json::Value& ops = serialized[OPERATIONS];

    // Unserialize the individual operations
    operations_.reserve(ops.size());
    for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
    {
      operations_.push_back(new Operation(unserializer, i, ops[i]));
    }

    // Connect the next operations
    for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
    {
      if (!ops[i].isMember(NEXT_OPERATIONS) ||
          ops[i][NEXT_OPERATIONS].type() != Json::arrayValue)
      {
        throw OrthancException(ErrorCode_BadFileFormat);
      }

      const Json::Value& next = ops[i][NEXT_OPERATIONS];
      for (Json::Value::ArrayIndex j = 0; j < next.size(); j++)
      {
        if (next[j].type() != Json::intValue ||
            next[j].asInt() < 0 ||
            next[j].asUInt() >= operations_.size())
        {
          throw OrthancException(ErrorCode_BadFileFormat);
        }
        else
        {
          operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true);
        }
      }
    }  
  }
}