changeset 2557:b4516a6f214b jobs

state machine
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 03 May 2018 13:45:31 +0200
parents 91e944c8389b
children 57f81b988713
files UnitTestsSources/MultiThreadingTests.cpp
diffstat 1 files changed, 918 insertions(+), 44 deletions(-) [+]
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp	Thu May 03 10:27:39 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Thu May 03 13:45:31 2018 +0200
@@ -272,7 +272,10 @@
 #  error The job engine cannot be used in sandboxed environments
 #endif
 
+#include "../Core/Logging.h"
+
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include <queue>
 
 namespace Orthanc
 {
@@ -289,24 +292,24 @@
   enum JobStepStatus
   {
     JobStepStatus_Success,
-    JobStepStatus_Error,
+    JobStepStatus_Failure,
     JobStepStatus_Continue,
     JobStepStatus_Retry
   };
 
 
-  class IJobStepResult : public boost::noncopyable
+  class JobStepResult
   {
   private:
     JobStepStatus status_;
     
   public:
-    explicit IJobStepResult(JobStepStatus status) :
-      status_(status)
+    explicit JobStepResult(JobStepStatus status) :
+    status_(status)
     {
     }
 
-    virtual ~IJobStepResult()
+    virtual ~JobStepResult()
     {
     }
 
@@ -317,15 +320,15 @@
   };
 
 
-  class RetryResult : public IJobStepResult
+  class RetryResult : public JobStepResult
   {
   private:
     unsigned int  timeout_;   // Retry after "timeout_" milliseconds
 
   public:
     RetryResult(unsigned int timeout) :
-      IJobStepResult(JobStepStatus_Retry),
-      timeout_(timeout)
+    JobStepResult(JobStepStatus_Retry),
+    timeout_(timeout)
     {
     }
 
@@ -343,7 +346,7 @@
     {
     }
 
-    virtual IJobStepResult* ExecuteStep() = 0;
+    virtual JobStepResult* ExecuteStep() = 0;
 
     virtual void ReleaseResources() = 0;   // For pausing jobs
 
@@ -353,83 +356,690 @@
   };
 
 
+  class JobHandler : public boost::noncopyable
+  {
+  private:
+    std::string               id_;
+    JobState                  state_;
+    std::auto_ptr<IJob>       job_;
+    int                       priority_;  // "+inf()" means highest priority
+    boost::posix_time::ptime  creationTime_;
+    boost::posix_time::ptime  lastUpdateTime_;
+    boost::posix_time::ptime  retryTime_;
+    uint64_t                  runtime_;  // In milliseconds
+    bool                      pauseScheduled_;
+
+    void SetStateInternal(JobState state) 
+    {
+      const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
+
+      if (state_ == JobState_Running)
+      {
+        runtime_ += (now - lastUpdateTime_).total_milliseconds();
+      }
+
+      state_ = state;
+      lastUpdateTime_ = now;
+      pauseScheduled_ = false;
+    }
+
+  public:
+    JobHandler(IJob* job,
+               int priority) :
+      id_(Toolbox::GenerateUuid()),
+      state_(JobState_Pending),
+      job_(job),
+      priority_(priority),
+      creationTime_(boost::posix_time::microsec_clock::universal_time()),
+      lastUpdateTime_(creationTime_),
+      runtime_(0),
+      pauseScheduled_(false)
+    {
+      if (job == NULL)
+      {
+        throw OrthancException(ErrorCode_NullPointer);
+      }
+    }
+
+    const std::string& GetId() const
+    {
+      return id_;
+    }
+
+    IJob& GetJob() const
+    {
+      assert(job_.get() != NULL);
+      return *job_;
+    }
+
+    void SetPriority(int priority)
+    {
+      priority_ = priority;
+    }
+
+    int GetPriority() const
+    {
+      return priority_;
+    }
+
+    JobState GetState() const
+    {
+      return state_;
+    }
+
+    void SetState(JobState state) 
+    {
+      if (state == JobState_Retry)
+      {
+        // Use "SetRetryState()"
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        SetStateInternal(state);
+      }
+    }
+
+    void SetRetryState(unsigned int timeout)
+    {
+      if (state_ == JobState_Running)
+      {
+        SetStateInternal(JobState_Retry);
+        retryTime_ = (boost::posix_time::microsec_clock::universal_time() + 
+                      boost::posix_time::milliseconds(timeout));
+      }
+      else
+      {
+        // Only valid for running jobs
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+
+    void SchedulePause()
+    {
+      if (state_ == JobState_Running)
+      {
+        pauseScheduled_ = true;
+      }
+      else
+      {
+        // Only valid for running jobs
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+
+    bool IsPauseScheduled()
+    {
+      return pauseScheduled_;
+    }
+
+    bool IsRetryReady(const boost::posix_time::ptime& now) const
+    {
+      if (state_ != JobState_Retry)
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        return retryTime_ >= now;
+      }
+    }
+  };
+
+
   class JobsMonitor : public boost::noncopyable
   {
   private:
-    class JobHandler : public boost::noncopyable
+    struct PriorityComparator
     {
-    private:
-      std::string               id_;
-      JobState                  state_;
-      std::auto_ptr<IJob>       job_;
-      int                       priority_;  // "+inf()" means highest priority
-      boost::posix_time::ptime  creationTime_;
-      boost::posix_time::ptime  lastUpdateTime_;
-      uint64_t                  runtime_;  // In milliseconds
+      bool operator() (JobHandler*& a,
+                       JobHandler*& b) const
+      {
+        return a->GetPriority() < b->GetPriority();
+      }                       
+    };
+
+    typedef std::map<std::string, JobHandler*>              JobsIndex;
+    typedef std::list<const JobHandler*>                    CompletedJobs;
+    typedef std::set<JobHandler*>                           RetryJobs;
+    typedef std::priority_queue<JobHandler*, 
+                                std::vector<JobHandler*>,   // Could be a "std::deque"
+                                PriorityComparator>         PendingJobs;
+
+    boost::mutex               mutex_;
+    JobsIndex                  jobsIndex_;
+    PendingJobs                pendingJobs_;
+    CompletedJobs              completedJobs_;
+    RetryJobs                  retryJobs_;
+
+    boost::condition_variable  pendingJobAvailable_;
+    size_t                     maxCompletedJobs_;
+
+
+#ifndef NDEBUG
+    bool IsPendingJob(const JobHandler& job) const
+    {
+      PendingJobs copy = pendingJobs_;
+      while (!copy.empty())
+      {
+        if (copy.top() == &job)
+        {
+          return true;
+        }
 
-    public:
-      JobHandler(IJob* job,
-                 int priority) :
-        id_(Toolbox::GenerateUuid()),
-        state_(JobState_Pending),
-        job_(job),
-        priority_(priority),
-        creationTime_(boost::posix_time::microsec_clock::universal_time()),
-        lastUpdateTime_(creationTime_),
-        runtime_(0)
+        copy.pop();
+      }
+
+      return false;
+    }
+
+    bool IsCompletedJob(const JobHandler& job) const
+    {
+      for (CompletedJobs::const_iterator it = completedJobs_.begin();
+           it != completedJobs_.end(); ++it)
       {
-        if (job == NULL)
+        if (*it == &job)
         {
-          throw OrthancException(ErrorCode_NullPointer);
+          return true;
+        }
+      }
+
+      return false;
+    }
+
+    bool IsRetryJob(JobHandler& job) const
+    {
+      return retryJobs_.find(&job) != retryJobs_.end();
+    }
+#endif
+
+
+    void CheckInvariants()
+    {
+#ifndef NDEBUG
+      {
+        PendingJobs copy = pendingJobs_;
+        while (!copy.empty())
+        {
+          assert(copy.top()->GetState() == JobState_Pending);
+          copy.pop();
         }
       }
 
-      const std::string& GetId() const
+      assert(completedJobs_.size() <= maxCompletedJobs_);
+
+      for (CompletedJobs::const_iterator it = completedJobs_.begin();
+           it != completedJobs_.end(); ++it)
+      {
+        assert((*it)->GetState() == JobState_Success ||
+               (*it)->GetState() == JobState_Failure);
+      }
+
+      for (RetryJobs::const_iterator it = retryJobs_.begin();
+           it != retryJobs_.end(); ++it)
+      {
+        assert((*it)->GetState() == JobState_Retry);
+      }
+
+      for (JobsIndex::iterator it = jobsIndex_.begin();
+           it != jobsIndex_.end(); ++it)
+      {
+        JobHandler& job = *it->second;
+
+        assert(job.GetId() == it->first);
+
+        switch (job.GetState())
+        {
+          case JobState_Pending:
+            assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
+            break;
+            
+          case JobState_Success:
+          case JobState_Failure:
+            assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
+            break;
+            
+          case JobState_Retry:
+            assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
+            break;
+            
+          case JobState_Running:
+          case JobState_Paused:
+            assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
+            break;
+
+          default:
+            throw OrthancException(ErrorCode_InternalError);
+        }
+      }
+#endif
+    }
+
+
+    void ForgetOldCompletedJobs()
+    {
+      if (maxCompletedJobs_ != 0)
       {
-        return id_;
+        while (completedJobs_.size() > maxCompletedJobs_)
+        {
+          assert(completedJobs_.front() != NULL);
+
+          std::string id = completedJobs_.front()->GetId();
+
+          assert(jobsIndex_.find(id) != jobsIndex_.end());
+
+          jobsIndex_.erase(id);
+          delete(completedJobs_.front());
+          completedJobs_.pop_front();
+        }
       }
-    };
+    }
+
+
+    void MarkRunningAsCompleted(JobHandler& job,
+                                bool success)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+      assert(job.GetState() == JobState_Running);
+
+      job.SetState(success ? JobState_Success : JobState_Failure);
+
+      completedJobs_.push_back(&job);
+      ForgetOldCompletedJobs();
+
+      CheckInvariants();
+    }
+
+
+    void MarkRunningAsRetry(JobHandler& job,
+                            unsigned int timeout)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      assert(job.GetState() == JobState_Running &&
+             retryJobs_.find(&job) == retryJobs_.end());
+
+      retryJobs_.insert(&job);
+      job.SetRetryState(timeout);
+
+      CheckInvariants();
+    }
+
+
+    void MarkRunningAsPaused(JobHandler& job)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+      assert(job.GetState() == JobState_Running);
+
+      job.SetState(JobState_Paused);
+
+      CheckInvariants();
+    }
+
+
+    JobHandler* WaitPendingJob(unsigned int timeout)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      while (pendingJobs_.empty())
+      {
+        if (timeout == 0)
+        {
+          pendingJobAvailable_.wait(lock);
+        }
+        else
+        {
+          bool success = pendingJobAvailable_.timed_wait
+            (lock, boost::posix_time::milliseconds(timeout));
+          if (!success)
+          {
+            return NULL;
+          }
+        }
+      }
+
+      JobHandler* job = pendingJobs_.top();
+      pendingJobs_.pop();
+      
+      job->SetState(JobState_Running);
+      return job;
+    }
+
 
   public:
