changeset 2665:389d050a2e66 jobs

fix deadlock, speed up unit tests
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 08 Jun 2018 13:51:31 +0200
parents a21b244efb37
children 2540ac79ab6c
files Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsEngine.h Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp OrthancServer/LuaScripting.cpp OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerIndex.cpp OrthancServer/ServerIndex.h OrthancServer/main.cpp UnitTestsSources/MultiThreadingTests.cpp UnitTestsSources/ServerIndexTests.cpp
diffstat 11 files changed, 104 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/Core/JobsEngine/JobsEngine.cpp	Thu Jun 07 21:37:40 2018 +0200
+++ b/Core/JobsEngine/JobsEngine.cpp	Fri Jun 08 13:51:31 2018 +0200
@@ -120,7 +120,7 @@
 
     while (engine->IsRunning())
     {
-      boost::this_thread::sleep(boost::posix_time::milliseconds(200));
+      boost::this_thread::sleep(boost::posix_time::milliseconds(engine->threadSleep_));
       engine->GetRegistry().ScheduleRetries();
     }
   }
@@ -135,7 +135,7 @@
 
     while (engine->IsRunning())
     {
-      JobsRegistry::RunningJob running(engine->GetRegistry(), 100);
+      JobsRegistry::RunningJob running(engine->GetRegistry(), engine->threadSleep_);
 
       if (running.IsValid())
       {
@@ -156,6 +156,7 @@
 
   JobsEngine::JobsEngine() :
     state_(State_Setup),
+    threadSleep_(200),
     workers_(1)
   {
   }
@@ -184,7 +185,21 @@
 
     workers_.resize(count);
   }
-    
+
+
+  void JobsEngine::SetThreadSleep(unsigned int sleep)
+  {
+    boost::mutex::scoped_lock lock(stateMutex_);
+      
+    if (state_ != State_Setup)
+    {
+      // Can only be invoked before calling "Start()"
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+
+    threadSleep_ = sleep;
+  }
+
 
   void JobsEngine::Start()
   {
--- a/Core/JobsEngine/JobsEngine.h	Thu Jun 07 21:37:40 2018 +0200
+++ b/Core/JobsEngine/JobsEngine.h	Fri Jun 08 13:51:31 2018 +0200
@@ -54,6 +54,7 @@
     State                        state_;
     JobsRegistry                 registry_;
     boost::thread                retryHandler_;
+    unsigned int                 threadSleep_;
     std::vector<boost::thread*>  workers_;
 
     bool IsRunning();
@@ -72,6 +73,8 @@
     ~JobsEngine();
 
     void SetWorkersCount(size_t count);
+
+    void SetThreadSleep(unsigned int sleep);
     
     JobsRegistry& GetRegistry()
     {
--- a/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp	Thu Jun 07 21:37:40 2018 +0200
+++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp	Fri Jun 08 13:51:31 2018 +0200
@@ -365,6 +365,8 @@
   {
     boost::mutex::scoped_lock lock(mutex_);
 
+    value = Json::objectValue;
+    
     Json::Value tmp = Json::arrayValue;
     for (size_t i = 0; i < operations_.size(); i++)
     {
--- a/OrthancServer/LuaScripting.cpp	Thu Jun 07 21:37:40 2018 +0200
+++ b/OrthancServer/LuaScripting.cpp	Fri Jun 08 13:51:31 2018 +0200
@@ -581,6 +581,7 @@
     {
       LOG(INFO) << "Starting the Lua engine";
       eventThread_ = boost::thread(EventThread, this);
+      state_ = State_Running;
     }
   }
 
--- a/OrthancServer/ServerContext.cpp	Thu Jun 07 21:37:40 2018 +0200
+++ b/OrthancServer/ServerContext.cpp	Fri Jun 08 13:51:31 2018 +0200
@@ -107,8 +107,9 @@
 
 
   ServerContext::ServerContext(IDatabaseWrapper& database,
-                               IStorageArea& area) :
-    index_(*this, database),
+                               IStorageArea& area,
+                               bool unitTesting) :
+    index_(*this, database, (unitTesting ? 20 : 500)),
     area_(area),
     compressionEnabled_(false),
     storeMD5_(true),
@@ -126,6 +127,7 @@
 
     jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2));
     //jobsEngine_.SetMaxCompleted   // TODO
+    jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200);
     jobsEngine_.Start();
 
     changeThread_ = boost::thread(ChangeThread, this);
--- a/OrthancServer/ServerContext.h	Thu Jun 07 21:37:40 2018 +0200
+++ b/OrthancServer/ServerContext.h	Fri Jun 08 13:51:31 2018 +0200
@@ -158,7 +158,8 @@
     };
 
     ServerContext(IDatabaseWrapper& database,
-                  IStorageArea& area);
+                  IStorageArea& area,
+                  bool unitTesting);
 
     ~ServerContext();
 
