# HG changeset patch # User Sebastien Jodogne # Date 1526490043 -7200 # Node ID 593d6b0f4cba9d67e6893b0940e75cd1f4a486e7 # Parent 34dc57f4a7d210a4e76784b57d5f762833494195 IJobOperation diff -r 34dc57f4a7d2 -r 593d6b0f4cba UnitTestsSources/MultiThreadingTests.cpp --- a/UnitTestsSources/MultiThreadingTests.cpp Wed May 16 16:23:20 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Wed May 16 19:00:43 2018 +0200 @@ -272,8 +272,6 @@ virtual JobStepResult ExecuteStep() { - boost::this_thread::sleep(boost::posix_time::milliseconds(10)); - if (fails_) { return JobStepResult::Failure(ErrorCode_ParameterOutOfRange); @@ -728,77 +726,525 @@ - -TEST(JobsEngine, Basic) +TEST(JobsEngine, SubmitAndWait) { JobsEngine engine; - - std::string s; - - for (size_t i = 0; i < 20; i++) - engine.GetRegistry().Submit(s, new DummyJob(), rand() % 10); - engine.SetWorkersCount(3); engine.Start(); - boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + ASSERT_TRUE(engine.GetRegistry().SubmitAndWait(new DummyJob(), rand() % 10)); + ASSERT_FALSE(engine.GetRegistry().SubmitAndWait(new DummyJob(true), rand() % 10)); + + engine.Stop(); +} + + + + +#include "../OrthancServer/ServerContext.h" +#include "../Core/Logging.h" + +namespace +{ + class JobOperationValue : public boost::noncopyable { - typedef std::set Jobs; + public: + enum Type + { + Type_DicomInstance, + Type_Null + }; + + private: + Type type_; + + protected: + JobOperationValue(Type type) : + type_(type) + { + } + + public: + virtual ~JobOperationValue() + { + } - Jobs jobs; - engine.GetRegistry().ListJobs(jobs); + Type GetType() const + { + return type_; + } + + virtual JobOperationValue* Clone() const = 0; + }; + + + class IDicomConnectionProvider : public boost::noncopyable + { + public: + virtual ~IDicomConnectionProvider() + { + } + + class IResource : public boost::noncopyable + { + public: + virtual ~IResource() + { + } + + virtual DicomUserConnection& GetConnection() = 0; + }; - Json::Value v = Json::arrayValue; - for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) + virtual IResource* Acquire(const std::string& localAet, + const RemoteModalityParameters& remote) = 0; + }; + + + class JobOperationValues : public boost::noncopyable + { + private: + std::vector values_; + + public: + ~JobOperationValues() { - JobInfo info; + Clear(); + } + + void Append(JobOperationValues& target, + bool clear) + { + target.Reserve(target.GetSize() + GetSize()); + + for (size_t i = 0; i < values_.size(); i++) + { + if (clear) + { + target.Append(values_[i]); + values_[i] = NULL; + } + else + { + target.Append(GetValue(i).Clone()); + } + } - if (engine.GetRegistry().GetJobInfo(info, *it)) + if (clear) + { + Clear(); + } + } + + void Clear() + { + for (size_t i = 0; i < values_.size(); i++) { - Json::Value vv; - info.Serialize(vv, true); - v.append(vv); + if (values_[i] != NULL) + { + delete values_[i]; + } + } + + values_.clear(); + } + + void Reserve(size_t count) + { + values_.reserve(count); + } + + void Append(JobOperationValue* value) // Takes ownership + { + if (value == NULL) + { + throw OrthancException(ErrorCode_NullPointer); + } + else + { + values_.push_back(value); } } - std::cout << v << std::endl; - } - std::cout << "====================================================" << std::endl; + size_t GetSize() const + { + return values_.size(); + } + + JobOperationValue& GetValue(size_t index) const + { + if (index >= values_.size()) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + else + { + assert(values_[index] != NULL); + return *values_[index]; + } + } + }; + + + + class IJobOperation : public boost::noncopyable + { + public: + virtual ~IJobOperation() + { + } + + virtual void Apply(JobOperationValues& outputs, + const JobOperationValue& input, + IDicomConnectionProvider& provider); + }; + - boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + class DicomInstanceValue : public JobOperationValue + { + private: + ServerContext& context_; + std::string id_; + + public: + DicomInstanceValue(ServerContext& context, + const std::string& id) : + JobOperationValue(Type_DicomInstance), + context_(context), + id_(id) + { + } - if (1) + const std::string& GetId() const + { + return id_; + } + + void ReadContent(std::string& dicom) const + { + context_.ReadDicom(dicom, id_); + } + + virtual JobOperationValue* Clone() const + { + return new DicomInstanceValue(context_, id_); + } + }; + + + class StoreScuOperation : public IJobOperation { - ASSERT_TRUE(engine.GetRegistry().SubmitAndWait(new DummyJob(), rand() % 10)); - ASSERT_FALSE(engine.GetRegistry().SubmitAndWait(new DummyJob(true), rand() % 10)); - } + private: + std::string localAet_; + RemoteModalityParameters modality_; + + public: + StoreScuOperation(const std::string& localAet, + const RemoteModalityParameters& modality) : + localAet_(localAet), + modality_(modality) + { + } + + virtual void Apply(JobOperationValues& outputs, + const JobOperationValue& input, + IDicomConnectionProvider& provider) + { + std::auto_ptr resource(provider.Acquire(localAet_, modality_)); + + if (resource.get() == NULL) + { + LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle(); + return; + } + + if (input.GetType() != JobOperationValue::Type_DicomInstance) + { + throw OrthancException(ErrorCode_BadParameterType); + } + + const DicomInstanceValue& instance = dynamic_cast(input); + + LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \"" + << modality_.GetApplicationEntityTitle() << "\""; - boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + try + { + std::string dicom; + instance.ReadContent(dicom); + resource->GetConnection().Store(dicom); + outputs.Append(instance.Clone()); + } + catch (OrthancException& e) + { + LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \"" + << modality_.GetApplicationEntityTitle() << "\": " << e.What(); + } + } + }; + - - engine.Stop(); + class DeleteResourceOperation : public IJobOperation + { + private: + ServerContext& context_; + + public: + DeleteResourceOperation(ServerContext& context) : + context_(context) + { + } + + virtual void Apply(JobOperationValues& outputs, + const JobOperationValue& input, + IDicomConnectionProvider& provider) + { + switch (input.GetType()) + { + case JobOperationValue::Type_DicomInstance: + { + const DicomInstanceValue& instance = dynamic_cast(input); + LOG(INFO) << "Deleting instance: " << instance.GetId(); - if (0) + try + { + Json::Value tmp; + context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance); + } + catch (OrthancException& e) + { + LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What(); + } + + break; + } + + default: + break; + } + } + }; + + + class SequenceOfOperationsJob : + public IJob, + private IDicomConnectionProvider { - typedef std::set Jobs; + private: + /*class DicomConnection + { + private: + boost::posix_time::ptime lastUse_; + + void Touch() + { + lastUse_ = boost::posix_time::microsec_clock::universal_time(); + } + + public: + class Resource : public IDicomConnectionProvider::IResource + { + private: + DicomConnection() + }; + };*/ - Jobs jobs; - engine.GetRegistry().ListJobs(jobs); + class Operation : public boost::noncopyable + { + private: + JobOperationValues originalInputs_; + JobOperationValues workInputs_; + std::auto_ptr operation_; + std::list nextOperations_; + size_t currentInput_; + + public: + Operation(IJobOperation* operation) : + operation_(operation), + currentInput_(0) + { + if (operation == NULL) + { + throw OrthancException(ErrorCode_NullPointer); + } + } + + void AddOriginalInput(const JobOperationValue& value) + { + if (currentInput_ != 0) + { + // Cannot add input after processing has started + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + originalInputs_.Append(value.Clone()); + } + } - Json::Value v = Json::arrayValue; - for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) - { - JobInfo info; + const JobOperationValues& GetOriginalInputs() const + { + return originalInputs_; + } + + void Reset() + { + workInputs_.Clear(); + currentInput_ = 0; + } + + void AddNextOperation(Operation& other) + { + if (currentInput_ != 0) + { + // Cannot add input after processing has started + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + nextOperations_.push_back(&other); + } + } + + bool IsDone() const + { + return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); + } + + void Step(IDicomConnectionProvider& provider) + { + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + const JobOperationValue* input; - if (engine.GetRegistry().GetJobInfo(info, *it)) + if (currentInput_ < originalInputs_.GetSize()) + { + input = &originalInputs_.GetValue(currentInput_); + } + else + { + input = &originalInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); + } + + JobOperationValues outputs; + operation_->Apply(outputs, *input, provider); + + if (!nextOperations_.empty()) + { + // TODO + } + + currentInput_ += 1; + } + }; + + + boost::mutex mutex_; + std::vector operations_; + size_t currentOperation_; + boost::condition_variable operationAdded_; + + public: + SequenceOfOperationsJob() : + currentOperation_(0) + { + } + + virtual ~SequenceOfOperationsJob() + { + for (size_t i = 0; i < operations_.size(); i++) { - Json::Value vv; - info.Serialize(vv, true); - v.append(vv); + if (operations_[i] != NULL) + { + delete operations_[i]; + } } } - std::cout << v << std::endl; - } + class Lock : public boost::noncopyable + { + private: + SequenceOfOperationsJob& that_; + boost::mutex::scoped_lock lock_; + + public: + Lock(SequenceOfOperationsJob& that) : + that_(that), + lock_(that.mutex_) + { + } + + size_t AddOperation(IJobOperation* operation) + { + that_.operations_.push_back(new Operation(operation)); + that_.operationAdded_.notify_one(); + + return that_.operations_.size() - 1; + } + + void AddInput(size_t index, + const JobOperationValue& value) + { + if (index >= that_.operations_.size() || + index < that_.currentOperation_) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + else + { + that_.operations_[index]->AddOriginalInput(value); + } + } + + void Connect(size_t input, + size_t output) + { + if (input >= output || + input >= that_.operations_.size() || + output >= that_.operations_.size() || + input < that_.currentOperation_ || + output < that_.currentOperation_) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + else + { + Operation& a = *that_.operations_[input]; + Operation& b = *that_.operations_[output]; + a.AddNextOperation(b); + } + } + }; + + virtual void Start() + { + } + + virtual JobStepResult ExecuteStep() = 0; + + virtual void SignalResubmit() + { + boost::mutex::scoped_lock lock(mutex_); + + currentOperation_ = 0; + + for (size_t i = 0; i < operations_.size(); i++) + { + operations_[i]->Reset(); + } + } + + virtual void ReleaseResources() = 0; // For pausing/canceling jobs + + virtual float GetProgress() = 0; + + virtual void GetJobType(std::string& target) = 0; + + virtual void GetPublicContent(Json::Value& value) = 0; + + virtual void GetInternalContent(Json::Value& value) = 0; + }; }