changeset 2950:dc18d5804746

support of JobsHistorySize set to zero
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 30 Nov 2018 17:19:57 +0100
parents e6204cd21443
children 65b20d922e10
files Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsEngine.h Core/JobsEngine/JobsRegistry.cpp Core/JobsEngine/JobsRegistry.h Core/JobsEngine/Operations/SequenceOfOperationsJob.h NEWS OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerJobs/LuaJobManager.cpp OrthancServer/main.cpp Resources/Configuration.json UnitTestsSources/MultiThreadingTests.cpp UnitTestsSources/ServerIndexTests.cpp
diffstat 13 files changed, 153 insertions(+), 90 deletions(-) [+]
line wrap: on
line diff
--- a/Core/JobsEngine/JobsEngine.cpp	Thu Nov 29 20:36:55 2018 +0100
+++ b/Core/JobsEngine/JobsEngine.cpp	Fri Nov 30 17:19:57 2018 +0100
@@ -156,9 +156,9 @@
   }
 
 
-  JobsEngine::JobsEngine() :
+  JobsEngine::JobsEngine(size_t maxCompletedJobs) :
     state_(State_Setup),
-    registry_(new JobsRegistry),
+    registry_(new JobsRegistry(maxCompletedJobs)),
     threadSleep_(200),
     workers_(1)
   {
@@ -198,7 +198,9 @@
       throw OrthancException(ErrorCode_BadSequenceOfCalls);
     }
 
-    registry_.reset(new JobsRegistry(unserializer, serialized));
+    assert(registry_.get() != NULL);
+    const size_t maxCompletedJobs = registry_->GetMaxCompletedJobs();
+    registry_.reset(new JobsRegistry(unserializer, serialized, maxCompletedJobs));
   }
 
 
--- a/Core/JobsEngine/JobsEngine.h	Thu Nov 29 20:36:55 2018 +0100
+++ b/Core/JobsEngine/JobsEngine.h	Fri Nov 30 17:19:57 2018 +0100
@@ -68,7 +68,7 @@
                        size_t workerIndex);
 
   public:
-    JobsEngine();
+    JobsEngine(size_t maxCompletedJobs);
 
     ~JobsEngine();
 
--- a/Core/JobsEngine/JobsRegistry.cpp	Thu Nov 29 20:36:55 2018 +0100
+++ b/Core/JobsEngine/JobsRegistry.cpp	Fri Nov 30 17:19:57 2018 +0100
@@ -47,7 +47,6 @@
   static const char* JOB = "Job";
   static const char* JOBS = "Jobs";
   static const char* JOBS_REGISTRY = "JobsRegistry";
-  static const char* MAX_COMPLETED_JOBS = "MaxCompletedJobs";
   static const char* CREATION_TIME = "CreationTime";
   static const char* LAST_CHANGE_TIME = "LastChangeTime";
   static const char* RUNTIME = "Runtime";
@@ -435,20 +434,19 @@
 
   void JobsRegistry::ForgetOldCompletedJobs()
   {
-    if (maxCompletedJobs_ != 0)
+    while (completedJobs_.size() > maxCompletedJobs_)
     {
-      while (completedJobs_.size() > maxCompletedJobs_)
-      {
-        assert(completedJobs_.front() != NULL);
+      assert(completedJobs_.front() != NULL);
+
+      std::string id = completedJobs_.front()->GetId();
+      assert(jobsIndex_.find(id) != jobsIndex_.end());
 
-        std::string id = completedJobs_.front()->GetId();
-        assert(jobsIndex_.find(id) != jobsIndex_.end());
+      jobsIndex_.erase(id);
+      delete(completedJobs_.front());
+      completedJobs_.pop_front();
+    }
 
-        jobsIndex_.erase(id);
-        delete(completedJobs_.front());
-        completedJobs_.pop_front();
-      }
-    }
+    CheckInvariants();
   }
 
 
@@ -458,26 +456,48 @@
     job.SetState(success ? JobState_Success : JobState_Failure);
 
     completedJobs_.push_back(&job);
