diff Core/JobsEngine/JobsRegistry.cpp @ 2581:8da2cffc2378 jobs

JobsRegistry::Cancel()
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 11 May 2018 17:33:19 +0200
parents 3372c5255333
children 1b6a6d80b6f2
line wrap: on
line diff
--- a/Core/JobsEngine/JobsRegistry.cpp	Fri May 11 09:35:17 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.cpp	Fri May 11 17:33:19 2018 +0200
@@ -52,6 +52,7 @@
     boost::posix_time::time_duration  runtime_;
     boost::posix_time::ptime          retryTime_;
     bool                              pauseScheduled_;
+    bool                              cancelScheduled_;
     JobStatus                         lastStatus_;
 
     void Touch()
@@ -70,6 +71,7 @@
     {
       state_ = state;
       pauseScheduled_ = false;
+      cancelScheduled_ = false;
       Touch();
     }
 
@@ -84,7 +86,8 @@
       lastStateChangeTime_(creationTime_),
       runtime_(boost::posix_time::milliseconds(0)),
       retryTime_(creationTime_),
-      pauseScheduled_(false)
+      pauseScheduled_(false),
+      cancelScheduled_(false)
     {
       if (job == NULL)
       {
@@ -162,11 +165,29 @@
       }
     }
 
+    void ScheduleCancel()
+    {
+      if (state_ == JobState_Running)
+      {
+        cancelScheduled_ = true;
+      }
+      else
+      {
+        // Only valid for running jobs
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+
     bool IsPauseScheduled()
     {
       return pauseScheduled_;
     }
 
+    bool IsCancelScheduled()
+    {
+      return cancelScheduled_;
+    }
+
     bool IsRetryReady(const boost::posix_time::ptime& now) const
     {
       if (state_ != JobState_Retry)
@@ -204,6 +225,11 @@
       lastStatus_ = status;
       Touch();
     }
+
+    void SetLastErrorCode(ErrorCode code)
+    {
+      lastStatus_.SetErrorCode(code);
+    }
   };
 
 
@@ -335,6 +361,18 @@
   }
 
 
+  void JobsRegistry::SetCompletedJob(JobHandler& job,
+                                     bool success)
+  {
+    job.SetState(success ? JobState_Success : JobState_Failure);
+
+    completedJobs_.push_back(&job);
+    ForgetOldCompletedJobs();
+
+    someJobComplete_.notify_all();
+  }
+
+
   void JobsRegistry::MarkRunningAsCompleted(JobHandler& job,
                                             bool success)
   {
@@ -342,14 +380,9 @@
               << ": " << job.GetId();
 
     CheckInvariants();
+
     assert(job.GetState() == JobState_Running);
-
-    job.SetState(success ? JobState_Success : JobState_Failure);
-
-    completedJobs_.push_back(&job);
-    ForgetOldCompletedJobs();
-
-    someJobComplete_.notify_all();
+    SetCompletedJob(job, success);
 
     CheckInvariants();
   }
@@ -501,8 +534,6 @@
     std::string id;
     Submit(id, job, priority);
 
-    printf(">> %s\n", id.c_str()); fflush(stdout);
-
     JobState state;
 
     {
@@ -561,6 +592,34 @@
   }
 
 
