changeset 2563:98dfc1948d00 jobs

RunningJob::ExecuteStep()
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 04 May 2018 17:47:02 +0200
parents 1e66fe3ddf9f
children f8681f251caa
files UnitTestsSources/MultiThreadingTests.cpp
diffstat 1 files changed, 658 insertions(+), 620 deletions(-) [+]
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp	Fri May 04 17:28:47 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Fri May 04 17:47:02 2018 +0200
@@ -306,7 +306,7 @@
     
   public:
     explicit JobStepResult(JobStepCode status) :
-    status_(status)
+      status_(status)
     {
     }
 
@@ -328,8 +328,8 @@
 
   public:
     RetryResult(unsigned int timeout) :
-    JobStepResult(JobStepCode_Retry),
-    timeout_(timeout)
+      JobStepResult(JobStepCode_Retry),
+      timeout_(timeout)
     {
     }
 
@@ -353,16 +353,18 @@
 
     virtual float GetProgress() = 0;
 
-    virtual void FormatStatus(Json::Value& value) = 0;
+    virtual void GetDescription(Json::Value& value) = 0;
   };
 
 
-  struct JobStatus
+  class JobStatus
   {
+  private:
     ErrorCode      errorCode_;
     float          progress_;
     Json::Value    description_;
 
+  public:
     JobStatus() :
       errorCode_(ErrorCode_Success),
       progress_(0),
@@ -371,16 +373,32 @@
     }
 
     JobStatus(ErrorCode code,
-              float progress) :
+              IJob& job) :
       errorCode_(code),
-      progress_(progress),
-      description_(Json::objectValue)
+      progress_(job.GetProgress())
     {
-      if (progress < 0 ||
-          progress > 1)
+      if (progress_ < 0 ||
+          progress_ > 1)
       {
         throw OrthancException(ErrorCode_ParameterOutOfRange);
       }
+
+      job.GetDescription(description_);
+    }
+
+    ErrorCode GetErrorCode() const
+    {
+      return errorCode_;
+    }
+
+    float GetProgress() const
+    {
+      return progress_;
+    }
+
+    const Json::Value& GetDescription() const
+    {
+      return description_;
     }
   };
 
@@ -414,7 +432,7 @@
       status_(status)
     {
       float ms = static_cast<float>(runtime_.total_milliseconds());
-      float remaining = boost::math::llround(1.0f - status_.progress_) * ms;
+      float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms;
       eta_ = infoTime_ + boost::posix_time::milliseconds(remaining);
     }
 
@@ -601,14 +619,14 @@
       }
     }
 
-    JobStatus& GetLastStatus()
+    const JobStatus& GetLastStatus() const
     {
       return lastStatus_;
     }
 
-    const JobStatus& GetLastStatus() const
+    void SetLastStatus(const JobStatus& status)
     {
-      return lastStatus_;
+      lastStatus_ = status;
     }
   };
 
@@ -626,747 +644,767 @@
     };
 
     typedef std::map<std::string, JobHandler*>              JobsIndex;
-    typedef std::list<const JobHandler*>                    CompletedJobs;
+    typedef std::list<JobHandler*>                          CompletedJobs;
     typedef std::set<JobHandler*>                           RetryJobs;
     typedef std::priority_queue<JobHandler*, 
                                 std::vector<JobHandler*>,   // Could be a "std::deque"
                                 PriorityComparator>         PendingJobs;
 
-  boost::mutex               mutex_;
-  JobsIndex                  jobsIndex_;
-  PendingJobs                pendingJobs_;
-  CompletedJobs              completedJobs_;
-  RetryJobs                  retryJobs_;
+    boost::mutex               mutex_;
+    JobsIndex                  jobsIndex_;
+    PendingJobs                pendingJobs_;
+    CompletedJobs              completedJobs_;
+    RetryJobs                  retryJobs_;
 
-  boost::condition_variable  pendingJobAvailable_;
-  size_t                     maxCompletedJobs_;
+    boost::condition_variable  pendingJobAvailable_;
+    size_t                     maxCompletedJobs_;
 
 
 #ifndef NDEBUG
