Mercurial > hg > orthanc
diff UnitTestsSources/MultiThreadingTests.cpp @ 2601:5b6c3d77a2a1 jobs
reorganization
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 17 May 2018 17:03:40 +0200 |
parents | 140a539b4eba |
children | 988936118354 |
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp Thu May 17 16:22:40 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Thu May 17 17:03:40 2018 +0200 @@ -745,238 +745,13 @@ #include "../OrthancServer/ServerContext.h" #include "../Core/Logging.h" -namespace -{ - class JobOperationValue : public boost::noncopyable - { - public: - enum Type - { - Type_DicomInstance, - Type_Null, - Type_String - }; - - private: - Type type_; - - protected: - JobOperationValue(Type type) : - type_(type) - { - } - - public: - virtual ~JobOperationValue() - { - } - - Type GetType() const - { - return type_; - } - - virtual JobOperationValue* Clone() const = 0; - }; - - - class NullOperationValue : public JobOperationValue - { - public: - NullOperationValue() : - JobOperationValue(Type_Null) - { - } - - virtual JobOperationValue* Clone() const - { - return new NullOperationValue; - } - }; - - - class StringOperationValue : public JobOperationValue - { - private: - std::string content_; - - public: - StringOperationValue(const std::string& content) : - JobOperationValue(JobOperationValue::Type_String), - content_(content) - { - } - - virtual JobOperationValue* Clone() const - { - return new StringOperationValue(content_); - } - - const std::string& GetContent() const - { - return content_; - } - }; - - - class IDicomConnectionManager : public boost::noncopyable - { - public: - virtual ~IDicomConnectionManager() - { - } - - class IResource : public boost::noncopyable - { - public: - virtual ~IResource() - { - } - - virtual DicomUserConnection& GetConnection() = 0; - }; - - virtual IResource* AcquireConnection(const std::string& localAet, - const RemoteModalityParameters& remote) = 0; - }; - - - class JobOperationValues : public boost::noncopyable - { - private: - std::vector<JobOperationValue*> values_; - - void Append(JobOperationValues& target, - bool clear) - { - target.Reserve(target.GetSize() + GetSize()); +#include "../Core/DicomNetworking/IDicomConnectionManager.h" +#include "../Core/JobsEngine/Operations/JobOperationValues.h" +#include "../Core/JobsEngine/Operations/StringOperationValue.h" +#include "../Core/JobsEngine/Operations/LogJobOperation.h" - for (size_t i = 0; i < values_.size(); i++) - { - if (clear) - { - target.Append(values_[i]); - values_[i] = NULL; - } - else - { - target.Append(GetValue(i).Clone()); - } - } - - if (clear) - { - Clear(); - } - } - - public: - ~JobOperationValues() - { - Clear(); - } - - void Move(JobOperationValues& target) - { - return Append(target, true); - } - - void Copy(JobOperationValues& target) - { - return Append(target, false); - } - - void Clear() - { - for (size_t i = 0; i < values_.size(); i++) - { - if (values_[i] != NULL) - { - delete values_[i]; - } - } - - values_.clear(); - } - - void Reserve(size_t count) - { - values_.reserve(count); - } - - void Append(JobOperationValue* value) // Takes ownership - { - if (value == NULL) - { - throw OrthancException(ErrorCode_NullPointer); - } - else - { - values_.push_back(value); - } - } - - size_t GetSize() const - { - return values_.size(); - } - - JobOperationValue& GetValue(size_t index) const - { - if (index >= values_.size()) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - else - { - assert(values_[index] != NULL); - return *values_[index]; - } - } - }; - - - - class IJobOperation : public boost::noncopyable - { - public: - virtual ~IJobOperation() - { - } - - virtual void Apply(JobOperationValues& outputs, - const JobOperationValue& input, - IDicomConnectionManager& manager) = 0; - }; - - - class LogJobOperation : public IJobOperation - { - public: - virtual void Apply(JobOperationValues& outputs, - const JobOperationValue& input, - IDicomConnectionManager& manager) - { - switch (input.GetType()) - { - case JobOperationValue::Type_String: - LOG(INFO) << "Job value: " << dynamic_cast<const StringOperationValue&>(input).GetContent(); - break; - - case JobOperationValue::Type_Null: - LOG(INFO) << "Job value: (null)"; - break; - - default: - LOG(INFO) << "Job value: (unsupport)"; - break; - } - - outputs.Append(input.Clone()); - } - }; - - +namespace Orthanc +{ class DicomInstanceValue : public JobOperationValue { private: @@ -1172,7 +947,7 @@ boost::mutex::scoped_lock lock(mutex_); timeout_ = boost::posix_time::milliseconds(timeout); - CheckTimeout(); + CheckTimeoutInternal(); } unsigned int GetTimeout() @@ -1286,7 +1061,7 @@ return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); } - void Step(IDicomConnectionManager& manager) + void Step() { if (IsDone()) { @@ -1305,7 +1080,7 @@ } JobOperationValues outputs; - operation_->Apply(outputs, *input, manager); + operation_->Apply(outputs, *input); if (!nextOperations_.empty()) { @@ -1333,7 +1108,6 @@ std::vector<Operation*> operations_; size_t current_; boost::condition_variable operationAdded_; - TimeoutDicomConnectionManager& connectionManager_; boost::posix_time::time_duration trailingTimeout_; std::list<IObserver*> observers_; @@ -1346,17 +1120,14 @@ } public: - SequenceOfOperationsJob(TimeoutDicomConnectionManager& manager) : - jobType_("SequenceOfOperations"), - connectionManager_(manager) + SequenceOfOperationsJob() : + jobType_("SequenceOfOperations") { Setup(); } - SequenceOfOperationsJob(const std::string& jobType, - TimeoutDicomConnectionManager& manager) : - jobType_(jobType), - connectionManager_(manager) + SequenceOfOperationsJob(const std::string& jobType) : + jobType_(jobType) { Setup(); } @@ -1399,11 +1170,6 @@ return that_.done_; } - void SetDicomConnectionTimeout(unsigned int timeout) - { - that_.connectionManager_.SetTimeout(timeout); - } - void SetTrailingOperationTimeout(unsigned int timeout) { that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); @@ -1513,7 +1279,7 @@ if (current_ < operations_.size()) { - operations_[current_]->Step(connectionManager_); + operations_[current_]->Step(); } return JobStepResult::Continue(); @@ -1534,8 +1300,6 @@ virtual void ReleaseResources() { - boost::mutex::scoped_lock lock(mutex_); - connectionManager_.Close(); } virtual float GetProgress() @@ -1573,13 +1337,24 @@ private: boost::mutex mutex_; JobsEngine& engine_; - TimeoutDicomConnectionManager manager_; + TimeoutDicomConnectionManager connectionManager_; std::string currentId_; SequenceOfOperationsJob* currentJob_; size_t maxOperations_; int priority_; unsigned int trailingTimeout_; + bool continue_; + boost::thread connectionTimeoutThread_; + static void ConnectionTimeoutThread(LuaJobManager* manager) + { + while (manager->continue_) + { + manager->connectionManager_.CheckTimeout(); + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + } + } + virtual void SignalDone(const SequenceOfOperationsJob& job) { boost::mutex::scoped_lock lock(mutex_); @@ -1596,8 +1371,20 @@ engine_(engine), currentJob_(NULL), maxOperations_(1000), - priority_(0) + priority_(0), + continue_(true) { + connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this); + } + + ~LuaJobManager() + { + continue_ = false; + + if (connectionTimeoutThread_.joinable()) + { + connectionTimeoutThread_.join(); + } } void SetMaxOperationsPerJob(size_t count) @@ -1635,7 +1422,7 @@ // Need to create a new job, as the previous one is either // finished, or is getting too long - currentJob_ = new SequenceOfOperationsJob(manager_); + currentJob_ = new SequenceOfOperationsJob; engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); std::auto_ptr<Lock> result(new Lock(*currentJob_)); @@ -1649,7 +1436,6 @@ TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) { - TimeoutDicomConnectionManager manager; JobsEngine engine; engine.SetWorkersCount(3); engine.Start(); @@ -1658,7 +1444,7 @@ SequenceOfOperationsJob* job = NULL; { - std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob(manager)); + std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob); job = a.get(); engine.GetRegistry().Submit(id, a.release(), 0); }