--- a/OrthancServer/ServerIndex.cpp	Thu Jun 07 21:37:40 2018 +0200
+++ b/OrthancServer/ServerIndex.cpp	Fri Jun 08 13:51:31 2018 +0200
@@ -342,7 +342,8 @@
   }
 
 
-  void ServerIndex::FlushThread(ServerIndex* that)
+  void ServerIndex::FlushThread(ServerIndex* that,
+                                unsigned int threadSleep)
   {
     // By default, wait for 10 seconds before flushing
     unsigned int sleep = 10;
@@ -368,7 +369,7 @@
 
     while (!that->done_)
     {
-      boost::this_thread::sleep(boost::posix_time::seconds(1));
+      boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleep));
       count++;
       if (count < sleep)
       {
@@ -538,7 +539,8 @@
 
 
   ServerIndex::ServerIndex(ServerContext& context,
-                           IDatabaseWrapper& db) : 
+                           IDatabaseWrapper& db,
+                           unsigned int threadSleep) : 
     done_(false),
     db_(db),
     maximumStorageSize_(0),
@@ -555,10 +557,11 @@
 
     if (db.HasFlushToDisk())
     {
-      flushThread_ = boost::thread(FlushThread, this);
+      flushThread_ = boost::thread(FlushThread, this, threadSleep);
     }
 
-    unstableResourcesMonitorThread_ = boost::thread(UnstableResourcesMonitorThread, this);
+    unstableResourcesMonitorThread_ = boost::thread
+      (UnstableResourcesMonitorThread, this, threadSleep);
   }
 
 
@@ -1878,7 +1881,8 @@
   }
 
 
-  void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that)
+  void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that,
+                                                   unsigned int threadSleep)
   {
     int stableAge = Configuration::GetGlobalUnsignedIntegerParameter("StableAge", 60);
     if (stableAge <= 0)
@@ -1890,8 +1894,8 @@
 
     while (!that->done_)
     {
-      // Check for stable resources each second
-      boost::this_thread::sleep(boost::posix_time::seconds(1));
+      // Check for stable resources each few seconds
+      boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleep));
 
       boost::mutex::scoped_lock lock(that->mutex_);
 
--- a/OrthancServer/ServerIndex.h	Thu Jun 07 21:37:40 2018 +0200
+++ b/OrthancServer/ServerIndex.h	Fri Jun 08 13:51:31 2018 +0200
@@ -74,9 +74,11 @@
     uint64_t maximumStorageSize_;
     unsigned int maximumPatients_;
 
-    static void FlushThread(ServerIndex* that);
+    static void FlushThread(ServerIndex* that,
+                            unsigned int threadSleep);
 
-    static void UnstableResourcesMonitorThread(ServerIndex* that);
+    static void UnstableResourcesMonitorThread(ServerIndex* that,
+                                               unsigned int threadSleep);
 
     void MainDicomTagsToJson(Json::Value& result,
                              int64_t resourceId,
@@ -124,7 +126,8 @@
 
   public:
     ServerIndex(ServerContext& context,
-                IDatabaseWrapper& database);
+                IDatabaseWrapper& database,
+                unsigned int threadSleep);
 
     ~ServerIndex();
 
