# HG changeset patch # User Sebastien Jodogne # Date 1560527537 -7200 # Node ID ebd8a1cd97daf273327aa162533dd25c2daa740b # Parent fd257a30e4feb923cbbebec6df6a76f2473d76f6 StowClientJob diff -r fd257a30e4fe -r ebd8a1cd97da Plugin/DicomWebClient.cpp --- a/Plugin/DicomWebClient.cpp Fri Jun 14 15:21:34 2019 +0200 +++ b/Plugin/DicomWebClient.cpp Fri Jun 14 17:52:17 2019 +0200 @@ -27,12 +27,14 @@ #include #include #include -#include #include #include +#include + + static void AddInstance(std::list& target, const Json::Value& instance) { @@ -104,14 +106,57 @@ +static void CheckStowAnswer(const Json::Value& response, + const std::string& serverName, + size_t instancesCount) +{ + if (response.type() != Json::objectValue || + !response.isMember("00081199")) + { + throw Orthanc::OrthancException( + Orthanc::ErrorCode_NetworkProtocol, + "Unable to parse STOW-RS JSON response from DICOMweb server " + serverName); + } + + size_t size; + if (!GetSequenceSize(size, response, "00081199", true, serverName) || + size != instancesCount) + { + throw Orthanc::OrthancException( + Orthanc::ErrorCode_NetworkProtocol, + "The STOW-RS server was only able to receive " + + boost::lexical_cast(size) + " instances out of " + + boost::lexical_cast(instancesCount)); + } + + if (GetSequenceSize(size, response, "00081198", false, serverName) && + size != 0) + { + throw Orthanc::OrthancException( + Orthanc::ErrorCode_NetworkProtocol, + "The response from the STOW-RS server contains " + + boost::lexical_cast(size) + + " items in its Failed SOP Sequence (0008,1198) tag"); + } + + if (GetSequenceSize(size, response, "0008119A", false, serverName) && + size != 0) + { + throw Orthanc::OrthancException( + Orthanc::ErrorCode_NetworkProtocol, + "The response from the STOW-RS server contains " + + boost::lexical_cast(size) + + " items in its Other Failures Sequence (0008,119A) tag"); + } +} + + static void ParseStowRequest(std::list& instances /* out */, std::map& httpHeaders /* out */, - std::map& queryArguments /* out */, const OrthancPluginHttpRequest* request /* in */) { static const char* RESOURCES = "Resources"; static const char* HTTP_HEADERS = "HttpHeaders"; - static const char* QUERY_ARGUMENTS = "Arguments"; Json::Value body; Json::Reader reader; @@ -128,7 +173,6 @@ "\" containing an array of resources to be sent"); } - OrthancPlugins::ParseAssociativeArray(queryArguments, body, QUERY_ARGUMENTS); OrthancPlugins::ParseAssociativeArray(httpHeaders, body, HTTP_HEADERS); Json::Value& resources = body[RESOURCES]; @@ -181,17 +225,16 @@ static void SendStowChunks(const Orthanc::WebServiceParameters& server, const std::map& httpHeaders, - const std::map& queryArguments, const std::string& boundary, Orthanc::ChunkedBuffer& chunks, - size_t& countInstances, + size_t& instancesCount, bool force) { unsigned int maxInstances = OrthancPlugins::Configuration::GetUnsignedIntegerValue("StowMaxInstances", 10); size_t maxSize = static_cast(OrthancPlugins::Configuration::GetUnsignedIntegerValue("StowMaxSize", 10)) * 1024 * 1024; - if ((force && countInstances > 0) || - (maxInstances != 0 && countInstances >= maxInstances) || + if ((force && instancesCount > 0) || + (maxInstances != 0 && instancesCount >= maxInstances) || (maxSize != 0 && chunks.GetNumBytes() >= maxSize)) { chunks.AddChunk("\r\n--" + boundary + "--\r\n"); @@ -202,11 +245,8 @@ OrthancPlugins::MemoryBuffer answerBody; std::map answerHeaders; - std::string uri; - OrthancPlugins::UriEncode(uri, "studies", queryArguments); - OrthancPlugins::CallServer(answerBody, answerHeaders, server, OrthancPluginHttpMethod_Post, - httpHeaders, uri, body); + httpHeaders, "studies", body); Json::Value response; Json::Reader reader; @@ -214,49 +254,300 @@ reinterpret_cast((*answerBody)->data) + (*answerBody)->size, response); answerBody.Clear(); - if (!success || - response.type() != Json::objectValue || - !response.isMember("00081199")) + if (!success) { throw Orthanc::OrthancException( Orthanc::ErrorCode_NetworkProtocol, "Unable to parse STOW-RS JSON response from DICOMweb server " + server.GetUrl()); } - - size_t size; - if (!GetSequenceSize(size, response, "00081199", true, server.GetUrl()) || - size != countInstances) + else { - throw Orthanc::OrthancException( - Orthanc::ErrorCode_NetworkProtocol, - "The STOW-RS server was only able to receive " + - boost::lexical_cast(size) + " instances out of " + - boost::lexical_cast(countInstances)); + CheckStowAnswer(response, server.GetUrl(), instancesCount); + } + + instancesCount = 0; + } +} + + + +class StowClientJob : public OrthancPlugins::OrthancJob +{ +private: + enum State + { + State_Running, + State_Paused, + State_Error, + State_Done + }; + + + boost::mutex mutex_; + std::string serverName_; + std::vector instances_; + OrthancPlugins::HttpClient::HttpHeaders headers_; + std::string boundary_; + + + std::auto_ptr worker_; + + + State state_; + size_t position_; + Json::Value content_; + + + bool ReadNextInstance(std::string& dicom) + { + boost::mutex::scoped_lock lock(mutex_); + + if (state_ != State_Running) + { + return false; + } + + while (position_ < instances_.size()) + { + size_t i = position_++; + + if (OrthancPlugins::RestApiGetString(dicom, "/instances/" + instances_[i] + "/file", false)) + { + return true; + } + } + + return false; + } + + + class RequestBody : public OrthancPlugins::HttpClient::IRequestBody + { + private: + StowClientJob& that_; + std::string boundary_; + bool done_; + + public: + RequestBody(StowClientJob& that) : + that_(that), + boundary_(that.boundary_), + done_(false) + { } - if (GetSequenceSize(size, response, "00081198", false, server.GetUrl()) && - size != 0) + virtual bool ReadNextChunk(std::string& chunk) + { + if (done_) + { + return false; + } + else + { + std::string dicom; + + if (that_.ReadNextInstance(dicom)) + { + chunk = ("--" + boundary_ + "\r\n" + + "Content-Type: application/dicom\r\n" + + "Content-Length: " + boost::lexical_cast(dicom.size()) + + "\r\n\r\n" + dicom + "\r\n"); + } + else + { + done_ = true; + chunk = ("--" + boundary_ + "--"); + } + + //boost::this_thread::sleep(boost::posix_time::seconds(1)); + + return true; + } + } + }; + + + static void Worker(StowClientJob* that) + { + try { - throw Orthanc::OrthancException( - Orthanc::ErrorCode_NetworkProtocol, - "The response from the STOW-RS server contains " + - boost::lexical_cast(size) + - " items in its Failed SOP Sequence (0008,1198) tag"); + std::string serverName; + size_t startPosition; + + // The lifetime of "body" should be larger than "client" + std::auto_ptr body; + std::auto_ptr client; + + { + boost::mutex::scoped_lock lock(that->mutex_); + serverName = that->serverName_; + startPosition = that->position_; + + body.reset(new RequestBody(*that)); + + client.reset(new OrthancPlugins::HttpClient); + OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(*client, that->serverName_, "/studies"); + client->SetMethod(OrthancPluginHttpMethod_Post); + client->AddHeaders(that->headers_); + } + + OrthancPlugins::HttpClient::HttpHeaders answerHeaders; + Json::Value answerBody; + + client->SetBody(*body); + client->Execute(answerHeaders, answerBody); + + size_t endPosition; + + { + boost::mutex::scoped_lock lock(that->mutex_); + endPosition = that->position_; + } + + CheckStowAnswer(answerBody, serverName, endPosition - startPosition); + } + catch (Orthanc::OrthancException& e) + { + { + boost::mutex::scoped_lock lock(that->mutex_); + LOG(ERROR) << "Error in STOW-RS client job to server " << that->serverName_ << ": " << e.What(); + that->state_ = State_Error; + } + + that->SetContent("Error", e.What()); + } + } + + + void SetContent(const std::string& key, + const std::string& value) + { + boost::mutex::scoped_lock lock(mutex_); + content_[key] = value; + UpdateContent(content_); + } + + + void StopWorker() + { + if (worker_.get() != NULL) + { + if (worker_->joinable()) + { + worker_->join(); + } + + worker_.reset(); + } + } + + +public: + StowClientJob(const std::string& serverName, + const std::list& instances, + const OrthancPlugins::HttpClient::HttpHeaders& headers) : + OrthancJob("DicomWebStowClient"), + serverName_(serverName), + headers_(headers), + state_(State_Running), + position_(0), + content_(Json::objectValue) + { + instances_.reserve(instances.size()); + + for (std::list::const_iterator + it = instances.begin(); it != instances.end(); ++it) + { + instances_.push_back(*it); } - if (GetSequenceSize(size, response, "0008119A", false, server.GetUrl()) && - size != 0) { - throw Orthanc::OrthancException( - Orthanc::ErrorCode_NetworkProtocol, - "The response from the STOW-RS server contains " + - boost::lexical_cast(size) + - " items in its Other Failures Sequence (0008,119A) tag"); + OrthancPlugins::OrthancString tmp; + tmp.Assign(OrthancPluginGenerateUuid(OrthancPlugins::GetGlobalContext())); + tmp.ToString(boundary_); + } + + boundary_ = (boundary_ + "-" + boundary_); // Make the boundary longer + + headers_["Accept"] = "application/dicom+json"; + headers_["Expect"] = ""; + headers_["Content-Type"] = "multipart/related; type=\"application/dicom\"; boundary=" + boundary_; + } + + + virtual OrthancPluginJobStepStatus Step() + { + State state; + + { + boost::mutex::scoped_lock lock(mutex_); + + if (state_ == State_Paused) + { + state_ = State_Running; + } + + UpdateProgress(instances_.empty() ? 1 : + static_cast(position_) / static_cast(instances_.size())); + + if (position_ == instances_.size() && + state_ == State_Running) + { + state_ = State_Done; + } + + state = state_; } - countInstances = 0; + switch (state) + { + case State_Done: + StopWorker(); + return (state_ == State_Done ? + OrthancPluginJobStepStatus_Success : + OrthancPluginJobStepStatus_Failure); + + case State_Error: + StopWorker(); + return OrthancPluginJobStepStatus_Failure; + + case State_Running: + if (worker_.get() == NULL) + { + worker_.reset(new boost::thread(Worker, this)); + } + + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + + return OrthancPluginJobStepStatus_Continue; + + default: + throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); + } } -} + + + virtual void Stop(OrthancPluginJobStopReason reason) + { + { + boost::mutex::scoped_lock lock(mutex_); + state_ = State_Paused; + } + + StopWorker(); + } + + + virtual void Reset() + { + boost::mutex::scoped_lock lock(mutex_); + position_ = 0; + state_ = State_Running; + content_ = Json::objectValue; + ClearContent(); + } +}; + void StowClient(OrthancPluginRestOutput* output, @@ -276,41 +567,45 @@ return; } - Orthanc::WebServiceParameters server(OrthancPlugins::DicomWebServers::GetInstance().GetServer(request->groups[0])); + std::string serverName(request->groups[0]); + +#if 1 + std::list instances; + std::map httpHeaders; + ParseStowRequest(instances, httpHeaders, request); + OrthancPlugins::LogInfo("Sending " + boost::lexical_cast(instances.size()) + + " instances using STOW-RS to DICOMweb server: " + serverName); + + OrthancPlugins::OrthancJob::Submit(new StowClientJob(serverName, instances, httpHeaders), + 0 /* TODO priority, synchronous */); + +#else std::string boundary; - + { - char* uuid = OrthancPluginGenerateUuid(context); - try - { - boundary.assign(uuid); - } - catch (...) - { - OrthancPluginFreeString(context, uuid); - throw Orthanc::OrthancException(Orthanc::ErrorCode_NotEnoughMemory); - } - - OrthancPluginFreeString(context, uuid); + OrthancPlugins::OrthancString tmp; + tmp.Assign(OrthancPluginGenerateUuid(context)); + tmp.ToString(boundary); } - std::string mime = "multipart/related; type=\"application/dicom\"; boundary=" + boundary; + boundary = (boundary + "-" + boundary); - std::map queryArguments; std::map httpHeaders; httpHeaders["Accept"] = "application/dicom+json"; httpHeaders["Expect"] = ""; - httpHeaders["Content-Type"] = mime; + httpHeaders["Content-Type"] = "multipart/related; type=\"application/dicom\"; boundary=" + boundary; std::list instances; - ParseStowRequest(instances, httpHeaders, queryArguments, request); + ParseStowRequest(instances, httpHeaders, request); OrthancPlugins::LogInfo("Sending " + boost::lexical_cast(instances.size()) + - " instances using STOW-RS to DICOMweb server: " + server.GetUrl()); + " instances using STOW-RS to DICOMweb server: " + serverName); + + Orthanc::WebServiceParameters server(OrthancPlugins::DicomWebServers::GetInstance().GetServer(serverName)); Orthanc::ChunkedBuffer chunks; - size_t countInstances = 0; + size_t instancesCount = 0; for (std::list::const_iterator it = instances.begin(); it != instances.end(); ++it) { @@ -322,13 +617,14 @@ "Content-Length: " + boost::lexical_cast(dicom.GetSize()) + "\r\n\r\n"); chunks.AddChunk(dicom.GetData(), dicom.GetSize()); - countInstances ++; + instancesCount ++; - SendStowChunks(server, httpHeaders, queryArguments, boundary, chunks, countInstances, false); + SendStowChunks(server, httpHeaders, boundary, chunks, instancesCount, false); } } - SendStowChunks(server, httpHeaders, queryArguments, boundary, chunks, countInstances, true); + SendStowChunks(server, httpHeaders, boundary, chunks, instancesCount, true); +#endif std::string answer = "{}\n"; OrthancPluginAnswerBuffer(context, output, answer.c_str(), answer.size(), "application/json"); @@ -362,48 +658,6 @@ } -static std::string RemoveMultipleSlashes(const std::string& source) -{ - std::string target; - target.reserve(source.size()); - - size_t prefix = 0; - - if (boost::starts_with(source, "https://")) - { - prefix = 8; - } - else if (boost::starts_with(source, "http://")) - { - prefix = 7; - } - - for (size_t i = 0; i < prefix; i++) - { - target.push_back(source[i]); - } - - bool isLastSlash = false; - - for (size_t i = prefix; i < source.size(); i++) - { - if (source[i] == '/') - { - if (!isLastSlash) - { - target.push_back('/'); - isLastSlash = true; - } - } - else - { - target.push_back(source[i]); - isLastSlash = false; - } - } - - return target; -} void GetFromServer(OrthancPluginRestOutput* output, @@ -422,9 +676,6 @@ return; } - Orthanc::WebServiceParameters server( - OrthancPlugins::DicomWebServers::GetInstance().GetServer(request->groups[0])); - std::string tmp; Json::Value body; Json::Reader reader; @@ -442,20 +693,14 @@ OrthancPlugins::ParseAssociativeArray(getArguments, body, GET_ARGUMENTS); std::string uri; - OrthancPlugins::UriEncode(uri, tmp, getArguments); + OrthancPlugins::DicomWebServers::UriEncode(uri, tmp, getArguments); OrthancPlugins::HttpClient client; - client.SetUrl(RemoveMultipleSlashes(server.GetUrl() + "/" + uri)); - client.SetHeaders(server.GetHttpHeaders()); + OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(client, request->groups[0], uri); std::map additionalHeaders; OrthancPlugins::ParseAssociativeArray(additionalHeaders, body, HTTP_HEADERS); - - for (std::map::const_iterator - it = additionalHeaders.begin(); it != additionalHeaders.end(); ++it) - { - client.AddHeader(it->first, it->second); - } + client.AddHeaders(additionalHeaders); std::map answerHeaders; std::string answer; @@ -540,7 +785,7 @@ } std::string uri; - OrthancPlugins::UriEncode(uri, tmpUri, getArguments); + OrthancPlugins::DicomWebServers::UriEncode(uri, tmpUri, getArguments); OrthancPlugins::MemoryBuffer answerBody; std::map answerHeaders; diff -r fd257a30e4fe -r ebd8a1cd97da Plugin/DicomWebServers.cpp --- a/Plugin/DicomWebServers.cpp Fri Jun 14 15:21:34 2019 +0200 +++ b/Plugin/DicomWebServers.cpp Fri Jun 14 17:52:17 2019 +0200 @@ -25,6 +25,8 @@ #include +#include + namespace OrthancPlugins { void DicomWebServers::Clear() @@ -116,36 +118,58 @@ } + static std::string RemoveMultipleSlashes(const std::string& source) + { + std::string target; + target.reserve(source.size()); + + size_t prefix = 0; + + if (boost::starts_with(source, "https://")) + { + prefix = 8; + } + else if (boost::starts_with(source, "http://")) + { + prefix = 7; + } + + for (size_t i = 0; i < prefix; i++) + { + target.push_back(source[i]); + } + + bool isLastSlash = false; + + for (size_t i = prefix; i < source.size(); i++) + { + if (source[i] == '/') + { + if (!isLastSlash) + { + target.push_back('/'); + isLastSlash = true; + } + } + else + { + target.push_back(source[i]); + isLastSlash = false; + } + } + + return target; + } + + void DicomWebServers::ConfigureHttpClient(HttpClient& client, const std::string& name, const std::string& uri) { - boost::mutex::scoped_lock lock(mutex_); const Orthanc::WebServiceParameters parameters = GetServer(name); - std::string url = parameters.GetUrl(); - - if (url.empty() || - url[url.size() - 1] != '/') - { - url += '/'; - } - - std::string normalizedUri = uri; - while (!normalizedUri.empty() && - normalizedUri[0] == '/') - { - normalizedUri = normalizedUri.substr(1); - } - - client.SetUrl(url + normalizedUri); - - for (Orthanc::WebServiceParameters::Dictionary::const_iterator - it = parameters.GetHttpHeaders().begin(); - it != parameters.GetHttpHeaders().end(); ++it) - { - client.AddHeader(it->first, it->second); - } + client.SetUrl(RemoveMultipleSlashes(parameters.GetUrl() + "/" + uri)); + client.SetHeaders(parameters.GetHttpHeaders()); if (!parameters.GetUsername().empty()) { @@ -329,9 +353,9 @@ } - void UriEncode(std::string& uri, - const std::string& resource, - const std::map& getArguments) + void DicomWebServers::UriEncode(std::string& uri, + const std::string& resource, + const std::map& getArguments) { if (resource.find('?') != std::string::npos) { diff -r fd257a30e4fe -r ebd8a1cd97da Plugin/DicomWebServers.h --- a/Plugin/DicomWebServers.h Fri Jun 14 15:21:34 2019 +0200 +++ b/Plugin/DicomWebServers.h Fri Jun 14 17:52:17 2019 +0200 @@ -45,6 +45,10 @@ } public: + static void UriEncode(std::string& uri, + const std::string& resource, + const std::map& getArguments); + void Load(const Json::Value& configuration); ~DicomWebServers() @@ -76,8 +80,4 @@ const std::map& httpHeaders, const std::string& uri, const std::string& body); - - void UriEncode(std::string& uri, - const std::string& resource, - const std::map& getArguments); }