changeset 2562:1e66fe3ddf9f jobs

refactoring
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 04 May 2018 17:28:47 +0200
parents 7d4a3eca96af
children 98dfc1948d00
files UnitTestsSources/MultiThreadingTests.cpp
diffstat 1 files changed, 177 insertions(+), 160 deletions(-) [+]
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp	Thu May 03 18:48:20 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Fri May 04 17:28:47 2018 +0200
@@ -290,22 +290,22 @@
     JobState_Retry
   };
   
-  enum JobStepStatus
+  enum JobStepCode
   {
-    JobStepStatus_Success,
-    JobStepStatus_Failure,
-    JobStepStatus_Continue,
-    JobStepStatus_Retry
+    JobStepCode_Success,
+    JobStepCode_Failure,
+    JobStepCode_Continue,
+    JobStepCode_Retry
   };
 
 
   class JobStepResult
   {
   private:
-    JobStepStatus status_;
+    JobStepCode status_;
     
   public:
-    explicit JobStepResult(JobStepStatus status) :
+    explicit JobStepResult(JobStepCode status) :
     status_(status)
     {
     }
@@ -314,7 +314,7 @@
     {
     }
 
-    JobStepStatus GetStatus() const
+    JobStepCode GetCode() const
     {
       return status_;
     }
@@ -328,7 +328,7 @@
 
   public:
     RetryResult(unsigned int timeout) :
-    JobStepResult(JobStepStatus_Retry),
+    JobStepResult(JobStepCode_Retry),
     timeout_(timeout)
     {
     }
@@ -357,6 +357,34 @@
   };
 
 
+  struct JobStatus
+  {
+    ErrorCode      errorCode_;
+    float          progress_;
+    Json::Value    description_;
+
+    JobStatus() :
+      errorCode_(ErrorCode_Success),
+      progress_(0),
+      description_(Json::objectValue)
+    {
+    }
+
+    JobStatus(ErrorCode code,
+              float progress) :
+      errorCode_(code),
+      progress_(progress),
+      description_(Json::objectValue)
+    {
+      if (progress < 0 ||
+          progress > 1)
+      {
+        throw OrthancException(ErrorCode_ParameterOutOfRange);
+      }
+    }
+  };
+
+
   class JobInfo
   {
   private:
@@ -364,39 +392,30 @@
     int                               priority_;
     ErrorCode                         errorCode_;
     JobState                          state_;
-    float                             progress_;
     boost::posix_time::ptime          infoTime_;
     boost::posix_time::ptime          creationTime_;
     boost::posix_time::time_duration  runtime_;
     boost::posix_time::ptime          eta_;
-    Json::Value                       status_;
+    JobStatus                         status_;
 
   public:
     JobInfo(const std::string& id,
             int priority,
-            ErrorCode errorCode,
             JobState state,
-            float progress,
+            const JobStatus& status,
             const boost::posix_time::ptime& creationTime,
             const boost::posix_time::time_duration& runtime) :
       id_(id),
       priority_(priority),
-      errorCode_(errorCode),
       state_(state),
-      progress_(progress),
       infoTime_(boost::posix_time::microsec_clock::universal_time()),
       creationTime_(creationTime),
-      runtime_(runtime)
+      runtime_(runtime),
+      status_(status)
     {
-      if (progress < 0 ||
-          progress > 1)
-      {
-        throw OrthancException(ErrorCode_ParameterOutOfRange);
-      }
-
-      float r = static_cast<float>(runtime_.total_milliseconds());
-
-      eta_ = infoTime_ + boost::posix_time::milliseconds(boost::math::llround(1.0f - progress) * r);
+      float ms = static_cast<float>(runtime_.total_milliseconds());
+      float remaining = boost::math::llround(1.0f - status_.progress_) * ms;
+      eta_ = infoTime_ + boost::posix_time::milliseconds(remaining);
     }
 
     const std::string& GetIdentifier() const
@@ -419,11 +438,6 @@
       return state_;
     }
 
-    float GetProgress() const
-    {
-      return progress_;
-    }
-
     const boost::posix_time::ptime& GetInfoTime() const
     {
       return infoTime_;
@@ -444,12 +458,12 @@
       return eta_;
     }
 
-    const Json::Value& GetStatus() const
+    const JobStatus& GetStatus() const
     {
       return status_;
     }
 