-    ForgetOldCompletedJobs();
-
     someJobComplete_.notify_all();
   }
 
 
   void JobsRegistry::MarkRunningAsCompleted(JobHandler& job,
-                                            bool success)
+                                            CompletedReason reason)
   {
-    LOG(INFO) << "Job has completed with " << (success ? "success" : "failure")
-              << ": " << job.GetId();
+    const char* tmp;
+
+    switch (reason)
+    {
+      case CompletedReason_Success:
+        tmp = "success";
+        break;
+
+      case CompletedReason_Failure:
+        tmp = "success";
+        break;
+
+      case CompletedReason_Canceled:
+        tmp = "cancel";
+        break;
+
+      default:
+        throw OrthancException(ErrorCode_InternalError);
+    }
+    
+    LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId();
 
     CheckInvariants();
 
     assert(job.GetState() == JobState_Running);
-    SetCompletedJob(job, success);
+    SetCompletedJob(job, reason == CompletedReason_Success);
+
+    if (reason == CompletedReason_Canceled)
+    {
+      job.SetLastErrorCode(ErrorCode_CanceledJob);
+    }
 
     if (observer_ != NULL)
     {
-      if (success)
+      if (reason == CompletedReason_Success)
       {
         observer_->SignalJobSuccess(job.GetId());
       }
@@ -487,7 +507,9 @@
       }
     }
 
-    CheckInvariants();
+    // WARNING: The following call might make "job" invalid if the job
+    // history size is empty
+    ForgetOldCompletedJobs();
   }
 
 
@@ -558,8 +580,14 @@
 
     maxCompletedJobs_ = n;
     ForgetOldCompletedJobs();
+  }
 
+
+  size_t JobsRegistry::GetMaxCompletedJobs()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
     CheckInvariants();
+    return maxCompletedJobs_;
   }
 
 
@@ -604,17 +632,14 @@
 
 
   void JobsRegistry::SubmitInternal(std::string& id,
-                                    JobHandler* handlerRaw,
-                                    bool keepLastChangeTime)
+                                    JobHandler* handler)
   {
-    if (handlerRaw == NULL)
+    if (handler == NULL)
     {
       throw OrthancException(ErrorCode_NullPointer);
     }
     
-    std::auto_ptr<JobHandler>  handler(handlerRaw);
-
-    boost::posix_time::ptime lastChangeTime = handler->GetLastStateChangeTime();
+    std::auto_ptr<JobHandler>  protection(handler);
 
     {
       boost::mutex::scoped_lock lock(mutex_);
@@ -623,13 +648,15 @@
       id = handler->GetId();
       int priority = handler->GetPriority();
 
+      jobsIndex_.insert(std::make_pair(id, protection.release()));
+
       switch (handler->GetState())
       {
         case JobState_Pending:
         case JobState_Retry:
         case JobState_Running:
           handler->SetState(JobState_Pending);
-          pendingJobs_.push(handler.get());
+          pendingJobs_.push(handler);
           pendingJobAvailable_.notify_one();
           break;
  
@@ -650,13 +677,6 @@
           throw OrthancException(ErrorCode_InternalError);
       }
 
-      if (keepLastChangeTime)
-      {
-        handler->SetLastStateChangeTime(lastChangeTime);
-      }
-    
-      jobsIndex_.insert(std::make_pair(id, handler.release()));
-
       LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
 
       if (observer_ != NULL)
@@ -664,7 +684,9 @@
         observer_->SignalJobSubmitted(id);
       }
 
-      CheckInvariants();
+      // WARNING: The following call might make "handler" invalid if
+      // the job history size is empty
+      ForgetOldCompletedJobs();
     }
   }
 
@@ -673,7 +695,7 @@
                             IJob* job,        // Takes ownership
                             int priority)
   {
-    SubmitInternal(id, new JobHandler(job, priority), false);
+    SubmitInternal(id, new JobHandler(job, priority));
   }
 
 
@@ -681,7 +703,7 @@
                             int priority)
   {
     std::string id;
-    SubmitInternal(id, new JobHandler(job, priority), false);
+    SubmitInternal(id, new JobHandler(job, priority));
   }
 
 
