view OrthancFramework/Sources/MultiThreading/ThreadPool.cpp @ 6939:ae404101b2ae streaming tip

renamed DicomDataSource threads to differentiate them from the current DICOM network threads
author Alain Mazy <am@orthanc.team>
date Tue, 09 Jun 2026 09:53:51 +0200
parents 91a63f8a96ec
children
line wrap: on
line source

/**
 * Orthanc - A Lightweight, RESTful DICOM Store
 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
 * Department, University Hospital of Liege, Belgium
 * Copyright (C) 2017-2023 Osimis S.A., Belgium
 * Copyright (C) 2024-2026 Orthanc Team SRL, Belgium
 * Copyright (C) 2021-2026 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program. If not, see
 * <http://www.gnu.org/licenses/>.
 **/


#include "../PrecompiledHeaders.h"
#include "ThreadPool.h"

#include "../Logging.h"
#include "../OrthancException.h"
#include "FutureState.h"

#include <boost/lexical_cast.hpp>
#include <boost/weak_ptr.hpp>


static const unsigned int DEFAULT_DEQUEUE_TIMEOUT_MS = 100;


namespace Orthanc
{
  class ThreadPool::ITask : public IDynamicObject
  {
  public:
    virtual ~ITask()
    {
    }

    virtual void Execute() = 0;

    virtual void Cancel() = 0;
  };


  class ThreadPool::CallableTask : public ITask
  {
  private:
    std::unique_ptr<ICallable>               callable_;
    boost::weak_ptr<Internals::FutureState>  state_;

  public:
    CallableTask(ICallable* callable,
                 boost::shared_ptr<Internals::FutureState>& state) :
      callable_(callable),
      state_(state)
    {
      assert(callable != NULL);
    }

    void Execute() ORTHANC_OVERRIDE
    {
      boost::shared_ptr<Internals::FutureState> locked = state_.lock();

      if (locked)
      {
        try
        {
          locked->AcquireResult(callable_->Call());
        }
        catch (const OrthancException& e)
        {
          locked->SetError(e);
        }
        catch (...)
        {
          locked->SetError(OrthancException(ErrorCode_InternalError, "Unknown exception in ICallable::Call()"));
        }
      }
      else
      {
        // Nothing to do: The future was canceled before we even started
      }
    }

    void Cancel() ORTHANC_OVERRIDE
    {
      boost::shared_ptr<Internals::FutureState> locked = state_.lock();

      if (locked)
      {
        locked->SetError(OrthancException(ErrorCode_CanceledJob));
      }
      else
      {
        // Nothing to do: The future was canceled before we even started
      }
    }
  };


  class ThreadPool::RunnableTask : public ITask
  {
  private:
    std::unique_ptr<IRunnable>  runnable_;

  public:
    explicit RunnableTask(IRunnable* runnable) :
      runnable_(runnable)
    {
      assert(runnable != NULL);
    }

    void Execute() ORTHANC_OVERRIDE
    {
      runnable_->Run();
    }

    void Cancel() ORTHANC_OVERRIDE
    {
    }
  };


  template <bool throws>
  void ThreadPool::StopInternal()
  {
    {
      boost::mutex::scoped_lock lock(mutex_);

      switch (state_)
      {
        case State_Initialization:
          if (throws)
          {
            throw OrthancException(ErrorCode_BadSequenceOfCalls, "Start() has not been called");
          }
          else
          {
            return;  // This is for the destructor
          }

        case State_Finalization:
          if (throws)
          {
            throw OrthancException(ErrorCode_BadSequenceOfCalls, "Concurrent access to Stop()");
          }
          else
          {
            return;  // This is for the destructor, should never happen
          }

        case State_Running:
          state_ = State_Finalization;
          break;

        default:
          if (throws)
          {
            THROW_WITH_FILE_AND_LINE_INFO(ErrorCode_InternalError);
          }
          else
          {
            return;  // This is for the destructor
          }
      }
    }

    assert(workers_.get() != NULL);
    workers_->join_all();
    workers_.reset(NULL);

    // Cancel all the remaining tasks in the queue
    for (;;)
    {
      std::unique_ptr<IDynamicObject> task(queue_.Dequeue(1));

      if (task.get() == NULL)
      {
        break;
      }
      else
      {
        try
        {
          dynamic_cast<ITask&>(*task).Cancel();
        }
        catch (OrthancException& e)
        {
          LOG(ERROR) << "Error while canceling task during shutdown: " << e.What();
        }
      }
    }

    {
      boost::mutex::scoped_lock lock(mutex_);
      state_ = State_Initialization;
    }
  }


