view Core/MultiThreading/BagOfTasksProcessor.cpp @ 2248:69b0f4e8a49b

Escape multipart type parameter value in Content-Type header ## Summary Multipart responses do not quote/escape the value of their type parameter (the subtype) even though it always contains at least one special character (the slash "/"), which confuses standard-compliant HTTP clients. ## Details The Content-Type header in HTTP is in RFC 7231, Section 3.1.1.5: https://tools.ietf.org/html/rfc7231#section-3.1.1.5 The section defers to the media type section (3.1.1.1) for the syntax of the media type: https://tools.ietf.org/html/rfc7231#section-3.1.1.1 This states that a parameter value can be quoted: parameter = token "=" ( token / quoted-string ) A parameter value that matches the token production can be transmitted either as a token or within a quoted-string. The quoted and unquoted values are equivalent. Tokens are defined in RFC 7230, Section 3.2.6 (via RFC 7231, appendix C): https://tools.ietf.org/html/rfc7231#appendix-C https://tools.ietf.org/html/rfc7230#section-3.2.6 Here we observe that tokens cannot contain a slash "/" character: token = 1*tchar tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA ; any VCHAR, except delimiters Delimiters are chosen from the set of US-ASCII visual characters not allowed in a token (DQUOTE and "(),/:;<=>?@[\]{}"). However, the current implementation does not quote/escape the value of the type parameter: multipart/related; type=application/dicom Instead, it should be: multipart/related; type="application/dicom" All of this also seems to apply to the MIME Content-Type header definition, even though it is a little different: https://www.iana.org/assignments/message-headers https://tools.ietf.org/html/rfc2045#section-5.1 https://tools.ietf.org/html/rfc2387
author Thibault Nélis <tn@osimis.io>
date Mon, 16 Jan 2017 13:07:11 +0100
parents a3a65de1840f
children 878b59270859
line wrap: on
line source

