view OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h @ 5911:bfae0fc2ea1b get-scu-test

Started to work on handling errors as warnings when trying to store instances whose SOPClassUID has not been accepted during the negotiation. Work to be finalized later
author Alain Mazy <am@orthanc.team>
date Mon, 09 Dec 2024 10:07:19 +0100
parents 8329d28611ad
children
line wrap: on
line source

/**
 * Orthanc - A Lightweight, RESTful DICOM Store
 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
 * Department, University Hospital of Liege, Belgium
 * Copyright (C) 2017-2023 Osimis S.A., Belgium
 * Copyright (C) 2024-2024 Orthanc Team SRL, Belgium
 * Copyright (C) 2021-2024 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#pragma once

#include "../../../OrthancFramework/Sources/Compatibility.h"  // For ORTHANC_OVERRIDE
#include "../../../OrthancFramework/Sources/JobsEngine/IJob.h"
#include "../../../OrthancFramework/Sources/MultiThreading/SharedMessageQueue.h"

#include <set>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/thread.hpp>

namespace Orthanc
{
  class ServerContext;

  // This class is a threaded version of SetOfInstancesJob merged with CleaningInstancesJob
  class ORTHANC_PUBLIC ThreadedSetOfInstancesJob : public IJob
  {
  public:
    enum ThreadedJobStep  // cannot use "Step" since there is a method with this name !
    {
      ThreadedJobStep_NotStarted,
      ThreadedJobStep_ProcessingInstances,
      ThreadedJobStep_PostProcessingInstances,
      ThreadedJobStep_Cleanup,
      ThreadedJobStep_Done
    };

  private:
    std::set<std::string>               instancesToProcess_;  // the list of source instances ids to process
    std::set<std::string>               failedInstances_;     // the list of source instances ids that failed processing
    std::set<std::string>               processedInstances_;  // the list of source instances ids that have been processed (including failed ones)

    SharedMessageQueue                  instancesToProcessQueue_;
    std::vector<boost::shared_ptr<boost::thread> >         instancesWorkers_;

    bool                    hasPostProcessing_;  // final step before "KeepSource" cleanup
    bool                    started_;
    bool                    stopRequested_;
    bool                    permissive_;
    ThreadedJobStep         currentStep_;
    std::string             description_;
    size_t                  workersCount_;

    ServerContext&          context_;
    bool                    keepSource_;
    ErrorCode               errorCode_;
  
  protected:
    mutable boost::recursive_mutex      mutex_;
    std::set<std::string>               parentResources_;

  public:
    ThreadedSetOfInstancesJob(ServerContext& context,
                              bool hasTrailingStep,
                              bool keepSource,
                              size_t workersCount);

    explicit ThreadedSetOfInstancesJob(ServerContext& context,
                                       const Json::Value& source,
                                       bool hasTrailingStep,
                                       bool defaultKeepSource);

    virtual ~ThreadedSetOfInstancesJob();

  protected:
    virtual bool HandleInstance(const std::string& instance) = 0;

    virtual void PostProcessInstances();

    void InitWorkers(size_t workersCount);

    void StopWorkers();

    void WaitWorkersComplete();

    static void InstanceWorkerThread(ThreadedSetOfInstancesJob* that);

    const std::string& GetInstance(size_t index) const;

    bool HasPostProcessingStep() const;

    bool HasCleanupStep() const;

    void SetErrorCode(ErrorCode errorCode);

    ErrorCode GetErrorCode() const;

  public:

    ThreadedJobStep GetCurrentStep() const;

    void SetDescription(const std::string& description);

    const std::string& GetDescription() const;

    void SetKeepSource(bool keep);

    bool IsKeepSource() const;

    void GetInstances(std::set<std::string>& target) const;

    void GetFailedInstances(std::set<std::string>& target) const;

    size_t GetInstancesCount() const;

    void AddInstances(const std::list<std::string>& instances);

    void AddParentResource(const std::string &resource);

    bool IsPermissive() const;

    void SetPermissive(bool permissive);

    virtual void Reset() ORTHANC_OVERRIDE;
    
    virtual void Start() ORTHANC_OVERRIDE;
    
    virtual void Stop(JobStopReason reason) ORTHANC_OVERRIDE;

    virtual float GetProgress() const ORTHANC_OVERRIDE;

    bool IsStarted() const;

    virtual JobStepResult Step(const std::string& jobId) ORTHANC_OVERRIDE;
    
    virtual void GetPublicContent(Json::Value& value) const ORTHANC_OVERRIDE;
    
    virtual bool Serialize(Json::Value& target) const ORTHANC_OVERRIDE;

    virtual bool GetOutput(std::string& output,
                           MimeType& mime,
                           std::string& filename,
                           const std::string& key) ORTHANC_OVERRIDE
    {
      return false;
    }

    virtual bool DeleteOutput(const std::string& key) ORTHANC_OVERRIDE
    {
      return false;
    }

    bool IsFailedInstance(const std::string& instance) const;

    ServerContext& GetContext() const
    {
      return context_;
    }

  };
}