changeset 2559:9b7680dee75d jobs

cont
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 03 May 2018 18:45:47 +0200
parents 57f81b988713
children 7d4a3eca96af
files Resources/ImplementationNotes/JobsEngineStates.dot UnitTestsSources/MultiThreadingTests.cpp
diffstat 2 files changed, 227 insertions(+), 38 deletions(-) [+]
line wrap: on
line diff
--- a/Resources/ImplementationNotes/JobsEngineStates.dot	Thu May 03 15:24:33 2018 +0200
+++ b/Resources/ImplementationNotes/JobsEngineStates.dot	Thu May 03 18:45:47 2018 +0200
@@ -20,4 +20,4 @@
   pending -> paused  [label="Pause()" fontcolor="red"];
   retry -> paused  [label="Pause()" fontcolor="red"];
   running -> paused  [label="Pause()" fontcolor="red"];
- }
+}
--- a/UnitTestsSources/MultiThreadingTests.cpp	Thu May 03 15:24:33 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Thu May 03 18:45:47 2018 +0200
@@ -274,6 +274,7 @@
 
 #include "../Core/Logging.h"
 
+#include <boost/math/special_functions/round.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <queue>
 
@@ -356,18 +357,121 @@
   };
 
 
+  class JobInfo
+  {
+  private:
+    std::string                       id_;
+    int                               priority_;
+    ErrorCode                         errorCode_;
+    JobState                          state_;
+    float                             progress_;
+    boost::posix_time::ptime          infoTime_;
+    boost::posix_time::ptime          creationTime_;
+    boost::posix_time::time_duration  runtime_;
+    boost::posix_time::ptime          eta_;
+    Json::Value                       status_;
+
+  public:
+    JobInfo(const std::string& id,
+            int priority,
+            ErrorCode errorCode,
+            JobState state,
+            float progress,
+            const boost::posix_time::ptime& creationTime,
+            const boost::posix_time::time_duration& runtime) :
+      id_(id),
+      priority_(priority),
+      errorCode_(errorCode),
+      state_(state),
+      progress_(progress),
+      infoTime_(boost::posix_time::microsec_clock::universal_time()),
+      creationTime_(creationTime),
+      runtime_(runtime)
+    {
+      if (progress < 0 ||
+          progress > 1)
+      {
+        throw OrthancException(ErrorCode_ParameterOutOfRange);
+      }
+
+      float r = static_cast<float>(runtime_.total_milliseconds());
+
+      eta_ = infoTime_ + boost::posix_time::milliseconds(boost::math::llround(1.0f - progress) * r);
+    }
+
+    const std::string& GetIdentifier() const
+    {
+      return id_;
+    }
+
+    int GetPriority() const
+    {
+      return priority_;
+    }
+
+    ErrorCode GetErrorCode() const
+    {
+      return errorCode_;
+    }
+
+    JobState GetState() const
+    {
+      return state_;
+    }
+
+    float GetProgress() const
+    {
+      return progress_;
+    }
+
+    const boost::posix_time::ptime& GetInfoTime() const
+    {
+      return infoTime_;
+    }
+
+    const boost::posix_time::ptime& GetCreationTime() const
+    {
+      return creationTime_;
+    }
+
+    const boost::posix_time::time_duration& GetRuntime() const
+    {
+      return runtime_;
+    }
+
+    const boost::posix_time::ptime& GetEstimatedTimeOfArrival() const
+    {
+      return eta_;
+    }
+
+    const Json::Value& GetStatus() const
+    {
+      return status_;
+    }
+
+    Json::Value& GetStatus()
+    {
+      return status_;
+    }
+  };
+
+
   class JobHandler : public boost::noncopyable
   {
   private:
-    std::string               id_;
-    JobState                  state_;
-    std::auto_ptr<IJob>       job_;
-    int                       priority_;  // "+inf()" means highest priority
-    boost::posix_time::ptime  creationTime_;
-    boost::posix_time::ptime  lastUpdateTime_;
-    boost::posix_time::ptime  retryTime_;
-    uint64_t                  runtime_;  // In milliseconds
-    bool                      pauseScheduled_;
+    std::string                       id_;
+    JobState                          state_;
+    boost::mutex                      jobMutex_;
+    std::auto_ptr<IJob>               job_;
+    int                               priority_;  // "+inf()" means highest priority
+    boost::posix_time::ptime          creationTime_;
+    boost::posix_time::ptime          lastStateChangeTime;
+    boost::posix_time::ptime          retryTime_;
+    boost::posix_time::time_duration  runtime_;
+    bool                              pauseScheduled_;
+    ErrorCode                         lastErrorCode_;
+    float                             lastProgress_;
+    Json::Value                       lastStatus_;
 
     void SetStateInternal(JobState state) 
     {
@@ -375,11 +479,11 @@
 
       if (state_ == JobState_Running)
       {
-        runtime_ += (now - lastUpdateTime_).total_milliseconds();
+        runtime_ += (now - lastStateChangeTime);
       }
 
       state_ = state;
-      lastUpdateTime_ = now;
+      lastStateChangeTime = now;
       pauseScheduled_ = false;
     }
 
@@ -391,9 +495,11 @@
       job_(job),
       priority_(priority),
       creationTime_(boost::posix_time::microsec_clock::universal_time()),
-      lastUpdateTime_(creationTime_),
-      runtime_(0),
-      pauseScheduled_(false)
+      lastStateChangeTime(creationTime_),
+      runtime_(boost::posix_time::milliseconds(0)),
+      pauseScheduled_(false),
+      lastErrorCode_(ErrorCode_Success),
+      lastProgress_(0)
     {
       if (job == NULL)
       {
@@ -406,12 +512,6 @@
       return id_;
     }
 
-    IJob& GetJob() const
-    {
-      assert(job_.get() != NULL);
-      return *job_;
-    }
-
     void SetPriority(int priority)
     {
       priority_ = priority;
@@ -484,6 +584,36 @@
         return retryTime_ <= now;
       }
     }
+
+    class JobLock
+    {
+    private:
+      boost::mutex::scoped_lock  lock_;
+      JobHandler&                handler_;
+
+    public:
+      JobLock(JobHandler& handler) :
+      lock_(handler.jobMutex_),
+      handler_(handler)
+      {
+      }
+
+      IJob& GetJob()
+      {
+        return *handler_.job_;
+      }
+
+      void UpdateStatus()
+      {
+        handler_.lastProgress_ = handler_.job_->GetProgress();
+        handler_.job_->FormatStatus(handler_.lastStatus_);
+      }
+
+      void SetLastErrorCode(ErrorCode code)
+      {
+        handler_.lastErrorCode_ = code;
+      }
+    };
   };
 
 