/**
 * Orthanc - A Lightweight, RESTful DICOM Store
 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
 * Department, University Hospital of Liege, Belgium
 * Copyright (C) 2017 Osimis, Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * In addition, as a special exception, the copyright holders of this
 * program give permission to link the code of its release with the
 * OpenSSL project's "OpenSSL" library (or with modified versions of it
 * that use the same license as the "OpenSSL" library), and distribute
 * the linked executables. You must obey the GNU General Public License
 * in all respects for all of the code used other than "OpenSSL". If you
 * modify file(s) with this exception, you may extend this exception to
 * your version of the file(s), but you are not obligated to do so. If
 * you do not wish to do so, delete this exception statement from your
 * version. If you delete this exception statement from all source files
 * in the program, then also delete it here.
 * 
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#include "../PrecompiledHeaders.h"
#include "BagOfTasksProcessor.h"

#include "../Logging.h"
#include "../OrthancException.h"

#include <stdio.h>

namespace Orthanc
{
  class BagOfTasksProcessor::Task : public IDynamicObject
  {
  private:
    uint64_t                 bag_;
    std::auto_ptr<ICommand>  command_;

  public:
    Task(uint64_t  bag,
         ICommand* command) :
      bag_(bag),
      command_(command)
    {
    }

    bool Execute()
    {
      try
      {
        return command_->Execute();
      }
      catch (OrthancException& e)
      {
        LOG(ERROR) << "Exception while processing a bag of tasks: " << e.What();
        return false;
      }
      catch (std::runtime_error& e)
      {
        LOG(ERROR) << "Runtime exception while processing a bag of tasks: " << e.what();
        return false;
      }
      catch (...)
      {
        LOG(ERROR) << "Native exception while processing a bag of tasks";
        return false;
      }
    }

    uint64_t GetBag()
    {
      return bag_;
    }
  };


  void BagOfTasksProcessor::SignalProgress(Task& task,
                                           Bag& bag)
  {
    assert(bag.done_ < bag.size_);

    bag.done_ += 1;

    if (bag.done_ == bag.size_)
    {
      exitStatus_[task.GetBag()] = (bag.status_ == BagStatus_Running);
      bagFinished_.notify_all();
    }
  }

  void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that)
  {
    while (that->continue_)
    {
      std::auto_ptr<IDynamicObject> obj(that->queue_.Dequeue(100));
      if (obj.get() != NULL)
      {
        Task& task = *dynamic_cast<Task*>(obj.get());

        {
          boost::mutex::scoped_lock lock(that->mutex_);

          Bags::iterator bag = that->bags_.find(task.GetBag());
          assert(bag != that->bags_.end());
          assert(bag->second.done_ < bag->second.size_);

          if (bag->second.status_ != BagStatus_Running)
          {
            // Do not execute this task, as its parent bag of tasks
            // has failed or is tagged as canceled
            that->SignalProgress(task, bag->second);
            continue;
          }
        }

        bool success = task.Execute();

        {
          boost::mutex::scoped_lock lock(that->mutex_);

          Bags::iterator bag = that->bags_.find(task.GetBag());
          assert(bag != that->bags_.end());

          if (!success)
          {
            bag->second.status_ = BagStatus_Failed;
          }

          that->SignalProgress(task, bag->second);
        }
      }
    }
  }


  void BagOfTasksProcessor::Cancel(int64_t bag)
  {
    boost::mutex::scoped_lock  lock(mutex_);

    Bags::iterator it = bags_.find(bag);
    if (it != bags_.end())
    {
      it->second.status_ = BagStatus_Canceled;
    }
  }


  bool BagOfTasksProcessor::Join(int64_t bag)
  {
    boost::mutex::scoped_lock  lock(mutex_);

    while (continue_)
    {
      ExitStatus::iterator it = exitStatus_.find(bag);
      if (it == exitStatus_.end())  // The bag is still running
      {
        bagFinished_.wait(lock);
      }
      else
      {
        bool status = it->second;
        exitStatus_.erase(it);
        return status;
      }
    }

    return false;   // The processor is stopping
  }


  float BagOfTasksProcessor::GetProgress(int64_t bag)
  {
    boost::mutex::scoped_lock  lock(mutex_);

    Bags::const_iterator it = bags_.find(bag);
    if (it == bags_.end())
    {
      // The bag of tasks has finished
      return 1.0f;
    }
    else
    {
      return (static_cast<float>(it->second.done_) / 
              static_cast<float>(it->second.size_));
    }
  }


  bool BagOfTasksProcessor::Handle::Join()
  {
    if (hasJoined_)
    {
      return status_;
    }
    else
    {
      status_ = that_.Join(bag_);
      hasJoined_ = true;
      return status_;
    }
  }


  BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) : 
    countBags_(0),
    continue_(true)
  {
    if (countThreads == 0)
    {
      throw OrthancException(ErrorCode_ParameterOutOfRange);
    }

    threads_.resize(countThreads);

    for (size_t i = 0; i < threads_.size(); i++)
    {
      threads_[i] = new boost::thread(Worker, this);
    }
  }


  BagOfTasksProcessor::~BagOfTasksProcessor()
  {
    continue_ = false;

    bagFinished_.notify_all();   // Wakes up all the pending "Join()"

    for (size_t i = 0; i < threads_.size(); i++)
    {
      if (threads_[i])
      {
        if (threads_[i]->joinable())
        {
          threads_[i]->join();
        }

        delete threads_[i];
        threads_[i] = NULL;
      }
    }
  }


  BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks)
  {
    if (tasks.GetSize() == 0)
    {
      return new Handle(*this, 0, true);
    }

    boost::mutex::scoped_lock lock(mutex_);

    uint64_t id = countBags_;
    countBags_ += 1;

    Bag bag(tasks.GetSize());
    bags_[id] = bag;

    while (!tasks.IsEmpty())
    {
      queue_.Enqueue(new Task(id, tasks.Pop()));
    }

    return new Handle(*this, id, false);
  }
}