+    JobsMonitor() :
+      maxCompletedJobs_(10)
+    {
+    }
+
+
+    ~JobsMonitor()
+    {
+      for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
+      {
+        assert(it->second != NULL);
+        delete it->second;
+      }
+    }
+
+
+    void SetMaxCompletedJobs(size_t i)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      maxCompletedJobs_ = i;
+      ForgetOldCompletedJobs();
+
+      CheckInvariants();
+    }
+
+
+    void ListJobs(std::set<std::string>& target)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      for (JobsIndex::const_iterator it = jobsIndex_.begin();
+           it != jobsIndex_.end(); ++it)
+      {
+        target.insert(it->first);
+      }
+    }
+
+
     void Submit(std::string& id,
-                IJob* job,
+                IJob* job,        // Takes ownership
                 int priority)
     {
       std::auto_ptr<JobHandler>  handler(new JobHandler(job, priority));
+
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+      
       id = handler->GetId();
+      pendingJobs_.push(handler.get());
+      jobsIndex_.insert(std::make_pair(id, handler.release()));
+
+      pendingJobAvailable_.notify_one();
+
+      CheckInvariants();
     }
 
+
+    void Submit(IJob* job,        // Takes ownership
+                int priority)
+    {
+      std::string id;
+      Submit(id, job, priority);
+    }
+
+
     void SetPriority(const std::string& id,
                      int priority)
     {
-      // TODO
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::iterator found = jobsIndex_.find(id);
+
+      if (found == jobsIndex_.end())
+      {
+        LOG(WARNING) << "Unknown job: " << id;
+      }
+      else
+      {
+        found->second->SetPriority(priority);
+
+        if (found->second->GetState() == JobState_Pending)
+        {
+          // If the job is pending, we need to reconstruct the
+          // priority queue, as the heap condition has changed
+
+          PendingJobs copy;
+          std::swap(copy, pendingJobs_);
+
+          assert(pendingJobs_.empty());
+          while (!copy.empty())
+          {
+            pendingJobs_.push(copy.top());
+            copy.pop();
+          }
+        }
+      }
+
+      CheckInvariants();
     }
 
