changeset 2581:8da2cffc2378 jobs

JobsRegistry::Cancel()
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 11 May 2018 17:33:19 +0200
parents 055d7d4a823f
children b3da733d984c
files Core/Enumerations.cpp Core/Enumerations.h Core/JobsEngine/JobStatus.h Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsRegistry.cpp Core/JobsEngine/JobsRegistry.h OrthancServer/main.cpp Plugins/Include/orthanc/OrthancCPlugin.h Resources/ErrorCodes.json Resources/ImplementationNotes/JobsEngineStates.dot Resources/ImplementationNotes/JobsEngineStates.pdf UnitTestsSources/MultiThreadingTests.cpp
diffstat 12 files changed, 316 insertions(+), 37 deletions(-) [+]
line wrap: on
line diff
--- a/Core/Enumerations.cpp	Fri May 11 09:35:17 2018 +0200
+++ b/Core/Enumerations.cpp	Fri May 11 17:33:19 2018 +0200
@@ -164,6 +164,9 @@
       case ErrorCode_DatabaseUnavailable:
         return "The database is currently not available (probably a transient situation)";
 
+      case ErrorCode_CanceledJob:
+        return "This job was canceled";
+
       case ErrorCode_SQLiteNotOpened:
         return "SQLite: The database is not opened";
 
--- a/Core/Enumerations.h	Fri May 11 09:35:17 2018 +0200
+++ b/Core/Enumerations.h	Fri May 11 17:33:19 2018 +0200
@@ -96,6 +96,7 @@
     ErrorCode_NotAcceptable = 34    /*!< Cannot send a response which is acceptable according to the Accept HTTP header */,
     ErrorCode_NullPointer = 35    /*!< Cannot handle a NULL pointer */,
     ErrorCode_DatabaseUnavailable = 36    /*!< The database is currently not available (probably a transient situation) */,
+    ErrorCode_CanceledJob = 37    /*!< This job was canceled */,
     ErrorCode_SQLiteNotOpened = 1000    /*!< SQLite: The database is not opened */,
     ErrorCode_SQLiteAlreadyOpened = 1001    /*!< SQLite: Connection is already open */,
     ErrorCode_SQLiteCannotOpen = 1002    /*!< SQLite: Unable to open the database */,
--- a/Core/JobsEngine/JobStatus.h	Fri May 11 09:35:17 2018 +0200
+++ b/Core/JobsEngine/JobStatus.h	Fri May 11 17:33:19 2018 +0200
@@ -57,6 +57,11 @@
       return errorCode_;
     }
 
+    void SetErrorCode(ErrorCode error)
+    {
+      errorCode_ = error;
+    }
+
     float GetProgress() const
     {
       return progress_;
--- a/Core/JobsEngine/JobsEngine.cpp	Fri May 11 09:35:17 2018 +0200
+++ b/Core/JobsEngine/JobsEngine.cpp	Fri May 11 17:33:19 2018 +0200
@@ -60,6 +60,13 @@
       return false;
     }
 
