Mercurial > hg > orthanc
diff UnitTestsSources/MultiThreadingTests.cpp @ 2603:988936118354 jobs
reorganization
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 18 May 2018 17:02:25 +0200 |
parents | 5b6c3d77a2a1 |
children | 76ef12fa136c |
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreadingTests.cpp Fri May 18 15:34:11 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri May 18 17:02:25 2018 +0200 @@ -742,697 +742,10 @@ -#include "../OrthancServer/ServerContext.h" -#include "../Core/Logging.h" - -#include "../Core/DicomNetworking/IDicomConnectionManager.h" -#include "../Core/JobsEngine/Operations/JobOperationValues.h" +#include "../OrthancServer/ServerJobs/LuaJobManager.h" #include "../Core/JobsEngine/Operations/StringOperationValue.h" #include "../Core/JobsEngine/Operations/LogJobOperation.h" -namespace Orthanc -{ - class DicomInstanceValue : public JobOperationValue - { - private: - ServerContext& context_; - std::string id_; - - public: - DicomInstanceValue(ServerContext& context, - const std::string& id) : - JobOperationValue(Type_DicomInstance), - context_(context), - id_(id) - { - } - - const std::string& GetId() const - { - return id_; - } - - void ReadContent(std::string& dicom) const - { - context_.ReadDicom(dicom, id_); - } - - virtual JobOperationValue* Clone() const - { - return new DicomInstanceValue(context_, id_); - } - }; - - - class StoreScuOperation : public IJobOperation - { - private: - std::string localAet_; - RemoteModalityParameters modality_; - - public: - StoreScuOperation(const std::string& localAet, - const RemoteModalityParameters& modality) : - localAet_(localAet), - modality_(modality) - { - } - - virtual void Apply(JobOperationValues& outputs, - const JobOperationValue& input, - IDicomConnectionManager& manager) - { - std::auto_ptr<IDicomConnectionManager::IResource> resource - (manager.AcquireConnection(localAet_, modality_)); - - if (resource.get() == NULL) - { - LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle(); - return; - } - - if (input.GetType() != JobOperationValue::Type_DicomInstance) - { - throw OrthancException(ErrorCode_BadParameterType); - } - - const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input); - - LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \"" - << modality_.GetApplicationEntityTitle() << "\""; - - try - { - std::string dicom; - instance.ReadContent(dicom); - resource->GetConnection().Store(dicom); - outputs.Append(instance.Clone()); - } - catch (OrthancException& e) - { - LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \"" - << modality_.GetApplicationEntityTitle() << "\": " << e.What(); - } - } - }; - - - class DeleteResourceOperation : public IJobOperation - { - private: - ServerContext& context_; - - public: - DeleteResourceOperation(ServerContext& context) : - context_(context) - { - } - - virtual void Apply(JobOperationValues& outputs, - const JobOperationValue& input, - IDicomConnectionManager& manager) - { - switch (input.GetType()) - { - case JobOperationValue::Type_DicomInstance: - { - const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input); - LOG(INFO) << "Deleting instance: " << instance.GetId(); - - try - { - Json::Value tmp; - context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance); - } - catch (OrthancException& e) - { - LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What(); - } - - break; - } - - default: - break; - } - } - }; - - - - class TimeoutDicomConnectionManager : public IDicomConnectionManager - { - private: - class Resource : public IDicomConnectionManager::IResource - { - private: - TimeoutDicomConnectionManager& that_; - boost::mutex::scoped_lock lock_; - - public: - Resource(TimeoutDicomConnectionManager& that) : - that_(that), - lock_(that.mutex_) - { - } - - virtual ~Resource() - { - that_.Touch(); - } - - virtual DicomUserConnection& GetConnection() - { - if (that_.connection_.get() == NULL) - { - throw OrthancException(ErrorCode_InternalError); - } - - return *that_.connection_; - } - }; - - boost::mutex mutex_; - std::auto_ptr<DicomUserConnection> connection_; - boost::posix_time::ptime lastUse_; - boost::posix_time::time_duration timeout_; - - static boost::posix_time::ptime GetNow() - { - return boost::posix_time::microsec_clock::universal_time(); - } - - void Touch() - { - lastUse_ = GetNow(); - } - - void CheckTimeoutInternal() - { - if (connection_.get() != NULL && - (GetNow() - lastUse_) >= timeout_) - { - connection_.reset(NULL); - } - } - - public: - TimeoutDicomConnectionManager() : - timeout_(boost::posix_time::milliseconds(1000)) - { - } - - void SetTimeout(unsigned int timeout) - { - boost::mutex::scoped_lock lock(mutex_); - - timeout_ = boost::posix_time::milliseconds(timeout); - CheckTimeoutInternal(); - } - - unsigned int GetTimeout() - { - boost::mutex::scoped_lock lock(mutex_); - return timeout_.total_milliseconds(); - } - - void Close() - { - boost::mutex::scoped_lock lock(mutex_); - connection_.reset(NULL); - } - - void CheckTimeout() - { - boost::mutex::scoped_lock lock(mutex_); - CheckTimeoutInternal(); - } - - virtual IResource* AcquireConnection(const std::string& localAet, - const RemoteModalityParameters& remote) - { - boost::mutex::scoped_lock lock(mutex_); - - if (connection_.get() == NULL || - !connection_->IsSameAssociation(localAet, remote)) - { - connection_.reset(new DicomUserConnection(localAet, remote)); - } - - return new Resource(*this); - } - }; - - - - class SequenceOfOperationsJob : public IJob - { - public: - class IObserver : public boost::noncopyable - { - public: - virtual ~IObserver() - { - } - - virtual void SignalDone(const SequenceOfOperationsJob& job) = 0; - }; - - private: - class Operation : public boost::noncopyable - { - private: - JobOperationValues originalInputs_; - JobOperationValues workInputs_; - std::auto_ptr<IJobOperation> operation_; - std::list<Operation*> nextOperations_; - size_t currentInput_; - - public: - Operation(IJobOperation* operation) : - operation_(operation), - currentInput_(0) - { - if (operation == NULL) - { - throw OrthancException(ErrorCode_NullPointer); - } - } - - void AddOriginalInput(const JobOperationValue& value) - { - if (currentInput_ != 0) - { - // Cannot add input after processing has started - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - originalInputs_.Append(value.Clone()); - } - } - - const JobOperationValues& GetOriginalInputs() const - { - return originalInputs_; - } - - void Reset() - { - workInputs_.Clear(); - currentInput_ = 0; - } - - void AddNextOperation(Operation& other) - { - if (currentInput_ != 0) - { - // Cannot add input after processing has started - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - nextOperations_.push_back(&other); - } - } - - bool IsDone() const - { - return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); - } - - void Step() - { - if (IsDone()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - const JobOperationValue* input; - - if (currentInput_ < originalInputs_.GetSize()) - { - input = &originalInputs_.GetValue(currentInput_); - } - else - { - input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); - } - - JobOperationValues outputs; - operation_->Apply(outputs, *input); - - if (!nextOperations_.empty()) - { - std::list<Operation*>::iterator first = nextOperations_.begin(); - outputs.Move((*first)->workInputs_); - - std::list<Operation*>::iterator current = first; - ++current; - - while (current != nextOperations_.end()) - { - (*first)->workInputs_.Copy((*current)->workInputs_); - ++current; - } - } - - currentInput_ += 1; - } - }; - - - std::string jobType_; - bool done_; - boost::mutex mutex_; - std::vector<Operation*> operations_; - size_t current_; - boost::condition_variable operationAdded_; - boost::posix_time::time_duration trailingTimeout_; - std::list<IObserver*> observers_; - - // Invoked from constructors - void Setup() - { - done_ = false; - current_ = 0; - trailingTimeout_ = boost::posix_time::milliseconds(1000); - } - - public: - SequenceOfOperationsJob() : - jobType_("SequenceOfOperations") - { - Setup(); - } - - SequenceOfOperationsJob(const std::string& jobType) : - jobType_(jobType) - { - Setup(); - } - - virtual ~SequenceOfOperationsJob() - { - for (size_t i = 0; i < operations_.size(); i++) - { - if (operations_[i] != NULL) - { - delete operations_[i]; - } - } - } - - void Register(IObserver& observer) - { - boost::mutex::scoped_lock lock(mutex_); - observers_.push_back(&observer); - } - - // This lock allows adding new operations to the end of the job, - // from another thread than the worker thread, after the job has - // been submitted for processing - class Lock : public boost::noncopyable - { - private: - SequenceOfOperationsJob& that_; - boost::mutex::scoped_lock lock_; - - public: - Lock(SequenceOfOperationsJob& that) : - that_(that), - lock_(that.mutex_) - { - } - - bool IsDone() const - { - return that_.done_; - } - - void SetTrailingOperationTimeout(unsigned int timeout) - { - that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); - } - - size_t AddOperation(IJobOperation* operation) - { - if (IsDone()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - that_.operations_.push_back(new Operation(operation)); - that_.operationAdded_.notify_one(); - - return that_.operations_.size() - 1; - } - - size_t GetOperationsCount() const - { - return that_.operations_.size(); - } - - void AddInput(size_t index, - const JobOperationValue& value) - { - if (IsDone()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else if (index >= that_.operations_.size() || - index < that_.current_) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - else - { - that_.operations_[index]->AddOriginalInput(value); - } - } - - void Connect(size_t input, - size_t output) - { - if (IsDone()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else if (input >= output || - input >= that_.operations_.size() || - output >= that_.operations_.size() || - input < that_.current_ || - output < that_.current_) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - else - { - Operation& a = *that_.operations_[input]; - Operation& b = *that_.operations_[output]; - a.AddNextOperation(b); - } - } - }; - - virtual void Start() - { - } - - virtual JobStepResult ExecuteStep() - { - boost::mutex::scoped_lock lock(mutex_); - - if (current_ == operations_.size()) - { - LOG(INFO) << "Executing the trailing timeout in the sequence of operations"; - operationAdded_.timed_wait(lock, trailingTimeout_); - - if (current_ == operations_.size()) - { - // No operation was added during the trailing timeout: The - // job is over - LOG(INFO) << "The sequence of operations is over"; - done_ = true; - - for (std::list<IObserver*>::iterator it = observers_.begin(); - it != observers_.end(); ++it) - { - (*it)->SignalDone(*this); - } - - return JobStepResult::Success(); - } - else - { - LOG(INFO) << "New operation added to the sequence of operations"; - } - } - - assert(current_ < operations_.size()); - - while (current_ < operations_.size() && - operations_[current_]->IsDone()) - { - current_++; - } - - if (current_ < operations_.size()) - { - operations_[current_]->Step(); - } - - return JobStepResult::Continue(); - } - - virtual void SignalResubmit() - { - boost::mutex::scoped_lock lock(mutex_); - - current_ = 0; - done_ = false; - - for (size_t i = 0; i < operations_.size(); i++) - { - operations_[i]->Reset(); - } - } - - virtual void ReleaseResources() - { - } - - virtual float GetProgress() - { - boost::mutex::scoped_lock lock(mutex_); - - return (static_cast<float>(current_) / - static_cast<float>(operations_.size() + 1)); - } - - virtual void GetJobType(std::string& target) - { - target = jobType_; - } - - virtual void GetPublicContent(Json::Value& value) - { - boost::mutex::scoped_lock lock(mutex_); - - value["CountOperations"] = static_cast<unsigned int>(operations_.size()); - } - - virtual void GetInternalContent(Json::Value& value) - { - // TODO - } - }; - - - class LuaJobManager : private SequenceOfOperationsJob::IObserver - { - public: - typedef SequenceOfOperationsJob::Lock Lock; - - private: - boost::mutex mutex_; - JobsEngine& engine_; - TimeoutDicomConnectionManager connectionManager_; - std::string currentId_; - SequenceOfOperationsJob* currentJob_; - size_t maxOperations_; - int priority_; - unsigned int trailingTimeout_; - bool continue_; - boost::thread connectionTimeoutThread_; - - static void ConnectionTimeoutThread(LuaJobManager* manager) - { - while (manager->continue_) - { - manager->connectionManager_.CheckTimeout(); - boost::this_thread::sleep(boost::posix_time::milliseconds(100)); - } - } - - virtual void SignalDone(const SequenceOfOperationsJob& job) - { - boost::mutex::scoped_lock lock(mutex_); - - if (&job == currentJob_) - { - currentId_.clear(); - currentJob_ = NULL; - } - } - - public: - LuaJobManager(JobsEngine& engine) : - engine_(engine), - currentJob_(NULL), - maxOperations_(1000), - priority_(0), - continue_(true) - { - connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this); - } - - ~LuaJobManager() - { - continue_ = false; - - if (connectionTimeoutThread_.joinable()) - { - connectionTimeoutThread_.join(); - } - } - - void SetMaxOperationsPerJob(size_t count) - { - boost::mutex::scoped_lock lock(mutex_); - maxOperations_ = count; - } - - void SetPriority(int priority) - { - boost::mutex::scoped_lock lock(mutex_); - priority_ = priority; - } - - void SetTrailingOperationTimeout(unsigned int timeout) - { - boost::mutex::scoped_lock lock(mutex_); - trailingTimeout_ = timeout; - } - - Lock* Modify() - { - boost::mutex::scoped_lock lock(mutex_); - - if (currentJob_ != NULL) - { - std::auto_ptr<Lock> result(new Lock(*currentJob_)); - - if (!result->IsDone() && - result->GetOperationsCount() < maxOperations_) - { - return result.release(); - } - } - - // Need to create a new job, as the previous one is either - // finished, or is getting too long - currentJob_ = new SequenceOfOperationsJob; - engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); - - std::auto_ptr<Lock> result(new Lock(*currentJob_)); - result->SetTrailingOperationTimeout(trailingTimeout_); - - return result.release(); - } - }; -} - TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) {