+
     void Pause(const std::string& id)
     {
-      // TODO
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::iterator found = jobsIndex_.find(id);
+
+      if (found == jobsIndex_.end())
+      {
+        LOG(WARNING) << "Unknown job: " << id;
+      }
+      else
+      {
+        switch (found->second->GetState())
+        {
+          case JobState_Pending:
+          {
+            // If the job is pending, we need to reconstruct the
+            // priority queue to remove it
+            PendingJobs copy;
+            std::swap(copy, pendingJobs_);
+
+            assert(pendingJobs_.empty());
+            while (!copy.empty())
+            {
+              if (copy.top()->GetId() != id)
+              {
+                pendingJobs_.push(copy.top());
+              }
+
+              copy.pop();
+            }
+
+            found->second->SetState(JobState_Paused);
+
+            break;
+          }
+
+          case JobState_Retry:
+          {
+            RetryJobs::iterator item = retryJobs_.find(found->second);
+            assert(item != retryJobs_.end());            
+            retryJobs_.erase(item);
+
+            found->second->SetState(JobState_Paused);
+
+            break;
+          }
+
+          case JobState_Paused:
+          case JobState_Success:
+          case JobState_Failure:
+            // Nothing to be done
+            break;
+
+          case JobState_Running:
+            found->second->SchedulePause();
+            break;
+
+          default:
+            throw OrthancException(ErrorCode_InternalError);
+        }
+      }
+
+      CheckInvariants();
     }
 