+  void JobsRegistry::RemovePendingJob(const std::string& id)
+  {
+    // If the job is pending, we need to reconstruct the priority
+    // queue to remove it
+    PendingJobs copy;
+    std::swap(copy, pendingJobs_);
+
+    assert(pendingJobs_.empty());
+    while (!copy.empty())
+    {
+      if (copy.top()->GetId() != id)
+      {
+        pendingJobs_.push(copy.top());
+      }
+
+      copy.pop();
+    }
+  }
+
+
+  void JobsRegistry::RemoveRetryJob(JobHandler* handler)
+  {
+    RetryJobs::iterator item = retryJobs_.find(handler);
+    assert(item != retryJobs_.end());            
+    retryJobs_.erase(item);
+  }
+
+
   bool JobsRegistry::Pause(const std::string& id)
   {
     LOG(INFO) << "Pausing job: " << id;
@@ -580,38 +639,14 @@
       switch (found->second->GetState())
       {
         case JobState_Pending:
-        {
-          // If the job is pending, we need to reconstruct the
-          // priority queue to remove it
-          PendingJobs copy;
-          std::swap(copy, pendingJobs_);
-
-          assert(pendingJobs_.empty());
-          while (!copy.empty())
-          {
-            if (copy.top()->GetId() != id)
-            {
-              pendingJobs_.push(copy.top());
-            }
-
-            copy.pop();
-          }
-
+          RemovePendingJob(id);
           found->second->SetState(JobState_Paused);
-
           break;
-        }
 
         case JobState_Retry:
-        {
-          RetryJobs::iterator item = retryJobs_.find(found->second);
-          assert(item != retryJobs_.end());            
-          retryJobs_.erase(item);
-
+          RemoveRetryJob(found->second);
           found->second->SetState(JobState_Paused);
-
           break;
-        }
 
         case JobState_Paused:
         case JobState_Success:
@@ -633,6 +668,60 @@
   }
 
 
+  bool JobsRegistry::Cancel(const std::string& id)
+  {
+    LOG(INFO) << "Canceling job: " << id;
+
+    boost::mutex::scoped_lock lock(mutex_);
+    CheckInvariants();
+
+    JobsIndex::iterator found = jobsIndex_.find(id);
+
+    if (found == jobsIndex_.end())
+    {
+      LOG(WARNING) << "Unknown job: " << id;
+      return false;
+    }
+    else
+    {
+      switch (found->second->GetState())
+      {
+        case JobState_Pending:
+          RemovePendingJob(id);
+          SetCompletedJob(*found->second, false);
+          found->second->SetLastErrorCode(ErrorCode_CanceledJob);
+          break;
+
+        case JobState_Retry:
+          RemoveRetryJob(found->second);
+          SetCompletedJob(*found->second, false);
+          found->second->SetLastErrorCode(ErrorCode_CanceledJob);
+          break;
+
+        case JobState_Paused:
+          SetCompletedJob(*found->second, false);
+          found->second->SetLastErrorCode(ErrorCode_CanceledJob);
+          break;
+        
+        case JobState_Success:
+        case JobState_Failure:
+          // Nothing to be done
+          break;
+
+        case JobState_Running:
+          found->second->ScheduleCancel();
+          break;
+
+        default:
+          throw OrthancException(ErrorCode_InternalError);
+      }
+
+      CheckInvariants();
+      return true;
+    }
+  }
+
+
   bool JobsRegistry::Resume(const std::string& id)
   {
     LOG(INFO) << "Resuming job: " << id;
@@ -751,7 +840,8 @@
     registry_(registry),
     handler_(NULL),
     targetState_(JobState_Failure),
-    targetRetryTimeout_(0)
+    targetRetryTimeout_(0),
+    canceled_(false)
   {
     {
       boost::mutex::scoped_lock lock(registry_.mutex_);
@@ -779,6 +869,7 @@
 
       assert(handler_->GetState() == JobState_Pending);
       handler_->SetState(JobState_Running);
+      handler_->SetLastErrorCode(ErrorCode_Success);
 
       job_ = &handler_->GetJob();
       id_ = handler_->GetId();
@@ -797,6 +888,12 @@
       {
         case JobState_Failure:
           registry_.MarkRunningAsCompleted(*handler_, false);
+
+          if (canceled_)
+          {
+            handler_->SetLastErrorCode(ErrorCode_CanceledJob);
+          }
+          
           break;
 
         case JobState_Success:
@@ -881,6 +978,23 @@
   }
 
       
+  bool JobsRegistry::RunningJob::IsCancelScheduled()
+  {
+    if (!IsValid())
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+    else
+    {
+      boost::mutex::scoped_lock lock(registry_.mutex_);
+      registry_.CheckInvariants();
+      assert(handler_->GetState() == JobState_Running);
+        
+      return handler_->IsCancelScheduled();
+    }
+  }
+
+      
   void JobsRegistry::RunningJob::MarkSuccess()
   {
     if (!IsValid())
@@ -907,6 +1021,20 @@
   }
 
       
+  void JobsRegistry::RunningJob::MarkCanceled()
+  {
+    if (!IsValid())
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+    else
+    {
+      targetState_ = JobState_Failure;
+      canceled_ = true;
+    }
+  }
+
+      
   void JobsRegistry::RunningJob::MarkPause()
   {
     if (!IsValid())