Mercurial > hg > orthanc
changeset 2665:389d050a2e66 jobs
fix deadlock, speed up unit tests
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 08 Jun 2018 13:51:31 +0200 |
parents | a21b244efb37 |
children | 2540ac79ab6c |
files | Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsEngine.h Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp OrthancServer/LuaScripting.cpp OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerIndex.cpp OrthancServer/ServerIndex.h OrthancServer/main.cpp UnitTestsSources/MultiThreadingTests.cpp UnitTestsSources/ServerIndexTests.cpp |
diffstat | 11 files changed, 104 insertions(+), 24 deletions(-) [+] |
line wrap: on
line diff
--- a/Core/JobsEngine/JobsEngine.cpp Thu Jun 07 21:37:40 2018 +0200 +++ b/Core/JobsEngine/JobsEngine.cpp Fri Jun 08 13:51:31 2018 +0200 @@ -120,7 +120,7 @@ while (engine->IsRunning()) { - boost::this_thread::sleep(boost::posix_time::milliseconds(200)); + boost::this_thread::sleep(boost::posix_time::milliseconds(engine->threadSleep_)); engine->GetRegistry().ScheduleRetries(); } } @@ -135,7 +135,7 @@ while (engine->IsRunning()) { - JobsRegistry::RunningJob running(engine->GetRegistry(), 100); + JobsRegistry::RunningJob running(engine->GetRegistry(), engine->threadSleep_); if (running.IsValid()) { @@ -156,6 +156,7 @@ JobsEngine::JobsEngine() : state_(State_Setup), + threadSleep_(200), workers_(1) { } @@ -184,7 +185,21 @@ workers_.resize(count); } - + + + void JobsEngine::SetThreadSleep(unsigned int sleep) + { + boost::mutex::scoped_lock lock(stateMutex_); + + if (state_ != State_Setup) + { + // Can only be invoked before calling "Start()" + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + threadSleep_ = sleep; + } + void JobsEngine::Start() {
--- a/Core/JobsEngine/JobsEngine.h Thu Jun 07 21:37:40 2018 +0200 +++ b/Core/JobsEngine/JobsEngine.h Fri Jun 08 13:51:31 2018 +0200 @@ -54,6 +54,7 @@ State state_; JobsRegistry registry_; boost::thread retryHandler_; + unsigned int threadSleep_; std::vector<boost::thread*> workers_; bool IsRunning(); @@ -72,6 +73,8 @@ ~JobsEngine(); void SetWorkersCount(size_t count); + + void SetThreadSleep(unsigned int sleep); JobsRegistry& GetRegistry() {
--- a/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp Thu Jun 07 21:37:40 2018 +0200 +++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp Fri Jun 08 13:51:31 2018 +0200 @@ -365,6 +365,8 @@ { boost::mutex::scoped_lock lock(mutex_); + value = Json::objectValue; + Json::Value tmp = Json::arrayValue; for (size_t i = 0; i < operations_.size(); i++) {
--- a/OrthancServer/LuaScripting.cpp Thu Jun 07 21:37:40 2018 +0200 +++ b/OrthancServer/LuaScripting.cpp Fri Jun 08 13:51:31 2018 +0200 @@ -581,6 +581,7 @@ { LOG(INFO) << "Starting the Lua engine"; eventThread_ = boost::thread(EventThread, this); + state_ = State_Running; } }
--- a/OrthancServer/ServerContext.cpp Thu Jun 07 21:37:40 2018 +0200 +++ b/OrthancServer/ServerContext.cpp Fri Jun 08 13:51:31 2018 +0200 @@ -107,8 +107,9 @@ ServerContext::ServerContext(IDatabaseWrapper& database, - IStorageArea& area) : - index_(*this, database), + IStorageArea& area, + bool unitTesting) : + index_(*this, database, (unitTesting ? 20 : 500)), area_(area), compressionEnabled_(false), storeMD5_(true), @@ -126,6 +127,7 @@ jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2)); //jobsEngine_.SetMaxCompleted // TODO + jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200); jobsEngine_.Start(); changeThread_ = boost::thread(ChangeThread, this);
--- a/OrthancServer/ServerContext.h Thu Jun 07 21:37:40 2018 +0200 +++ b/OrthancServer/ServerContext.h Fri Jun 08 13:51:31 2018 +0200 @@ -158,7 +158,8 @@ }; ServerContext(IDatabaseWrapper& database, - IStorageArea& area); + IStorageArea& area, + bool unitTesting); ~ServerContext();
--- a/OrthancServer/ServerIndex.cpp Thu Jun 07 21:37:40 2018 +0200 +++ b/OrthancServer/ServerIndex.cpp Fri Jun 08 13:51:31 2018 +0200 @@ -342,7 +342,8 @@ } - void ServerIndex::FlushThread(ServerIndex* that) + void ServerIndex::FlushThread(ServerIndex* that, + unsigned int threadSleep) { // By default, wait for 10 seconds before flushing unsigned int sleep = 10; @@ -368,7 +369,7 @@ while (!that->done_) { - boost::this_thread::sleep(boost::posix_time::seconds(1)); + boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleep)); count++; if (count < sleep) { @@ -538,7 +539,8 @@ ServerIndex::ServerIndex(ServerContext& context, - IDatabaseWrapper& db) : + IDatabaseWrapper& db, + unsigned int threadSleep) : done_(false), db_(db), maximumStorageSize_(0), @@ -555,10 +557,11 @@ if (db.HasFlushToDisk()) { - flushThread_ = boost::thread(FlushThread, this); + flushThread_ = boost::thread(FlushThread, this, threadSleep); } - unstableResourcesMonitorThread_ = boost::thread(UnstableResourcesMonitorThread, this); + unstableResourcesMonitorThread_ = boost::thread + (UnstableResourcesMonitorThread, this, threadSleep); } @@ -1878,7 +1881,8 @@ } - void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that) + void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that, + unsigned int threadSleep) { int stableAge = Configuration::GetGlobalUnsignedIntegerParameter("StableAge", 60); if (stableAge <= 0) @@ -1890,8 +1894,8 @@ while (!that->done_) { - // Check for stable resources each second - boost::this_thread::sleep(boost::posix_time::seconds(1)); + // Check for stable resources each few seconds + boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleep)); boost::mutex::scoped_lock lock(that->mutex_);
--- a/OrthancServer/ServerIndex.h Thu Jun 07 21:37:40 2018 +0200 +++ b/OrthancServer/ServerIndex.h Fri Jun 08 13:51:31 2018 +0200 @@ -74,9 +74,11 @@ uint64_t maximumStorageSize_; unsigned int maximumPatients_; - static void FlushThread(ServerIndex* that); + static void FlushThread(ServerIndex* that, + unsigned int threadSleep); - static void UnstableResourcesMonitorThread(ServerIndex* that); + static void UnstableResourcesMonitorThread(ServerIndex* that, + unsigned int threadSleep); void MainDicomTagsToJson(Json::Value& result, int64_t resourceId, @@ -124,7 +126,8 @@ public: ServerIndex(ServerContext& context, - IDatabaseWrapper& database); + IDatabaseWrapper& database, + unsigned int threadSleep); ~ServerIndex();
--- a/OrthancServer/main.cpp Thu Jun 07 21:37:40 2018 +0200 +++ b/OrthancServer/main.cpp Fri Jun 08 13:51:31 2018 +0200 @@ -969,7 +969,7 @@ DicomUserConnection::SetDefaultTimeout(Configuration::GetGlobalUnsignedIntegerParameter("DicomScuTimeout", 10)); - ServerContext context(database, storageArea); + ServerContext context(database, storageArea, false /* not running unit tests */); context.SetCompressionEnabled(Configuration::GetGlobalBoolParameter("StorageCompression", false)); context.SetStoreMD5ForAttachments(Configuration::GetGlobalBoolParameter("StoreMD5ForAttachments", true));
--- a/UnitTestsSources/MultiThreadingTests.cpp Thu Jun 07 21:37:40 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri Jun 08 13:51:31 2018 +0200 @@ -666,6 +666,7 @@ TEST(JobsEngine, SubmitAndWait) { JobsEngine engine; + engine.SetThreadSleep(10); engine.SetWorkersCount(3); engine.Start(); @@ -679,6 +680,7 @@ TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) { JobsEngine engine; + engine.SetThreadSleep(10); engine.SetWorkersCount(3); engine.Start(); @@ -714,6 +716,7 @@ TEST(JobsEngine, DISABLED_Lua) { JobsEngine engine; + engine.SetThreadSleep(10); engine.SetWorkersCount(2); engine.Start(); @@ -778,6 +781,8 @@ values.Append(new NullOperationValue); values.Append(new StringOperationValue("hello")); values.Append(new StringOperationValue("world")); + + s = 42; values.Serialize(s); } @@ -801,6 +806,8 @@ { NullOperationValue null; + + s = 42; null.Serialize(s); } @@ -815,6 +822,8 @@ { StringOperationValue str("Hello"); + + s = 42; str.Serialize(s); } @@ -833,6 +842,8 @@ { LogJobOperation operation; + + s = 42; operation.Serialize(s); } @@ -868,6 +879,8 @@ job.Start(); job.ExecuteStep(); job.ExecuteStep(); + + s = 42; ASSERT_TRUE(job.Serialize(s)); } @@ -891,8 +904,6 @@ ASSERT_EQ("world", tmp.GetInstance(2)); ASSERT_TRUE(tmp.IsFailedInstance("nope")); } - - // TODO : Test SequenceOfOperationsJob.h } @@ -927,6 +938,8 @@ modification.Replace(DICOM_TAG_PATIENT_NAME, "Test 4", true); modification.Apply(*modified); + + s = 42; modification.Serialize(s); } @@ -959,6 +972,8 @@ { DicomInstanceOrigin origin; + + s = 42; origin.Serialize(s); } @@ -973,6 +988,8 @@ { DicomInstanceOrigin origin(DicomInstanceOrigin::FromDicomProtocol("host", "aet", "called")); + + s = 42; origin.Serialize(s); } @@ -987,6 +1004,8 @@ { DicomInstanceOrigin origin(DicomInstanceOrigin::FromHttp("host", "username")); + + s = 42; origin.Serialize(s); } @@ -1001,6 +1020,8 @@ { DicomInstanceOrigin origin(DicomInstanceOrigin::FromLua()); + + s = 42; origin.Serialize(s); } @@ -1011,6 +1032,8 @@ { DicomInstanceOrigin origin(DicomInstanceOrigin::FromPlugins()); + + s = 42; origin.Serialize(s); } @@ -1041,7 +1064,7 @@ OrthancJobsSerialization() { db_.Open(); - context_.reset(new ServerContext(db_, storage_)); + context_.reset(new ServerContext(db_, storage_, true /* running unit tests */)); } virtual ~OrthancJobsSerialization() @@ -1081,6 +1104,8 @@ { DicomInstanceOperationValue instance(GetContext(), id); + + s = 42; instance.Serialize(s); } @@ -1113,6 +1138,8 @@ { DeleteResourceOperation operation(GetContext()); + + s = 42; operation.Serialize(s); } @@ -1136,6 +1163,8 @@ peer.SetPkcs11Enabled(true); StorePeerOperation operation(peer); + + s = 42; operation.Serialize(s); } @@ -1159,6 +1188,8 @@ modality.SetManufacturer(ModalityManufacturer_StoreScp); StoreScuOperation operation("TEST", modality); + + s = 42; operation.Serialize(s); } @@ -1180,6 +1211,8 @@ operation.AddPreArgument("a"); operation.AddPreArgument("b"); operation.AddPostArgument("c"); + + s = 42; operation.Serialize(s); } @@ -1202,6 +1235,8 @@ modification->SetupAnonymization(DicomVersion_2008); ModifyInstanceOperation operation(GetContext(), RequestOrigin_Lua, modification.release()); + + s = 42; operation.Serialize(s); } @@ -1240,7 +1275,8 @@ job.SetLocalAet("LOCAL"); job.SetRemoteModality(modality); job.SetMoveOriginator("MOVESCU", 42); - + + s = 42; ASSERT_TRUE(job.Serialize(s)); } @@ -1273,6 +1309,7 @@ OrthancPeerStoreJob job(GetContext()); job.SetPeer(peer); + s = 42; ASSERT_TRUE(job.Serialize(s)); } @@ -1297,6 +1334,7 @@ job.SetModification(modification.release(), true); job.SetOrigin(DicomInstanceOrigin::FromLua()); + s = 42; ASSERT_TRUE(job.Serialize(s)); } @@ -1309,4 +1347,15 @@ ASSERT_EQ(RequestOrigin_Lua, tmp.GetOrigin().GetRequestOrigin()); ASSERT_TRUE(tmp.GetModification().IsRemoved(DICOM_TAG_STUDY_DESCRIPTION)); } + + // SequenceOfOperationsJob.h + + { + SequenceOfOperationsJob job; + + s = 42; + ASSERT_TRUE(job.Serialize(s)); + } + + std::cout << s; }
--- a/UnitTestsSources/ServerIndexTests.cpp Thu Jun 07 21:37:40 2018 +0200 +++ b/UnitTestsSources/ServerIndexTests.cpp Fri Jun 08 13:51:31 2018 +0200 @@ -675,7 +675,7 @@ FilesystemStorage storage(path); DatabaseWrapper db; // The SQLite DB is in memory db.Open(); - ServerContext context(db, storage); + ServerContext context(db, storage, true /* running unit tests */); ServerIndex& index = context.GetIndex(); ASSERT_EQ(1u, index.IncrementGlobalSequence(GlobalProperty_AnonymizationSequence)); @@ -773,7 +773,7 @@ FilesystemStorage storage(path); DatabaseWrapper db; // The SQLite DB is in memory db.Open(); - ServerContext context(db, storage); + ServerContext context(db, storage, true /* running unit tests */); ServerIndex& index = context.GetIndex(); index.SetMaximumStorageSize(10);