+
     void Resume(const std::string& id)
     {
-      // TODO
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::iterator found = jobsIndex_.find(id);
+
+      if (found == jobsIndex_.end())
+      {
+        LOG(WARNING) << "Unknown job: " << id;
+      }
+      else if (found->second->GetState() != JobState_Paused)
+      {
+        LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
+      }
+      else
+      {
+        found->second->SetState(JobState_Pending);
+        pendingJobs_.push(found->second);
+        pendingJobAvailable_.notify_one();
+      }
+
+      CheckInvariants();
     }
 
+
     void Resubmit(const std::string& id)
     {
-      // TODO
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::iterator found = jobsIndex_.find(id);
+
+      if (found == jobsIndex_.end())
+      {
+        LOG(WARNING) << "Unknown job: " << id;
+      }
+      else if (found->second->GetState() != JobState_Failure)
+      {
+        LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
+      }
+      else
+      {
+        bool ok = false;
+        for (CompletedJobs::iterator it = completedJobs_.begin(); 
+             it != completedJobs_.end(); ++it)
+        {
+          if (*it == found->second)
+          {
+            ok = true;
+            completedJobs_.erase(it);
+            break;
+          }
+        }
+
+        assert(ok);
+
+        found->second->SetState(JobState_Pending);
+        pendingJobs_.push(found->second);
+        pendingJobAvailable_.notify_one();
+      }
+
+      CheckInvariants();
     }
 
