Mercurial > hg > orthanc-dicomweb
changeset 317:ffe1c92c73c7 refactoring
WadoRetrieveJob
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 20 Jun 2019 18:37:20 +0200 |
parents | 9313f6157eef |
children | 1f948ba78c5d |
files | Plugin/DicomWebClient.cpp Plugin/DicomWebClient.h Plugin/Plugin.cpp |
diffstat | 3 files changed, 584 insertions(+), 35 deletions(-) [+] |
line wrap: on
line diff
--- a/Plugin/DicomWebClient.cpp Thu Jun 20 12:55:26 2019 +0200 +++ b/Plugin/DicomWebClient.cpp Thu Jun 20 18:37:20 2019 +0200 @@ -31,6 +31,7 @@ #include <Core/HttpServer/MultipartStreamReader.h> #include <Core/ChunkedBuffer.h> #include <Core/Toolbox.h> +#include <Plugins/Samples/Common/OrthancPluginCppWrapper.h> #include <boost/thread.hpp> @@ -284,7 +285,7 @@ enum State { State_Running, - State_Paused, + State_Stopped, State_Error, State_Done }; @@ -489,7 +490,7 @@ { boost::mutex::scoped_lock lock(mutex_); - if (state_ == State_Paused) + if (state_ == State_Stopped) { state_ = State_Running; } @@ -538,7 +539,7 @@ { { boost::mutex::scoped_lock lock(mutex_); - state_ = State_Paused; + state_ = State_Stopped; } StopWorker(); @@ -673,13 +674,35 @@ -static void ConfigureGetFromServer(OrthancPlugins::HttpClient& client, - const OrthancPluginHttpRequest* request) +static void ParseGetFromServer(std::string& uri, + std::map<std::string, std::string>& additionalHeaders, + const Json::Value& resource) { static const char* URI = "Uri"; static const char* HTTP_HEADERS = "HttpHeaders"; static const char* GET_ARGUMENTS = "Arguments"; + std::string tmp; + if (resource.type() != Json::objectValue || + !GetStringValue(tmp, resource, URI)) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat, + "A request to the DICOMweb client must provide a JSON object " + "with the field \"Uri\" containing the URI of interest"); + } + + std::map<std::string, std::string> getArguments; + OrthancPlugins::ParseAssociativeArray(getArguments, resource, GET_ARGUMENTS); + OrthancPlugins::DicomWebServers::UriEncode(uri, tmp, getArguments); + + OrthancPlugins::ParseAssociativeArray(additionalHeaders, resource, HTTP_HEADERS); +} + + + +static void ConfigureGetFromServer(OrthancPlugins::HttpClient& client, + const OrthancPluginHttpRequest* request) +{ if (request->method != OrthancPluginHttpMethod_Post) { throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); @@ -688,25 +711,11 @@ Json::Value body; OrthancPlugins::ParseJsonBody(body, request); - std::string tmp; - if (body.type() != Json::objectValue || - !GetStringValue(tmp, body, URI)) - { - throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat, - "A request to the DICOMweb client must provide a JSON object " - "with the field \"Uri\" containing the URI of interest"); - } - - std::map<std::string, std::string> getArguments; - OrthancPlugins::ParseAssociativeArray(getArguments, body, GET_ARGUMENTS); - std::string uri; - OrthancPlugins::DicomWebServers::UriEncode(uri, tmp, getArguments); + std::map<std::string, std::string> additionalHeaders; + ParseGetFromServer(uri, additionalHeaders, body); OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(client, request->groups[0], uri); - - std::map<std::string, std::string> additionalHeaders; - OrthancPlugins::ParseAssociativeArray(additionalHeaders, body, HTTP_HEADERS); client.AddHeaders(additionalHeaders); } @@ -1042,13 +1051,14 @@ { State_Headers, State_Body, - State_Stopped + State_Canceled }; boost::mutex mutex_; State state_; std::list<std::string> instances_; std::auto_ptr<Orthanc::MultipartStreamReader> reader_; + uint64_t networkSize_; virtual void HandlePart(const Orthanc::MultipartStreamReader::HttpHeaders& headers, const void* part, @@ -1095,7 +1105,8 @@ public: WadoRetrieveAnswer() : - state_(State_Headers) + state_(State_Headers), + networkSize_(0) { } @@ -1107,7 +1118,7 @@ { boost::mutex::scoped_lock lock(mutex_); - if (state_ != State_Stopped && + if (state_ != State_Canceled && reader_.get() != NULL) { reader_->CloseStream(); @@ -1119,7 +1130,7 @@ { boost::mutex::scoped_lock lock(mutex_); - if (state_ == State_Stopped) + if (state_ == State_Canceled) { return; } @@ -1163,20 +1174,21 @@ { boost::mutex::scoped_lock lock(mutex_); - if (state_ == State_Stopped) + if (state_ == State_Canceled) { - return; + throw Orthanc::OrthancException(Orthanc::ErrorCode_CanceledJob); } - - if (reader_.get() == NULL) + else if (reader_.get() == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol, "No Content-Type provided by the remote WADO-RS server"); } - - state_ = State_Body; - - reader_->AddChunk(data, size); + else + { + state_ = State_Body; + networkSize_ += size; + reader_->AddChunk(data, size); + } } void GetReceivedInstances(std::list<std::string>& target) @@ -1185,11 +1197,17 @@ target = instances_; } - void Stop() + void Cancel() { boost::mutex::scoped_lock lock(mutex_); + LOG(ERROR) << "A WADO-RS retrieve job has been canceled, expect \"Error in the network protocol\" errors"; + state_ = State_Canceled; + } - state_ = State_Stopped; + uint64_t GetNetworkSize() + { + boost::mutex::scoped_lock lock(mutex_); + return networkSize_; } }; @@ -1207,6 +1225,7 @@ // Do some loop { WadoRetrieveAnswer answer; + answer.Cancel(); client.Execute(answer); answer.Close(); @@ -1223,3 +1242,524 @@ tmp.c_str(), tmp.size(), "application/json"); } + + + + + +#include <Core/IDynamicObject.h> + + +class SingleFunctionJob : public OrthancPlugins::OrthancJob +{ +public: + class JobContext : public boost::noncopyable + { + private: + SingleFunctionJob& that_; + + public: + JobContext(SingleFunctionJob& that) : + that_(that) + { + } + + void SetContent(const std::string& key, + const std::string& value) + { + that_.SetContent(key, value); + } + + void SetProgress(unsigned int position, + unsigned int maxPosition) + { + boost::mutex::scoped_lock lock(that_.mutex_); + + if (maxPosition == 0 || + position > maxPosition) + { + that_.UpdateProgress(1); + } + else + { + that_.UpdateProgress(static_cast<float>(position) / static_cast<float>(maxPosition)); + } + } + }; + + + class IFunction : public boost::noncopyable + { + public: + virtual ~IFunction() + { + } + + // Must return "true" if the job has completed with success, or + // "false" if the job has been canceled. Pausing the job + // corresponds to canceling it. + virtual bool Execute(JobContext& context) = 0; + }; + + + class IFunctionFactory : public boost::noncopyable + { + public: + virtual ~IFunctionFactory() + { + } + + // WARNING: "CancelFunction()" will be invoked while "Execute()" + // is running. Mutex is probably necessary. + virtual void CancelFunction() = 0; + + // Only called when no function is running, to deal with + // "Resubmit()" after job cancelation/failure. + virtual void ResetFunction() = 0; + + virtual IFunction* CreateFunction() = 0; + }; + + +protected: + void SetFactory(IFunctionFactory& factory) + { + boost::mutex::scoped_lock lock(mutex_); + + if (state_ != State_Setup) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + else + { + factory_ = &factory; + } + } + + +private: + enum State + { + State_Setup, + State_Running, + State_Success, + State_Failure + }; + + boost::mutex mutex_; + State state_; // Can only be modified by the "Worker()" function + std::auto_ptr<boost::thread> worker_; + Json::Value content_; + IFunctionFactory* factory_; + + void JoinWorker() + { + assert(factory_ != NULL); + + if (worker_.get() != NULL) + { + if (worker_->joinable()) + { + worker_->join(); + } + + worker_.reset(); + } + } + + void StartWorker() + { + assert(factory_ != NULL); + + if (worker_.get() == NULL && + factory_ != NULL) + { + worker_.reset(new boost::thread(Worker, this, factory_)); + } + } + + void SetContent(const std::string& key, + const std::string& value) + { + boost::mutex::scoped_lock lock(mutex_); + content_[key] = value; + UpdateContent(content_); + } + + static void Worker(SingleFunctionJob* job, + IFunctionFactory* factory) + { + assert(job != NULL && factory != NULL); + + JobContext context(*job); + + { + boost::mutex::scoped_lock lock(job->mutex_); + job->state_ = State_Running; + } + + try + { + std::auto_ptr<IFunction> function(factory->CreateFunction()); + bool success = function->Execute(context); + + { + boost::mutex::scoped_lock lock(job->mutex_); + job->state_ = (success ? State_Success : State_Failure); + if (success) + { + job->UpdateProgress(1); + } + } + } + catch (Orthanc::OrthancException& e) + { + LOG(ERROR) << "Error in a job: " << e.What(); + + { + boost::mutex::scoped_lock lock(job->mutex_); + job->state_ = State_Failure; + job->content_["FunctionErrorCode"] = e.GetErrorCode(); + job->content_["FunctionErrorDescription"] = e.What(); + if (e.HasDetails()) + { + job->content_["FunctionErrorDetails"] = e.GetDetails(); + } + job->UpdateContent(job->content_); + } + } + } + +public: + SingleFunctionJob(const std::string& jobName) : + OrthancJob(jobName), + state_(State_Setup), + content_(Json::objectValue), + factory_(NULL) + { + } + + virtual ~SingleFunctionJob() + { + if (worker_.get() != NULL) + { + LOG(ERROR) << "Classes deriving from SingleFunctionJob must " + << "explicitly call Finalize() in their destructor"; + + try + { + JoinWorker(); + } + catch (Orthanc::OrthancException&) + { + } + } + } + + void Finalize() + { + try + { + if (factory_ != NULL) + { + factory_->CancelFunction(); + JoinWorker(); + } + } + catch (Orthanc::OrthancException&) + { + } + } + + virtual OrthancPluginJobStepStatus Step() + { + if (factory_ == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + + State state; + + { + boost::mutex::scoped_lock lock(mutex_); + state = state_; + } + + switch (state) + { + case State_Setup: + StartWorker(); + break; + + case State_Running: + break; + + case State_Success: + JoinWorker(); + return OrthancPluginJobStepStatus_Success; + + case State_Failure: + JoinWorker(); + return OrthancPluginJobStepStatus_Failure; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } + + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + return OrthancPluginJobStepStatus_Continue; + } + + virtual void Stop(OrthancPluginJobStopReason reason) + { + if (factory_ == NULL) + { + return; + } + + if (reason == OrthancPluginJobStopReason_Paused || + reason == OrthancPluginJobStopReason_Canceled) + { + factory_->CancelFunction(); + } + + JoinWorker(); + + if (reason == OrthancPluginJobStopReason_Paused) + { + // This type of job cannot be paused: Reset under the hood + Reset(); + } + } + + virtual void Reset() + { + boost::mutex::scoped_lock lock(mutex_); + + if (factory_ != NULL) + { + factory_->ResetFunction(); + } + + state_ = State_Setup; + + content_ = Json::objectValue; + ClearContent(); + } +}; + + + +class WadoRetrieveJob : + public SingleFunctionJob, + private SingleFunctionJob::IFunctionFactory +{ +private: + class Resource : public boost::noncopyable + { + private: + std::string uri_; + std::map<std::string, std::string> additionalHeaders_; + + public: + Resource(const std::string& uri) : + uri_(uri) + { + } + + Resource(const std::string& uri, + const std::map<std::string, std::string>& additionalHeaders) : + uri_(uri), + additionalHeaders_(additionalHeaders) + { + } + + const std::string& GetUri() const + { + return uri_; + } + + const std::map<std::string, std::string>& GetAdditionalHeaders() const + { + return additionalHeaders_; + } + }; + + + enum Status + { + Status_Done, + Status_Canceled, + Status_Continue + }; + + + class F : public IFunction + { + private: + WadoRetrieveJob& that_; + + public: + F(WadoRetrieveJob& that) : + that_(that) + { + } + + virtual bool Execute(JobContext& context) + { + for (;;) + { + OrthancPlugins::HttpClient client; + + switch (that_.SetupNextResource(client)) + { + case Status_Continue: + client.Execute(*that_.answer_); + that_.CloseResource(context); + break; + + case Status_Canceled: + return false; + + case Status_Done: + return true; + } + } + } + }; + + + boost::mutex mutex_; + std::string serverName_; + size_t position_; + std::vector<Resource*> resources_; + bool canceled_; + std::list<std::string> retrievedInstances_; + std::auto_ptr<WadoRetrieveAnswer> answer_; + uint64_t networkSize_; + + + Status SetupNextResource(OrthancPlugins::HttpClient& client) + { + boost::mutex::scoped_lock lock(mutex_); + + if (canceled_) + { + return Status_Canceled; + } + else if (position_ == resources_.size()) + { + return Status_Done; + } + else + { + answer_.reset(new WadoRetrieveAnswer); + + const Resource* resource = resources_[position_++]; + if (resource == NULL) + { + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } + + OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient + (client, serverName_, resource->GetUri()); + client.AddHeaders(resource->GetAdditionalHeaders()); + + return Status_Continue; + } + } + + + void CloseResource(JobContext& context) + { + boost::mutex::scoped_lock lock(mutex_); + answer_->Close(); + + std::list<std::string> instances; + answer_->GetReceivedInstances(instances); + networkSize_ += answer_->GetNetworkSize(); + + answer_.reset(); + + retrievedInstances_.splice(retrievedInstances_.end(), instances); + + context.SetProgress(position_, resources_.size()); + context.SetContent("NetworkUsageMB", boost::lexical_cast<std::string>(networkSize_ / (1024llu * 1024llu))); + context.SetContent("ReceivedInstancesCount", boost::lexical_cast<std::string>(retrievedInstances_.size())); + } + + + virtual void CancelFunction() + { + boost::mutex::scoped_lock lock(mutex_); + canceled_ = true; + if (answer_.get() != NULL) + { + answer_->Cancel(); + } + } + + virtual void ResetFunction() + { + boost::mutex::scoped_lock lock(mutex_); + canceled_ = false; + position_ = 0; + retrievedInstances_.clear(); + } + + virtual IFunction* CreateFunction() + { + return new F(*this); + } + +public: + WadoRetrieveJob(const std::string& serverName) : + SingleFunctionJob("DicomWebWadoRetrieveClient"), + serverName_(serverName), + position_(0), + canceled_(false), + networkSize_(0) + { + SetFactory(*this); + } + + virtual ~WadoRetrieveJob() + { + SingleFunctionJob::Finalize(); + + for (size_t i = 0; i < resources_.size(); i++) + { + assert(resources_[i] != NULL); + delete resources_[i]; + } + } + + void AddResource(const std::string uri) + { + resources_.push_back(new Resource(uri)); + } + + void AddResourceFromRequest(const Json::Value& resource) + { + std::string uri; + std::map<std::string, std::string> additionalHeaders; + ParseGetFromServer(uri, additionalHeaders, resource); + + resources_.push_back(new Resource(uri, additionalHeaders)); + } +}; + + + +void WadoTest() +{ + std::auto_ptr<WadoRetrieveJob> job(new WadoRetrieveJob("self")); + //job->AddResource("studies/2.16.840.1.113669.632.20.1211.10000315526"); // VIX + job->AddResource("studies/1.3.51.0.1.1.192.168.29.133.1688840.1688819"); // Cardiac + + //OrthancPlugins::OrthancJob::Submit(job.release(), 0); + + Json::Value result; + OrthancPlugins::OrthancJob::SubmitAndWait(result, job.release(), 0); + std::cout << result.toStyledString() << std::endl; +}