@@ -904,7 +926,10 @@
           throw OrthancException(ErrorCode_InternalError);
       }
 
-      CheckInvariants();
+      // WARNING: The following call might make "handler" invalid if
+      // the job history size is empty
+      ForgetOldCompletedJobs();
+
       return true;
     }
   }
@@ -1091,17 +1116,12 @@
       switch (targetState_)
       {
         case JobState_Failure:
-          registry_.MarkRunningAsCompleted(*handler_, false);
-
-          if (canceled_)
-          {
-            handler_->SetLastErrorCode(ErrorCode_CanceledJob);
-          }
-          
+          registry_.MarkRunningAsCompleted
+            (*handler_, canceled_ ? CompletedReason_Canceled : CompletedReason_Failure);
           break;
 
         case JobState_Success:
-          registry_.MarkRunningAsCompleted(*handler_, true);
+          registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success);
           break;
 
         case JobState_Paused:
@@ -1293,7 +1313,6 @@
 
     target = Json::objectValue;
     target[TYPE] = JOBS_REGISTRY;
-    target[MAX_COMPLETED_JOBS] = static_cast<unsigned int>(maxCompletedJobs_);
     target[JOBS] = Json::objectValue;
     
     for (JobsIndex::const_iterator it = jobsIndex_.begin(); 
@@ -1309,7 +1328,9 @@
 
 
   JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
-                             const Json::Value& s) :
+                             const Json::Value& s,
+                             size_t maxCompletedJobs) :
+    maxCompletedJobs_(maxCompletedJobs),
     observer_(NULL)
   {
     if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
@@ -1319,17 +1340,28 @@
       throw OrthancException(ErrorCode_BadFileFormat);
     }
 
-    maxCompletedJobs_ = SerializationToolbox::ReadUnsignedInteger(s, MAX_COMPLETED_JOBS);
-
     Json::Value::Members members = s[JOBS].getMemberNames();
 
     for (Json::Value::Members::const_iterator it = members.begin();
          it != members.end(); ++it)
     {
       std::auto_ptr<JobHandler> job(new JobHandler(unserializer, s[JOBS][*it], *it));
-      
+
+      const boost::posix_time::ptime lastChangeTime = job->GetLastStateChangeTime();
+
       std::string id;
-      SubmitInternal(id, job.release(), true);
+      SubmitInternal(id, job.release());
+
+      // Check whether the job has not been removed (which could be
+      // the case if the "maxCompletedJobs_" value gets smaller)
+      JobsIndex::iterator found = jobsIndex_.find(id);
+      if (found != jobsIndex_.end())
+      {
+        // The job still lies in the history: Update the time of its
+        // last change to the time that was serialized
+        assert(found->second != NULL);
+        found->second->SetLastStateChangeTime(lastChangeTime);
+      }
     }
   }
 }
--- a/Core/JobsEngine/JobsRegistry.h	Thu Nov 29 20:36:55 2018 +0100
+++ b/Core/JobsEngine/JobsRegistry.h	Fri Nov 30 17:19:57 2018 +0100
@@ -71,6 +71,13 @@
     };
     
   private:
+    enum CompletedReason
+    {
+      CompletedReason_Success,
+      CompletedReason_Failure,
+      CompletedReason_Canceled
+    };
+    
     class JobHandler;
 
     struct PriorityComparator
@@ -115,7 +122,7 @@
                          bool success);
     
     void MarkRunningAsCompleted(JobHandler& job,
-                                bool success);
+                                CompletedReason reason);
 
     void MarkRunningAsRetry(JobHandler& job,
                             unsigned int timeout);
@@ -130,23 +137,25 @@
     void RemoveRetryJob(JobHandler* handler);
       
     void SubmitInternal(std::string& id,
-                        JobHandler* handler,
-                        bool keepLastChangeTime);
+                        JobHandler* handler);
     
   public:
