view OrthancFramework/Sources/JobsEngine/Operations/SequenceOfOperationsJob.cpp @ 5911:bfae0fc2ea1b get-scu-test

Started to work on handling errors as warnings when trying to store instances whose SOPClassUID has not been accepted during the negotiation. Work to be finalized later
author Alain Mazy <am@orthanc.team>
date Mon, 09 Dec 2024 10:07:19 +0100
parents 8329d28611ad
children
line wrap: on
line source

/**
 * Orthanc - A Lightweight, RESTful DICOM Store
 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
 * Department, University Hospital of Liege, Belgium
 * Copyright (C) 2017-2023 Osimis S.A., Belgium
 * Copyright (C) 2024-2024 Orthanc Team SRL, Belgium
 * Copyright (C) 2021-2024 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program. If not, see
 * <http://www.gnu.org/licenses/>.
 **/


#include "../../PrecompiledHeaders.h"
#include "SequenceOfOperationsJob.h"

#include "../../Logging.h"
#include "../../OrthancException.h"
#include "../../SerializationToolbox.h"
#include "../IJobUnserializer.h"

namespace Orthanc
{
  static const char* CURRENT = "Current";
  static const char* DESCRIPTION = "Description";
  static const char* NEXT_OPERATIONS = "Next";
  static const char* OPERATION = "Operation";
  static const char* OPERATIONS = "Operations";
  static const char* ORIGINAL_INPUTS = "OriginalInputs";
  static const char* TRAILING_TIMEOUT = "TrailingTimeout";
  static const char* TYPE = "Type";
  static const char* WORK_INPUTS = "WorkInputs";

  
  class SequenceOfOperationsJob::Operation : public boost::noncopyable
  {
  private:
    size_t                               index_;
    std::unique_ptr<IJobOperation>       operation_;
    std::unique_ptr<JobOperationValues>  originalInputs_;
    std::unique_ptr<JobOperationValues>  workInputs_;
    std::list<Operation*>                nextOperations_;
    size_t                               currentInput_;

  public:
    Operation(size_t index,
              IJobOperation* operation) :
      index_(index),
      operation_(operation),
      originalInputs_(new JobOperationValues),
      workInputs_(new JobOperationValues),
      currentInput_(0)
    {
      if (operation == NULL)
      {
        throw OrthancException(ErrorCode_NullPointer);
      }
    }

    void AddOriginalInput(const IJobOperationValue& value)
    {
      if (currentInput_ != 0)
      {
        // Cannot add input after processing has started
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }
      else
      {
        originalInputs_->Append(value.Clone());
      }
    }

    const JobOperationValues& GetOriginalInputs() const
    {
      return *originalInputs_;
    }

    void Reset()
    {
      workInputs_->Clear();
      currentInput_ = 0;
    }

    void AddNextOperation(Operation& other,
                          bool unserializing)
    {
      if (other.index_ <= index_)
      {
        throw OrthancException(ErrorCode_InternalError);
      }

      if (!unserializing &&
          currentInput_ != 0)
      {
        // Cannot add input after processing has started
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }
      else
      {
        nextOperations_.push_back(&other);
      }
    }

    bool IsDone() const
    {
      return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize();
    }

    void Step()
    {
      if (IsDone())
      {
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }

      const IJobOperationValue* input;

      if (currentInput_ < originalInputs_->GetSize())
      {
        input = &originalInputs_->GetValue(currentInput_);
      }
      else
      {
        input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize());
      }

      JobOperationValues outputs;
      operation_->Apply(outputs, *input);

      if (!nextOperations_.empty())
      {
        std::list<Operation*>::iterator first = nextOperations_.begin();
        outputs.Move(*(*first)->workInputs_);

        std::list<Operation*>::iterator current = first;
        ++current;

        while (current != nextOperations_.end())
        {
          (*first)->workInputs_->Copy(*(*current)->workInputs_);
          ++current;
        }
      }

      currentInput_ += 1;
    }

    void Serialize(Json::Value& target) const
    {
      target = Json::objectValue;
      target[CURRENT] = static_cast<unsigned int>(currentInput_);
      operation_->Serialize(target[OPERATION]);
      originalInputs_->Serialize(target[ORIGINAL_INPUTS]);
      workInputs_->Serialize(target[WORK_INPUTS]);      

      Json::Value tmp = Json::arrayValue;
      for (std::list<Operation*>::const_iterator it = nextOperations_.begin();
           it != nextOperations_.end(); ++it)
      {
        tmp.append(static_cast<int>((*it)->index_));
      }

      target[NEXT_OPERATIONS] = tmp;
    }

