# HG changeset patch # User Sebastien Jodogne # Date 1525722124 -7200 # Node ID 2e879c796ec7c5bf283624a20d074f5cae46a6ea # Parent 2af17cd5eb1fbd50da829d776ef2134415515499 JobsRegistry::SubmitAndWait(), StoreScuJob diff -r 2af17cd5eb1f -r 2e879c796ec7 Core/JobsEngine/IJob.h --- a/Core/JobsEngine/IJob.h Mon May 07 15:37:20 2018 +0200 +++ b/Core/JobsEngine/IJob.h Mon May 07 21:42:04 2018 +0200 @@ -47,6 +47,9 @@ { } + // Method called once the job enters the jobs engine + virtual void Start() = 0; + virtual JobStepResult* ExecuteStep() = 0; virtual void ReleaseResources() = 0; // For pausing jobs diff -r 2af17cd5eb1f -r 2e879c796ec7 Core/JobsEngine/JobsEngine.cpp --- a/Core/JobsEngine/JobsEngine.cpp Mon May 07 15:37:20 2018 +0200 +++ b/Core/JobsEngine/JobsEngine.cpp Mon May 07 21:42:04 2018 +0200 @@ -102,6 +102,7 @@ return false; case JobStepCode_Retry: + running.GetJob().ReleaseResources(); running.MarkRetry(dynamic_cast(*result).GetTimeout()); return false; @@ -190,11 +191,6 @@ void JobsEngine::SetWorkersCount(size_t count) { - if (count == 0) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - boost::mutex::scoped_lock lock(stateMutex_); if (state_ != State_Setup) @@ -218,6 +214,19 @@ retryHandler_ = boost::thread(RetryHandler, this); + if (workers_.size() == 0) + { + // Use all the available CPUs + size_t n = boost::thread::hardware_concurrency(); + + if (n == 0) + { + n = 1; + } + + workers_.resize(n); + } + for (size_t i = 0; i < workers_.size(); i++) { workers_[i] = boost::thread(Worker, this, i); @@ -225,7 +234,7 @@ state_ = State_Running; - LOG(WARNING) << "The jobs engine has started"; + LOG(WARNING) << "The jobs engine has started with " << workers_.size() << " threads"; } diff -r 2af17cd5eb1f -r 2e879c796ec7 Core/JobsEngine/JobsRegistry.cpp --- a/Core/JobsEngine/JobsRegistry.cpp Mon May 07 15:37:20 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.cpp Mon May 07 21:42:04 2018 +0200 @@ -92,6 +92,7 @@ } lastStatus_ = JobStatus(ErrorCode_Success, *job); + job->Start(); } const std::string& GetId() const @@ -348,6 +349,8 @@ completedJobs_.push_back(&job); ForgetOldCompletedJobs(); + someJobComplete_.notify_all(); + CheckInvariants(); } @@ -382,6 +385,24 @@ } + bool JobsRegistry::GetStateInternal(JobState& state, + const std::string& id) + { + CheckInvariants(); + + JobsIndex::const_iterator it = jobsIndex_.find(id); + if (it == jobsIndex_.end()) + { + return false; + } + else + { + state = it->second->GetState(); + return true; + } + } + + JobsRegistry::~JobsRegistry() { for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) @@ -474,6 +495,31 @@ } + bool JobsRegistry::SubmitAndWait(IJob* job, // Takes ownership + int priority) + { + std::string id; + Submit(id, job, priority); + + printf(">> %s\n", id.c_str()); fflush(stdout); + + JobState state; + + { + boost::mutex::scoped_lock lock(mutex_); + + while (GetStateInternal(state, id) && + state != JobState_Success && + state != JobState_Failure) + { + someJobComplete_.wait(lock); + } + } + + return (state == JobState_Success); + } + + void JobsRegistry::SetPriority(const std::string& id, int priority) { @@ -687,18 +733,7 @@ const std::string& id) { boost::mutex::scoped_lock lock(mutex_); - CheckInvariants(); - - JobsIndex::const_iterator it = jobsIndex_.find(id); - if (it == jobsIndex_.end()) - { - return false; - } - else - { - state = it->second->GetState(); - return true; - } + return GetStateInternal(state, id); } diff -r 2af17cd5eb1f -r 2e879c796ec7 Core/JobsEngine/JobsRegistry.h --- a/Core/JobsEngine/JobsRegistry.h Mon May 07 15:37:20 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.h Mon May 07 21:42:04 2018 +0200 @@ -77,6 +77,7 @@ RetryJobs retryJobs_; boost::condition_variable pendingJobAvailable_; + boost::condition_variable someJobComplete_; size_t maxCompletedJobs_; @@ -99,6 +100,9 @@ unsigned int timeout); void MarkRunningAsPaused(JobHandler& job); + + bool GetStateInternal(JobState& state, + const std::string& id); public: JobsRegistry() : @@ -122,6 +126,9 @@ void Submit(IJob* job, // Takes ownership int priority); + + bool SubmitAndWait(IJob* job, // Takes ownership + int priority); void SetPriority(const std::string& id, int priority); diff -r 2af17cd5eb1f -r 2e879c796ec7 OrthancServer/ServerContext.cpp --- a/OrthancServer/ServerContext.cpp Mon May 07 15:37:20 2018 +0200 +++ b/OrthancServer/ServerContext.cpp Mon May 07 21:42:04 2018 +0200 @@ -134,6 +134,10 @@ listeners_.push_back(ServerListener(lua_, "Lua")); + jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2)); + //jobsEngine_.SetMaxCompleted // TODO + jobsEngine_.Start(); + changeThread_ = boost::thread(ChangeThread, this); } @@ -168,6 +172,7 @@ scu_.Finalize(); // Do not change the order below! + jobsEngine_.Stop(); scheduler_.Stop(); index_.Stop(); } diff -r 2af17cd5eb1f -r 2e879c796ec7 OrthancServer/ServerContext.h --- a/OrthancServer/ServerContext.h Mon May 07 15:37:20 2018 +0200 +++ b/OrthancServer/ServerContext.h Mon May 07 21:42:04 2018 +0200 @@ -48,6 +48,7 @@ #include "Scheduler/ServerScheduler.h" #include "ServerIndex.h" #include "OrthancHttpHandler.h" +#include "../Core/JobsEngine/JobsEngine.h" #include #include @@ -120,6 +121,7 @@ MemoryCache dicomCache_; ReusableDicomUserConnection scu_; ServerScheduler scheduler_; + JobsEngine jobsEngine_; LuaScripting lua_; @@ -248,6 +250,11 @@ return scheduler_; } + JobsEngine& GetJobsEngine() + { + return jobsEngine_; + } + bool DeleteResource(Json::Value& target, const std::string& uuid, ResourceType expectedType); diff -r 2af17cd5eb1f -r 2e879c796ec7 Resources/Configuration.json --- a/Resources/Configuration.json Mon May 07 15:37:20 2018 +0200 +++ b/Resources/Configuration.json Mon May 07 21:42:04 2018 +0200 @@ -43,6 +43,11 @@ "Plugins" : [ ], + // Maximum number of processing jobs that are simultanously running + // at any given time. A value of "0" indicates to use all the + // available CPU logical cores. To emulate Orthanc <= 1.3.2, set + // this value to "1". + "ConcurrentJobs" : 2, /** diff -r 2af17cd5eb1f -r 2e879c796ec7 UnitTestsSources/MultiThreadingTests.cpp --- a/UnitTestsSources/MultiThreadingTests.cpp Mon May 07 15:37:20 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Mon May 07 21:42:04 2018 +0200 @@ -263,9 +263,13 @@ { } + virtual void Start() + { + } + virtual JobStepResult* ExecuteStep() { - boost::this_thread::sleep(boost::posix_time::milliseconds(50)); + boost::this_thread::sleep(boost::posix_time::milliseconds(10)); if (count_ == steps_ - 1) { @@ -598,6 +602,323 @@ } + +#include "../OrthancServer/ServerContext.h" + +namespace Orthanc +{ + class InstancesIteratorJob : public IJob + { + private: + bool started_; + std::vector instances_; + size_t position_; + + public: + InstancesIteratorJob() : + started_(false), + position_(0) + { + } + + void Reserve(size_t size) + { + if (started_) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + instances_.reserve(size); + } + } + + void AddInstance(const std::string& instance) + { + if (started_) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + instances_.push_back(instance); + } + } + + virtual void Start() + { + started_ = true; + } + + virtual float GetProgress() + { + if (instances_.size() == 0) + { + return 0; + } + else + { + return (static_cast(position_) / + static_cast(instances_.size())); + } + } + + bool IsStarted() const + { + return started_; + } + + bool IsDone() const + { + if (instances_.size() == 0) + { + return true; + } + else + { + return (position_ == instances_.size() - 1); + } + } + + void Next() + { + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + position_ += 1; + } + } + + const std::string& GetCurrentInstance() const + { + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + return instances_[position_]; + } + } + }; + + + class StoreScuJob : public InstancesIteratorJob + { + private: + ServerContext& context_; + std::string localAet_; + RemoteModalityParameters remote_; + bool permissive_; + std::string moveOriginatorAet_; + uint16_t moveOriginatorId_; + std::auto_ptr connection_; + std::set failedInstances_; + + void Open() + { + if (connection_.get() == NULL) + { + connection_.reset(new DicomUserConnection); + connection_->SetLocalApplicationEntityTitle(localAet_); + connection_->SetRemoteModality(remote_); + connection_->Open(); + } + } + + public: + StoreScuJob(ServerContext& context) : + context_(context), + localAet_("ORTHANC"), + permissive_(false), + moveOriginatorId_(0) // By default, not a C-MOVE + { + } + + const std::string& GetLocalAet() const + { + return localAet_; + } + + void SetLocalAet(const std::string& aet) + { + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + localAet_ = aet; + } + } + + const RemoteModalityParameters& GetRemoteModality() const + { + return remote_; + } + + void SetRemoteModality(const RemoteModalityParameters& remote) + { + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + remote_ = remote; + } + } + + bool IsPermissive() const + { + return permissive_; + } + + void SetPermissive(bool permissive) + { + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + permissive_ = permissive; + } + } + + bool HasMoveOriginator() const + { + return moveOriginatorId_ != 0; + } + + const std::string& GetMoveOriginatorAet() const + { + if (HasMoveOriginator()) + { + return moveOriginatorAet_; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + uint16_t GetMoveOriginatorId() const + { + if (HasMoveOriginator()) + { + return moveOriginatorId_; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + void SetMoveOriginator(const std::string& aet, + int id) + { + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else if (id < 0 || + id >= 65536) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + else + { + moveOriginatorId_ = static_cast(id); + moveOriginatorAet_ = aet; + } + } + + virtual JobStepResult* ExecuteStep() + { + if (IsDone()) + { + return new JobStepResult(JobStepCode_Success); + } + + Open(); + + bool ok = false; + + try + { + std::string dicom; + context_.ReadDicom(dicom, GetCurrentInstance()); + + if (HasMoveOriginator()) + { + connection_->Store(dicom, moveOriginatorAet_, moveOriginatorId_); + } + else + { + connection_->Store(dicom); + } + + ok = true; + } + catch (OrthancException& e) + { + } + + if (!ok) + { + if (permissive_) + { + failedInstances_.insert(GetCurrentInstance()); + } + else + { + return new JobStepResult(JobStepCode_Failure); + } + } + + Next(); + + return new JobStepResult(IsDone() ? JobStepCode_Success : JobStepCode_Continue); + } + + virtual void ReleaseResources() // For pausing jobs + { + connection_.release(); + } + + virtual void GetDescription(Json::Value& value) + { + value["Type"] = "C-STORE"; + value["LocalAet"] = localAet_; + + Json::Value v; + remote_.ToJson(v); + value["Target"] = v; + + if (HasMoveOriginator()) + { + value["MoveOriginatorAET"] = GetMoveOriginatorAet(); + value["MoveOriginatorID"] = GetMoveOriginatorId(); + } + + v = Json::arrayValue; + for (std::set::const_iterator it = failedInstances_.begin(); + it != failedInstances_.end(); ++it) + { + v.append(*it); + } + + value["FailedInstances"] = v; + } + }; +} + + + TEST(JobsEngine, Basic) { JobsEngine engine; @@ -636,10 +957,18 @@ std::cout << "====================================================" << std::endl; boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + + if (1) + { + printf(">> %d\n", engine.GetRegistry().SubmitAndWait(new DummyJob(JobStepResult(Orthanc::JobStepCode_Failure)), rand() % 10)); + } + + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + engine.Stop(); - + if (0) { typedef std::set Jobs;