Mercurial > hg > orthanc-dicomweb
changeset 333:bc903f260e38
refactoring of StowClientJob using SingleFunctionJob
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 21 Jun 2019 17:16:50 +0200 |
parents | bff4b45cbf1f |
children | b995fc9156de |
files | Plugin/DicomWebClient.cpp |
diffstat | 1 files changed, 120 insertions(+), 158 deletions(-) [+] |
line wrap: on
line diff
--- a/Plugin/DicomWebClient.cpp Fri Jun 21 16:12:42 2019 +0200 +++ b/Plugin/DicomWebClient.cpp Fri Jun 21 17:16:50 2019 +0200 @@ -64,7 +64,7 @@ unsigned int maxPosition) { boost::mutex::scoped_lock lock(that_.mutex_); - + if (maxPosition == 0 || position > maxPosition) { @@ -175,8 +175,6 @@ static void Worker(SingleFunctionJob* job, IFunctionFactory* factory) { - printf("=================================> STARTING\n"); - assert(job != NULL && factory != NULL); JobContext context(*job); @@ -247,8 +245,6 @@ virtual OrthancPluginJobStepStatus Step() { - printf("=================================> STEP\n"); - if (factory_ == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); @@ -283,8 +279,6 @@ virtual void Stop(OrthancPluginJobStopReason reason) { - printf("=================================> STOP %d\n", (int) reason); - if (factory_ == NULL) { return; @@ -312,8 +306,6 @@ virtual void Reset() { - printf("=================================> RESET\n"); - boost::mutex::scoped_lock lock(mutex_); assert(worker_.get() == NULL); @@ -564,48 +556,55 @@ } -class StowClientJob : public OrthancPlugins::OrthancJob +class StowClientJob : + public SingleFunctionJob, + private SingleFunctionJob::IFunctionFactory { private: - enum State + enum Action { - State_Running, - State_Stopped, - State_Error, - State_Done + Action_None, + Action_Pause, + Action_Cancel }; - + + boost::mutex mutex_; + std::string serverName_; + std::vector<std::string> instances_; + OrthancPlugins::HttpClient::HttpHeaders headers_; + std::string boundary_; + size_t position_; + Action action_; + size_t networkSize_; + bool debug_; - boost::mutex mutex_; - std::string serverName_; - std::vector<std::string> instances_; - OrthancPlugins::HttpClient::HttpHeaders headers_; - std::string boundary_; - - - std::auto_ptr<boost::thread> worker_; - - - State state_; - size_t position_; - Json::Value content_; - - - bool ReadNextInstance(std::string& dicom) + bool ReadNextInstance(std::string& dicom, + JobContext& context) { boost::mutex::scoped_lock lock(mutex_); - if (state_ != State_Running) + if (action_ != Action_None) { return false; } while (position_ < instances_.size()) { + context.SetProgress(position_, instances_.size()); + size_t i = position_++; + if (debug_) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + } + if (OrthancPlugins::RestApiGetString(dicom, "/instances/" + instances_[i] + "/file", false)) { + networkSize_ += dicom.size(); + context.SetContent("NetworkSizeMB", boost::lexical_cast<std::string> + (networkSize_ / static_cast<uint64_t>(1024 * 1024))); + return true; } } @@ -618,12 +617,15 @@ { private: StowClientJob& that_; + JobContext& context_; std::string boundary_; bool done_; public: - RequestBody(StowClientJob& that) : + RequestBody(StowClientJob& that, + JobContext& context) : that_(that), + context_(context), boundary_(that.boundary_), done_(false) { @@ -633,13 +635,14 @@ { if (done_) { + context_.SetProgress(1, 1); return false; } else { std::string dicom; - if (that_.ReadNextInstance(dicom)) + if (that_.ReadNextInstance(dicom, context_)) { chunk = ("--" + boundary_ + "\r\n" + "Content-Type: application/dicom\r\n" + @@ -660,11 +663,19 @@ }; - static void Worker(StowClientJob* that) + class F : public IFunction { - try + private: + StowClientJob& that_; + + public: + F(StowClientJob& that) : + that_(that) { - std::string serverName; + } + + virtual void Execute(JobContext& context) + { size_t startPosition; // The lifetime of "body" should be larger than "client" @@ -672,16 +683,16 @@ std::auto_ptr<OrthancPlugins::HttpClient> client; { - boost::mutex::scoped_lock lock(that->mutex_); - serverName = that->serverName_; - startPosition = that->position_; - - body.reset(new RequestBody(*that)); + boost::mutex::scoped_lock lock(that_.mutex_); + context.SetContent("InstancesCount", boost::lexical_cast<std::string>(that_.instances_.size())); + + startPosition = that_.position_; + body.reset(new RequestBody(that_, context)); client.reset(new OrthancPlugins::HttpClient); - OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(*client, that->serverName_, "/studies"); + OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(*client, that_.serverName_, "/studies"); client->SetMethod(OrthancPluginHttpMethod_Post); - client->AddHeaders(that->headers_); + client->AddHeaders(that_.headers_); } OrthancPlugins::HttpClient::HttpHeaders answerHeaders; @@ -693,45 +704,37 @@ size_t endPosition; { - boost::mutex::scoped_lock lock(that->mutex_); - endPosition = that->position_; + boost::mutex::scoped_lock lock(that_.mutex_); + endPosition = that_.position_; + CheckStowAnswer(answerBody, that_.serverName_, endPosition - startPosition); + + if (that_.action_ == Action_Cancel) + { + that_.position_ = 0; + } } - - CheckStowAnswer(answerBody, serverName, endPosition - startPosition); } - catch (Orthanc::OrthancException& e) - { - { - boost::mutex::scoped_lock lock(that->mutex_); - LOG(ERROR) << "Error in STOW-RS client job to server " << that->serverName_ << ": " << e.What(); - that->state_ = State_Error; - } + }; + - that->SetContent("Error", e.What()); - } + virtual void CancelFunction() + { + boost::mutex::scoped_lock lock(mutex_); + action_ = Action_Cancel; + } + + + virtual void PauseFunction() + { + boost::mutex::scoped_lock lock(mutex_); + action_ = Action_Pause; } - - void SetContent(const std::string& key, - const std::string& value) + + virtual IFunction* CreateFunction() { - boost::mutex::scoped_lock lock(mutex_); - content_[key] = value; - UpdateContent(content_); - } - - - void StopWorker() - { - if (worker_.get() != NULL) - { - if (worker_->joinable()) - { - worker_->join(); - } - - worker_.reset(); - } + action_ = Action_None; + return new F(*this); } @@ -739,13 +742,16 @@ StowClientJob(const std::string& serverName, const std::list<std::string>& instances, const OrthancPlugins::HttpClient::HttpHeaders& headers) : - OrthancJob("DicomWebStowClient"), + SingleFunctionJob("DicomWebStowClient"), serverName_(serverName), headers_(headers), - state_(State_Running), position_(0), - content_(Json::objectValue) + action_(Action_None), + networkSize_(0), + debug_(false) { + SetFactory(*this); + instances_.reserve(instances.size()); for (std::list<std::string>::const_iterator @@ -767,77 +773,9 @@ headers_["Content-Type"] = "multipart/related; type=\"application/dicom\"; boundary=" + boundary_; } - - virtual OrthancPluginJobStepStatus Step() + void SetDebug(bool debug) { - State state; - - { - boost::mutex::scoped_lock lock(mutex_); - - if (state_ == State_Stopped) - { - state_ = State_Running; - } - - UpdateProgress(instances_.empty() ? 1 : - static_cast<float>(position_) / static_cast<float>(instances_.size())); - - if (position_ == instances_.size() && - state_ == State_Running) - { - state_ = State_Done; - } - - state = state_; - } - - switch (state) - { - case State_Done: - StopWorker(); - return (state_ == State_Done ? - OrthancPluginJobStepStatus_Success : - OrthancPluginJobStepStatus_Failure); - - case State_Error: - StopWorker(); - return OrthancPluginJobStepStatus_Failure; - - case State_Running: - if (worker_.get() == NULL) - { - worker_.reset(new boost::thread(Worker, this)); - } - - boost::this_thread::sleep(boost::posix_time::milliseconds(500)); - - return OrthancPluginJobStepStatus_Continue; - - default: - throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); - } - } - - - virtual void Stop(OrthancPluginJobStopReason reason) - { - { - boost::mutex::scoped_lock lock(mutex_); - state_ = State_Stopped; - } - - StopWorker(); - } - - - virtual void Reset() - { - boost::mutex::scoped_lock lock(mutex_); - position_ = 0; - state_ = State_Running; - content_ = Json::objectValue; - ClearContent(); + debug_ = debug; } }; @@ -872,8 +810,16 @@ OrthancPlugins::LogInfo("Sending " + boost::lexical_cast<std::string>(instances.size()) + " instances using STOW-RS to DICOMweb server: " + serverName); + std::auto_ptr<StowClientJob> job(new StowClientJob(serverName, instances, httpHeaders)); + + bool debug; + if (OrthancPlugins::LookupBooleanValue(debug, body, "Debug")) + { + job->SetDebug(debug); + } + Json::Value answer; - SubmitJob(output, new StowClientJob(serverName, instances, httpHeaders), body, + SubmitJob(output, job.release(), body, true /* synchronous by default, for compatibility with <= 0.6 */); } @@ -1011,8 +957,6 @@ const void* part, size_t size) { - printf(" part %d\n", size); - std::string contentType; if (!Orthanc::MultipartStreamReader::GetMainContentType(contentType, headers)) { @@ -1052,7 +996,7 @@ if (debug_) { - boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + boost::this_thread::sleep(boost::posix_time::milliseconds(50)); } } @@ -1254,7 +1198,7 @@ std::list<std::string> retrievedInstances_; std::auto_ptr<WadoRetrieveAnswer> answer_; uint64_t networkSize_; - + bool debug_; bool SetupNextResource(OrthancPlugins::HttpClient& client, JobContext& context) @@ -1271,7 +1215,7 @@ context.SetProgress(position_, resources_.size()); answer_.reset(new WadoRetrieveAnswer); - answer_->SetDebug(true); // TODO + answer_->SetDebug(debug_); const Resource* resource = resources_[position_++]; if (resource == NULL) @@ -1343,7 +1287,8 @@ serverName_(serverName), position_(0), stopped_(false), - networkSize_(0) + networkSize_(0), + debug_(false) { SetFactory(*this); } @@ -1359,6 +1304,11 @@ } } + void SetDebug(bool debug) + { + debug_ = debug; + } + void AddResource(const std::string uri) { resources_.push_back(new Resource(uri)); @@ -1403,6 +1353,12 @@ std::auto_ptr<WadoRetrieveJob> job(new WadoRetrieveJob(serverName)); job->AddResourceFromRequest(body); + bool debug; + if (OrthancPlugins::LookupBooleanValue(debug, body, "Debug")) + { + job->SetDebug(debug); + } + SubmitJob(output, job.release(), body, false /* asynchronous by default */); } @@ -1499,6 +1455,12 @@ job->AddResource(uri, additionalHeaders); } + bool debug; + if (OrthancPlugins::LookupBooleanValue(debug, body, "Debug")) + { + job->SetDebug(debug); + } + SubmitJob(output, job.release(), body, true /* synchronous by default, for compatibility with <= 0.6 */); }