-  bool IsPendingJob(const JobHandler& job) const
-  {
-    PendingJobs copy = pendingJobs_;
-    while (!copy.empty())
-    {
-      if (copy.top() == &job)
-      {
-        return true;
-      }
-
-      copy.pop();
-    }
-
-    return false;
-  }
-
-  bool IsCompletedJob(const JobHandler& job) const
-  {
-    for (CompletedJobs::const_iterator it = completedJobs_.begin();
-         it != completedJobs_.end(); ++it)
-    {
-      if (*it == &job)
-      {
-        return true;
-      }
-    }
-
-    return false;
-  }
-
-  bool IsRetryJob(JobHandler& job) const
-  {
-    return retryJobs_.find(&job) != retryJobs_.end();
-  }
-#endif
-
-
-  void CheckInvariants()
-  {
-#ifndef NDEBUG
+    bool IsPendingJob(const JobHandler& job) const
     {
       PendingJobs copy = pendingJobs_;
       while (!copy.empty())
       {
-        assert(copy.top()->GetState() == JobState_Pending);
+        if (copy.top() == &job)
+        {
+          return true;
+        }
+
         copy.pop();
       }
-    }
 
-    assert(completedJobs_.size() <= maxCompletedJobs_);
-
-    for (CompletedJobs::const_iterator it = completedJobs_.begin();
-         it != completedJobs_.end(); ++it)
-    {
-      assert((*it)->GetState() == JobState_Success ||
-             (*it)->GetState() == JobState_Failure);
-    }
-
-    for (RetryJobs::const_iterator it = retryJobs_.begin();
-         it != retryJobs_.end(); ++it)
-    {
-      assert((*it)->GetState() == JobState_Retry);
+      return false;
     }
 
-    for (JobsIndex::iterator it = jobsIndex_.begin();
-         it != jobsIndex_.end(); ++it)
+    bool IsCompletedJob(JobHandler& job) const
     {
-      JobHandler& job = *it->second;
-
-      assert(job.GetId() == it->first);
-
-      switch (job.GetState())
+      for (CompletedJobs::const_iterator it = completedJobs_.begin();
+           it != completedJobs_.end(); ++it)
       {
-        case JobState_Pending:
-          assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
-          break;
-            
-        case JobState_Success:
-        case JobState_Failure:
-          assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
-          break;
-            
-        case JobState_Retry:
-          assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
-          break;
-            
-        case JobState_Running:
-        case JobState_Paused:
-          assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
-          break;
+        if (*it == &job)
+        {
+          return true;
+        }
+      }
 
-        default:
-          throw OrthancException(ErrorCode_InternalError);
-      }
+      return false;
+    }
+
+    bool IsRetryJob(JobHandler& job) const
+    {
+      return retryJobs_.find(&job) != retryJobs_.end();
     }
 #endif