@@ -1019,9 +1149,9 @@
   {
   private:
     JobsRegistry&  that_;
-    JobHandler*   handler_;
-    JobState      targetState_;
-    unsigned int  retryTimeout_;
+    JobHandler*    handler_;
+    JobState       targetState_;
+    unsigned int   retryTimeout_;
       
   public:
     RunningJob(JobsRegistry& that,
@@ -1105,20 +1235,6 @@
       return handler_->IsPauseScheduled();
     }
 
-    IJob& GetJob()
-    {
-      if (!IsValid())
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-
-      boost::mutex::scoped_lock lock(that_.mutex_);
-      that_.CheckInvariants();
-      assert(handler_->GetState() == JobState_Running);
-
-      return handler_->GetJob();
-    }
-
     void MarkSuccess()
     {
       if (!IsValid())
@@ -1159,6 +1275,79 @@
       targetState_ = JobState_Retry;
       retryTimeout_ = timeout;
     }
+
+    void ExecuteStep()
+    {
+      if (!IsValid())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+
+      if (handler_->IsPauseScheduled())
+      {
+        targetState_ = JobState_Paused;
+        return;
+      }
+
+      std::auto_ptr<JobStepResult> result;
+
+      {
+        JobHandler::JobLock lock(*handler_);
+
+        bool ok = false;
+
+        try
+        {
+          result.reset(lock.GetJob().ExecuteStep());
+          lock.UpdateStatus();
+          ok = true;
+        }
+        catch (OrthancException& e)
+        {
+          lock.SetLastErrorCode(e.GetErrorCode());
+        }
+        catch (boost::bad_lexical_cast&)
+        {
+          lock.SetLastErrorCode(ErrorCode_BadFileFormat);
+        }
+        catch (...)
+        {
+          lock.SetLastErrorCode(ErrorCode_InternalError);
+        }
+
+        if (ok)
+        {
+          lock.SetLastErrorCode(ErrorCode_Success);
+        }
+        else
+        {
+          result.reset(new JobStepResult(JobStepStatus_Failure));
+        }
+      }
+
+      switch (result->GetStatus())
+      {
+        case JobStepStatus_Success:
+          targetState_ = JobState_Success;
+          break;
+
+        case JobStepStatus_Failure:
+          targetState_ = JobState_Failure;
+          break;
+
+        case JobStepStatus_Continue:
+          targetState_ = JobState_Running;
+          break;
+
+        case JobStepStatus_Retry:
+          targetState_ = JobState_Retry;
+          retryTimeout_ = dynamic_cast<RetryResult&>(*result).GetTimeout();
+          break;
+
+        default:
+          throw OrthancException(ErrorCode_InternalError);
+      }
+    }
   };
 };
 }