-    Json::Value& GetStatus()
+    JobStatus& GetStatus()
     {
       return status_;
     }
@@ -457,21 +471,18 @@
 
 
   class JobHandler : public boost::noncopyable
-  {
+  {   
   private:
     std::string                       id_;
     JobState                          state_;
-    boost::mutex                      jobMutex_;
     std::auto_ptr<IJob>               job_;
     int                               priority_;  // "+inf()" means highest priority
     boost::posix_time::ptime          creationTime_;
-    boost::posix_time::ptime          lastStateChangeTime;
+    boost::posix_time::ptime          lastStateChangeTime_;
+    boost::posix_time::time_duration  runtime_;
     boost::posix_time::ptime          retryTime_;
-    boost::posix_time::time_duration  runtime_;
     bool                              pauseScheduled_;
-    ErrorCode                         lastErrorCode_;
-    float                             lastProgress_;
-    Json::Value                       lastStatus_;
+    JobStatus                         lastStatus_;
 
     void SetStateInternal(JobState state) 
     {
@@ -479,11 +490,11 @@
 
       if (state_ == JobState_Running)
       {
-        runtime_ += (now - lastStateChangeTime);
+        runtime_ += (now - lastStateChangeTime_);
       }
 
       state_ = state;
-      lastStateChangeTime = now;
+      lastStateChangeTime_ = now;
       pauseScheduled_ = false;
     }
 
@@ -495,11 +506,10 @@
       job_(job),
       priority_(priority),
       creationTime_(boost::posix_time::microsec_clock::universal_time()),
-      lastStateChangeTime(creationTime_),
+      lastStateChangeTime_(creationTime_),
       runtime_(boost::posix_time::milliseconds(0)),
-      pauseScheduled_(false),
-      lastErrorCode_(ErrorCode_Success),
-      lastProgress_(0)
+      retryTime_(creationTime_),
+      pauseScheduled_(false)
     {
       if (job == NULL)
       {
@@ -512,6 +522,12 @@
       return id_;
     }
 
+    IJob& GetJob() const
+    {
+      assert(job_.get() != NULL);
+      return *job_;
+    }
+
     void SetPriority(int priority)
     {
       priority_ = priority;
@@ -585,35 +601,15 @@
       }
     }
 
-    class JobLock
+    JobStatus& GetLastStatus()
     {
-    private:
-      boost::mutex::scoped_lock  lock_;
-      JobHandler&                handler_;
-
-    public:
-      JobLock(JobHandler& handler) :
-      lock_(handler.jobMutex_),
-      handler_(handler)
-      {
-      }
+      return lastStatus_;
+    }
 
-      IJob& GetJob()
-      {
-        return *handler_.job_;
-      }
-
-      void UpdateStatus()
-      {
-        handler_.lastProgress_ = handler_.job_->GetProgress();
-        handler_.job_->FormatStatus(handler_.lastStatus_);
-      }
-
-      void SetLastErrorCode(ErrorCode code)
-      {
-        handler_.lastErrorCode_ = code;
-      }
-    };
+    const JobStatus& GetLastStatus() const
+    {
+      return lastStatus_;
+    }
   };
 
 
@@ -771,7 +767,6 @@
     LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
               << ": " << job.GetId();
 
-    boost::mutex::scoped_lock lock(mutex_);
     CheckInvariants();
     assert(job.GetState() == JobState_Running);
 
