view UnitTestsSources/MultiThreadingTests.cpp @ 2568:a46094602346 jobs

improvements
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 07 May 2018 15:02:34 +0200
parents 3caca43371f5
children 2af17cd5eb1f
line wrap: on
line source

/**
 * Orthanc - A Lightweight, RESTful DICOM Store
 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
 * Department, University Hospital of Liege, Belgium
 * Copyright (C) 2017-2018 Osimis S.A., Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * In addition, as a special exception, the copyright holders of this
 * program give permission to link the code of its release with the
 * OpenSSL project's "OpenSSL" library (or with modified versions of it
 * that use the same license as the "OpenSSL" library), and distribute
 * the linked executables. You must obey the GNU General Public License
 * in all respects for all of the code used other than "OpenSSL". If you
 * modify file(s) with this exception, you may extend this exception to
 * your version of the file(s), but you are not obligated to do so. If
 * you do not wish to do so, delete this exception statement from your
 * version. If you delete this exception statement from all source files
 * in the program, then also delete it here.
 * 
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#include "PrecompiledHeadersUnitTests.h"
#include "gtest/gtest.h"

#include "../OrthancServer/Scheduler/ServerScheduler.h"
#include "../Core/OrthancException.h"
#include "../Core/SystemToolbox.h"
#include "../Core/Toolbox.h"
#include "../Core/MultiThreading/Locker.h"

using namespace Orthanc;

namespace
{
  class DynamicInteger : public IDynamicObject
  {
  private:
    int value_;
    std::set<int>& target_;

  public:
    DynamicInteger(int value, std::set<int>& target) : 
      value_(value), target_(target)
    {
    }

    int GetValue() const
    {
      return value_;
    }
  };
}


TEST(MultiThreading, SharedMessageQueueBasic)
{
  std::set<int> s;

  SharedMessageQueue q;
  ASSERT_TRUE(q.WaitEmpty(0));
  q.Enqueue(new DynamicInteger(10, s));
  ASSERT_FALSE(q.WaitEmpty(1));
  q.Enqueue(new DynamicInteger(20, s));
  q.Enqueue(new DynamicInteger(30, s));
  q.Enqueue(new DynamicInteger(40, s));

  std::auto_ptr<DynamicInteger> i;
  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue());
  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue());
  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue());
  ASSERT_FALSE(q.WaitEmpty(1));
  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue());
  ASSERT_TRUE(q.WaitEmpty(0));
  ASSERT_EQ(NULL, q.Dequeue(1));
}


TEST(MultiThreading, SharedMessageQueueClean)
{
  std::set<int> s;

  try
  {
    SharedMessageQueue q;
    q.Enqueue(new DynamicInteger(10, s));
    q.Enqueue(new DynamicInteger(20, s));  
    throw OrthancException(ErrorCode_InternalError);
  }
  catch (OrthancException&)
  {
  }
}




#include "../Core/DicomNetworking/ReusableDicomUserConnection.h"

TEST(ReusableDicomUserConnection, DISABLED_Basic)
{
  ReusableDicomUserConnection c;
  c.SetMillisecondsBeforeClose(200);
  printf("START\n"); fflush(stdout);

  {
    RemoteModalityParameters remote("STORESCP", "localhost", 2000, ModalityManufacturer_Generic);
    ReusableDicomUserConnection::Locker lock(c, "ORTHANC", remote);
    lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676281");
  }

  printf("**\n"); fflush(stdout);
  SystemToolbox::USleep(1000000);
  printf("**\n"); fflush(stdout);

  {
    RemoteModalityParameters remote("STORESCP", "localhost", 2000, ModalityManufacturer_Generic);
    ReusableDicomUserConnection::Locker lock(c, "ORTHANC", remote);
    lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676277");
  }

  SystemToolbox::ServerBarrier();
  printf("DONE\n"); fflush(stdout);
}



class Tutu : public IServerCommand
{
private:
  int factor_;

public:
  Tutu(int f) : factor_(f)
  {
  }

  virtual bool Apply(ListOfStrings& outputs,
                     const ListOfStrings& inputs)
  {
    for (ListOfStrings::const_iterator 
           it = inputs.begin(); it != inputs.end(); ++it)
    {
      int a = boost::lexical_cast<int>(*it);
      int b = factor_ * a;

      printf("%d * %d = %d\n", a, factor_, b);

      //if (a == 84) { printf("BREAK\n"); return false; }

      outputs.push_back(boost::lexical_cast<std::string>(b));
    }

    SystemToolbox::USleep(30000);

    return true;
  }
};


static void Tata(ServerScheduler* s, ServerJob* j, bool* done)
{
  typedef IServerCommand::ListOfStrings  ListOfStrings;

  while (!(*done))
  {
    ListOfStrings l;
    s->GetListOfJobs(l);
    for (ListOfStrings::iterator it = l.begin(); it != l.end(); ++it)
    {
      printf(">> %s: %0.1f\n", it->c_str(), 100.0f * s->GetProgress(*it));
    }
    SystemToolbox::USleep(3000);
  }
}


TEST(MultiThreading, ServerScheduler)
{
  ServerScheduler scheduler(10);

  ServerJob job;
  ServerCommandInstance& f2 = job.AddCommand(new Tutu(2));
  ServerCommandInstance& f3 = job.AddCommand(new Tutu(3));
  ServerCommandInstance& f4 = job.AddCommand(new Tutu(4));
  ServerCommandInstance& f5 = job.AddCommand(new Tutu(5));
  f2.AddInput(boost::lexical_cast<std::string>(42));
  //f3.AddInput(boost::lexical_cast<std::string>(42));
  //f4.AddInput(boost::lexical_cast<std::string>(42));
  f2.ConnectOutput(f3);
  f3.ConnectOutput(f4);
  f4.ConnectOutput(f5);

  f3.SetConnectedToSink(true);
  f5.SetConnectedToSink(true);

  job.SetDescription("tutu");

  bool done = false;
  boost::thread t(Tata, &scheduler, &job, &done);


  //scheduler.Submit(job);

  IServerCommand::ListOfStrings l;
  scheduler.SubmitAndWait(l, job);

  ASSERT_EQ(2u, l.size());
  ASSERT_EQ(42 * 2 * 3, boost::lexical_cast<int>(l.front()));
  ASSERT_EQ(42 * 2 * 3 * 4 * 5, boost::lexical_cast<int>(l.back()));

  for (IServerCommand::ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
  {
    printf("** %s\n", i->c_str());
  }

  //SystemToolbox::ServerBarrier();
  //SystemToolbox::USleep(3000000);

  scheduler.Stop();

  done = true;
  if (t.joinable())
  {
    t.join();
  }
}





#if !defined(ORTHANC_SANDBOXED)
#  error The macro ORTHANC_SANDBOXED must be defined
#endif

#if ORTHANC_SANDBOXED == 1
#  error The job engine cannot be used in sandboxed environments
#endif

#include "../Core/Logging.h"

#include <boost/math/special_functions/round.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <queue>

namespace Orthanc
{
  enum JobState
  {
    JobState_Pending,
    JobState_Running,
    JobState_Success,
    JobState_Failure,
    JobState_Paused,
    JobState_Retry
  };

  static const char* EnumerationToString(JobState state)
  {
    switch (state)
    {
      case JobState_Pending:
        return "Pending";
        
      case JobState_Running:
        return "Running";
        
      case JobState_Success:
        return "Success";
        
      case JobState_Failure:
        return "Failure";
        
      case JobState_Paused:
        return "Paused";
        
      case JobState_Retry:
        return "Retry";
        
      default:
        throw OrthancException(ErrorCode_ParameterOutOfRange);
    }
  }
  
  enum JobStepCode
  {
    JobStepCode_Success,
    JobStepCode_Failure,
    JobStepCode_Continue,
    JobStepCode_Retry
  };    
    
  class JobStepResult
  {
  private:
    JobStepCode code_;
    
  public:
    explicit JobStepResult(JobStepCode code) :
      code_(code)
    {
    }

    virtual ~JobStepResult()
    {
    }

    JobStepCode GetCode() const
    {
      return code_;
    }
  };


  class JobStepRetry : public JobStepResult
  {
  private:
    unsigned int  timeout_;   // Retry after "timeout_" milliseconds

  public:
    JobStepRetry(unsigned int timeout) :
      JobStepResult(JobStepCode_Retry),
      timeout_(timeout)
    {
    }

    unsigned int  GetTimeout() const
    {
      return timeout_;
    }
  };

  
  class IJob : public boost::noncopyable
  {
  public:
    virtual ~IJob()
    {
    }

    virtual JobStepResult* ExecuteStep() = 0;

    virtual void ReleaseResources() = 0;   // For pausing jobs

    virtual float GetProgress() = 0;

    virtual void GetDescription(Json::Value& value) = 0;
  };


  class JobStatus
  {
  private:
    ErrorCode      errorCode_;
    float          progress_;
    Json::Value    description_;

  public:
    JobStatus() :
      errorCode_(ErrorCode_InternalError),
      progress_(0),
      description_(Json::objectValue)
    {
    }

    JobStatus(ErrorCode code,
              IJob& job) :
      errorCode_(code),
      progress_(job.GetProgress())
    {
      if (progress_ < 0)
      {
        progress_ = 0;
      }
      
      if (progress_ > 1)
      {
        progress_ = 1;
      }

      job.GetDescription(description_);
    }

    ErrorCode GetErrorCode() const
    {
      return errorCode_;
    }

    float GetProgress() const
    {
      return progress_;
    }

    const Json::Value& GetDescription() const
    {
      return description_;
    }
  };


  class JobInfo
  {
  private:
    std::string                       id_;
    int                               priority_;
    JobState                          state_;
    boost::posix_time::ptime          timestamp_;
    boost::posix_time::ptime          creationTime_;
    boost::posix_time::ptime          lastStateChangeTime_;
    boost::posix_time::time_duration  runtime_;
    bool                              hasEta_;
    boost::posix_time::ptime          eta_;
    JobStatus                         status_;

  public:
    JobInfo(const std::string& id,
            int priority,
            JobState state,
            const JobStatus& status,
            const boost::posix_time::ptime& creationTime,
            const boost::posix_time::ptime& lastStateChangeTime,
            const boost::posix_time::time_duration& runtime) :
      id_(id),
      priority_(priority),
      state_(state),
      timestamp_(boost::posix_time::microsec_clock::universal_time()),
      creationTime_(creationTime),
      lastStateChangeTime_(lastStateChangeTime),
      runtime_(runtime),
      hasEta_(false),
      status_(status)
    {
      if (state_ == JobState_Running)
      {
        float ms = static_cast<float>(runtime_.total_milliseconds());

        if (status_.GetProgress() > 0.01f &&
            ms > 0.01f)
        {
          float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms;
          eta_ = timestamp_ + boost::posix_time::milliseconds(remaining);
          hasEta_ = true;
        }
      }
    }

    JobInfo() :
      priority_(0),
      state_(JobState_Failure),
      timestamp_(boost::posix_time::microsec_clock::universal_time()),
      creationTime_(timestamp_),
      lastStateChangeTime_(timestamp_),
      runtime_(boost::posix_time::milliseconds(0)),
      hasEta_(false)
    {
    }

    const std::string& GetIdentifier() const
    {
      return id_;
    }

    int GetPriority() const
    {
      return priority_;
    }

    JobState GetState() const
    {
      return state_;
    }

    const boost::posix_time::ptime& GetInfoTime() const
    {
      return timestamp_;
    }

    const boost::posix_time::ptime& GetCreationTime() const
    {
      return creationTime_;
    }

    const boost::posix_time::time_duration& GetRuntime() const
    {
      return runtime_;
    }

    bool HasEstimatedTimeOfArrival() const
    {
      return hasEta_;
    }

    bool HasCompletionTime() const
    {
      return (state_ == JobState_Success ||
              state_ == JobState_Failure);
    }

    const boost::posix_time::ptime& GetEstimatedTimeOfArrival() const
    {
      if (hasEta_)
      {
        return eta_;
      }
      else
      {
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }
    }

    const boost::posix_time::ptime& GetCompletionTime() const
    {
      if (HasCompletionTime())
      {
        return lastStateChangeTime_;
      }
      else
      {
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }
    }

    const JobStatus& GetStatus() const
    {
      return status_;
    }

    JobStatus& GetStatus()
    {
      return status_;
    }

    void Format(Json::Value& target) const
    {
      target = Json::objectValue;
      target["ID"] = id_;
      target["Priority"] = priority_;
      target["ErrorCode"] = static_cast<int>(status_.GetErrorCode());
      target["ErrorDescription"] = EnumerationToString(status_.GetErrorCode());
      target["State"] = EnumerationToString(state_);
      target["Timestamp"] = boost::posix_time::to_iso_string(timestamp_);
      target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_);
      target["Runtime"] = static_cast<uint32_t>(runtime_.total_milliseconds());      
      target["Progress"] = boost::math::iround(status_.GetProgress() * 100.0f);
      target["Description"] = status_.GetDescription();

      if (HasEstimatedTimeOfArrival())
      {
        target["EstimatedTimeOfArrival"] = boost::posix_time::to_iso_string(GetEstimatedTimeOfArrival());
      }

      if (HasCompletionTime())
      {
        target["CompletionTime"] = boost::posix_time::to_iso_string(GetCompletionTime());
      }
    }
  };




  class JobsRegistry : public boost::noncopyable
  {
  private:
    class JobHandler : public boost::noncopyable
    {   
    private:
      std::string                       id_;
      JobState                          state_;
      std::auto_ptr<IJob>               job_;
      int                               priority_;  // "+inf()" means highest priority
      boost::posix_time::ptime          creationTime_;
      boost::posix_time::ptime          lastStateChangeTime_;
      boost::posix_time::time_duration  runtime_;
      boost::posix_time::ptime          retryTime_;
      bool                              pauseScheduled_;
      JobStatus                         lastStatus_;

      void Touch()
      {
        const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();

        if (state_ == JobState_Running)
        {
          runtime_ += (now - lastStateChangeTime_);
        }

        lastStateChangeTime_ = now;
      }

      void SetStateInternal(JobState state) 
      {
        state_ = state;
        pauseScheduled_ = false;
        Touch();
      }

    public:
      JobHandler(IJob* job,
                 int priority) :
        id_(Toolbox::GenerateUuid()),
        state_(JobState_Pending),
        job_(job),
        priority_(priority),
        creationTime_(boost::posix_time::microsec_clock::universal_time()),
        lastStateChangeTime_(creationTime_),
        runtime_(boost::posix_time::milliseconds(0)),
        retryTime_(creationTime_),
        pauseScheduled_(false)
      {
        if (job == NULL)
        {
          throw OrthancException(ErrorCode_NullPointer);
        }

        lastStatus_ = JobStatus(ErrorCode_Success, *job);
      }

      const std::string& GetId() const
      {
        return id_;
      }

      IJob& GetJob() const
      {
        assert(job_.get() != NULL);
        return *job_;
      }

      void SetPriority(int priority)
      {
        priority_ = priority;
      }

      int GetPriority() const
      {
        return priority_;
      }

      JobState GetState() const
      {
        return state_;
      }

      void SetState(JobState state) 
      {
        if (state == JobState_Retry)
        {
          // Use "SetRetryState()"
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          SetStateInternal(state);
        }
      }

      void SetRetryState(unsigned int timeout)
      {
        if (state_ == JobState_Running)
        {
          SetStateInternal(JobState_Retry);
          retryTime_ = (boost::posix_time::microsec_clock::universal_time() + 
                        boost::posix_time::milliseconds(timeout));
        }
        else
        {
          // Only valid for running jobs
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
      }

      void SchedulePause()
      {
        if (state_ == JobState_Running)
        {
          pauseScheduled_ = true;
        }
        else
        {
          // Only valid for running jobs
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
      }

      bool IsPauseScheduled()
      {
        return pauseScheduled_;
      }

      bool IsRetryReady(const boost::posix_time::ptime& now) const
      {
        if (state_ != JobState_Retry)
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          return retryTime_ <= now;
        }
      }

      const boost::posix_time::ptime& GetCreationTime() const
      {
        return creationTime_;
      }

      const boost::posix_time::ptime& GetLastStateChangeTime() const
      {
        return lastStateChangeTime_;
      }

      const boost::posix_time::time_duration& GetRuntime() const
      {
        return runtime_;
      }

      const JobStatus& GetLastStatus() const
      {
        return lastStatus_;
      }

      void SetLastStatus(const JobStatus& status)
      {
        lastStatus_ = status;
        Touch();
      }
    };

    struct PriorityComparator
    {
      bool operator() (JobHandler*& a,
                       JobHandler*& b) const
      {
        return a->GetPriority() < b->GetPriority();
      }                       
    };

    typedef std::map<std::string, JobHandler*>              JobsIndex;
    typedef std::list<JobHandler*>                          CompletedJobs;
    typedef std::set<JobHandler*>                           RetryJobs;
    typedef std::priority_queue<JobHandler*, 
                                std::vector<JobHandler*>,   // Could be a "std::deque"
                                PriorityComparator>         PendingJobs;

    boost::mutex               mutex_;
    JobsIndex                  jobsIndex_;
    PendingJobs                pendingJobs_;
    CompletedJobs              completedJobs_;
    RetryJobs                  retryJobs_;

    boost::condition_variable  pendingJobAvailable_;
    size_t                     maxCompletedJobs_;


#ifndef NDEBUG
    bool IsPendingJob(const JobHandler& job) const
    {
      PendingJobs copy = pendingJobs_;
      while (!copy.empty())
      {
        if (copy.top() == &job)
        {
          return true;
        }

        copy.pop();
      }

      return false;
    }

    bool IsCompletedJob(JobHandler& job) const
    {
      for (CompletedJobs::const_iterator it = completedJobs_.begin();
           it != completedJobs_.end(); ++it)
      {
        if (*it == &job)
        {
          return true;
        }
      }

      return false;
    }

    bool IsRetryJob(JobHandler& job) const
    {
      return retryJobs_.find(&job) != retryJobs_.end();
    }
#endif


    void CheckInvariants() const
    {
#ifndef NDEBUG
      {
        PendingJobs copy = pendingJobs_;
        while (!copy.empty())
        {
          assert(copy.top()->GetState() == JobState_Pending);
          copy.pop();
        }
      }

      assert(completedJobs_.size() <= maxCompletedJobs_);

      for (CompletedJobs::const_iterator it = completedJobs_.begin();
           it != completedJobs_.end(); ++it)
      {
        assert((*it)->GetState() == JobState_Success ||
               (*it)->GetState() == JobState_Failure);
      }

      for (RetryJobs::const_iterator it = retryJobs_.begin();
           it != retryJobs_.end(); ++it)
      {
        assert((*it)->GetState() == JobState_Retry);
      }

      for (JobsIndex::const_iterator it = jobsIndex_.begin();
           it != jobsIndex_.end(); ++it)
      {
        JobHandler& job = *it->second;

        assert(job.GetId() == it->first);

        switch (job.GetState())
        {
          case JobState_Pending:
            assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
            break;
            
          case JobState_Success:
          case JobState_Failure:
            assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
            break;
            
          case JobState_Retry:
            assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
            break;
            
          case JobState_Running:
          case JobState_Paused:
            assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
            break;

          default:
            throw OrthancException(ErrorCode_InternalError);
        }
      }
#endif
    }


    void ForgetOldCompletedJobs()
    {
      if (maxCompletedJobs_ != 0)
      {
        while (completedJobs_.size() > maxCompletedJobs_)
        {
          assert(completedJobs_.front() != NULL);

          std::string id = completedJobs_.front()->GetId();
          assert(jobsIndex_.find(id) != jobsIndex_.end());

          jobsIndex_.erase(id);
          delete(completedJobs_.front());
          completedJobs_.pop_front();
        }
      }
    }


    void MarkRunningAsCompleted(JobHandler& job,
                                bool success)
    {
      LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
                << ": " << job.GetId();

      CheckInvariants();
      assert(job.GetState() == JobState_Running);

      job.SetState(success ? JobState_Success : JobState_Failure);

      completedJobs_.push_back(&job);
      ForgetOldCompletedJobs();

      CheckInvariants();
    }


    void MarkRunningAsRetry(JobHandler& job,
                            unsigned int timeout)
    {
      LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();

      CheckInvariants();

      assert(job.GetState() == JobState_Running &&
             retryJobs_.find(&job) == retryJobs_.end());

      retryJobs_.insert(&job);
      job.SetRetryState(timeout);

      CheckInvariants();
    }


    void MarkRunningAsPaused(JobHandler& job)
    {
      LOG(INFO) << "Job paused: " << job.GetId();

      CheckInvariants();
      assert(job.GetState() == JobState_Running);

      job.SetState(JobState_Paused);

      CheckInvariants();
    }


  public:
    JobsRegistry() :
      maxCompletedJobs_(10)
    {
    }


    ~JobsRegistry()
    {
      for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
      {
        assert(it->second != NULL);
        delete it->second;
      }
    }


    void SetMaxCompletedJobs(size_t i)
    {
      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();

      maxCompletedJobs_ = i;
      ForgetOldCompletedJobs();

      CheckInvariants();
    }


    void ListJobs(std::set<std::string>& target)
    {
      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();

      for (JobsIndex::const_iterator it = jobsIndex_.begin();
           it != jobsIndex_.end(); ++it)
      {
        target.insert(it->first);
      }
    }


    bool GetJobInfo(JobInfo& target,
                    const std::string& id)
    {
      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();

      JobsIndex::const_iterator found = jobsIndex_.find(id);

      if (found == jobsIndex_.end())
      {
        return false;
      }
      else
      {
        const JobHandler& handler = *found->second;
        target = JobInfo(handler.GetId(),
                         handler.GetPriority(),
                         handler.GetState(),
                         handler.GetLastStatus(),
                         handler.GetCreationTime(),
                         handler.GetLastStateChangeTime(),
                         handler.GetRuntime());
        return true;
      }
    }


    void Submit(std::string& id,
                IJob* job,        // Takes ownership
                int priority)
    {
      std::auto_ptr<JobHandler>  handler(new JobHandler(job, priority));

      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();
      
      id = handler->GetId();

      pendingJobs_.push(handler.get());
      pendingJobAvailable_.notify_one();

      jobsIndex_.insert(std::make_pair(id, handler.release()));

      LOG(INFO) << "New job submitted with priority " << priority << ": " << id;

      CheckInvariants();
    }


    void Submit(IJob* job,        // Takes ownership
                int priority)
    {
      std::string id;
      Submit(id, job, priority);
    }


    void SetPriority(const std::string& id,
                     int priority)
    {
      LOG(INFO) << "Changing priority to " << priority << " for job: " << id;

      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();

      JobsIndex::iterator found = jobsIndex_.find(id);

      if (found == jobsIndex_.end())
      {
        LOG(WARNING) << "Unknown job: " << id;
      }
      else
      {
        found->second->SetPriority(priority);

        if (found->second->GetState() == JobState_Pending)
        {
          // If the job is pending, we need to reconstruct the
          // priority queue, as the heap condition has changed

          PendingJobs copy;
          std::swap(copy, pendingJobs_);

          assert(pendingJobs_.empty());
          while (!copy.empty())
          {
            pendingJobs_.push(copy.top());
            copy.pop();
          }
        }
      }

      CheckInvariants();
    }


    void Pause(const std::string& id)
    {
      LOG(INFO) << "Pausing job: " << id;

      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();

      JobsIndex::iterator found = jobsIndex_.find(id);

      if (found == jobsIndex_.end())
      {
        LOG(WARNING) << "Unknown job: " << id;
      }
      else
      {
        switch (found->second->GetState())
        {
          case JobState_Pending:
          {
            // If the job is pending, we need to reconstruct the
            // priority queue to remove it
            PendingJobs copy;
            std::swap(copy, pendingJobs_);

            assert(pendingJobs_.empty());
            while (!copy.empty())
            {
              if (copy.top()->GetId() != id)
              {
                pendingJobs_.push(copy.top());
              }

              copy.pop();
            }

            found->second->SetState(JobState_Paused);

            break;
          }

          case JobState_Retry:
          {
            RetryJobs::iterator item = retryJobs_.find(found->second);
            assert(item != retryJobs_.end());            
            retryJobs_.erase(item);

            found->second->SetState(JobState_Paused);

            break;
          }

          case JobState_Paused:
          case JobState_Success:
          case JobState_Failure:
            // Nothing to be done
            break;

          case JobState_Running:
            found->second->SchedulePause();
            break;

          default:
            throw OrthancException(ErrorCode_InternalError);
        }
      }

      CheckInvariants();
    }


    void Resume(const std::string& id)
    {
      LOG(INFO) << "Resuming job: " << id;

      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();

      JobsIndex::iterator found = jobsIndex_.find(id);

      if (found == jobsIndex_.end())
      {
        LOG(WARNING) << "Unknown job: " << id;
      }
      else if (found->second->GetState() != JobState_Paused)
      {
        LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
      }
      else
      {
        found->second->SetState(JobState_Pending);
        pendingJobs_.push(found->second);
        pendingJobAvailable_.notify_one();
      }

      CheckInvariants();
    }


    void Resubmit(const std::string& id)
    {
      LOG(INFO) << "Resubmitting failed job: " << id;

      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();

      JobsIndex::iterator found = jobsIndex_.find(id);

      if (found == jobsIndex_.end())
      {
        LOG(WARNING) << "Unknown job: " << id;
      }
      else if (found->second->GetState() != JobState_Failure)
      {
        LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
      }
      else
      {
        bool ok = false;
        for (CompletedJobs::iterator it = completedJobs_.begin(); 
             it != completedJobs_.end(); ++it)
        {
          if (*it == found->second)
          {
            ok = true;
            completedJobs_.erase(it);
            break;
          }
        }

        assert(ok);

        found->second->SetState(JobState_Pending);
        pendingJobs_.push(found->second);
        pendingJobAvailable_.notify_one();
      }

      CheckInvariants();
    }


    void ScheduleRetries()
    {
      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();

      RetryJobs copy;
      std::swap(copy, retryJobs_);

      const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();

      assert(retryJobs_.empty());
      for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
      {
        if ((*it)->IsRetryReady(now))
        {
          LOG(INFO) << "Retrying job: " << (*it)->GetId();
          (*it)->SetState(JobState_Pending);
          pendingJobs_.push(*it);
          pendingJobAvailable_.notify_one();
        }
        else
        {
          retryJobs_.insert(*it);
        }
      }

      CheckInvariants();
    }


    bool GetState(JobState& state,
                  const std::string& id)
    {
      boost::mutex::scoped_lock lock(mutex_);
      CheckInvariants();

      JobsIndex::const_iterator it = jobsIndex_.find(id);
      if (it == jobsIndex_.end())
      {
        return false;
      }
      else
      {
        state = it->second->GetState();
        return true;
      }
    }


    class RunningJob : public boost::noncopyable
    {
    private:
      JobsRegistry&  registry_;
      JobHandler*    handler_;  // Can only be accessed if the registry
      // mutex is locked!
      IJob*          job_;  // Will by design be in mutual exclusion,
      // because only one RunningJob can be
      // executed at a time on a JobHandler

      std::string    id_;
      int            priority_;
      JobState       targetState_;
      unsigned int   targetRetryTimeout_;
      
    public:
      RunningJob(JobsRegistry& registry,
                 unsigned int timeout) :
        registry_(registry),
        handler_(NULL),
        targetState_(JobState_Failure),
        targetRetryTimeout_(0)
      {
        {
          boost::mutex::scoped_lock lock(registry_.mutex_);

          while (registry_.pendingJobs_.empty())
          {
            if (timeout == 0)
            {
              registry_.pendingJobAvailable_.wait(lock);
            }
            else
            {
              bool success = registry_.pendingJobAvailable_.timed_wait
                (lock, boost::posix_time::milliseconds(timeout));
              if (!success)
              {
                // No pending job
                return;
              }
            }
          }

          handler_ = registry_.pendingJobs_.top();
          registry_.pendingJobs_.pop();

          assert(handler_->GetState() == JobState_Pending);
          handler_->SetState(JobState_Running);

          job_ = &handler_->GetJob();
          id_ = handler_->GetId();
          priority_ = handler_->GetPriority();
        }
      }

      ~RunningJob()
      {
        if (IsValid())
        {
          boost::mutex::scoped_lock lock(registry_.mutex_);

          switch (targetState_)
          {
            case JobState_Failure:
              registry_.MarkRunningAsCompleted(*handler_, false);
              break;

            case JobState_Success:
              registry_.MarkRunningAsCompleted(*handler_, true);
              break;

            case JobState_Paused:
              registry_.MarkRunningAsPaused(*handler_);
              break;            

            case JobState_Retry:
              registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
              break;
            
            default:
              assert(0);
          }
        }
      }

      bool IsValid() const
      {
        return (handler_ != NULL &&
                job_ != NULL);
      }

      const std::string& GetId() const
      {
        if (!IsValid())
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          return id_;
        }
      }

      int GetPriority() const
      {
        if (!IsValid())
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          return priority_;
        }
      }

      IJob& GetJob()
      {
        if (!IsValid())
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          return *job_;
        }
      }

      bool IsPauseScheduled()
      {
        if (!IsValid())
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          boost::mutex::scoped_lock lock(registry_.mutex_);
          registry_.CheckInvariants();
          assert(handler_->GetState() == JobState_Running);
        
          return handler_->IsPauseScheduled();
        }
      }

      void MarkSuccess()
      {
        if (!IsValid())
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          targetState_ = JobState_Success;
        }
      }

      void MarkFailure()
      {
        if (!IsValid())
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          targetState_ = JobState_Failure;
        }
      }

      void MarkPause()
      {
        if (!IsValid())
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          targetState_ = JobState_Paused;
        }
      }

      void MarkRetry(unsigned int timeout)
      {
        if (!IsValid())
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          targetState_ = JobState_Retry;
          targetRetryTimeout_ = timeout;
        }
      }

      void UpdateStatus(ErrorCode code)
      {
        if (!IsValid())
        {
          throw OrthancException(ErrorCode_BadSequenceOfCalls);
        }
        else
        {
          JobStatus status(code, *job_);
          
          boost::mutex::scoped_lock lock(registry_.mutex_);
          registry_.CheckInvariants();
          assert(handler_->GetState() == JobState_Running);
        
          handler_->SetLastStatus(status);
        }
      }
    };
  };



  class JobsEngine
  {
  private:
    enum State
    {
      State_Setup,
      State_Running,
      State_Stopping,
      State_Done
    };

    boost::mutex                stateMutex_;
    State                       state_;
    JobsRegistry                registry_;
    boost::thread               retryHandler_;
    std::vector<boost::thread>  workers_;

    bool ExecuteStep(JobsRegistry::RunningJob& running,
                     size_t workerIndex)
    {
      assert(running.IsValid());

      LOG(INFO) << "Executing job with priority " << running.GetPriority()
                << " in worker thread " << workerIndex << ": " << running.GetId();

      if (running.IsPauseScheduled())
      {
        running.GetJob().ReleaseResources();
        running.MarkPause();
        return false;
      }

      std::auto_ptr<JobStepResult> result;

      {
        try
        {
          result.reset(running.GetJob().ExecuteStep());

          if (result->GetCode() == JobStepCode_Failure)
          {
            running.UpdateStatus(ErrorCode_InternalError);
          }
          else
          {
            running.UpdateStatus(ErrorCode_Success);
          }
        }
        catch (OrthancException& e)
        {
          running.UpdateStatus(e.GetErrorCode());
        }
        catch (boost::bad_lexical_cast&)
        {
          running.UpdateStatus(ErrorCode_BadFileFormat);
        }
        catch (...)
        {
          running.UpdateStatus(ErrorCode_InternalError);
        }

        if (result.get() == NULL)
        {
          result.reset(new JobStepResult(JobStepCode_Failure));
        }
      }

      switch (result->GetCode())
      {
        case JobStepCode_Success:
          running.MarkSuccess();
          return false;

        case JobStepCode_Failure:
          running.MarkFailure();
          return false;

        case JobStepCode_Retry:
          running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout());
          return false;

        case JobStepCode_Continue:
          return true;
            
        default:
          throw OrthancException(ErrorCode_InternalError);
      }
    }

    static void RetryHandler(JobsEngine* engine)
    {
      assert(engine != NULL);

      for (;;)
      {
        boost::this_thread::sleep(boost::posix_time::milliseconds(200));

        {
          boost::mutex::scoped_lock lock(engine->stateMutex_);

          if (engine->state_ != State_Running)
          {
            return;
          }
        }

        engine->GetRegistry().ScheduleRetries();
      }
    }

    static void Worker(JobsEngine* engine,
                       size_t workerIndex)
    {
      assert(engine != NULL);

      LOG(INFO) << "Worker thread " << workerIndex << " has started";

      for (;;)
      {
        {
          boost::mutex::scoped_lock lock(engine->stateMutex_);

          if (engine->state_ != State_Running)
          {
            return;
          }
        }

        JobsRegistry::RunningJob running(engine->GetRegistry(), 100);

        if (running.IsValid())
        {
          for (;;)
          {
            if (!engine->ExecuteStep(running, workerIndex))
            {
              break;
            }
          }
        }
      }      
    }

  public:
    JobsEngine() :
      state_(State_Setup),
      workers_(1)
    {
    }

    ~JobsEngine()
    {
      if (state_ != State_Setup &&
          state_ != State_Done)
      {
        LOG(ERROR) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!";
        Stop();
      }
    }

    void SetWorkersCount(size_t count)
    {
      if (count == 0)
      {
        throw OrthancException(ErrorCode_ParameterOutOfRange);
      }
      
      boost::mutex::scoped_lock lock(stateMutex_);
      
      if (state_ != State_Setup)
      {
        // Can only be invoked before calling "Start()"
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }

      workers_.resize(count);
    }
    
    JobsRegistry& GetRegistry()
    {
      return registry_;
    }

    void Start()
    {
      boost::mutex::scoped_lock lock(stateMutex_);

      if (state_ != State_Setup)
      {
        throw OrthancException(ErrorCode_BadSequenceOfCalls);
      }

      retryHandler_ = boost::thread(RetryHandler, this);

      for (size_t i = 0; i < workers_.size(); i++)
      {
        workers_[i] = boost::thread(Worker, this, i);
      }

      state_ = State_Running;

      LOG(WARNING) << "The jobs engine has started";
    }


    void Stop()
    {
      {
        boost::mutex::scoped_lock lock(stateMutex_);

        if (state_ != State_Running)
        {
          return;
        }
        
        state_ = State_Stopping;
      }

      LOG(INFO) << "Stopping the jobs engine";
      
      if (retryHandler_.joinable())
      {
        retryHandler_.join();
      }
      
      for (size_t i = 0; i < workers_.size(); i++)
      {
        if (workers_[i].joinable())
        {
          workers_[i].join();
        }
      }
      
      {
        boost::mutex::scoped_lock lock(stateMutex_);
        state_ = State_Done;
      }

      LOG(WARNING) << "The jobs engine has stopped";
    }
  };
}



class DummyJob : public Orthanc::IJob
{
private:
  JobStepResult  result_;
  unsigned int count_;
  unsigned int steps_;

public:
  DummyJob() :
    result_(Orthanc::JobStepCode_Success),
    count_(0),
    steps_(4)
  {
  }

  explicit DummyJob(JobStepResult result) :
    result_(result),
    count_(0),
    steps_(4)
  {
  }

  virtual JobStepResult* ExecuteStep()
  {
    boost::this_thread::sleep(boost::posix_time::milliseconds(50));

    if (count_ == steps_ - 1)
    {
      return new JobStepResult(result_);
    }
    else
    {
      count_++;
      return new JobStepResult(JobStepCode_Continue);
    }
  }

  virtual void ReleaseResources()
  {
  }

  virtual float GetProgress()
  {
    return static_cast<float>(count_) / static_cast<float>(steps_ - 1);
  }

  virtual void GetDescription(Json::Value& value)
  {
    value["hello"] = "world";
  }
};


static bool CheckState(Orthanc::JobsRegistry& registry,
                       const std::string& id,
                       Orthanc::JobState state)
{
  Orthanc::JobState s;
  if (registry.GetState(s, id))
  {
    return state == s;
  }
  else
  {
    return false;
  }
}


TEST(JobsRegistry, Priority)
{
  JobsRegistry registry;

  std::string i1, i2, i3, i4;
  registry.Submit(i1, new DummyJob(), 10);
  registry.Submit(i2, new DummyJob(), 30);
  registry.Submit(i3, new DummyJob(), 20);
  registry.Submit(i4, new DummyJob(), 5);  

  registry.SetMaxCompletedJobs(2);

  std::set<std::string> id;
  registry.ListJobs(id);

  ASSERT_EQ(4u, id.size());
  ASSERT_TRUE(id.find(i1) != id.end());
  ASSERT_TRUE(id.find(i2) != id.end());
  ASSERT_TRUE(id.find(i3) != id.end());
  ASSERT_TRUE(id.find(i4) != id.end());

  ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());
    ASSERT_EQ(30, job.GetPriority());
    ASSERT_EQ(i2, job.GetId());

    ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Failure));
  ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());
    ASSERT_EQ(20, job.GetPriority());
    ASSERT_EQ(i3, job.GetId());

    job.MarkSuccess();

    ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Success));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());
    ASSERT_EQ(10, job.GetPriority());
    ASSERT_EQ(i1, job.GetId());
  }

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());
    ASSERT_EQ(5, job.GetPriority());
    ASSERT_EQ(i4, job.GetId());
  }

  {
    JobsRegistry::RunningJob job(registry, 1);
    ASSERT_FALSE(job.IsValid());
  }

  Orthanc::JobState s;
  ASSERT_TRUE(registry.GetState(s, i1));
  ASSERT_FALSE(registry.GetState(s, i2));  // Removed because oldest
  ASSERT_FALSE(registry.GetState(s, i3));  // Removed because second oldest
  ASSERT_TRUE(registry.GetState(s, i4));

  registry.SetMaxCompletedJobs(1);  // (*)
  ASSERT_FALSE(registry.GetState(s, i1));  // Just discarded by (*)
  ASSERT_TRUE(registry.GetState(s, i4));
}


TEST(JobsRegistry, Simultaneous)
{
  JobsRegistry registry;

  std::string i1, i2;
  registry.Submit(i1, new DummyJob(), 20);
  registry.Submit(i2, new DummyJob(), 10);

  ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Pending));
  ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job1(registry, 0);
    JobsRegistry::RunningJob job2(registry, 0);

    ASSERT_TRUE(job1.IsValid());
    ASSERT_TRUE(job2.IsValid());

    job1.MarkFailure();
    job2.MarkSuccess();

    ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Running));
    ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Failure));
  ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Success));
}


TEST(JobsRegistry, Resubmit)
{
  JobsRegistry registry;

  std::string id;
  registry.Submit(id, new DummyJob(), 10);

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  registry.Resubmit(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());
    job.MarkFailure();

    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));

    registry.Resubmit(id);
    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure));

  registry.Resubmit(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());
    ASSERT_EQ(id, job.GetId());

    job.MarkSuccess();
    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));

  registry.Resubmit(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
}


TEST(JobsRegistry, Retry)
{
  JobsRegistry registry;

  std::string id;
  registry.Submit(id, new DummyJob(), 10);

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());
    job.MarkRetry(0);

    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry));

  registry.Resubmit(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry));
  
  registry.ScheduleRetries();
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());
    job.MarkSuccess();

    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
}


TEST(JobsRegistry, PausePending)
{
  JobsRegistry registry;

  std::string id;
  registry.Submit(id, new DummyJob(), 10);

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  registry.Pause(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));

  registry.Pause(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));

  registry.Resubmit(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));

  registry.Resume(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
}


TEST(JobsRegistry, PauseRunning)
{
  JobsRegistry registry;

  std::string id;
  registry.Submit(id, new DummyJob(), 10);

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());

    registry.Resubmit(id);
    job.MarkPause();
    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));

  registry.Resubmit(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));

  registry.Resume(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());

    job.MarkSuccess();
    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
}


TEST(JobsRegistry, PauseRetry)
{
  JobsRegistry registry;

  std::string id;
  registry.Submit(id, new DummyJob(), 10);

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());

    job.MarkRetry(0);
    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry));

  registry.Pause(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));

  registry.Resume(id);
  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));

  {
    JobsRegistry::RunningJob job(registry, 0);
    ASSERT_TRUE(job.IsValid());

    job.MarkSuccess();
    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
  }

  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
}


TEST(JobsEngine, Basic)
{
  JobsEngine engine;

  std::string s;

  for (size_t i = 0; i < 20; i++)
    engine.GetRegistry().Submit(s, new DummyJob(), rand() % 10);

  engine.SetWorkersCount(3);
  engine.Start();

  boost::this_thread::sleep(boost::posix_time::milliseconds(100));

  {
    typedef std::set<std::string> Jobs;

    Jobs jobs;
    engine.GetRegistry().ListJobs(jobs);

    Json::Value v = Json::arrayValue;
    for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it)
    {
      JobInfo info;

      if (engine.GetRegistry().GetJobInfo(info, *it))
      {
        Json::Value vv;
        info.Format(vv);
        v.append(vv);
      }
    }

    std::cout << v << std::endl;
  }
  std::cout << "====================================================" << std::endl;

  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  
  engine.Stop();


  {
    typedef std::set<std::string> Jobs;

    Jobs jobs;
    engine.GetRegistry().ListJobs(jobs);

    Json::Value v = Json::arrayValue;
    for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it)
    {
      JobInfo info;

      if (engine.GetRegistry().GetJobInfo(info, *it))
      {
        Json::Value vv;
        info.Format(vv);
        v.append(vv);
      }
    }

    std::cout << v << std::endl;
  }
}