--- a/OrthancServer/main.cpp	Thu Jun 07 21:37:40 2018 +0200
+++ b/OrthancServer/main.cpp	Fri Jun 08 13:51:31 2018 +0200
@@ -969,7 +969,7 @@
 
   DicomUserConnection::SetDefaultTimeout(Configuration::GetGlobalUnsignedIntegerParameter("DicomScuTimeout", 10));
 
-  ServerContext context(database, storageArea);
+  ServerContext context(database, storageArea, false /* not running unit tests */);
   context.SetCompressionEnabled(Configuration::GetGlobalBoolParameter("StorageCompression", false));
   context.SetStoreMD5ForAttachments(Configuration::GetGlobalBoolParameter("StoreMD5ForAttachments", true));
 
--- a/UnitTestsSources/MultiThreadingTests.cpp	Thu Jun 07 21:37:40 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Fri Jun 08 13:51:31 2018 +0200
@@ -666,6 +666,7 @@
 TEST(JobsEngine, SubmitAndWait)
 {
   JobsEngine engine;
+  engine.SetThreadSleep(10);
   engine.SetWorkersCount(3);
   engine.Start();
 
@@ -679,6 +680,7 @@
 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob)
 {
   JobsEngine engine;
+  engine.SetThreadSleep(10);
   engine.SetWorkersCount(3);
   engine.Start();
 
@@ -714,6 +716,7 @@
 TEST(JobsEngine, DISABLED_Lua)
 {
   JobsEngine engine;
+  engine.SetThreadSleep(10);
   engine.SetWorkersCount(2);
   engine.Start();
 
@@ -778,6 +781,8 @@
     values.Append(new NullOperationValue);
     values.Append(new StringOperationValue("hello"));
     values.Append(new StringOperationValue("world"));
+
+    s = 42;
     values.Serialize(s);
   }
 
@@ -801,6 +806,8 @@
 
   {
     NullOperationValue null;
+
+    s = 42;
     null.Serialize(s);
   }
 
@@ -815,6 +822,8 @@
 
   {
     StringOperationValue str("Hello");
+
+    s = 42;
     str.Serialize(s);
   }
 
@@ -833,6 +842,8 @@
 
   {
     LogJobOperation operation;
+
+    s = 42;
     operation.Serialize(s);
   }
 
@@ -868,6 +879,8 @@
     job.Start();
     job.ExecuteStep();
     job.ExecuteStep();
+
+    s = 42;
     ASSERT_TRUE(job.Serialize(s));
   }
 
@@ -891,8 +904,6 @@
     ASSERT_EQ("world", tmp.GetInstance(2));
     ASSERT_TRUE(tmp.IsFailedInstance("nope"));
   }
-
-  // TODO : Test SequenceOfOperationsJob.h
 }
 
 
@@ -927,6 +938,8 @@
     modification.Replace(DICOM_TAG_PATIENT_NAME, "Test 4", true);
 
     modification.Apply(*modified);
+
+    s = 42;
     modification.Serialize(s);
   }
 
@@ -959,6 +972,8 @@
 
   {
     DicomInstanceOrigin origin;
+
+    s = 42;
     origin.Serialize(s);
   }
 
@@ -973,6 +988,8 @@
 
   {
     DicomInstanceOrigin origin(DicomInstanceOrigin::FromDicomProtocol("host", "aet", "called"));
+
+    s = 42;
     origin.Serialize(s);
   }
 
@@ -987,6 +1004,8 @@
 
   {
     DicomInstanceOrigin origin(DicomInstanceOrigin::FromHttp("host", "username"));
+
+    s = 42;
     origin.Serialize(s);
   }
 
@@ -1001,6 +1020,8 @@
 
   {
     DicomInstanceOrigin origin(DicomInstanceOrigin::FromLua());
+
+    s = 42;
     origin.Serialize(s);
   }
 
@@ -1011,6 +1032,8 @@
 
   {
     DicomInstanceOrigin origin(DicomInstanceOrigin::FromPlugins());
+
+    s = 42;
     origin.Serialize(s);
   }
 
