Mercurial > hg > orthanc
changeset 2583:1b6a6d80b6f2 jobs
OrthancPeerStoreJob
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Mon, 14 May 2018 20:43:16 +0200 |
parents | b3da733d984c |
children | 38b5045f2bff |
files | Core/JobsEngine/IJob.h Core/JobsEngine/JobsRegistry.cpp OrthancExplorer/explorer.js OrthancServer/OrthancRestApi/OrthancRestModalities.cpp UnitTestsSources/MultiThreadingTests.cpp |
diffstat | 5 files changed, 249 insertions(+), 154 deletions(-) [+] |
line wrap: on
line diff
--- a/Core/JobsEngine/IJob.h Fri May 11 17:58:06 2018 +0200 +++ b/Core/JobsEngine/IJob.h Mon May 14 20:43:16 2018 +0200 @@ -52,7 +52,10 @@ virtual JobStepResult* ExecuteStep() = 0; - virtual void ReleaseResources() = 0; // For pausing jobs + // Method called once the job is resubmitted after a failure + virtual void SignalResubmit() = 0; + + virtual void ReleaseResources() = 0; // For pausing/canceling jobs virtual float GetProgress() = 0;
--- a/Core/JobsEngine/JobsRegistry.cpp Fri May 11 17:58:06 2018 +0200 +++ b/Core/JobsEngine/JobsRegistry.cpp Mon May 14 20:43:16 2018 +0200 @@ -773,6 +773,8 @@ } else { + found->second->GetJob().SignalResubmit(); + bool ok = false; for (CompletedJobs::iterator it = completedJobs_.begin(); it != completedJobs_.end(); ++it)
--- a/OrthancExplorer/explorer.js Fri May 11 17:58:06 2018 +0200 +++ b/OrthancExplorer/explorer.js Mon May 14 20:43:16 2018 +0200 @@ -1151,6 +1151,22 @@ var target = $('#all-jobs'); $('li', target).remove(); + var running = $('<li>') + .attr('data-role', 'list-divider') + .text('Currently running'); + + var pending = $('<li>') + .attr('data-role', 'list-divider') + .text('Pending jobs'); + + var inactive = $('<li>') + .attr('data-role', 'list-divider') + .text('Inactive jobs'); + + target.append(running); + target.append(pending); + target.append(inactive); + jobs.map(function(job) { var li = $('<li>'); var item = $('<a>'); @@ -1164,7 +1180,16 @@ AddJobDateField(item, 'Creation time: ', job.CreationTime); AddJobDateField(item, 'Completion time: ', job.CompletionTime); AddJobDateField(item, 'ETA: ', job.EstimatedTimeOfArrival); - target.append(li); + + if (job.State == 'Running') { + AddJobField(item, 'Progress: ', job.Progress); + li.insertAfter(running); + } else if (job.State == 'Pending' || + job.State == 'Paused') { + li.insertAfter(pending); + } else { + li.insertAfter(inactive); + } }); target.listview('refresh'); @@ -1210,8 +1235,14 @@ .text('Detailed information')); var block = $('<li>'); - for (var i in job.PublicContent) { - AddJobField(block, i + ': ', JSON.stringify(job.PublicContent[i])); + + for (var item in job.PublicContent) { + var value = job.PublicContent[item]; + if (typeof value !== 'string') { + value = JSON.stringify(value); + } + + AddJobField(block, item + ': ', value); } target.append(block); @@ -1251,6 +1282,7 @@ url: '../jobs/' + $.mobile.pageData.uuid + '/' + action, type: 'POST', async: false, + cache: false, complete: function(s) { window.location.reload(); }
--- a/OrthancServer/OrthancRestApi/OrthancRestModalities.cpp Fri May 11 17:58:06 2018 +0200 +++ b/OrthancServer/OrthancRestApi/OrthancRestModalities.cpp Mon May 14 20:43:16 2018 +0200 @@ -48,16 +48,22 @@ namespace Orthanc { - class InstancesIteratorJob : public IJob + class SetOfInstancesJob : public IJob { private: bool started_; std::vector<std::string> instances_; + bool permissive_; size_t position_; + std::set<std::string> failedInstances_; + + protected: + virtual bool HandleInstance(const std::string& instance) = 0; public: - InstancesIteratorJob() : + SetOfInstancesJob() : started_(false), + permissive_(false), position_(0) { } @@ -90,7 +96,37 @@ instances_.push_back(instance); } } - + + bool IsPermissive() const + { + return permissive_; + } + + void SetPermissive(bool permissive) + { + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + permissive_ = permissive; + } + } + + virtual void SignalResubmit() + { + if (started_) + { + position_ = 0; + failedInstances_.clear(); + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + virtual void Start() { started_ = true; @@ -142,22 +178,98 @@ return instances_[position_]; } } + + + const std::vector<std::string>& GetInstances() const + { + return instances_; + } + + + const std::set<std::string>& GetFailedInstances() const + { + return failedInstances_; + } + + + virtual JobStepResult* ExecuteStep() + { + if (IsDone()) + { + return new JobStepResult(JobStepCode_Failure); + } + + bool ok; + + try + { + ok = HandleInstance(GetCurrentInstance()); + } + catch (OrthancException& e) + { + ok = false; + } + + if (!ok) + { + if (permissive_) + { + failedInstances_.insert(GetCurrentInstance()); + } + else + { + return new JobStepResult(JobStepCode_Failure); + } + } + + Next(); + + if (IsDone()) + { + return new JobStepResult(JobStepCode_Success); + } + else + { + return new JobStepResult(JobStepCode_Continue); + } + } + + virtual void GetInternalContent(Json::Value& value) + { + Json::Value v = Json::arrayValue; + + for (size_t i = 0; i < instances_.size(); i++) + { + v.append(instances_[i]); + } + + value["Instances"] = v; + + + v = Json::arrayValue; + + for (std::set<std::string>::const_iterator it = failedInstances_.begin(); + it != failedInstances_.end(); ++it) + { + v.append(*it); + } + + value["FailedInstances"] = v; + } }; - class StoreScuJob : public InstancesIteratorJob + class DicomStoreJob : public SetOfInstancesJob { private: ServerContext& context_; std::string localAet_; RemoteModalityParameters remote_; - bool permissive_; std::string moveOriginatorAet_; uint16_t moveOriginatorId_; std::auto_ptr<DicomUserConnection> connection_; - std::set<std::string> failedInstances_; - void CreateConnection() + void OpenConnection() { if (connection_.get() == NULL) { @@ -166,32 +278,40 @@ connection_->SetRemoteModality(remote_); } } + + protected: + virtual bool HandleInstance(const std::string& instance) + { + OpenConnection(); + + LOG(INFO) << "Sending instance " << instance << " to modality \"" + << remote_.GetApplicationEntityTitle() << "\""; + + std::string dicom; + context_.ReadDicom(dicom, GetCurrentInstance()); + + if (HasMoveOriginator()) + { + connection_->Store(dicom, moveOriginatorAet_, moveOriginatorId_); + } + else + { + connection_->Store(dicom); + } + + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + + return true; + } public: - StoreScuJob(ServerContext& context) : + DicomStoreJob(ServerContext& context) : context_(context), localAet_("ORTHANC"), - permissive_(false), moveOriginatorId_(0) // By default, not a C-MOVE { } - void AddResource(const std::string& publicId) - { - typedef std::list<std::string> Instances; - - Instances instances; - context_.GetIndex().GetChildInstances(instances, publicId); - - Reserve(GetInstancesCount() + instances.size()); - - for (Instances::const_iterator it = instances.begin(); - it != instances.end(); ++it) - { - AddInstance(*it); - } - } - const std::string& GetLocalAet() const { return localAet_; @@ -226,23 +346,6 @@ } } - bool IsPermissive() const - { - return permissive_; - } - - void SetPermissive(bool permissive) - { - if (IsStarted()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - permissive_ = permissive; - } - } - bool HasMoveOriginator() const { return moveOriginatorId_ != 0; @@ -291,63 +394,6 @@ } } - virtual JobStepResult* ExecuteStep() - { - if (IsDone()) - { - return new JobStepResult(JobStepCode_Success); - } - - CreateConnection(); - - bool ok = false; - - try - { - std::string dicom; - context_.ReadDicom(dicom, GetCurrentInstance()); - - if (HasMoveOriginator()) - { - connection_->Store(dicom, moveOriginatorAet_, moveOriginatorId_); - } - else - { - connection_->Store(dicom); - } - - boost::this_thread::sleep(boost::posix_time::milliseconds(300)); - - ok = true; - } - catch (OrthancException& e) - { - } - - if (!ok) - { - if (permissive_) - { - failedInstances_.insert(GetCurrentInstance()); - } - else - { - return new JobStepResult(JobStepCode_Failure); - } - } - - Next(); - - if (IsDone()) - { - return new JobStepResult(JobStepCode_Success); - } - else - { - return new JobStepResult(JobStepCode_Continue); - } - } - virtual void ReleaseResources() // For pausing jobs { connection_.reset(NULL); @@ -355,7 +401,7 @@ virtual void GetJobType(std::string& target) { - target = "C-Store"; + target = "DicomStore"; } virtual void GetPublicContent(Json::Value& value) @@ -369,19 +415,66 @@ value["MoveOriginatorID"] = GetMoveOriginatorId(); } - Json::Value v = Json::arrayValue; - for (std::set<std::string>::const_iterator it = failedInstances_.begin(); - it != failedInstances_.end(); ++it) + value["InstancesCount"] = static_cast<uint32_t>(GetInstances().size()); + value["FailedInstancesCount"] = static_cast<uint32_t>(GetFailedInstances().size()); + } + }; + + + class OrthancPeerStoreJob : public SetOfInstancesJob + { + private: + ServerContext& context_; + WebServiceParameters peer_; + std::auto_ptr<HttpClient> client_; + + protected: + virtual bool HandleInstance(const std::string& instance) + { + if (client_.get() == NULL) { - v.append(*it); + client_.reset(new HttpClient(peer_, "instances")); + client_->SetMethod(HttpMethod_Post); } + + LOG(INFO) << "Sending instance " << instance << " to peer \"" + << peer_.GetUrl() << "\""; - value["FailedInstances"] = v; + context_.ReadDicom(client_->GetBody(), GetCurrentInstance()); + + std::string answer; + return client_->Apply(answer); + } + + public: + OrthancPeerStoreJob(ServerContext& context) : + context_(context) + { } - virtual void GetInternalContent(Json::Value& value) + const WebServiceParameters& GetPeer() const + { + return peer_; + } + + virtual void ReleaseResources() // For pausing jobs + { + client_.reset(NULL); + } + + virtual void GetJobType(std::string& target) { - // TODO + target = "OrthancPeerStore"; + } + + virtual void GetPublicContent(Json::Value& value) + { + Json::Value v; + peer_.ToJson(v); + value["Peer"] = v; + + value["InstancesCount"] = static_cast<uint32_t>(GetInstances().size()); + value["FailedInstancesCount"] = static_cast<uint32_t>(GetFailedInstances().size()); } }; } @@ -1036,25 +1129,20 @@ int moveOriginatorID = Toolbox::GetJsonIntegerField(request, "MoveOriginatorID", 0 /* By default, not a C-MOVE */); int priority = Toolbox::GetJsonIntegerField(request, "Priority", 0); - if (moveOriginatorID < 0 || - moveOriginatorID >= 65536) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - RemoteModalityParameters p = Configuration::GetModalityUsingSymbolicName(remote); -#if 1 - std::auto_ptr<StoreScuJob> job(new StoreScuJob(context)); + std::auto_ptr<DicomStoreJob> job(new DicomStoreJob(context)); job->SetLocalAet(localAet); job->SetRemoteModality(p); job->SetPermissive(permissive); if (moveOriginatorID != 0) { - job->SetMoveOriginator(moveOriginatorAET, static_cast<uint16_t>(moveOriginatorID)); + job->SetMoveOriginator(moveOriginatorAET, moveOriginatorID); } + job->Reserve(instances.size()); + for (std::list<std::string>::const_iterator it = instances.begin(); it != instances.end(); ++it) { @@ -1080,40 +1168,6 @@ { call.GetOutput().SignalError(HttpStatus_500_InternalServerError); } - -#else - ServerJob job; - for (std::list<std::string>::const_iterator - it = instances.begin(); it != instances.end(); ++it) - { - std::auto_ptr<StoreScuCommand> command(new StoreScuCommand(context, localAet, p, permissive)); - - if (moveOriginatorID != 0) - { - command->SetMoveOriginator(moveOriginatorAET, static_cast<uint16_t>(moveOriginatorID)); - } - - job.AddCommand(command.release()).AddInput(*it); - } - - job.SetDescription("HTTP request: Store-SCU to peer \"" + remote + "\""); - - if (asynchronous) - { - // Asynchronous mode: Submit the job, but don't wait for its completion - context.GetScheduler().Submit(job); - call.GetOutput().AnswerBuffer("{}", "application/json"); - } - else if (context.GetScheduler().SubmitAndWait(job)) - { - // Synchronous mode: We have submitted and waited for completion - call.GetOutput().AnswerBuffer("{}", "application/json"); - } - else - { - call.GetOutput().SignalError(HttpStatus_500_InternalServerError); - } -#endif }