view Framework/Oracle/ThreadedOracle.cpp @ 977:262a0244e9b2 toa2019090201

Added missing Unregister for objects that register by the broker + logs + guard in FetchContext
author Benjamin Golinvaux <bgo@osimis.io>
date Mon, 02 Sep 2019 17:29:26 +0200
parents a7351ad54960
children 18d53a8b41b7
line wrap: on
line source

/**
 * Stone of Orthanc
 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
 * Department, University Hospital of Liege, Belgium
 * Copyright (C) 2017-2019 Osimis S.A., Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Affero General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Affero General Public License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#include "ThreadedOracle.h"

#include "GetOrthancImageCommand.h"
#include "GetOrthancWebViewerJpegCommand.h"
#include "OrthancRestApiCommand.h"
#include "SleepOracleCommand.h"
#include "OracleCommandExceptionMessage.h"

#include <Core/Compression/GzipCompressor.h>
#include <Core/HttpClient.h>
#include <Core/OrthancException.h>
#include <Core/Toolbox.h>


namespace OrthancStone
{
  class ThreadedOracle::Item : public Orthanc::IDynamicObject
  {
  private:
    const IObserver&                receiver_;
    std::auto_ptr<IOracleCommand>   command_;

  public:
    Item(const IObserver& receiver,
         IOracleCommand* command) :
      receiver_(receiver),
      command_(command)
    {
      if (command == NULL)
      {
        throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
      }
    }

    const IObserver& GetReceiver() const
    {
      return receiver_;
    }

    IOracleCommand& GetCommand()
    {
      assert(command_.get() != NULL);
      return *command_;
    }
  };


  class ThreadedOracle::SleepingCommands : public boost::noncopyable
  {
  private:
    class Item
    {
    private:
      const IObserver&                   receiver_;
      std::auto_ptr<SleepOracleCommand>  command_;
      boost::posix_time::ptime           expiration_;

    public:
      Item(const IObserver& receiver,
           SleepOracleCommand* command) :
        receiver_(receiver),
        command_(command)
      {
        if (command == NULL)
        {
          throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
        }

        expiration_ = (boost::posix_time::microsec_clock::local_time() + 
                       boost::posix_time::milliseconds(command_->GetDelay()));
      }

      const boost::posix_time::ptime& GetExpirationTime() const
      {
        return expiration_;
      }

      void Awake(IMessageEmitter& emitter)
      {
        assert(command_.get() != NULL);

        SleepOracleCommand::TimeoutMessage message(*command_);
        emitter.EmitMessage(receiver_, message);
      }
    };

    typedef std::list<Item*>  Content;

    boost::mutex  mutex_;
    Content       content_;

  public:
    ~SleepingCommands()
    {
      for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
      {
        if (*it != NULL)
        {
          delete *it;
        }
      }
    }

    void Add(const IObserver& receiver,
             SleepOracleCommand* command)   // Takes ownership
    {
      boost::mutex::scoped_lock lock(mutex_);

      content_.push_back(new Item(receiver, command));
    }

    void AwakeExpired(IMessageEmitter& emitter)
    {
      boost::mutex::scoped_lock lock(mutex_);

      const boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();

      Content  stillSleeping;
        
      for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
      {
        if (*it != NULL &&
            (*it)->GetExpirationTime() <= now)
        {
          (*it)->Awake(emitter);
          delete *it;
          *it = NULL;
        }
        else
        {
          stillSleeping.push_back(*it);
        }
      }

      // Compact the still-sleeping commands
      content_ = stillSleeping;
    }
  };


  static void CopyHttpHeaders(Orthanc::HttpClient& client,
                              const Orthanc::HttpClient::HttpHeaders& headers)
  {
    for (Orthanc::HttpClient::HttpHeaders::const_iterator
           it = headers.begin(); it != headers.end(); it++ )
    {
      client.AddHeader(it->first, it->second);
    }
  }


  static void DecodeAnswer(std::string& answer,
                           const Orthanc::HttpClient::HttpHeaders& headers)
  {
    Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None;

    for (Orthanc::HttpClient::HttpHeaders::const_iterator it = headers.begin(); 
         it != headers.end(); ++it)
    {
      std::string s;
      Orthanc::Toolbox::ToLowerCase(s, it->first);

      if (s == "content-encoding")
      {
        if (it->second == "gzip")
        {
          contentEncoding = Orthanc::HttpCompression_Gzip;
        }
        else 
        {
          throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol,
                                          "Unsupported HTTP Content-Encoding: " + it->second);
        }

        break;
      }
    }

    if (contentEncoding == Orthanc::HttpCompression_Gzip)
    {
      std::string compressed;
      answer.swap(compressed);
          
      Orthanc::GzipCompressor compressor;
      compressor.Uncompress(answer, compressed.c_str(), compressed.size());

      LOG(INFO) << "Uncompressing gzip Encoding: from " << compressed.size()
                << " to " << answer.size() << " bytes";
    }
  }


  static void Execute(IMessageEmitter& emitter,
                      const Orthanc::WebServiceParameters& orthanc,
                      const IObserver& receiver,
                      const OrthancRestApiCommand& command)
  {
    Orthanc::HttpClient client(orthanc, command.GetUri());
    client.SetMethod(command.GetMethod());
    client.SetTimeout(command.GetTimeout());

    CopyHttpHeaders(client, command.GetHttpHeaders());

    if (command.GetMethod() == Orthanc::HttpMethod_Post ||
        command.GetMethod() == Orthanc::HttpMethod_Put)
    {
      client.SetBody(command.GetBody());
    }

    std::string answer;
    Orthanc::HttpClient::HttpHeaders answerHeaders;
    client.ApplyAndThrowException(answer, answerHeaders);

    DecodeAnswer(answer, answerHeaders);

    OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer);
    emitter.EmitMessage(receiver, message);
  }


  static void Execute(IMessageEmitter& emitter,
                      const Orthanc::WebServiceParameters& orthanc,
                      const IObserver& receiver,
                      const GetOrthancImageCommand& command)
  {
    Orthanc::HttpClient client(orthanc, command.GetUri());
    client.SetTimeout(command.GetTimeout());

    CopyHttpHeaders(client, command.GetHttpHeaders());
    
    std::string answer;
    Orthanc::HttpClient::HttpHeaders answerHeaders;
    client.ApplyAndThrowException(answer, answerHeaders);

    DecodeAnswer(answer, answerHeaders);

    command.ProcessHttpAnswer(emitter, receiver, answer, answerHeaders);
  }


  static void Execute(IMessageEmitter& emitter,
                      const Orthanc::WebServiceParameters& orthanc,
                      const IObserver& receiver,
                      const GetOrthancWebViewerJpegCommand& command)
  {
    Orthanc::HttpClient client(orthanc, command.GetUri());
    client.SetTimeout(command.GetTimeout());

    CopyHttpHeaders(client, command.GetHttpHeaders());

    std::string answer;
    Orthanc::HttpClient::HttpHeaders answerHeaders;
    client.ApplyAndThrowException(answer, answerHeaders);

    DecodeAnswer(answer, answerHeaders);

    command.ProcessHttpAnswer(emitter, receiver, answer);
  }


  void ThreadedOracle::Step()
  {
    std::auto_ptr<Orthanc::IDynamicObject>  object(queue_.Dequeue(100));

    if (object.get() != NULL)
    {
      Item& item = dynamic_cast<Item&>(*object);

      try
      {
        switch (item.GetCommand().GetType())
        {
          case IOracleCommand::Type_Sleep:
          {
            SleepOracleCommand& command = dynamic_cast<SleepOracleCommand&>(item.GetCommand());

            std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay()));

            if (command.HasPayload())
            {
              copy->SetPayload(command.ReleasePayload());
            }

            sleepingCommands_->Add(item.GetReceiver(), copy.release());

            break;
          }

          case IOracleCommand::Type_OrthancRestApi:
            Execute(emitter_, orthanc_, item.GetReceiver(), 
                    dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand()));
            break;

          case IOracleCommand::Type_GetOrthancImage:
            Execute(emitter_, orthanc_, item.GetReceiver(), 
                    dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand()));
            break;

          case IOracleCommand::Type_GetOrthancWebViewerJpeg:
            Execute(emitter_, orthanc_, item.GetReceiver(), 
                    dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand()));
            break;

          default:
            throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented);
        }
      }
      catch (Orthanc::OrthancException& e)
      {
        LOG(ERROR) << "Exception within the oracle: " << e.What();
        emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e));
      }
      catch (...)
      {
        LOG(ERROR) << "Threaded exception within the oracle";
        emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage
                             (item.GetCommand(), Orthanc::ErrorCode_InternalError));
      }
    }
  }


  void ThreadedOracle::Worker(ThreadedOracle* that)
  {
    assert(that != NULL);
      
    for (;;)
    {
      {
        boost::mutex::scoped_lock lock(that->mutex_);
        if (that->state_ != State_Running)
        {
          return;
        }
      }

      that->Step();
    }
  }


  void ThreadedOracle::SleepingWorker(ThreadedOracle* that)
  {
    assert(that != NULL);
      
    for (;;)
    {
      {
        boost::mutex::scoped_lock lock(that->mutex_);
        if (that->state_ != State_Running)
        {
          return;
        }
      }

      that->sleepingCommands_->AwakeExpired(that->emitter_);

      boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_));
    }
  }


  void ThreadedOracle::StopInternal()
  {
    {
      boost::mutex::scoped_lock lock(mutex_);

      if (state_ == State_Setup ||
          state_ == State_Stopped)
      {
        return;
      }
      else
      {
        state_ = State_Stopped;
      }
    }

    if (sleepingWorker_.joinable())
    {
      sleepingWorker_.join();
    }

    for (size_t i = 0; i < workers_.size(); i++)
    {
      if (workers_[i] != NULL)
      {
        if (workers_[i]->joinable())
        {
          workers_[i]->join();
        }

        delete workers_[i];
      }
    } 
  }


  ThreadedOracle::ThreadedOracle(IMessageEmitter& emitter) :
    emitter_(emitter),
    state_(State_Setup),
    workers_(4),
    sleepingCommands_(new SleepingCommands),
    sleepingTimeResolution_(50)  // By default, time resolution of 50ms
  {
  }


  ThreadedOracle::~ThreadedOracle()
  {
    if (state_ == State_Running)
    {
      LOG(ERROR) << "The threaded oracle is still running, explicit call to "
                 << "Stop() is mandatory to avoid crashes";
    }

    try
    {
      StopInternal();
    }
    catch (Orthanc::OrthancException& e)
    {
      LOG(ERROR) << "Exception while stopping the threaded oracle: " << e.What();
    }
    catch (...)
    {
      LOG(ERROR) << "Native exception while stopping the threaded oracle";
    }           
  }

  
  void ThreadedOracle::SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc)
  {
    boost::mutex::scoped_lock lock(mutex_);

    if (state_ != State_Setup)
    {
      LOG(ERROR) << "ThreadedOracle::SetOrthancParameters(): (state_ != State_Setup)";
      throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
    }
    else
    {
      orthanc_ = orthanc;
    }
  }


  void ThreadedOracle::SetThreadsCount(unsigned int count)
  {
    boost::mutex::scoped_lock lock(mutex_);

    if (count <= 0)
    {
      throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
    }
    else if (state_ != State_Setup)
    {
      LOG(ERROR) << "ThreadedOracle::SetThreadsCount(): (state_ != State_Setup)";
      throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
    }
    else
    {
      workers_.resize(count);
    }
  }


  void ThreadedOracle::SetSleepingTimeResolution(unsigned int milliseconds)
  {
    boost::mutex::scoped_lock lock(mutex_);

    if (milliseconds <= 0)
    {
      throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
    }
    else if (state_ != State_Setup)
    {
      LOG(ERROR) << "ThreadedOracle::SetSleepingTimeResolution(): (state_ != State_Setup)";
      throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
    }
    else
    {
      sleepingTimeResolution_ = milliseconds;
    }
  }


  void ThreadedOracle::Start()
  {
    boost::mutex::scoped_lock lock(mutex_);

    if (state_ != State_Setup)
    {
      LOG(ERROR) << "ThreadedOracle::Start(): (state_ != State_Setup)";
      throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
    }
    else
    {
      state_ = State_Running;

      for (unsigned int i = 0; i < workers_.size(); i++)
      {
        workers_[i] = new boost::thread(Worker, this);
      }

      sleepingWorker_ = boost::thread(SleepingWorker, this);
    }      
  }


  void ThreadedOracle::Schedule(const IObserver& receiver,
                                IOracleCommand* command)
  {
    queue_.Enqueue(new Item(receiver, command));
  }
}