@@ -1041,7 +1064,7 @@
     OrthancJobsSerialization()
     {
       db_.Open();
-      context_.reset(new ServerContext(db_, storage_));
+      context_.reset(new ServerContext(db_, storage_, true /* running unit tests */));
     }
 
     virtual ~OrthancJobsSerialization()
@@ -1081,6 +1104,8 @@
 
   {
     DicomInstanceOperationValue instance(GetContext(), id);
+
+    s = 42;
     instance.Serialize(s);
   }
 
@@ -1113,6 +1138,8 @@
   
   {
     DeleteResourceOperation operation(GetContext());
+
+    s = 42;
     operation.Serialize(s);
   }
 
@@ -1136,6 +1163,8 @@
     peer.SetPkcs11Enabled(true);
 
     StorePeerOperation operation(peer);
+
+    s = 42;
     operation.Serialize(s);
   }
 
@@ -1159,6 +1188,8 @@
     modality.SetManufacturer(ModalityManufacturer_StoreScp);
 
     StoreScuOperation operation("TEST", modality);
+
+    s = 42;
     operation.Serialize(s);
   }
 
@@ -1180,6 +1211,8 @@
     operation.AddPreArgument("a");
     operation.AddPreArgument("b");
     operation.AddPostArgument("c");
+
+    s = 42;
     operation.Serialize(s);
   }
 
@@ -1202,6 +1235,8 @@
     modification->SetupAnonymization(DicomVersion_2008);
     
     ModifyInstanceOperation operation(GetContext(), RequestOrigin_Lua, modification.release());
+
+    s = 42;
     operation.Serialize(s);
   }
 
@@ -1240,7 +1275,8 @@
     job.SetLocalAet("LOCAL");
     job.SetRemoteModality(modality);
     job.SetMoveOriginator("MOVESCU", 42);
-    
+
+    s = 42;
     ASSERT_TRUE(job.Serialize(s));
   }
 
@@ -1273,6 +1309,7 @@
     OrthancPeerStoreJob job(GetContext());
     job.SetPeer(peer);
     
+    s = 42;
     ASSERT_TRUE(job.Serialize(s));
   }
 
@@ -1297,6 +1334,7 @@
     job.SetModification(modification.release(), true);
     job.SetOrigin(DicomInstanceOrigin::FromLua());
     
+    s = 42;
     ASSERT_TRUE(job.Serialize(s));
   }
 
@@ -1309,4 +1347,15 @@
     ASSERT_EQ(RequestOrigin_Lua, tmp.GetOrigin().GetRequestOrigin());
     ASSERT_TRUE(tmp.GetModification().IsRemoved(DICOM_TAG_STUDY_DESCRIPTION));
   }
+
+  // SequenceOfOperationsJob.h
+
+  {
+    SequenceOfOperationsJob job;
+    
+    s = 42;
+    ASSERT_TRUE(job.Serialize(s));
+  }
+
+  std::cout << s;
 }
--- a/UnitTestsSources/ServerIndexTests.cpp	Thu Jun 07 21:37:40 2018 +0200
+++ b/UnitTestsSources/ServerIndexTests.cpp	Fri Jun 08 13:51:31 2018 +0200
@@ -675,7 +675,7 @@
   FilesystemStorage storage(path);
   DatabaseWrapper db;   // The SQLite DB is in memory
   db.Open();
-  ServerContext context(db, storage);
+  ServerContext context(db, storage, true /* running unit tests */);
   ServerIndex& index = context.GetIndex();
 
   ASSERT_EQ(1u, index.IncrementGlobalSequence(GlobalProperty_AnonymizationSequence));
@@ -773,7 +773,7 @@
   FilesystemStorage storage(path);
   DatabaseWrapper db;   // The SQLite DB is in memory
   db.Open();
-  ServerContext context(db, storage);
+  ServerContext context(db, storage, true /* running unit tests */);
   ServerIndex& index = context.GetIndex();
 
   index.SetMaximumStorageSize(10);