@@ -789,7 +784,6 @@
   {
     LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();
 
-    boost::mutex::scoped_lock lock(mutex_);
     CheckInvariants();
 
     assert(job.GetState() == JobState_Running &&
@@ -806,7 +800,6 @@
   {
     LOG(INFO) << "Job paused: " << job.GetId();
 
-    boost::mutex::scoped_lock lock(mutex_);
     CheckInvariants();
     assert(job.GetState() == JobState_Running);
 
@@ -816,35 +809,6 @@
   }
 
 
-  JobHandler* WaitPendingJob(unsigned int timeout)
-  {
-    boost::mutex::scoped_lock lock(mutex_);
-
-    while (pendingJobs_.empty())
-    {
-      if (timeout == 0)
-      {
-        pendingJobAvailable_.wait(lock);
-      }
-      else
-      {
-        bool success = pendingJobAvailable_.timed_wait
-          (lock, boost::posix_time::milliseconds(timeout));
-        if (!success)
-        {
-          return NULL;
-        }
-      }
-    }
-
-    JobHandler* job = pendingJobs_.top();
-    pendingJobs_.pop();
-      
-    job->SetState(JobState_Running);
-    return job;
-  }
-
-
 public:
   JobsRegistry() :
     maxCompletedJobs_(10)
@@ -1148,42 +1112,81 @@
   class RunningJob : public boost::noncopyable
   {
   private:
-    JobsRegistry&  that_;
-    JobHandler*    handler_;
+    JobsRegistry&  registry_;
+    JobHandler*    handler_;  // Can only be accessed if the registry
+                              // mutex is locked!
+    IJob*          job_;  // Will by design be in mutual exclusion,
+                          // because only one RunningJob can be
+                          // executed at a time on a JobHandler
+
+    std::string    id_;
+    int            priority_;
     JobState       targetState_;
-    unsigned int   retryTimeout_;
+    unsigned int   targetRetryTimeout_;
       
   public:
-    RunningJob(JobsRegistry& that,
+    RunningJob(JobsRegistry& registry,
                unsigned int timeout) :
-      that_(that),
+      registry_(registry),
       handler_(NULL),
       targetState_(JobState_Failure),
-      retryTimeout_(0)
+      targetRetryTimeout_(0)
     {
-      handler_ = that_.WaitPendingJob(timeout);
+      {
+        boost::mutex::scoped_lock lock(registry_.mutex_);
+
+        while (registry_.pendingJobs_.empty())
+        {
+          if (timeout == 0)
+          {
+            registry_.pendingJobAvailable_.wait(lock);
+          }
+          else
+          {
+            bool success = registry_.pendingJobAvailable_.timed_wait
+              (lock, boost::posix_time::milliseconds(timeout));
+            if (!success)
+            {
+              // No pending job
+              return;
+            }
+          }
+        }
+
+        handler_ = registry_.pendingJobs_.top();
+        registry_.pendingJobs_.pop();
+
+        assert(handler_->GetState() == JobState_Pending);
+        handler_->SetState(JobState_Running);
+
+        job_ = &handler_->GetJob();
+        id_ = handler_->GetId();
+        priority_ = handler_->GetPriority();
+      }
     }
 
     ~RunningJob()
     {
       if (IsValid())
       {
+        boost::mutex::scoped_lock lock(registry_.mutex_);
+
         switch (targetState_)
         {
           case JobState_Failure:
-            that_.MarkRunningAsCompleted(*handler_, false);
+            registry_.MarkRunningAsCompleted(*handler_, false);
             break;
 
           case JobState_Success:
-            that_.MarkRunningAsCompleted(*handler_, true);
+            registry_.MarkRunningAsCompleted(*handler_, true);
             break;
 
           case JobState_Paused:
-            that_.MarkRunningAsPaused(*handler_);
+            registry_.MarkRunningAsPaused(*handler_);
             break;            
 
           case JobState_Retry:
-            that_.MarkRunningAsRetry(*handler_, retryTimeout_);
+            registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
             break;
             
           default:
@@ -1194,31 +1197,32 @@
 
     bool IsValid() const
     {
-      return handler_ != NULL;
+      return (handler_ != NULL &&
+              job_ != NULL);
     }
 
     const std::string& GetId() const
     {
-      if (IsValid())
+      if (!IsValid())
       {
-        return handler_->GetId();
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
       }
       else
       {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }      
+        return id_;
+      }
     }
 
     int GetPriority() const
     {
-      if (IsValid())
+      if (!IsValid())
       {
-        return handler_->GetPriority();
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
       }
       else
       {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }      
+        return priority_;
+      }
     }
 
     bool IsPauseScheduled()
@@ -1227,12 +1231,14 @@
       {
         throw OrthancException(ErrorCode_BadSequenceOfCalls);
       }
-
-      boost::mutex::scoped_lock lock(that_.mutex_);
-      that_.CheckInvariants();
-      assert(handler_->GetState() == JobState_Running);
-
-      return handler_->IsPauseScheduled();
+      else
+      {
+        boost::mutex::scoped_lock lock(registry_.mutex_);
+        registry_.CheckInvariants();
+        assert(handler_->GetState() == JobState_Running);
+        
+        return handler_->IsPauseScheduled();
+      }
     }
 
     void MarkSuccess()
@@ -1241,8 +1247,10 @@
       {
         throw OrthancException(ErrorCode_BadSequenceOfCalls);
       }
-
-      targetState_ = JobState_Success;
+      else
+      {
+        targetState_ = JobState_Success;
+      }
     }
 
     void MarkFailure()
@@ -1251,18 +1259,22 @@
       {
         throw OrthancException(ErrorCode_BadSequenceOfCalls);
       }
-
-      targetState_ = JobState_Failure;
+      else
+      {
+        targetState_ = JobState_Failure;
+      }
     }
 
-    void SchedulePause()
+    void MarkPause()
     {
       if (!IsValid())
       {
         throw OrthancException(ErrorCode_BadSequenceOfCalls);
       }
-
-      targetState_ = JobState_Paused;
+      else
+      {
+        targetState_ = JobState_Paused;
+      }
     }
 
     void MarkRetry(unsigned int timeout)
@@ -1271,83 +1283,88 @@
       {
         throw OrthancException(ErrorCode_BadSequenceOfCalls);
       }
-
-      targetState_ = JobState_Retry;
-      retryTimeout_ = timeout;
+      else
+      {
+        targetState_ = JobState_Retry;
+        targetRetryTimeout_ = timeout;
+      }
     }
 
-    void ExecuteStep()
+    /*void ExecuteStep()
     {
       if (!IsValid())
       {
         throw OrthancException(ErrorCode_BadSequenceOfCalls);
       }
 
-      if (handler_->IsPauseScheduled())
+      if (IsPauseScheduled())
       {
         targetState_ = JobState_Paused;
         return;
       }
 
       std::auto_ptr<JobStepResult> result;
+      ErrorCode code;
 
       {
-        JobHandler::JobLock lock(*handler_);
-
         bool ok = false;
 
         try
         {
-          result.reset(lock.GetJob().ExecuteStep());
-          lock.UpdateStatus();
+          result.reset(job_->ExecuteStep());
           ok = true;
+
+          if (result->GetCode() == JobStepCode_Failure)
+          {
+            code = ErrorCode_InternalError;            
+          }
         }
         catch (OrthancException& e)
         {
-          lock.SetLastErrorCode(e.GetErrorCode());
+          code = e.GetErrorCode();
         }
         catch (boost::bad_lexical_cast&)
         {
-          lock.SetLastErrorCode(ErrorCode_BadFileFormat);
+          code = ErrorCode_BadFileFormat;
         }
         catch (...)
         {
-          lock.SetLastErrorCode(ErrorCode_InternalError);
+          code = ErrorCode_InternalError;
         }
 
         if (ok)
         {
-          lock.SetLastErrorCode(ErrorCode_Success);
+          code = ErrorCode_Success;
         }
         else
         {
-          result.reset(new JobStepResult(JobStepStatus_Failure));
+          result.reset(new JobStepResult(JobStepCode_Failure));
         }
       }
 
-      switch (result->GetStatus())
+      switch (result->GetCode())
       {
-        case JobStepStatus_Success:
+        case JobStepCode_Success:
           targetState_ = JobState_Success;
           break;
 
-        case JobStepStatus_Failure:
+        case JobStepCode_Failure:
           targetState_ = JobState_Failure;
           break;
 
-        case JobStepStatus_Continue:
+        case JobStepCode_Continue:
           targetState_ = JobState_Running;
           break;
 
-        case JobStepStatus_Retry:
+        case JobStepCode_Retry:
           targetState_ = JobState_Retry;
-          retryTimeout_ = dynamic_cast<RetryResult&>(*result).GetTimeout();
+          targetRetryTimeout_ = dynamic_cast<RetryResult&>(*result).GetTimeout();
           break;
 
         default:
           throw OrthancException(ErrorCode_InternalError);
       }
-    }
+      }*/
   };
 };
 }
@@ -1361,7 +1378,7 @@
 
 public:
   DummyJob() :
-    result_(Orthanc::JobStepStatus_Success)
+    result_(Orthanc::JobStepCode_Success)
   {
   }
 
@@ -1633,7 +1650,7 @@
     ASSERT_TRUE(job.IsValid());
 
     registry.Resubmit(id);
-    job.SchedulePause();
+    job.MarkPause();
     ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
   }