# HG changeset patch # User Sebastien Jodogne # Date 1526566960 -7200 # Node ID 140a539b4eba9b75981baa4a611d8ceca2bd3bc9 # Parent 593d6b0f4cba9d67e6893b0940e75cd1f4a486e7 SequenceOfOperationsJob diff -r 593d6b0f4cba -r 140a539b4eba Core/DicomNetworking/DicomUserConnection.cpp --- a/Core/DicomNetworking/DicomUserConnection.cpp Wed May 16 19:00:43 2018 +0200 +++ b/Core/DicomNetworking/DicomUserConnection.cpp Thu May 17 16:22:40 2018 +0200 @@ -1238,10 +1238,21 @@ } - void DicomUserConnection::SetDefaultTimeout(uint32_t seconds) + void SetDefaultTimeout(uint32_t seconds) { LOG(INFO) << "Default timeout for DICOM connections if Orthanc acts as SCU (client): " << seconds << " seconds (0 = no timeout)"; defaultTimeout_ = seconds; } + + + bool DicomUserConnection::IsSameAssociation(const std::string& localAet, + const RemoteModalityParameters& remote) const + { + return (localAet_ == localAet && + remoteAet_ == remote.GetApplicationEntityTitle() && + remoteHost_ == remote.GetHost() && + remotePort_ == remote.GetPort() && + manufacturer_ == remote.GetManufacturer()); + } } diff -r 593d6b0f4cba -r 140a539b4eba Core/DicomNetworking/DicomUserConnection.h --- a/Core/DicomNetworking/DicomUserConnection.h Wed May 16 19:00:43 2018 +0200 +++ b/Core/DicomNetworking/DicomUserConnection.h Thu May 17 16:22:40 2018 +0200 @@ -208,5 +208,8 @@ ParsedDicomFile& query); static void SetDefaultTimeout(uint32_t seconds); + + bool IsSameAssociation(const std::string& localAet, + const RemoteModalityParameters& remote) const; }; } diff -r 593d6b0f4cba -r 140a539b4eba Core/JobsEngine/JobsRegistry.cpp --- a/Core/JobsEngine/JobsRegistry.cpp Wed May 16 19:00:43 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.cpp Thu May 17 16:22:40 2018 +0200 @@ -768,6 +768,7 @@ } else if (found->second->GetState() != JobState_Failure) { + printf("%s\n", EnumerationToString(found->second->GetState())); LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; return false; } diff -r 593d6b0f4cba -r 140a539b4eba UnitTestsSources/MultiThreadingTests.cpp --- a/UnitTestsSources/MultiThreadingTests.cpp Wed May 16 19:00:43 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Thu May 17 16:22:40 2018 +0200 @@ -753,7 +753,8 @@ enum Type { Type_DicomInstance, - Type_Null + Type_Null, + Type_String }; private: @@ -779,10 +780,49 @@ }; - class IDicomConnectionProvider : public boost::noncopyable + class NullOperationValue : public JobOperationValue { public: - virtual ~IDicomConnectionProvider() + NullOperationValue() : + JobOperationValue(Type_Null) + { + } + + virtual JobOperationValue* Clone() const + { + return new NullOperationValue; + } + }; + + + class StringOperationValue : public JobOperationValue + { + private: + std::string content_; + + public: + StringOperationValue(const std::string& content) : + JobOperationValue(JobOperationValue::Type_String), + content_(content) + { + } + + virtual JobOperationValue* Clone() const + { + return new StringOperationValue(content_); + } + + const std::string& GetContent() const + { + return content_; + } + }; + + + class IDicomConnectionManager : public boost::noncopyable + { + public: + virtual ~IDicomConnectionManager() { } @@ -796,8 +836,8 @@ virtual DicomUserConnection& GetConnection() = 0; }; - virtual IResource* Acquire(const std::string& localAet, - const RemoteModalityParameters& remote) = 0; + virtual IResource* AcquireConnection(const std::string& localAet, + const RemoteModalityParameters& remote) = 0; }; @@ -806,12 +846,6 @@ private: std::vector values_; - public: - ~JobOperationValues() - { - Clear(); - } - void Append(JobOperationValues& target, bool clear) { @@ -836,6 +870,22 @@ } } + public: + ~JobOperationValues() + { + Clear(); + } + + void Move(JobOperationValues& target) + { + return Append(target, true); + } + + void Copy(JobOperationValues& target) + { + return Append(target, false); + } + void Clear() { for (size_t i = 0; i < values_.size(); i++) @@ -896,7 +946,34 @@ virtual void Apply(JobOperationValues& outputs, const JobOperationValue& input, - IDicomConnectionProvider& provider); + IDicomConnectionManager& manager) = 0; + }; + + + class LogJobOperation : public IJobOperation + { + public: + virtual void Apply(JobOperationValues& outputs, + const JobOperationValue& input, + IDicomConnectionManager& manager) + { + switch (input.GetType()) + { + case JobOperationValue::Type_String: + LOG(INFO) << "Job value: " << dynamic_cast(input).GetContent(); + break; + + case JobOperationValue::Type_Null: + LOG(INFO) << "Job value: (null)"; + break; + + default: + LOG(INFO) << "Job value: (unsupport)"; + break; + } + + outputs.Append(input.Clone()); + } }; @@ -948,9 +1025,10 @@ virtual void Apply(JobOperationValues& outputs, const JobOperationValue& input, - IDicomConnectionProvider& provider) + IDicomConnectionManager& manager) { - std::auto_ptr resource(provider.Acquire(localAet_, modality_)); + std::auto_ptr resource + (manager.AcquireConnection(localAet_, modality_)); if (resource.get() == NULL) { @@ -997,7 +1075,7 @@ virtual void Apply(JobOperationValues& outputs, const JobOperationValue& input, - IDicomConnectionProvider& provider) + IDicomConnectionManager& manager) { switch (input.GetType()) { @@ -1026,30 +1104,127 @@ }; - class SequenceOfOperationsJob : - public IJob, - private IDicomConnectionProvider + + class TimeoutDicomConnectionManager : public IDicomConnectionManager { private: - /*class DicomConnection + class Resource : public IDicomConnectionManager::IResource { private: - boost::posix_time::ptime lastUse_; - - void Touch() - { - lastUse_ = boost::posix_time::microsec_clock::universal_time(); - } + TimeoutDicomConnectionManager& that_; + boost::mutex::scoped_lock lock_; public: - class Resource : public IDicomConnectionProvider::IResource + Resource(TimeoutDicomConnectionManager& that) : + that_(that), + lock_(that.mutex_) + { + } + + virtual ~Resource() + { + that_.Touch(); + } + + virtual DicomUserConnection& GetConnection() + { + if (that_.connection_.get() == NULL) + { + throw OrthancException(ErrorCode_InternalError); + } + + return *that_.connection_; + } + }; + + boost::mutex mutex_; + std::auto_ptr connection_; + boost::posix_time::ptime lastUse_; + boost::posix_time::time_duration timeout_; + + static boost::posix_time::ptime GetNow() + { + return boost::posix_time::microsec_clock::universal_time(); + } + + void Touch() + { + lastUse_ = GetNow(); + } + + void CheckTimeoutInternal() + { + if (connection_.get() != NULL && + (GetNow() - lastUse_) >= timeout_) { - private: - DicomConnection() - }; - };*/ + connection_.reset(NULL); + } + } + + public: + TimeoutDicomConnectionManager() : + timeout_(boost::posix_time::milliseconds(1000)) + { + } + + void SetTimeout(unsigned int timeout) + { + boost::mutex::scoped_lock lock(mutex_); + + timeout_ = boost::posix_time::milliseconds(timeout); + CheckTimeout(); + } + + unsigned int GetTimeout() + { + boost::mutex::scoped_lock lock(mutex_); + return timeout_.total_milliseconds(); + } + + void Close() + { + boost::mutex::scoped_lock lock(mutex_); + connection_.reset(NULL); + } - class Operation : public boost::noncopyable + void CheckTimeout() + { + boost::mutex::scoped_lock lock(mutex_); + CheckTimeoutInternal(); + } + + virtual IResource* AcquireConnection(const std::string& localAet, + const RemoteModalityParameters& remote) + { + boost::mutex::scoped_lock lock(mutex_); + + if (connection_.get() == NULL || + !connection_->IsSameAssociation(localAet, remote)) + { + connection_.reset(new DicomUserConnection(localAet, remote)); + } + + return new Resource(*this); + } + }; + + + + class SequenceOfOperationsJob : public IJob + { + public: + class IObserver : public boost::noncopyable + { + public: + virtual ~IObserver() + { + } + + virtual void SignalDone(const SequenceOfOperationsJob& job) = 0; + }; + + private: + class Operation : public boost::noncopyable { private: JobOperationValues originalInputs_; @@ -1111,7 +1286,7 @@ return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); } - void Step(IDicomConnectionProvider& provider) + void Step(IDicomConnectionManager& manager) { if (IsDone()) { @@ -1126,15 +1301,25 @@ } else { - input = &originalInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); + input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); } JobOperationValues outputs; - operation_->Apply(outputs, *input, provider); + operation_->Apply(outputs, *input, manager); if (!nextOperations_.empty()) { - // TODO + std::list::iterator first = nextOperations_.begin(); + outputs.Move((*first)->workInputs_); + + std::list::iterator current = first; + ++current; + + while (current != nextOperations_.end()) + { + (*first)->workInputs_.Copy((*current)->workInputs_); + ++current; + } } currentInput_ += 1; @@ -1142,16 +1327,39 @@ }; - boost::mutex mutex_; - std::vector operations_; - size_t currentOperation_; - boost::condition_variable operationAdded_; + std::string jobType_; + bool done_; + boost::mutex mutex_; + std::vector operations_; + size_t current_; + boost::condition_variable operationAdded_; + TimeoutDicomConnectionManager& connectionManager_; + boost::posix_time::time_duration trailingTimeout_; + std::list observers_; + + // Invoked from constructors + void Setup() + { + done_ = false; + current_ = 0; + trailingTimeout_ = boost::posix_time::milliseconds(1000); + } public: - SequenceOfOperationsJob() : - currentOperation_(0) + SequenceOfOperationsJob(TimeoutDicomConnectionManager& manager) : + jobType_("SequenceOfOperations"), + connectionManager_(manager) { - } + Setup(); + } + + SequenceOfOperationsJob(const std::string& jobType, + TimeoutDicomConnectionManager& manager) : + jobType_(jobType), + connectionManager_(manager) + { + Setup(); + } virtual ~SequenceOfOperationsJob() { @@ -1164,6 +1372,15 @@ } } + void Register(IObserver& observer) + { + boost::mutex::scoped_lock lock(mutex_); + observers_.push_back(&observer); + } + + // This lock allows adding new operations to the end of the job, + // from another thread than the worker thread, after the job has + // been submitted for processing class Lock : public boost::noncopyable { private: @@ -1177,19 +1394,48 @@ { } + bool IsDone() const + { + return that_.done_; + } + + void SetDicomConnectionTimeout(unsigned int timeout) + { + that_.connectionManager_.SetTimeout(timeout); + } + + void SetTrailingOperationTimeout(unsigned int timeout) + { + that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); + } + size_t AddOperation(IJobOperation* operation) { + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + that_.operations_.push_back(new Operation(operation)); that_.operationAdded_.notify_one(); return that_.operations_.size() - 1; } + size_t GetOperationsCount() const + { + return that_.operations_.size(); + } + void AddInput(size_t index, const JobOperationValue& value) { - if (index >= that_.operations_.size() || - index < that_.currentOperation_) + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else if (index >= that_.operations_.size() || + index < that_.current_) { throw OrthancException(ErrorCode_ParameterOutOfRange); } @@ -1202,11 +1448,15 @@ void Connect(size_t input, size_t output) { - if (input >= output || - input >= that_.operations_.size() || - output >= that_.operations_.size() || - input < that_.currentOperation_ || - output < that_.currentOperation_) + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else if (input >= output || + input >= that_.operations_.size() || + output >= that_.operations_.size() || + input < that_.current_ || + output < that_.current_) { throw OrthancException(ErrorCode_ParameterOutOfRange); } @@ -1223,13 +1473,58 @@ { } - virtual JobStepResult ExecuteStep() = 0; + virtual JobStepResult ExecuteStep() + { + boost::mutex::scoped_lock lock(mutex_); + + if (current_ == operations_.size()) + { + LOG(INFO) << "Executing the trailing timeout in the sequence of operations"; + operationAdded_.timed_wait(lock, trailingTimeout_); + + if (current_ == operations_.size()) + { + // No operation was added during the trailing timeout: The + // job is over + LOG(INFO) << "The sequence of operations is over"; + done_ = true; + + for (std::list::iterator it = observers_.begin(); + it != observers_.end(); ++it) + { + (*it)->SignalDone(*this); + } + + return JobStepResult::Success(); + } + else + { + LOG(INFO) << "New operation added to the sequence of operations"; + } + } + + assert(current_ < operations_.size()); + + while (current_ < operations_.size() && + operations_[current_]->IsDone()) + { + current_++; + } + + if (current_ < operations_.size()) + { + operations_[current_]->Step(connectionManager_); + } + + return JobStepResult::Continue(); + } virtual void SignalResubmit() { boost::mutex::scoped_lock lock(mutex_); - currentOperation_ = 0; + current_ = 0; + done_ = false; for (size_t i = 0; i < operations_.size(); i++) { @@ -1237,14 +1532,177 @@ } } - virtual void ReleaseResources() = 0; // For pausing/canceling jobs + virtual void ReleaseResources() + { + boost::mutex::scoped_lock lock(mutex_); + connectionManager_.Close(); + } + + virtual float GetProgress() + { + boost::mutex::scoped_lock lock(mutex_); + + return (static_cast(current_) / + static_cast(operations_.size() + 1)); + } + + virtual void GetJobType(std::string& target) + { + target = jobType_; + } + + virtual void GetPublicContent(Json::Value& value) + { + boost::mutex::scoped_lock lock(mutex_); + + value["CountOperations"] = static_cast(operations_.size()); + } - virtual float GetProgress() = 0; + virtual void GetInternalContent(Json::Value& value) + { + // TODO + } + }; + + + class LuaJobManager : private SequenceOfOperationsJob::IObserver + { + public: + typedef SequenceOfOperationsJob::Lock Lock; + + private: + boost::mutex mutex_; + JobsEngine& engine_; + TimeoutDicomConnectionManager manager_; + std::string currentId_; + SequenceOfOperationsJob* currentJob_; + size_t maxOperations_; + int priority_; + unsigned int trailingTimeout_; + + virtual void SignalDone(const SequenceOfOperationsJob& job) + { + boost::mutex::scoped_lock lock(mutex_); - virtual void GetJobType(std::string& target) = 0; - - virtual void GetPublicContent(Json::Value& value) = 0; + if (&job == currentJob_) + { + currentId_.clear(); + currentJob_ = NULL; + } + } + + public: + LuaJobManager(JobsEngine& engine) : + engine_(engine), + currentJob_(NULL), + maxOperations_(1000), + priority_(0) + { + } + + void SetMaxOperationsPerJob(size_t count) + { + boost::mutex::scoped_lock lock(mutex_); + maxOperations_ = count; + } + + void SetPriority(int priority) + { + boost::mutex::scoped_lock lock(mutex_); + priority_ = priority; + } - virtual void GetInternalContent(Json::Value& value) = 0; + void SetTrailingOperationTimeout(unsigned int timeout) + { + boost::mutex::scoped_lock lock(mutex_); + trailingTimeout_ = timeout; + } + + Lock* Modify() + { + boost::mutex::scoped_lock lock(mutex_); + + if (currentJob_ != NULL) + { + std::auto_ptr result(new Lock(*currentJob_)); + + if (!result->IsDone() && + result->GetOperationsCount() < maxOperations_) + { + return result.release(); + } + } + + // Need to create a new job, as the previous one is either + // finished, or is getting too long + currentJob_ = new SequenceOfOperationsJob(manager_); + engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); + + std::auto_ptr result(new Lock(*currentJob_)); + result->SetTrailingOperationTimeout(trailingTimeout_); + + return result.release(); + } }; } + + +TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) +{ + TimeoutDicomConnectionManager manager; + JobsEngine engine; + engine.SetWorkersCount(3); + engine.Start(); + + std::string id; + SequenceOfOperationsJob* job = NULL; + + { + std::auto_ptr a(new SequenceOfOperationsJob(manager)); + job = a.get(); + engine.GetRegistry().Submit(id, a.release(), 0); + } + + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + + { + SequenceOfOperationsJob::Lock lock(*job); + size_t i = lock.AddOperation(new LogJobOperation); + size_t j = lock.AddOperation(new LogJobOperation); + size_t k = lock.AddOperation(new LogJobOperation); + lock.AddInput(i, StringOperationValue("Hello")); + lock.AddInput(i, StringOperationValue("World")); + lock.Connect(i, j); + lock.Connect(j, k); + } + + boost::this_thread::sleep(boost::posix_time::milliseconds(2000)); + + engine.Stop(); + +} + + +TEST(JobsEngine, Lua) +{ + JobsEngine engine; + engine.SetWorkersCount(2); + engine.Start(); + + LuaJobManager lua(engine); + lua.SetMaxOperationsPerJob(5); + lua.SetTrailingOperationTimeout(200); + + for (size_t i = 0; i < 30; i++) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(150)); + std::auto_ptr lock(lua.Modify()); + size_t a = lock->AddOperation(new LogJobOperation); + lock->AddInput(a, StringOperationValue(boost::lexical_cast(i))); + } + + boost::this_thread::sleep(boost::posix_time::milliseconds(2000)); + + engine.Stop(); + +}