  void ThreadPool::WorkerLoop(const std::string& threadName)
  {
    Logging::SetCurrentThreadName(threadName);

    while (true)
    {
      unsigned int timeout;

      {
        boost::mutex::scoped_lock lock(mutex_);

        if (state_ == State_Running)
        {
          timeout = dequeueTimeoutMilliseconds_;
        }
        else
        {
          assert(state_ == State_Finalization);
          return;
        }
      }

      std::unique_ptr<IDynamicObject> task(queue_.Dequeue(timeout));

      if (task.get() != NULL)
      {
        try
        {
          dynamic_cast<ITask&>(*task).Execute();
        }
        catch (const OrthancException& e)
        {
          LOG(ERROR) << "Exception while executing a task: " << e.What();
        }
        catch (...)
        {
          LOG(ERROR) << "Native exception while executing a task";
        }
      }
    }
  }


  ThreadPool::ThreadPool() :
    loggingThreadName_("POOL"),
    countThreads_(1),
    state_(State_Initialization),
    dequeueTimeoutMilliseconds_(DEFAULT_DEQUEUE_TIMEOUT_MS)
  {
  }


  ThreadPool::~ThreadPool()
  {
    StopInternal<false>();  // don't throw in destructor
  }


  void ThreadPool::SetLoggingThreadName(const std::string& name)
  {
    boost::mutex::scoped_lock lock(mutex_);

    if (state_ == State_Initialization)
    {
      loggingThreadName_ = name;
    }
    else
    {
      throw OrthancException(ErrorCode_BadSequenceOfCalls, "Start() has already been called");
    }
  }


  void ThreadPool::SetCountThreads(unsigned int count)
  {
    if (count < 1)
    {
      throw OrthancException(ErrorCode_ParameterOutOfRange);
    }

    {
      boost::mutex::scoped_lock lock(mutex_);

      if (state_ == State_Initialization)
      {
        countThreads_ = count;
      }
      else
      {
        throw OrthancException(ErrorCode_BadSequenceOfCalls, "Start() has already been called");
      }
    }
  }


  unsigned int ThreadPool::GetCountThreads()
  {
    boost::mutex::scoped_lock lock(mutex_);
    return countThreads_;
  }


  void ThreadPool::SetDequeueTimeout(unsigned int milliseconds)
  {
    if (milliseconds < 1)
    {
      throw OrthancException(ErrorCode_ParameterOutOfRange);
    }

    {
      boost::mutex::scoped_lock lock(mutex_);

      if (state_ == State_Initialization)
      {
        dequeueTimeoutMilliseconds_ = milliseconds;
      }
      else
      {
        throw OrthancException(ErrorCode_BadSequenceOfCalls, "Start() has already been called");
      }
    }
  }


  unsigned int ThreadPool::GetDequeueTimeout()
  {
    boost::mutex::scoped_lock lock(mutex_);
    return dequeueTimeoutMilliseconds_;
  }


  void ThreadPool::Start()
  {
    boost::mutex::scoped_lock lock(mutex_);

    if (state_ == State_Initialization)
    {
      assert(countThreads_ >= 1);

      state_ = State_Running;

      assert(workers_.get() == NULL);
      workers_.reset(new boost::thread_group);

      for (unsigned int i = 0; i < countThreads_; i++)
      {
        const std::string threadName = loggingThreadName_ + "-" + boost::lexical_cast<std::string>(i);
        workers_->create_thread(boost::bind(&ThreadPool::WorkerLoop, this, threadName));
      }
    }
    else
    {
      throw OrthancException(ErrorCode_BadSequenceOfCalls, "Start() has not been called");
    }
  }


  Future* ThreadPool::Submit(ICallable* callable)
  {
    std::unique_ptr<ICallable> protection(callable);

    if (callable == NULL)
    {
      throw OrthancException(ErrorCode_NullPointer);
    }

    {
      boost::mutex::scoped_lock lock(mutex_);

      if (state_ != State_Running)
      {
        throw OrthancException(ErrorCode_BadSequenceOfCalls, "The thread pool is not running");
      }
    }

    boost::shared_ptr<Internals::FutureState> state(boost::make_shared<Internals::FutureState>());

    queue_.Enqueue(new CallableTask(protection.release(), state));

    return new Future(state);
  }


  void ThreadPool::Submit(IRunnable* runnable)
  {
    std::unique_ptr<IRunnable> protection(runnable);

    if (runnable == NULL)
    {
      throw OrthancException(ErrorCode_NullPointer);
    }

    {
      boost::mutex::scoped_lock lock(mutex_);

      if (state_ != State_Running)
      {
        throw OrthancException(ErrorCode_BadSequenceOfCalls, "The thread pool is not running");
      }
    }

    queue_.Enqueue(new RunnableTask(protection.release()));
  }


  void ThreadPool::Stop()
  {
    StopInternal<true>();
  }
}