-    JobsRegistry() :
-      maxCompletedJobs_(10),
+    JobsRegistry(size_t maxCompletedJobs) :
+      maxCompletedJobs_(maxCompletedJobs),
       observer_(NULL)
     {
     }
 
     JobsRegistry(IJobUnserializer& unserializer,
-                 const Json::Value& s);
+                 const Json::Value& s,
+                 size_t maxCompletedJobs);
 
     ~JobsRegistry();
 
     void SetMaxCompletedJobs(size_t i);
     
+    size_t GetMaxCompletedJobs();
+
     void ListJobs(std::set<std::string>& target);
 
     bool GetJobInfo(JobInfo& target,
--- a/Core/JobsEngine/Operations/SequenceOfOperationsJob.h	Thu Nov 29 20:36:55 2018 +0100
+++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.h	Fri Nov 30 17:19:57 2018 +0100
@@ -71,6 +71,8 @@
     std::list<IObserver*>             observers_;
     TimeoutDicomConnectionManager     connectionManager_;
 
+    void NotifyDone() const;
+
   public:
     SequenceOfOperationsJob();
 
@@ -96,8 +98,8 @@
 
     public:
       Lock(SequenceOfOperationsJob& that) :
-      that_(that),
-      lock_(that.mutex_)
+        that_(that),
+        lock_(that.mutex_)
       {
       }
 
--- a/NEWS	Thu Nov 29 20:36:55 2018 +0100
+++ b/NEWS	Fri Nov 30 17:19:57 2018 +0100
@@ -39,6 +39,7 @@
 * Orthanc starts even if jobs from a previous execution cannot be unserialized
 * New CMake option "ENABLE_DCMTK_LOG" to disable logging internal to DCMTK
 * Fix issue 114 (Boost 1.68 doesn't support SHA-1 anymore)
+* Support of "JobsHistorySize" set to zero
 * Upgraded dependencies for static and Windows builds:
   - boost 1.68.0
   - lua 5.3.5
--- a/OrthancServer/ServerContext.cpp	Thu Nov 29 20:36:55 2018 +0100
+++ b/OrthancServer/ServerContext.cpp	Fri Nov 30 17:19:57 2018 +0100
@@ -214,7 +214,8 @@
 
   ServerContext::ServerContext(IDatabaseWrapper& database,
                                IStorageArea& area,
-                               bool unitTesting) :
+                               bool unitTesting,
+                               size_t maxCompletedJobs) :
     index_(*this, database, (unitTesting ? 20 : 500)),
     area_(area),
     compressionEnabled_(false),
@@ -224,6 +225,7 @@
     mainLua_(*this),
     filterLua_(*this),
     luaListener_(*this),
+    jobsEngine_(maxCompletedJobs),
 #if ORTHANC_ENABLE_PLUGINS == 1
     plugins_(NULL),
 #endif
--- a/OrthancServer/ServerContext.h	Thu Nov 29 20:36:55 2018 +0100
+++ b/OrthancServer/ServerContext.h	Fri Nov 30 17:19:57 2018 +0100
@@ -160,12 +160,17 @@
     DicomCacheProvider provider_;
     boost::mutex dicomCacheMutex_;
     MemoryCache dicomCache_;
-    JobsEngine jobsEngine_;
 
     LuaScripting mainLua_;
     LuaScripting filterLua_;
     LuaServerListener  luaListener_;
 
+    // The "JobsEngine" must be *after* "LuaScripting", as
+    // "LuaScripting" embeds "LuaJobManager" that registers as an
+    // observer to "SequenceOfOperationsJob", whose lifetime
+    // corresponds to that of "JobsEngine"
+    JobsEngine jobsEngine_;
+    
 #if ORTHANC_ENABLE_PLUGINS == 1
     OrthancPlugins* plugins_;
 #endif
@@ -206,7 +211,8 @@
 
     ServerContext(IDatabaseWrapper& database,
                   IStorageArea& area,
-                  bool unitTesting);
+                  bool unitTesting,
+                  size_t maxCompletedJobs);
 
     ~ServerContext();
 