-    class JobToRun : public boost::noncopyable
+
+    void ScheduleRetries()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      RetryJobs copy;
+      std::swap(copy, retryJobs_);
+
+      const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
+
+      assert(retryJobs_.empty());
+      for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
+      {
+        if ((*it)->IsRetryReady(now))
+        {
+          (*it)->SetState(JobState_Pending);
+        }
+        else
+        {
+          retryJobs_.insert(*it);
+        }
+      }
+
+      CheckInvariants();
+    }
+
+
+    bool GetState(JobState& state,
+                  const std::string& id)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::const_iterator it = jobsIndex_.find(id);
+      if (it == jobsIndex_.end())
+      {
+        return false;
+      }
+      else
+      {
+        state = it->second->GetState();
+        return true;
+      }
+    }
+
+
+    class RunningJob : public boost::noncopyable
     {
     private:
-      JobHandler*  handler_;
+      JobsMonitor&  that_;
+      JobHandler*   handler_;
+      JobState      targetState_;
+      unsigned int  retryTimeout_;
       
     public:
-      JobToRun(JobsMonitor& that,
-               unsigned int timeout) :
-        handler_(NULL)
+      RunningJob(JobsMonitor& that,
+                 unsigned int timeout) :
+        that_(that),
+        handler_(NULL),
+        targetState_(JobState_Failure),
+        retryTimeout_(0)
+      {
+        handler_ = that_.WaitPendingJob(timeout);
+      }
+
+      ~RunningJob()
       {
+        if (IsValid())
+        {
+          switch (targetState_)
+          {
+            case JobState_Failure:
+              that_.MarkRunningAsCompleted(*handler_, false);
+              break;
+
+            case JobState_Success:
+              that_.MarkRunningAsCompleted(*handler_, true);
+              break;
+
+            case JobState_Paused:
+              that_.MarkRunningAsPaused(*handler_);
+              break;            
+
+            case JobState_Retry:
+              that_.MarkRunningAsRetry(*handler_, retryTimeout_);
+              break;
+            
+            default:
+              assert(0);
+          }
+        }
       }
 
       bool IsValid() const
@@ -437,7 +1047,271 @@
         return handler_ != NULL;
       }
 
-      
+      const std::string& GetId() const
+      {
+        if (IsValid())
+        {
+          return handler_->GetId();
+        }
+        else
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }      
+      }
+
+      int GetPriority() const
+      {
+        if (IsValid())
+        {
+          return handler_->GetPriority();
+        }
+        else
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }      
+      }
+
+      bool IsPauseScheduled()
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+
+        boost::mutex::scoped_lock lock(that_.mutex_);
+        that_.CheckInvariants();
+        assert(handler_->GetState() == JobState_Running);
+
+        return handler_->IsPauseScheduled();
+      }
+
+      IJob& GetJob()
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+
+        boost::mutex::scoped_lock lock(that_.mutex_);
+        that_.CheckInvariants();
+        assert(handler_->GetState() == JobState_Running);
+
+        return handler_->GetJob();
+      }
+
+      void MarkSuccess()
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+
+        targetState_ = JobState_Success;
+      }
+
+      void MarkFailure()
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+
+        targetState_ = JobState_Failure;
+      }
+
+      void MarkPause()
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+
+        targetState_ = JobState_Paused;
+      }
+
+      void MarkRetry(unsigned int timeout)
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+
+        targetState_ = JobState_Retry;
+        retryTimeout_ = timeout;
+      }
     };
   };
 }