    Operation(IJobUnserializer& unserializer,
              Json::Value::ArrayIndex index,
              const Json::Value& serialized) :
      index_(index)
    {
      if (serialized.type() != Json::objectValue ||
          !serialized.isMember(OPERATION) ||
          !serialized.isMember(ORIGINAL_INPUTS) ||
          !serialized.isMember(WORK_INPUTS))
      {
        throw OrthancException(ErrorCode_BadFileFormat);
      }

      currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
      operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION]));
      originalInputs_.reset(JobOperationValues::Unserialize
                            (unserializer, serialized[ORIGINAL_INPUTS]));
      workInputs_.reset(JobOperationValues::Unserialize
                        (unserializer, serialized[WORK_INPUTS]));
    }
  };


  SequenceOfOperationsJob::SequenceOfOperationsJob() :
    done_(false),
    current_(0),
    trailingTimeout_(boost::posix_time::milliseconds(1000))
  {
  }


  SequenceOfOperationsJob::~SequenceOfOperationsJob()
  {
    for (size_t i = 0; i < operations_.size(); i++)
    {
      if (operations_[i] != NULL)
      {
        delete operations_[i];
      }
    }
  }


  void SequenceOfOperationsJob::SetDescription(const std::string& description)
  {
    boost::mutex::scoped_lock lock(mutex_);
    description_ = description;
  }


  void SequenceOfOperationsJob::GetDescription(std::string& description)
  {
    boost::mutex::scoped_lock lock(mutex_);
    description = description_;    
  }


  void SequenceOfOperationsJob::Register(IObserver& observer)
  {
    boost::mutex::scoped_lock lock(mutex_);
    observers_.push_back(&observer);
  }


#if ORTHANC_BUILDING_FRAMEWORK_LIBRARY == 1
  void SequenceOfOperationsJob::Lock::AddInput(size_t index,
                                               const JobOperationValue& value)
  {
    throw OrthancException(ErrorCode_DiscontinuedAbi, "Removed in 1.8.1");
  }