+    if (running.IsCancelScheduled())
+    {
+      running.GetJob().ReleaseResources();
+      running.MarkCanceled();
+      return false;
+    }
+
     std::auto_ptr<JobStepResult> result;
 
     {
--- a/Core/JobsEngine/JobsRegistry.cpp	Fri May 11 09:35:17 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.cpp	Fri May 11 17:33:19 2018 +0200
@@ -52,6 +52,7 @@
     boost::posix_time::time_duration  runtime_;
     boost::posix_time::ptime          retryTime_;
     bool                              pauseScheduled_;
+    bool                              cancelScheduled_;
     JobStatus                         lastStatus_;
 
     void Touch()
@@ -70,6 +71,7 @@
     {
       state_ = state;
       pauseScheduled_ = false;
+      cancelScheduled_ = false;
       Touch();
     }
 
@@ -84,7 +86,8 @@
       lastStateChangeTime_(creationTime_),
       runtime_(boost::posix_time::milliseconds(0)),
       retryTime_(creationTime_),
-      pauseScheduled_(false)
+      pauseScheduled_(false),
+      cancelScheduled_(false)
     {
       if (job == NULL)
       {
@@ -162,11 +165,29 @@
       }
     }
 
+    void ScheduleCancel()
+    {
+      if (state_ == JobState_Running)
+      {
+        cancelScheduled_ = true;
+      }
+      else
+      {
+        // Only valid for running jobs
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+
     bool IsPauseScheduled()
     {
       return pauseScheduled_;
     }
 
+    bool IsCancelScheduled()
+    {
+      return cancelScheduled_;
+    }
+
     bool IsRetryReady(const boost::posix_time::ptime& now) const
     {
       if (state_ != JobState_Retry)
@@ -204,6 +225,11 @@
       lastStatus_ = status;
       Touch();
     }
+
+    void SetLastErrorCode(ErrorCode code)
+    {
+      lastStatus_.SetErrorCode(code);
+    }
   };
 
 
@@ -335,6 +361,18 @@
   }
 
 
+  void JobsRegistry::SetCompletedJob(JobHandler& job,
+                                     bool success)
+  {
+    job.SetState(success ? JobState_Success : JobState_Failure);
+
+    completedJobs_.push_back(&job);
+    ForgetOldCompletedJobs();
+
+    someJobComplete_.notify_all();
+  }
+
+
   void JobsRegistry::MarkRunningAsCompleted(JobHandler& job,
                                             bool success)
   {
@@ -342,14 +380,9 @@
               << ": " << job.GetId();
 
     CheckInvariants();
+
     assert(job.GetState() == JobState_Running);
-
-    job.SetState(success ? JobState_Success : JobState_Failure);
-
-    completedJobs_.push_back(&job);
-    ForgetOldCompletedJobs();
-
-    someJobComplete_.notify_all();
+    SetCompletedJob(job, success);
 
     CheckInvariants();
   }
@@ -501,8 +534,6 @@
     std::string id;
     Submit(id, job, priority);
 
-    printf(">> %s\n", id.c_str()); fflush(stdout);
-
     JobState state;
 
     {
@@ -561,6 +592,34 @@
   }
 
 
+  void JobsRegistry::RemovePendingJob(const std::string& id)
+  {
+    // If the job is pending, we need to reconstruct the priority
+    // queue to remove it
+    PendingJobs copy;
+    std::swap(copy, pendingJobs_);
+
+    assert(pendingJobs_.empty());
+    while (!copy.empty())
+    {
+      if (copy.top()->GetId() != id)
+      {
+        pendingJobs_.push(copy.top());
+      }
+
+      copy.pop();
+    }
+  }
+
+
+  void JobsRegistry::RemoveRetryJob(JobHandler* handler)
+  {
+    RetryJobs::iterator item = retryJobs_.find(handler);
+    assert(item != retryJobs_.end());            
+    retryJobs_.erase(item);
+  }
+
+
   bool JobsRegistry::Pause(const std::string& id)
   {
     LOG(INFO) << "Pausing job: " << id;
@@ -580,38 +639,14 @@
       switch (found->second->GetState())
       {
         case JobState_Pending:
-        {
-          // If the job is pending, we need to reconstruct the
-          // priority queue to remove it
-          PendingJobs copy;
-          std::swap(copy, pendingJobs_);
-
-          assert(pendingJobs_.empty());
-          while (!copy.empty())
-          {
-            if (copy.top()->GetId() != id)
-            {
-              pendingJobs_.push(copy.top());
-            }
-
-            copy.pop();
-          }
-
+          RemovePendingJob(id);
           found->second->SetState(JobState_Paused);
-
           break;
-        }
 
         case JobState_Retry:
-        {
-          RetryJobs::iterator item = retryJobs_.find(found->second);
-          assert(item != retryJobs_.end());            
-          retryJobs_.erase(item);
-
+          RemoveRetryJob(found->second);
           found->second->SetState(JobState_Paused);
-
           break;
-        }
 
         case JobState_Paused:
         case JobState_Success:
@@ -633,6 +668,60 @@
   }
 
 
+  bool JobsRegistry::Cancel(const std::string& id)
+  {
+    LOG(INFO) << "Canceling job: " << id;
+
+    boost::mutex::scoped_lock lock(mutex_);
+    CheckInvariants();
+
+    JobsIndex::iterator found = jobsIndex_.find(id);
+
+    if (found == jobsIndex_.end())
+    {
+      LOG(WARNING) << "Unknown job: " << id;
+      return false;
+    }
+    else
+    {
+      switch (found->second->GetState())
+      {
+        case JobState_Pending:
+          RemovePendingJob(id);
+          SetCompletedJob(*found->second, false);
+          found->second->SetLastErrorCode(ErrorCode_CanceledJob);
+          break;
+
+        case JobState_Retry:
+          RemoveRetryJob(found->second);
+          SetCompletedJob(*found->second, false);
+          found->second->SetLastErrorCode(ErrorCode_CanceledJob);
+          break;
+
+        case JobState_Paused:
+          SetCompletedJob(*found->second, false);
+          found->second->SetLastErrorCode(ErrorCode_CanceledJob);
+          break;
+        
+        case JobState_Success:
+        case JobState_Failure:
+          // Nothing to be done
+          break;
+
+        case JobState_Running:
+          found->second->ScheduleCancel();
+          break;
+
+        default:
+          throw OrthancException(ErrorCode_InternalError);
+      }
+
+      CheckInvariants();
+      return true;
+    }
+  }
+
+
   bool JobsRegistry::Resume(const std::string& id)
   {
     LOG(INFO) << "Resuming job: " << id;
@@ -751,7 +840,8 @@
     registry_(registry),
     handler_(NULL),
     targetState_(JobState_Failure),
-    targetRetryTimeout_(0)
+    targetRetryTimeout_(0),
+    canceled_(false)
   {
     {
       boost::mutex::scoped_lock lock(registry_.mutex_);
@@ -779,6 +869,7 @@
 
       assert(handler_->GetState() == JobState_Pending);
       handler_->SetState(JobState_Running);
+      handler_->SetLastErrorCode(ErrorCode_Success);
 
       job_ = &handler_->GetJob();
       id_ = handler_->GetId();
@@ -797,6 +888,12 @@
       {
         case JobState_Failure:
           registry_.MarkRunningAsCompleted(*handler_, false);
+
+          if (canceled_)
+          {
+            handler_->SetLastErrorCode(ErrorCode_CanceledJob);
+          }
+          
           break;
 
         case JobState_Success:
@@ -881,6 +978,23 @@
   }
 
       
+  bool JobsRegistry::RunningJob::IsCancelScheduled()
+  {
+    if (!IsValid())
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+    else
+    {
+      boost::mutex::scoped_lock lock(registry_.mutex_);
+      registry_.CheckInvariants();
+      assert(handler_->GetState() == JobState_Running);
+        
+      return handler_->IsCancelScheduled();
+    }
+  }
+
+      
   void JobsRegistry::RunningJob::MarkSuccess()
   {
     if (!IsValid())
@@ -907,6 +1021,20 @@
   }
 
       
+  void JobsRegistry::RunningJob::MarkCanceled()
+  {
+    if (!IsValid())
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+    else
+    {
+      targetState_ = JobState_Failure;
+      canceled_ = true;
+    }
+  }
+
+      
   void JobsRegistry::RunningJob::MarkPause()
   {
     if (!IsValid())
--- a/Core/JobsEngine/JobsRegistry.h	Fri May 11 09:35:17 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.h	Fri May 11 17:33:19 2018 +0200
@@ -93,6 +93,9 @@
 
     void ForgetOldCompletedJobs();
 
+    void SetCompletedJob(JobHandler& job,
+                         bool success);
+    
     void MarkRunningAsCompleted(JobHandler& job,
                                 bool success);
 
@@ -104,6 +107,10 @@
     bool GetStateInternal(JobState& state,
                           const std::string& id);
 
+    void RemovePendingJob(const std::string& id);
+      
+    void RemoveRetryJob(JobHandler* handler);
+      
   public:
     JobsRegistry() :
       maxCompletedJobs_(10)
@@ -138,6 +145,8 @@
     bool Resume(const std::string& id);
 
     bool Resubmit(const std::string& id);
+
+    bool Cancel(const std::string& id);
     
     void ScheduleRetries();
     
@@ -158,6 +167,7 @@
       int            priority_;
       JobState       targetState_;
       unsigned int   targetRetryTimeout_;
+      bool           canceled_;
       
     public:
       RunningJob(JobsRegistry& registry,
@@ -175,12 +185,16 @@
 
       bool IsPauseScheduled();
 
+      bool IsCancelScheduled();
+
       void MarkSuccess();
 
       void MarkFailure();
 
       void MarkPause();
 
+      void MarkCanceled();
+
       void MarkRetry(unsigned int timeout);
 
       void UpdateStatus(ErrorCode code);
--- a/OrthancServer/main.cpp	Fri May 11 09:35:17 2018 +0200
+++ b/OrthancServer/main.cpp	Fri May 11 17:33:19 2018 +0200
@@ -574,6 +574,7 @@
     PrintErrorCode(ErrorCode_NotAcceptable, "Cannot send a response which is acceptable according to the Accept HTTP header");
     PrintErrorCode(ErrorCode_NullPointer, "Cannot handle a NULL pointer");
     PrintErrorCode(ErrorCode_DatabaseUnavailable, "The database is currently not available (probably a transient situation)");
+    PrintErrorCode(ErrorCode_CanceledJob, "This job was canceled");
     PrintErrorCode(ErrorCode_SQLiteNotOpened, "SQLite: The database is not opened");
     PrintErrorCode(ErrorCode_SQLiteAlreadyOpened, "SQLite: Connection is already open");
     PrintErrorCode(ErrorCode_SQLiteCannotOpen, "SQLite: Unable to open the database");
--- a/Plugins/Include/orthanc/OrthancCPlugin.h	Fri May 11 09:35:17 2018 +0200
+++ b/Plugins/Include/orthanc/OrthancCPlugin.h	Fri May 11 17:33:19 2018 +0200
@@ -235,6 +235,7 @@
     OrthancPluginErrorCode_NotAcceptable = 34    /*!< Cannot send a response which is acceptable according to the Accept HTTP header */,
     OrthancPluginErrorCode_NullPointer = 35    /*!< Cannot handle a NULL pointer */,
     OrthancPluginErrorCode_DatabaseUnavailable = 36    /*!< The database is currently not available (probably a transient situation) */,
+    OrthancPluginErrorCode_CanceledJob = 37    /*!< This job was canceled */,
     OrthancPluginErrorCode_SQLiteNotOpened = 1000    /*!< SQLite: The database is not opened */,
     OrthancPluginErrorCode_SQLiteAlreadyOpened = 1001    /*!< SQLite: Connection is already open */,
     OrthancPluginErrorCode_SQLiteCannotOpen = 1002    /*!< SQLite: Unable to open the database */,
--- a/Resources/ErrorCodes.json	Fri May 11 09:35:17 2018 +0200
+++ b/Resources/ErrorCodes.json	Fri May 11 17:33:19 2018 +0200
@@ -207,6 +207,11 @@
     "Name": "DatabaseUnavailable", 
     "Description": "The database is currently not available (probably a transient situation)"
   }, 
+  {
+    "Code": 37, 
+    "Name": "CanceledJob", 
+    "Description": "This job was canceled"
+  }, 
 
 
 
--- a/Resources/ImplementationNotes/JobsEngineStates.dot	Fri May 11 09:35:17 2018 +0200
+++ b/Resources/ImplementationNotes/JobsEngineStates.dot	Fri May 11 17:33:19 2018 +0200
@@ -20,4 +20,9 @@
   pending -> paused  [label="Pause()" fontcolor="red"];
   retry -> paused  [label="Pause()" fontcolor="red"];
   running -> paused  [label="Pause()" fontcolor="red"];
+
+  paused -> failure  [label="Cancel()" fontcolor="red"];
+  pending -> failure  [label="Cancel()" fontcolor="red"];
+  retry -> failure  [label="Cancel()" fontcolor="red"];
+  running -> failure  [label="Cancel()" fontcolor="red"];
 }
Binary file Resources/ImplementationNotes/JobsEngineStates.pdf has changed
--- a/UnitTestsSources/MultiThreadingTests.cpp	Fri May 11 09:35:17 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Fri May 11 17:33:19 2018 +0200
@@ -323,6 +323,22 @@
 }
 
 
+static bool CheckErrorCode(Orthanc::JobsRegistry& registry,
+                           const std::string& id,
+                           Orthanc::ErrorCode code)
+{
+  Orthanc::JobInfo s;
+  if (registry.GetJobInfo(s, id))
+  {
+    return code == s.GetStatus().GetErrorCode();
+  }
+  else
+  {
+    return false;
+  }
+}
+
+
 TEST(JobsRegistry, Priority)
 {
   JobsRegistry registry;
@@ -611,6 +627,99 @@
 }
 
 
+TEST(JobsRegistry, Cancel)
+{
+  JobsRegistry registry;
+
+  std::string id;
+  registry.Submit(id, new DummyJob(), 10);
+
+  ASSERT_FALSE(registry.Cancel("nope"));
+
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success));
+            
+  ASSERT_TRUE(registry.Cancel(id));
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
+  
+  ASSERT_TRUE(registry.Cancel(id));
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
+  
+  ASSERT_TRUE(registry.Resubmit(id));
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
+  
+  {
+    JobsRegistry::RunningJob job(registry, 0);
+    ASSERT_TRUE(job.IsValid());
+
+    ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success));
+
+    job.MarkSuccess();
+    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
+  }
+
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success));
+
+  ASSERT_TRUE(registry.Cancel(id));
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success));
+
+  registry.Submit(id, new DummyJob(), 10);
+
+  {
+    JobsRegistry::RunningJob job(registry, 0);
+    ASSERT_TRUE(job.IsValid());
+    ASSERT_EQ(id, job.GetId());
+
+    ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success));
+    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
+
+    job.MarkCanceled();
+  }
+
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
+
+  ASSERT_TRUE(registry.Resubmit(id));
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
+
+  ASSERT_TRUE(registry.Pause(id));
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
+
+  ASSERT_TRUE(registry.Cancel(id));
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
+
+  ASSERT_TRUE(registry.Resubmit(id));
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
+
+  {
+    JobsRegistry::RunningJob job(registry, 0);
+    ASSERT_TRUE(job.IsValid());
+    ASSERT_EQ(id, job.GetId());
+
+    ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success));
+    ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
+
+    job.MarkRetry(500);
+  }
+
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_Success));
+
+  ASSERT_TRUE(registry.Cancel(id));
+  ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure));
+  ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
+}
+
+
 
 
 TEST(JobsEngine, Basic)