+
+
+
+class DummyJob : public Orthanc::IJob
+{
+private:
+  JobStepResult  result_;
+
+public:
+  DummyJob() :
+    result_(Orthanc::JobStepStatus_Success)
+  {
+  }
+
+  explicit DummyJob(JobStepResult result) :
+  result_(result)
+  {
+  }
+
+  virtual JobStepResult* ExecuteStep()
+  {
+    return new JobStepResult(result_);
+  }
+
+  virtual void ReleaseResources()
+  {
+  }
+
+  virtual float GetProgress()
+  {
+    return 0;
+  }
+
+  virtual void FormatStatus(Json::Value& value)
+  {
+  }
+};
+
+
+static bool CheckState(Orthanc::JobsMonitor& monitor,
+                       const std::string& id,
+                       Orthanc::JobState state)
+{
+  Orthanc::JobState s;
+  if (monitor.GetState(s, id))
+  {
+    return state == s;
+  }
+  else
+  {
+    return false;
+  }
+}
+
+
+TEST(JobsMonitor, Priority)
+{
+  JobsMonitor monitor;
+
+  std::string i1, i2, i3, i4;
+  monitor.Submit(i1, new DummyJob(), 10);
+  monitor.Submit(i2, new DummyJob(), 30);
+  monitor.Submit(i3, new DummyJob(), 20);
+  monitor.Submit(i4, new DummyJob(), 5);  
+
+  monitor.SetMaxCompletedJobs(2);
+
+  std::set<std::string> id;
+  monitor.ListJobs(id);
+
+  ASSERT_EQ(4u, id.size());
+  ASSERT_TRUE(id.find(i1) != id.end());
+  ASSERT_TRUE(id.find(i2) != id.end());
+  ASSERT_TRUE(id.find(i3) != id.end());
+  ASSERT_TRUE(id.find(i4) != id.end());
+
+  ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Pending));
+
+  {
+    JobsMonitor::RunningJob job(monitor, 0);
+    ASSERT_TRUE(job.IsValid());
+    ASSERT_EQ(30, job.GetPriority());
+    ASSERT_EQ(i2, job.GetId());
+
+    ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Running));
+  }
+
+  ASSERT_TRUE(CheckState(monitor, i2, Orthanc::JobState_Failure));
+  ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Pending));
+
+  {
+    JobsMonitor::RunningJob job(monitor, 0);
+    ASSERT_TRUE(job.IsValid());
+    ASSERT_EQ(20, job.GetPriority());
+    ASSERT_EQ(i3, job.GetId());
+
+    job.MarkSuccess();
+
+    ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Running));
+  }
+
+  ASSERT_TRUE(CheckState(monitor, i3, Orthanc::JobState_Success));
+
+  {
+    JobsMonitor::RunningJob job(monitor, 0);
+    ASSERT_TRUE(job.IsValid());
+    ASSERT_EQ(10, job.GetPriority());
+    ASSERT_EQ(i1, job.GetId());
+  }
+
+  {
+    JobsMonitor::RunningJob job(monitor, 0);
+    ASSERT_TRUE(job.IsValid());
+    ASSERT_EQ(5, job.GetPriority());
+    ASSERT_EQ(i4, job.GetId());
+  }
+
+  {
+    JobsMonitor::RunningJob job(monitor, 1);
+    ASSERT_FALSE(job.IsValid());
+  }
+
+  Orthanc::JobState s;
+  ASSERT_TRUE(monitor.GetState(s, i1));
+  ASSERT_FALSE(monitor.GetState(s, i2));  // Removed because oldest
+  ASSERT_FALSE(monitor.GetState(s, i3));  // Removed because second oldest
+  ASSERT_TRUE(monitor.GetState(s, i4));
+
+  monitor.SetMaxCompletedJobs(1);  // (*)
+  ASSERT_FALSE(monitor.GetState(s, i1));  // Just discarded by (*)
+  ASSERT_TRUE(monitor.GetState(s, i4));
+}
+
+
+TEST(JobsMonitor, Resubmit)
+{
+  JobsMonitor monitor;
+
+  std::string id;
+  monitor.Submit(id, new DummyJob(), 10);
+
+  ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending));
+
+  monitor.Resubmit(id);
+  ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending));
+
+  {
+    JobsMonitor::RunningJob job(monitor, 0);
+    ASSERT_TRUE(job.IsValid());
+    job.MarkFailure();
+
+    ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running));
+
+    monitor.Resubmit(id);
+    ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running));
+  }
+
+  ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Failure));
+
+  monitor.Resubmit(id);
+  ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Pending));
+
+  {
+    JobsMonitor::RunningJob job(monitor, 0);
+    ASSERT_TRUE(job.IsValid());
+    ASSERT_EQ(id, job.GetId());
+
+    job.MarkSuccess();
+    ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Running));
+  }
+
+  ASSERT_TRUE(CheckState(monitor, id, Orthanc::JobState_Success));
+}