#endif


  SequenceOfOperationsJob::Lock::Lock(SequenceOfOperationsJob& that) :
    that_(that),
    lock_(that.mutex_)
  {
  }

  bool SequenceOfOperationsJob::Lock::IsDone() const
  {
    return that_.done_;
  }

  void SequenceOfOperationsJob::Lock::SetTrailingOperationTimeout(unsigned int timeout)
  {
    that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
  }

  
  size_t SequenceOfOperationsJob::Lock::AddOperation(IJobOperation* operation)
  {
    if (IsDone())
    {
      throw OrthancException(ErrorCode_BadSequenceOfCalls);
    }

    size_t index = that_.operations_.size();

    that_.operations_.push_back(new Operation(index, operation));
    that_.operationAdded_.notify_one();

    return index;
  }

  size_t SequenceOfOperationsJob::Lock::GetOperationsCount() const
  {
    return that_.operations_.size();
  }


  void SequenceOfOperationsJob::Lock::AddInput(size_t index,
                                               const IJobOperationValue& value)
  {
    if (IsDone())
    {
      throw OrthancException(ErrorCode_BadSequenceOfCalls);
    }
    else if (index >= that_.operations_.size() ||
             index < that_.current_)
    {
      throw OrthancException(ErrorCode_ParameterOutOfRange);
    }
    else
    {
      that_.operations_[index]->AddOriginalInput(value);
    }
  }
      

  void SequenceOfOperationsJob::Lock::Connect(size_t input,
                                              size_t output)
  {
    if (IsDone())
    {
      throw OrthancException(ErrorCode_BadSequenceOfCalls);
    }
    else if (input >= output ||
             input >= that_.operations_.size() ||
             output >= that_.operations_.size() ||
             input < that_.current_ ||
             output < that_.current_)
    {
      throw OrthancException(ErrorCode_ParameterOutOfRange);
    }
    else
    {
      Operation& a = *that_.operations_[input];
      Operation& b = *that_.operations_[output];
      a.AddNextOperation(b, false /* not unserializing */);
    }
  }


  void SequenceOfOperationsJob::Start()
  {
  }


  JobStepResult SequenceOfOperationsJob::Step(const std::string& jobId)
  {
    boost::mutex::scoped_lock lock(mutex_);

    if (current_ == operations_.size())
    {
      LOG(INFO) << "Executing the trailing timeout in the sequence of operations";
      operationAdded_.timed_wait(lock, trailingTimeout_);
            
      if (current_ == operations_.size())
      {
        // No operation was added during the trailing timeout: The
        // job is over
        LOG(INFO) << "The sequence of operations is over";
        done_ = true;

        for (std::list<IObserver*>::iterator it = observers_.begin(); 
             it != observers_.end(); ++it)
        {
          (*it)->SignalDone(*this);
        }

        return JobStepResult::Success();
      }
      else
      {
        LOG(INFO) << "New operation were added to the sequence of operations";
      }
    }

    assert(current_ < operations_.size());

    while (current_ < operations_.size() &&
           operations_[current_]->IsDone())
    {
      current_++;
    }

    if (current_ < operations_.size())
    {
      operations_[current_]->Step();
    }

    return JobStepResult::Continue();
  }


  void SequenceOfOperationsJob::Reset()
  {
    boost::mutex::scoped_lock lock(mutex_);
      
    current_ = 0;
    done_ = false;

    for (size_t i = 0; i < operations_.size(); i++)
    {
      operations_[i]->Reset();
    }
  }

  void SequenceOfOperationsJob::Stop(JobStopReason reason)
  {
  }


  float SequenceOfOperationsJob::GetProgress() const
  {
    boost::mutex::scoped_lock lock(mutex_);
      
    return (static_cast<float>(current_) / 
            static_cast<float>(operations_.size() + 1));
  }

  void SequenceOfOperationsJob::GetJobType(std::string& target) const
  {
    target = "SequenceOfOperations";
  }


  void SequenceOfOperationsJob::GetPublicContent(Json::Value& value) const
  {
    boost::mutex::scoped_lock lock(mutex_);

    value["CountOperations"] = static_cast<unsigned int>(operations_.size());
    value["Description"] = description_;
  }


  bool SequenceOfOperationsJob::Serialize(Json::Value& value) const
  {
    boost::mutex::scoped_lock lock(mutex_);

    value = Json::objectValue;

    std::string jobType;
    GetJobType(jobType);
    value[TYPE] = jobType;
    
    value[DESCRIPTION] = description_;
    value[TRAILING_TIMEOUT] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
    value[CURRENT] = static_cast<unsigned int>(current_);
    
    Json::Value tmp = Json::arrayValue;
    for (size_t i = 0; i < operations_.size(); i++)
    {
      Json::Value operation = Json::objectValue;
      operations_[i]->Serialize(operation);
      tmp.append(operation);
    }

    value[OPERATIONS] = tmp;

    return true;
  }


  void SequenceOfOperationsJob::AwakeTrailingSleep()
  {
    operationAdded_.notify_one();
  }


  SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer,
                                                   const Json::Value& serialized) :
    done_(false)
  {
    std::string jobType;
    GetJobType(jobType);
    
    if (SerializationToolbox::ReadString(serialized, TYPE) != jobType ||
        !serialized.isMember(OPERATIONS) ||
        serialized[OPERATIONS].type() != Json::arrayValue)
    {
      throw OrthancException(ErrorCode_BadFileFormat);
    }

    description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION);
    trailingTimeout_ = boost::posix_time::milliseconds
      (SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT));
    current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);

    const Json::Value& ops = serialized[OPERATIONS];

    // Unserialize the individual operations
    operations_.reserve(ops.size());
    for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
    {
      operations_.push_back(new Operation(unserializer, i, ops[i]));
    }

    // Connect the next operations
    for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
    {
      if (!ops[i].isMember(NEXT_OPERATIONS) ||
          ops[i][NEXT_OPERATIONS].type() != Json::arrayValue)
      {
        throw OrthancException(ErrorCode_BadFileFormat);
      }

      const Json::Value& next = ops[i][NEXT_OPERATIONS];
      for (Json::Value::ArrayIndex j = 0; j < next.size(); j++)
      {
        if (next[j].type() != Json::intValue ||
            next[j].asInt() < 0 ||
            next[j].asUInt() >= operations_.size())
        {
          throw OrthancException(ErrorCode_BadFileFormat);
        }
        else
        {
          operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true);
        }
      }
    }  
  }
}