-  }
-
-
-  void ForgetOldCompletedJobs()
-  {
-    if (maxCompletedJobs_ != 0)
-    {
-      while (completedJobs_.size() > maxCompletedJobs_)
-      {
-        assert(completedJobs_.front() != NULL);
-
-        std::string id = completedJobs_.front()->GetId();
-        assert(jobsIndex_.find(id) != jobsIndex_.end());
-
-        jobsIndex_.erase(id);
-        delete(completedJobs_.front());
-        completedJobs_.pop_front();
-      }
-    }
-  }
-
-
-  void MarkRunningAsCompleted(JobHandler& job,
-                              bool success)
-  {
-    LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
-              << ": " << job.GetId();
-
-    CheckInvariants();
-    assert(job.GetState() == JobState_Running);
-
-    job.SetState(success ? JobState_Success : JobState_Failure);
-
-    completedJobs_.push_back(&job);
-    ForgetOldCompletedJobs();
-
-    CheckInvariants();
-  }
-
-
-  void MarkRunningAsRetry(JobHandler& job,
-                          unsigned int timeout)
-  {
-    LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();
-
-    CheckInvariants();
-
-    assert(job.GetState() == JobState_Running &&
-           retryJobs_.find(&job) == retryJobs_.end());
-
-    retryJobs_.insert(&job);
-    job.SetRetryState(timeout);
-
-    CheckInvariants();
-  }
 
 
-  void MarkRunningAsPaused(JobHandler& job)
-  {
-    LOG(INFO) << "Job paused: " << job.GetId();
+    void CheckInvariants() const
+    {
+#ifndef NDEBUG
+      {
+        PendingJobs copy = pendingJobs_;
+        while (!copy.empty())
+        {
+          assert(copy.top()->GetState() == JobState_Pending);
+          copy.pop();
+        }
+      }
 
-    CheckInvariants();
-    assert(job.GetState() == JobState_Running);
+      assert(completedJobs_.size() <= maxCompletedJobs_);
 
-    job.SetState(JobState_Paused);
+      for (CompletedJobs::const_iterator it = completedJobs_.begin();
+           it != completedJobs_.end(); ++it)
+      {
+        assert((*it)->GetState() == JobState_Success ||
+               (*it)->GetState() == JobState_Failure);
+      }
 
-    CheckInvariants();
-  }
-
+      for (RetryJobs::const_iterator it = retryJobs_.begin();
+           it != retryJobs_.end(); ++it)
+      {
+        assert((*it)->GetState() == JobState_Retry);
+      }
 
-public:
-  JobsRegistry() :
-    maxCompletedJobs_(10)
-  {
-  }
+      for (JobsIndex::const_iterator it = jobsIndex_.begin();
+           it != jobsIndex_.end(); ++it)
+      {
+        JobHandler& job = *it->second;
 
+        assert(job.GetId() == it->first);
 
-  ~JobsRegistry()
-  {
-    for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
-    {
-      assert(it->second != NULL);
-      delete it->second;
+        switch (job.GetState())
+        {
+          case JobState_Pending:
+            assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
+            break;
+            
+          case JobState_Success:
+          case JobState_Failure:
+            assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
+            break;
+            
+          case JobState_Retry:
+            assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
+            break;
+            
+          case JobState_Running:
+          case JobState_Paused:
+            assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
+            break;
+
+          default:
+            throw OrthancException(ErrorCode_InternalError);
+        }
+      }
+#endif
     }
-  }
-
-
-  void SetMaxCompletedJobs(size_t i)
-  {
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
-
-    maxCompletedJobs_ = i;
-    ForgetOldCompletedJobs();
-
-    CheckInvariants();
-  }
-
-
-  void ListJobs(std::set<std::string>& target)
-  {
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
-
-    for (JobsIndex::const_iterator it = jobsIndex_.begin();
-         it != jobsIndex_.end(); ++it)
-    {
-      target.insert(it->first);
-    }
-  }
 
 
-  void Submit(std::string& id,
-              IJob* job,        // Takes ownership
-              int priority)
-  {
-    std::auto_ptr<JobHandler>  handler(new JobHandler(job, priority));
-
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
-      
-    id = handler->GetId();
-
-    pendingJobs_.push(handler.get());
-    pendingJobAvailable_.notify_one();
-
-    jobsIndex_.insert(std::make_pair(id, handler.release()));
-
-    LOG(INFO) << "New job submitted: " << id;
-
-    CheckInvariants();
-  }
-
-
-  void Submit(IJob* job,        // Takes ownership
-              int priority)
-  {
-    std::string id;
-    Submit(id, job, priority);
-  }
-
+    void ForgetOldCompletedJobs()
+    {
+      if (maxCompletedJobs_ != 0)
+      {
+        while (completedJobs_.size() > maxCompletedJobs_)
+        {
+          assert(completedJobs_.front() != NULL);
 
-  void SetPriority(const std::string& id,
-                   int priority)
-  {
-    LOG(INFO) << "Changing priority to " << priority << " for job: " << id;
-
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
-
-    JobsIndex::iterator found = jobsIndex_.find(id);
+          std::string id = completedJobs_.front()->GetId();
+          assert(jobsIndex_.find(id) != jobsIndex_.end());
 
-    if (found == jobsIndex_.end())
-    {
-      LOG(WARNING) << "Unknown job: " << id;
-    }
-    else
-    {
-      found->second->SetPriority(priority);
-
-      if (found->second->GetState() == JobState_Pending)
-      {
-        // If the job is pending, we need to reconstruct the
-        // priority queue, as the heap condition has changed
-
-        PendingJobs copy;
-        std::swap(copy, pendingJobs_);
-
-        assert(pendingJobs_.empty());
-        while (!copy.empty())
-        {
-          pendingJobs_.push(copy.top());
-          copy.pop();
+          jobsIndex_.erase(id);
+          delete(completedJobs_.front());
+          completedJobs_.pop_front();
         }
       }
     }
 
-    CheckInvariants();
-  }
+
+    void MarkRunningAsCompleted(JobHandler& job,
+                                bool success)
+    {
+      LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
+                << ": " << job.GetId();
+
+      CheckInvariants();
+      assert(job.GetState() == JobState_Running);
+
+      job.SetState(success ? JobState_Success : JobState_Failure);
+
+      completedJobs_.push_back(&job);
+      ForgetOldCompletedJobs();
+
+      CheckInvariants();
+    }
+
+
+    void MarkRunningAsRetry(JobHandler& job,
+                            unsigned int timeout)
+    {
+      LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();
+
+      CheckInvariants();
+
+      assert(job.GetState() == JobState_Running &&
+             retryJobs_.find(&job) == retryJobs_.end());
+
+      retryJobs_.insert(&job);
+      job.SetRetryState(timeout);
+
+      CheckInvariants();
+    }
+
+
+    void MarkRunningAsPaused(JobHandler& job)
+    {
+      LOG(INFO) << "Job paused: " << job.GetId();
+
+      CheckInvariants();
+      assert(job.GetState() == JobState_Running);
+
+      job.SetState(JobState_Paused);
+
+      CheckInvariants();
+    }
+
+
+  public:
+    JobsRegistry() :
+      maxCompletedJobs_(10)
+    {
+    }
+
+
+    ~JobsRegistry()
+    {
+      for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
+      {
+        assert(it->second != NULL);
+        delete it->second;
+      }
+    }
 
 
-  void Pause(const std::string& id)
-  {
-    LOG(INFO) << "Pausing job: " << id;
+    void SetMaxCompletedJobs(size_t i)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      maxCompletedJobs_ = i;
+      ForgetOldCompletedJobs();
+
+      CheckInvariants();
+    }
+
+
+    void ListJobs(std::set<std::string>& target)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
 
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
+      for (JobsIndex::const_iterator it = jobsIndex_.begin();
+           it != jobsIndex_.end(); ++it)
+      {
+        target.insert(it->first);
+      }
+    }
+
 
-    JobsIndex::iterator found = jobsIndex_.find(id);
+    void Submit(std::string& id,
+                IJob* job,        // Takes ownership
+                int priority)
+    {
+      std::auto_ptr<JobHandler>  handler(new JobHandler(job, priority));
+
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+      
+      id = handler->GetId();
 
-    if (found == jobsIndex_.end())
+      pendingJobs_.push(handler.get());
+      pendingJobAvailable_.notify_one();
+
+      jobsIndex_.insert(std::make_pair(id, handler.release()));
+
+      LOG(INFO) << "New job submitted: " << id;
+
+      CheckInvariants();
+    }
+
+
+    void Submit(IJob* job,        // Takes ownership
+                int priority)
     {
-      LOG(WARNING) << "Unknown job: " << id;
+      std::string id;
+      Submit(id, job, priority);
     }
-    else
+
+
+    void SetPriority(const std::string& id,
+                     int priority)
     {
-      switch (found->second->GetState())
+      LOG(INFO) << "Changing priority to " << priority << " for job: " << id;
+
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::iterator found = jobsIndex_.find(id);
+
+      if (found == jobsIndex_.end())
       {
-        case JobState_Pending:
+        LOG(WARNING) << "Unknown job: " << id;
+      }
+      else
+      {
+        found->second->SetPriority(priority);
+
+        if (found->second->GetState() == JobState_Pending)
         {
           // If the job is pending, we need to reconstruct the
-          // priority queue to remove it
+          // priority queue, as the heap condition has changed
+
           PendingJobs copy;
           std::swap(copy, pendingJobs_);
 
           assert(pendingJobs_.empty());
           while (!copy.empty())
           {
-            if (copy.top()->GetId() != id)
-            {
-              pendingJobs_.push(copy.top());
-            }
-
+            pendingJobs_.push(copy.top());
             copy.pop();
           }
-
-          found->second->SetState(JobState_Paused);
-
-          break;
-        }
-
-        case JobState_Retry:
-        {
-          RetryJobs::iterator item = retryJobs_.find(found->second);
-          assert(item != retryJobs_.end());            
-          retryJobs_.erase(item);
-
-          found->second->SetState(JobState_Paused);
-
-          break;
-        }
-
-        case JobState_Paused:
-        case JobState_Success:
-        case JobState_Failure:
-          // Nothing to be done
-          break;
-
-        case JobState_Running:
-          found->second->SchedulePause();
-          break;
-
-        default:
-          throw OrthancException(ErrorCode_InternalError);
-      }
-    }
-
-    CheckInvariants();
-  }
-
-
-  void Resume(const std::string& id)
-  {
-    LOG(INFO) << "Resuming job: " << id;
-
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
-
-    JobsIndex::iterator found = jobsIndex_.find(id);
-
-    if (found == jobsIndex_.end())
-    {
-      LOG(WARNING) << "Unknown job: " << id;
-    }
-    else if (found->second->GetState() != JobState_Paused)
-    {
-      LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
-    }
-    else
-    {
-      found->second->SetState(JobState_Pending);
-      pendingJobs_.push(found->second);
-      pendingJobAvailable_.notify_one();
-    }
-
-    CheckInvariants();
-  }
-
-
-  void Resubmit(const std::string& id)
-  {
-    LOG(INFO) << "Resubmitting failed job: " << id;
-
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
-
-    JobsIndex::iterator found = jobsIndex_.find(id);
-
-    if (found == jobsIndex_.end())
-    {
-      LOG(WARNING) << "Unknown job: " << id;
-    }
-    else if (found->second->GetState() != JobState_Failure)
-    {
-      LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
-    }
-    else
-    {
-      bool ok = false;
-      for (CompletedJobs::iterator it = completedJobs_.begin(); 
-           it != completedJobs_.end(); ++it)
-      {
-        if (*it == found->second)
-        {
-          ok = true;
-          completedJobs_.erase(it);
-          break;
         }
       }
 
-      assert(ok);
-
-      found->second->SetState(JobState_Pending);
-      pendingJobs_.push(found->second);
-      pendingJobAvailable_.notify_one();
+      CheckInvariants();
     }
 
-    CheckInvariants();
-  }
-
 
-  void ScheduleRetries()
-  {
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
-
-    RetryJobs copy;
-    std::swap(copy, retryJobs_);
-
-    const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
+    void Pause(const std::string& id)
+    {
+      LOG(INFO) << "Pausing job: " << id;
 
-    assert(retryJobs_.empty());
-    for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
-    {
-      if ((*it)->IsRetryReady(now))
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::iterator found = jobsIndex_.find(id);
+
+      if (found == jobsIndex_.end())
       {
-        LOG(INFO) << "Retrying job: " << (*it)->GetId();
-        (*it)->SetState(JobState_Pending);
-        pendingJobs_.push(*it);
-        pendingJobAvailable_.notify_one();
+        LOG(WARNING) << "Unknown job: " << id;
       }
       else
       {
-        retryJobs_.insert(*it);
-      }
-    }
+        switch (found->second->GetState())
+        {
+          case JobState_Pending:
+          {
+            // If the job is pending, we need to reconstruct the
+            // priority queue to remove it
+            PendingJobs copy;
+            std::swap(copy, pendingJobs_);
 
-    CheckInvariants();
-  }
+            assert(pendingJobs_.empty());
+            while (!copy.empty())
+            {
+              if (copy.top()->GetId() != id)
+              {
+                pendingJobs_.push(copy.top());
+              }
 
+              copy.pop();
+            }
+
+            found->second->SetState(JobState_Paused);
+
+            break;
+          }
 
-  bool GetState(JobState& state,
-                const std::string& id)
-  {
-    boost::mutex::scoped_lock lock(mutex_);
-    CheckInvariants();
+          case JobState_Retry:
+          {
+            RetryJobs::iterator item = retryJobs_.find(found->second);
+            assert(item != retryJobs_.end());            
+            retryJobs_.erase(item);
+
+            found->second->SetState(JobState_Paused);
+
+            break;
+          }
 
-    JobsIndex::const_iterator it = jobsIndex_.find(id);
-    if (it == jobsIndex_.end())
-    {
-      return false;
+          case JobState_Paused:
+          case JobState_Success:
+          case JobState_Failure:
+            // Nothing to be done
+            break;
+
+          case JobState_Running:
+            found->second->SchedulePause();
+            break;
+
+          default:
+            throw OrthancException(ErrorCode_InternalError);
+        }
+      }
+
+      CheckInvariants();
     }
-    else
-    {
-      state = it->second->GetState();
-      return true;
-    }
-  }
 
 
-  class RunningJob : public boost::noncopyable
-  {
-  private:
-    JobsRegistry&  registry_;
-    JobHandler*    handler_;  // Can only be accessed if the registry
-                              // mutex is locked!
-    IJob*          job_;  // Will by design be in mutual exclusion,
-                          // because only one RunningJob can be
-                          // executed at a time on a JobHandler
+    void Resume(const std::string& id)
+    {
+      LOG(INFO) << "Resuming job: " << id;
+
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::iterator found = jobsIndex_.find(id);
+
+      if (found == jobsIndex_.end())
+      {
+        LOG(WARNING) << "Unknown job: " << id;
+      }
+      else if (found->second->GetState() != JobState_Paused)
+      {
+        LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
+      }
+      else
+      {
+        found->second->SetState(JobState_Pending);
+        pendingJobs_.push(found->second);
+        pendingJobAvailable_.notify_one();
+      }
+
+      CheckInvariants();
+    }
+
 
-    std::string    id_;
-    int            priority_;
-    JobState       targetState_;
-    unsigned int   targetRetryTimeout_;
-      
-  public:
-    RunningJob(JobsRegistry& registry,
-               unsigned int timeout) :
-      registry_(registry),
-      handler_(NULL),
-      targetState_(JobState_Failure),
-      targetRetryTimeout_(0)
+    void Resubmit(const std::string& id)
     {
-      {
-        boost::mutex::scoped_lock lock(registry_.mutex_);
+      LOG(INFO) << "Resubmitting failed job: " << id;
+
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::iterator found = jobsIndex_.find(id);
 
-        while (registry_.pendingJobs_.empty())
+      if (found == jobsIndex_.end())
+      {
+        LOG(WARNING) << "Unknown job: " << id;
+      }
+      else if (found->second->GetState() != JobState_Failure)
+      {
+        LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
+      }
+      else
+      {
+        bool ok = false;
+        for (CompletedJobs::iterator it = completedJobs_.begin(); 
+             it != completedJobs_.end(); ++it)
         {
-          if (timeout == 0)
-          {
-            registry_.pendingJobAvailable_.wait(lock);
-          }
-          else
+          if (*it == found->second)
           {
-            bool success = registry_.pendingJobAvailable_.timed_wait
-              (lock, boost::posix_time::milliseconds(timeout));
-            if (!success)
-            {
-              // No pending job
-              return;
-            }
+            ok = true;
+            completedJobs_.erase(it);
+            break;
           }
         }
 
-        handler_ = registry_.pendingJobs_.top();
-        registry_.pendingJobs_.pop();
+        assert(ok);
 
-        assert(handler_->GetState() == JobState_Pending);
-        handler_->SetState(JobState_Running);
+        found->second->SetState(JobState_Pending);
+        pendingJobs_.push(found->second);
+        pendingJobAvailable_.notify_one();
+      }
 
-        job_ = &handler_->GetJob();
-        id_ = handler_->GetId();
-        priority_ = handler_->GetPriority();
-      }
+      CheckInvariants();
     }
 
-    ~RunningJob()
-    {
-      if (IsValid())
-      {
-        boost::mutex::scoped_lock lock(registry_.mutex_);
 
-        switch (targetState_)
-        {
-          case JobState_Failure:
-            registry_.MarkRunningAsCompleted(*handler_, false);
-            break;
+    void ScheduleRetries()
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      RetryJobs copy;
+      std::swap(copy, retryJobs_);
+
+      const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
 
-          case JobState_Success:
-            registry_.MarkRunningAsCompleted(*handler_, true);
-            break;
-
-          case JobState_Paused:
-            registry_.MarkRunningAsPaused(*handler_);
-            break;            
-
-          case JobState_Retry:
-            registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
-            break;
-            
-          default:
-            assert(0);
+      assert(retryJobs_.empty());
+      for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
+      {
+        if ((*it)->IsRetryReady(now))
+        {
+          LOG(INFO) << "Retrying job: " << (*it)->GetId();
+          (*it)->SetState(JobState_Pending);
+          pendingJobs_.push(*it);
+          pendingJobAvailable_.notify_one();
+        }
+        else
+        {
+          retryJobs_.insert(*it);
         }
       }
+
+      CheckInvariants();
     }
 
-    bool IsValid() const
-    {
-      return (handler_ != NULL &&
-              job_ != NULL);
-    }
 
-    const std::string& GetId() const
+    bool GetState(JobState& state,
+                  const std::string& id)
     {
-      if (!IsValid())
+      boost::mutex::scoped_lock lock(mutex_);
+      CheckInvariants();
+
+      JobsIndex::const_iterator it = jobsIndex_.find(id);
+      if (it == jobsIndex_.end())
       {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        return false;
       }
       else
       {
-        return id_;
-      }
-    }
-
-    int GetPriority() const
-    {
-      if (!IsValid())
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-      else
-      {
-        return priority_;
-      }
-    }
-
-    bool IsPauseScheduled()
-    {
-      if (!IsValid())
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-      else
-      {
-        boost::mutex::scoped_lock lock(registry_.mutex_);
-        registry_.CheckInvariants();
-        assert(handler_->GetState() == JobState_Running);
-        
-        return handler_->IsPauseScheduled();
-      }
-    }
-
-    void MarkSuccess()
-    {
-      if (!IsValid())
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-      else
-      {
-        targetState_ = JobState_Success;
+        state = it->second->GetState();
+        return true;
       }
     }
 
-    void MarkFailure()
+
+    class RunningJob : public boost::noncopyable
     {
-      if (!IsValid())
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-      else
-      {
-        targetState_ = JobState_Failure;
-      }
-    }
+    private:
+      JobsRegistry&  registry_;
+      JobHandler*    handler_;  // Can only be accessed if the registry
+      // mutex is locked!
+      IJob*          job_;  // Will by design be in mutual exclusion,
+      // because only one RunningJob can be
+      // executed at a time on a JobHandler
 
-    void MarkPause()
-    {
-      if (!IsValid())
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-      else
+      std::string    id_;
+      int            priority_;
+      JobState       targetState_;
+      unsigned int   targetRetryTimeout_;
+      
+    public:
+      RunningJob(JobsRegistry& registry,
+                 unsigned int timeout) :
+        registry_(registry),
+        handler_(NULL),
+        targetState_(JobState_Failure),
+        targetRetryTimeout_(0)
       {
-        targetState_ = JobState_Paused;
-      }
-    }
+        {
+          boost::mutex::scoped_lock lock(registry_.mutex_);
 
-    void MarkRetry(unsigned int timeout)
-    {
-      if (!IsValid())
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-      else
-      {
-        targetState_ = JobState_Retry;
-        targetRetryTimeout_ = timeout;
-      }
-    }
+          while (registry_.pendingJobs_.empty())
+          {
+            if (timeout == 0)
+            {
+              registry_.pendingJobAvailable_.wait(lock);
+            }
+            else
+            {
+              bool success = registry_.pendingJobAvailable_.timed_wait
+                (lock, boost::posix_time::milliseconds(timeout));
+              if (!success)
+              {
+                // No pending job
+                return;
+              }
+            }
+          }
 
-    /*void ExecuteStep()
-    {
-      if (!IsValid())
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+          handler_ = registry_.pendingJobs_.top();
+          registry_.pendingJobs_.pop();
+
+          assert(handler_->GetState() == JobState_Pending);
+          handler_->SetState(JobState_Running);
+
+          job_ = &handler_->GetJob();
+          id_ = handler_->GetId();
+          priority_ = handler_->GetPriority();
+        }
       }
 
-      if (IsPauseScheduled())
+      ~RunningJob()
       {
-        targetState_ = JobState_Paused;
-        return;
+        if (IsValid())
+        {
+          boost::mutex::scoped_lock lock(registry_.mutex_);
+
+          switch (targetState_)
+          {
+            case JobState_Failure:
+              registry_.MarkRunningAsCompleted(*handler_, false);
+              break;
+
+            case JobState_Success:
+              registry_.MarkRunningAsCompleted(*handler_, true);
+              break;
+
+            case JobState_Paused:
+              registry_.MarkRunningAsPaused(*handler_);
+              break;            
+
+            case JobState_Retry:
+              registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
+              break;
+            
+            default:
+              assert(0);
+          }
+        }
       }
 
-      std::auto_ptr<JobStepResult> result;
-      ErrorCode code;
-
+      bool IsValid() const
       {
-        bool ok = false;
+        return (handler_ != NULL &&
+                job_ != NULL);
+      }
 
-        try
+      const std::string& GetId() const
+      {
+        if (!IsValid())
         {
-          result.reset(job_->ExecuteStep());
-          ok = true;
-
-          if (result->GetCode() == JobStepCode_Failure)
-          {
-            code = ErrorCode_InternalError;            
-          }
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else
+        {
+          return id_;
         }
-        catch (OrthancException& e)
+      }
+
+      int GetPriority() const
+      {
+        if (!IsValid())
         {
-          code = e.GetErrorCode();
-        }
-        catch (boost::bad_lexical_cast&)
-        {
-          code = ErrorCode_BadFileFormat;
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
         }
-        catch (...)
+        else
         {
-          code = ErrorCode_InternalError;
+          return priority_;
         }
+      }
 
-        if (ok)
+      bool IsPauseScheduled()
+      {
+        if (!IsValid())
         {
-          code = ErrorCode_Success;
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
         }
         else
         {
-          result.reset(new JobStepResult(JobStepCode_Failure));
+          boost::mutex::scoped_lock lock(registry_.mutex_);
+          registry_.CheckInvariants();
+          assert(handler_->GetState() == JobState_Running);
+        
+          return handler_->IsPauseScheduled();
+        }
+      }
+
+      void MarkSuccess()
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else
+        {
+          targetState_ = JobState_Success;
+        }
+      }
+
+      void MarkFailure()
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else
+        {
+          targetState_ = JobState_Failure;
+        }
+      }
+
+      void MarkPause()
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else
+        {
+          targetState_ = JobState_Paused;
+        }
+      }
+
+      void MarkRetry(unsigned int timeout)
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else
+        {
+          targetState_ = JobState_Retry;
+          targetRetryTimeout_ = timeout;
+        }
+      }
+
+      void UpdateStatus(const JobStatus& status)
+      {
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+        else
+        {
+          boost::mutex::scoped_lock lock(registry_.mutex_);
+          registry_.CheckInvariants();
+          assert(handler_->GetState() == JobState_Running);
+        
+          handler_->SetLastStatus(status);
         }
       }
 
-      switch (result->GetCode())
+      bool ExecuteStep()
       {
-        case JobStepCode_Success:
-          targetState_ = JobState_Success;
-          break;
+        if (!IsValid())
+        {
+          throw OrthancException(ErrorCode_BadSequenceOfCalls);
+        }
+
+        if (IsPauseScheduled())
+        {
+          targetState_ = JobState_Paused;
+          return false;
+        }
 
-        case JobStepCode_Failure:
-          targetState_ = JobState_Failure;
-          break;
+        std::auto_ptr<JobStepResult> result;
+        ErrorCode code;
+
+        {
+          bool ok = false;
+
+          try
+          {
+            result.reset(job_->ExecuteStep());
+            ok = true;
 
-        case JobStepCode_Continue:
-          targetState_ = JobState_Running;
-          break;
+            if (result->GetCode() == JobStepCode_Failure)
+            {
+              code = ErrorCode_InternalError;            
+            }
+          }
+          catch (OrthancException& e)
+          {
+            code = e.GetErrorCode();
+          }
+          catch (boost::bad_lexical_cast&)
+          {
+            code = ErrorCode_BadFileFormat;
+          }
+          catch (...)
+          {
+            code = ErrorCode_InternalError;
+          }
+
+          if (ok)
+          {
+            code = ErrorCode_Success;
+          }
+          else
+          {
+            result.reset(new JobStepResult(JobStepCode_Failure));
+          }
+        }
 
-        case JobStepCode_Retry:
-          targetState_ = JobState_Retry;
-          targetRetryTimeout_ = dynamic_cast<RetryResult&>(*result).GetTimeout();
-          break;
+        {
+          JobStatus status(code, *job_);
+          UpdateStatus(status);
+        }
+
+        switch (result->GetCode())
+        {
+          case JobStepCode_Success:
+            targetState_ = JobState_Success;
+            return false;
+
+          case JobStepCode_Failure:
+            targetState_ = JobState_Failure;
+            return false;
 
-        default:
-          throw OrthancException(ErrorCode_InternalError);
+          case JobStepCode_Retry:
+            targetState_ = JobState_Retry;
+            targetRetryTimeout_ = dynamic_cast<RetryResult&>(*result).GetTimeout();
+            return false;
+
+          case JobStepCode_Continue:
+            return true;
+            
+          default:
+            throw OrthancException(ErrorCode_InternalError);
+        }
       }
-      }*/
+    };
   };
-};
 }
 
 
@@ -1383,7 +1421,7 @@
   }
 
   explicit DummyJob(JobStepResult result) :
-  result_(result)
+    result_(result)
   {
   }
 
@@ -1401,7 +1439,7 @@
     return 0;
   }
 
-  virtual void FormatStatus(Json::Value& value)
+  virtual void GetDescription(Json::Value& value)
   {
   }
 };