Mercurial > hg > orthanc
changeset 2604:76ef12fa136c jobs
fix race conditions if creating Lua jobs
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 18 May 2018 17:37:14 +0200 |
parents | 988936118354 |
children | 1e11b0229e04 |
files | OrthancServer/ServerJobs/LuaJobManager.cpp OrthancServer/ServerJobs/LuaJobManager.h UnitTestsSources/MultiThreadingTests.cpp |
diffstat | 3 files changed, 147 insertions(+), 27 deletions(-) [+] |
line wrap: on
line diff
--- a/OrthancServer/ServerJobs/LuaJobManager.cpp Fri May 18 17:02:25 2018 +0200 +++ b/OrthancServer/ServerJobs/LuaJobManager.cpp Fri May 18 17:37:14 2018 +0200 @@ -34,6 +34,13 @@ #include "../PrecompiledHeadersServer.h" #include "LuaJobManager.h" +#include "DeleteResourceOperation.h" +#include "StoreScuOperation.h" +#include "../../Core/JobsEngine/Operations/LogJobOperation.h" + +#include "DicomInstanceOperationValue.h" +#include "../../Core/JobsEngine/Operations/NullOperationValue.h" +#include "../../Core/JobsEngine/Operations/StringOperationValue.h" namespace Orthanc { @@ -59,8 +66,7 @@ } - LuaJobManager::LuaJobManager(JobsEngine& engine) : - engine_(engine), + LuaJobManager::LuaJobManager() : currentJob_(NULL), maxOperations_(1000), priority_(0), @@ -102,30 +108,108 @@ } - LuaJobManager::Lock* LuaJobManager::Modify() + LuaJobManager::Lock::Lock(LuaJobManager& that, + JobsEngine& engine) : + that_(that), + lock_(that.mutex_), + engine_(engine) { - boost::mutex::scoped_lock lock(mutex_); - - if (currentJob_ != NULL) + if (that_.currentJob_ == NULL) + { + isNewJob_ = true; + } + else { - std::auto_ptr<Lock> result(new Lock(*currentJob_)); + jobLock_.reset(new SequenceOfOperationsJob::Lock(*that_.currentJob_)); - if (!result->IsDone() && - result->GetOperationsCount() < maxOperations_) + if (jobLock_->IsDone() || + jobLock_->GetOperationsCount() >= that_.maxOperations_) { - return result.release(); + jobLock_.reset(NULL); + isNewJob_ = true; + } + else + { + isNewJob_ = false; } } - // Need to create a new job, as the previous one is either - // finished, or is getting too long - currentJob_ = new SequenceOfOperationsJob; + if (isNewJob_) + { + // Need to create a new job, as the previous one is either + // finished, or is getting too long + that_.currentJob_ = new SequenceOfOperationsJob; + jobLock_.reset(new SequenceOfOperationsJob::Lock(*that_.currentJob_)); + jobLock_->SetTrailingOperationTimeout(that_.trailingTimeout_); + } + + assert(jobLock_.get() != NULL); + } + + + LuaJobManager::Lock::~Lock() + { + assert(jobLock_.get() != NULL); + jobLock_.reset(NULL); + + if (isNewJob_) + { + engine_.GetRegistry().Submit(that_.currentId_, that_.currentJob_, that_.priority_); + } + } + + + size_t LuaJobManager::Lock::AddDeleteResourceOperation(ServerContext& context) + { + assert(jobLock_.get() != NULL); + return jobLock_->AddOperation(new DeleteResourceOperation(context)); + } + + + size_t LuaJobManager::Lock::AddLogOperation() + { + assert(jobLock_.get() != NULL); + return jobLock_->AddOperation(new LogJobOperation); + } + - engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); + size_t LuaJobManager::Lock::AddStoreScuOperation(const std::string& localAet, + const RemoteModalityParameters& modality, + IDicomConnectionManager& manager) + { + assert(jobLock_.get() != NULL); + return jobLock_->AddOperation(new StoreScuOperation(localAet, modality, that_.connectionManager_)); + } + + + void LuaJobManager::Lock::AddNullInput(size_t operation) + { + assert(jobLock_.get() != NULL); + jobLock_->AddInput(operation, NullOperationValue()); + } + - std::auto_ptr<Lock> result(new Lock(*currentJob_)); - result->SetTrailingOperationTimeout(trailingTimeout_); + void LuaJobManager::Lock::AddStringInput(size_t operation, + const std::string& content) + { + assert(jobLock_.get() != NULL); + jobLock_->AddInput(operation, StringOperationValue(content)); + } + - return result.release(); + void LuaJobManager::Lock::AddDicomInstanceInput(size_t operation, + ServerContext& context, + const std::string& instanceId) + { + assert(jobLock_.get() != NULL); + jobLock_->AddInput(operation, DicomInstanceOperationValue(context, instanceId)); + } + + + void LuaJobManager::Lock::Connect(size_t operation1, + size_t operation2) + { + assert(jobLock_.get() != NULL); + jobLock_->Connect(operation1, operation2); } }
--- a/OrthancServer/ServerJobs/LuaJobManager.h Fri May 18 17:02:25 2018 +0200 +++ b/OrthancServer/ServerJobs/LuaJobManager.h Fri May 18 17:37:14 2018 +0200 @@ -37,16 +37,14 @@ #include "../../Core/JobsEngine/JobsEngine.h" #include "../../Core/JobsEngine/Operations/SequenceOfOperationsJob.h" +#include "../ServerContext.h" + namespace Orthanc { class LuaJobManager : private SequenceOfOperationsJob::IObserver { - public: - typedef SequenceOfOperationsJob::Lock Lock; - private: boost::mutex mutex_; - JobsEngine& engine_; TimeoutDicomConnectionManager connectionManager_; std::string currentId_; SequenceOfOperationsJob* currentJob_; @@ -61,7 +59,7 @@ virtual void SignalDone(const SequenceOfOperationsJob& job); public: - LuaJobManager(JobsEngine& engine); + LuaJobManager(); ~LuaJobManager(); @@ -71,6 +69,40 @@ void SetTrailingOperationTimeout(unsigned int timeout); - Lock* Modify(); + class Lock : public boost::noncopyable + { + private: + LuaJobManager& that_; + boost::mutex::scoped_lock lock_; + JobsEngine& engine_; + std::auto_ptr<SequenceOfOperationsJob::Lock> jobLock_; + bool isNewJob_; + + public: + Lock(LuaJobManager& that, + JobsEngine& engine); + + ~Lock(); + + size_t AddLogOperation(); + + size_t AddDeleteResourceOperation(ServerContext& context); + + size_t AddStoreScuOperation(const std::string& localAet, + const RemoteModalityParameters& modality, + IDicomConnectionManager& manager); + + void AddNullInput(size_t operation); + + void AddStringInput(size_t operation, + const std::string& content); + + void AddDicomInstanceInput(size_t operation, + ServerContext& context, + const std::string& instanceId); + + void Connect(size_t operation1, + size_t operation2); + }; }; }
--- a/UnitTestsSources/MultiThreadingTests.cpp Fri May 18 17:02:25 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri May 18 17:37:14 2018 +0200 @@ -788,16 +788,20 @@ engine.SetWorkersCount(2); engine.Start(); - LuaJobManager lua(engine); + LuaJobManager lua; lua.SetMaxOperationsPerJob(5); lua.SetTrailingOperationTimeout(200); for (size_t i = 0; i < 30; i++) { boost::this_thread::sleep(boost::posix_time::milliseconds(150)); - std::auto_ptr<LuaJobManager::Lock> lock(lua.Modify()); - size_t a = lock->AddOperation(new LogJobOperation); - lock->AddInput(a, StringOperationValue(boost::lexical_cast<std::string>(i))); + + LuaJobManager::Lock lock(lua, engine); + size_t a = lock.AddLogOperation(); + size_t b = lock.AddLogOperation(); + lock.AddStringInput(a, boost::lexical_cast<std::string>(i)); + lock.AddNullInput(a); + lock.Connect(a, b); } boost::this_thread::sleep(boost::posix_time::milliseconds(2000));