--- a/OrthancServer/ServerJobs/LuaJobManager.cpp	Thu Nov 29 20:36:55 2018 +0100
+++ b/OrthancServer/ServerJobs/LuaJobManager.cpp	Fri Nov 30 17:19:57 2018 +0100
@@ -143,6 +143,7 @@
       // Need to create a new job, as the previous one is either
       // finished, or is getting too long
       that_.currentJob_ = new SequenceOfOperationsJob;
+      that_.currentJob_->Register(that_);
       that_.currentJob_->SetDescription("Lua");
 
       {
--- a/OrthancServer/main.cpp	Thu Nov 29 20:36:55 2018 +0100
+++ b/OrthancServer/main.cpp	Fri Nov 30 17:19:57 2018 +0100
@@ -1085,8 +1085,8 @@
                                    OrthancPlugins *plugins,
                                    bool loadJobsFromDatabase)
 {
-  ServerContext context(database, storageArea, false /* not running unit tests */);
-
+  size_t maxCompletedJobs;
+  
   {
     OrthancConfiguration::ReaderLock lock;
 
@@ -1101,6 +1101,15 @@
     HttpClient::SetDefaultProxy(lock.GetConfiguration().GetStringParameter("HttpProxy", ""));
     
     DicomUserConnection::SetDefaultTimeout(lock.GetConfiguration().GetUnsignedIntegerParameter("DicomScuTimeout", 10));
+
+    maxCompletedJobs = lock.GetConfiguration().GetUnsignedIntegerParameter("JobsHistorySize", 10);
+  }
+  
+  ServerContext context(database, storageArea, false /* not running unit tests */, maxCompletedJobs);
+
+  {
+    OrthancConfiguration::ReaderLock lock;
+
     context.SetCompressionEnabled(lock.GetConfiguration().GetBooleanParameter("StorageCompression", false));
     context.SetStoreMD5ForAttachments(lock.GetConfiguration().GetBooleanParameter("StoreMD5ForAttachments", true));
 
@@ -1125,9 +1134,6 @@
     {
       context.GetIndex().SetMaximumStorageSize(0);
     }
-
-    context.GetJobsEngine().GetRegistry().SetMaxCompletedJobs
-      (lock.GetConfiguration().GetUnsignedIntegerParameter("JobsHistorySize", 10));
   }
 
   {
--- a/Resources/Configuration.json	Thu Nov 29 20:36:55 2018 +0100
+++ b/Resources/Configuration.json	Fri Nov 30 17:19:57 2018 +0100
@@ -430,7 +430,9 @@
 
   // Maximum number of completed jobs that are kept in memory. A
   // processing job is considered as complete once it is tagged as
-  // "Success" or "Failure".
+  // "Success" or "Failure". Since Orthanc 1.4.3, a value of "0"
+  // indicates to keep no job in memory (i.e. jobs are removed from
+  // the history as soon as they are completed).
   "JobsHistorySize" : 10,
 
   // Specifies how Orthanc reacts when it receives a DICOM instance
--- a/UnitTestsSources/MultiThreadingTests.cpp	Thu Nov 29 20:36:55 2018 +0100
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Fri Nov 30 17:19:57 2018 +0100
@@ -329,7 +329,7 @@
 
 TEST(JobsRegistry, Priority)
 {
-  JobsRegistry registry;
+  JobsRegistry registry(10);
 
   std::string i1, i2, i3, i4;
   registry.Submit(i1, new DummyJob(), 10);
@@ -408,7 +408,7 @@
 
 TEST(JobsRegistry, Simultaneous)
 {
-  JobsRegistry registry;
+  JobsRegistry registry(10);
 
   std::string i1, i2;
   registry.Submit(i1, new DummyJob(), 20);
@@ -438,7 +438,7 @@
 
 TEST(JobsRegistry, Resubmit)
 {
-  JobsRegistry registry;
+  JobsRegistry registry(10);
 
   std::string id;
   registry.Submit(id, new DummyJob(), 10);
@@ -482,7 +482,7 @@
 
 TEST(JobsRegistry, Retry)
 {
-  JobsRegistry registry;
+  JobsRegistry registry(10);
 
   std::string id;
   registry.Submit(id, new DummyJob(), 10);
@@ -519,7 +519,7 @@
 
 TEST(JobsRegistry, PausePending)
 {
-  JobsRegistry registry;
+  JobsRegistry registry(10);
 
   std::string id;
   registry.Submit(id, new DummyJob(), 10);
@@ -542,7 +542,7 @@
 
 TEST(JobsRegistry, PauseRunning)
 {
-  JobsRegistry registry;
+  JobsRegistry registry(10);
 
   std::string id;
   registry.Submit(id, new DummyJob(), 10);
@@ -580,7 +580,7 @@
 
 TEST(JobsRegistry, PauseRetry)
 {
-  JobsRegistry registry;
+  JobsRegistry registry(10);
 
   std::string id;
   registry.Submit(id, new DummyJob(), 10);
@@ -617,7 +617,7 @@
 
 TEST(JobsRegistry, Cancel)
 {
-  JobsRegistry registry;
+  JobsRegistry registry(10);
 
   std::string id;
   registry.Submit(id, new DummyJob(), 10);
@@ -711,7 +711,7 @@
 
 TEST(JobsEngine, SubmitAndWait)
 {
-  JobsEngine engine;
+  JobsEngine engine(10);
   engine.SetThreadSleep(10);
   engine.SetWorkersCount(3);
   engine.Start();
@@ -731,7 +731,7 @@
 
 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob)
 {
-  JobsEngine engine;
+  JobsEngine engine(10);
   engine.SetThreadSleep(10);
   engine.SetWorkersCount(3);
   engine.Start();
@@ -771,7 +771,7 @@
 
 TEST(JobsEngine, DISABLED_Lua)
 {
-  JobsEngine engine;
+  JobsEngine engine(10);
   engine.SetThreadSleep(10);
   engine.SetWorkersCount(2);
   engine.Start();
@@ -1282,7 +1282,7 @@
     OrthancJobsSerialization()
     {
       db_.Open();
-      context_.reset(new ServerContext(db_, storage_, true /* running unit tests */));
+      context_.reset(new ServerContext(db_, storage_, true /* running unit tests */, 10));
       context_->SetupJobsEngine(true, false);
     }
 
@@ -1704,7 +1704,7 @@
   std::string i1, i2;
 
   {
-    JobsRegistry registry;
+    JobsRegistry registry(10);
     registry.Submit(i1, new DummyJob(), 10);
     registry.Submit(i2, new SequenceOfOperationsJob(), 30);
     registry.Serialize(s);
@@ -1712,7 +1712,7 @@
 
   {
     DummyUnserializer unserializer;
-    JobsRegistry registry(unserializer, s);
+    JobsRegistry registry(unserializer, s, 10);
 
     Json::Value t;
     registry.Serialize(t);
--- a/UnitTestsSources/ServerIndexTests.cpp	Thu Nov 29 20:36:55 2018 +0100
+++ b/UnitTestsSources/ServerIndexTests.cpp	Fri Nov 30 17:19:57 2018 +0100
@@ -677,7 +677,7 @@
   FilesystemStorage storage(path);
   DatabaseWrapper db;   // The SQLite DB is in memory
   db.Open();
-  ServerContext context(db, storage, true /* running unit tests */);
+  ServerContext context(db, storage, true /* running unit tests */, 10);
   context.SetupJobsEngine(true, false);
 
   ServerIndex& index = context.GetIndex();
@@ -777,7 +777,7 @@
   FilesystemStorage storage(path);
   DatabaseWrapper db;   // The SQLite DB is in memory
   db.Open();
-  ServerContext context(db, storage, true /* running unit tests */);
+  ServerContext context(db, storage, true /* running unit tests */, 10);
   context.SetupJobsEngine(true, false);
   ServerIndex& index = context.GetIndex();
 
@@ -865,7 +865,7 @@
     MemoryStorageArea storage;
     DatabaseWrapper db;   // The SQLite DB is in memory
     db.Open();
-    ServerContext context(db, storage, true /* running unit tests */);
+    ServerContext context(db, storage, true /* running unit tests */, 10);
     context.SetupJobsEngine(true, false);
     context.SetCompressionEnabled(true);