changeset 302:ebd8a1cd97da refactoring

StowClientJob
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 14 Jun 2019 17:52:17 +0200
parents fd257a30e4fe
children eb4ac27f0768
files Plugin/DicomWebClient.cpp Plugin/DicomWebServers.cpp Plugin/DicomWebServers.h
diffstat 3 files changed, 418 insertions(+), 149 deletions(-) [+]
line wrap: on
line diff
--- a/Plugin/DicomWebClient.cpp	Fri Jun 14 15:21:34 2019 +0200
+++ b/Plugin/DicomWebClient.cpp	Fri Jun 14 17:52:17 2019 +0200
@@ -27,12 +27,14 @@
 #include <list>
 #include <set>
 #include <boost/lexical_cast.hpp>
-#include <boost/algorithm/string/predicate.hpp>
 
 #include <Core/ChunkedBuffer.h>
 #include <Core/Toolbox.h>
 
 
+#include <boost/thread.hpp>
+
+
 static void AddInstance(std::list<std::string>& target,
                         const Json::Value& instance)
 {
@@ -104,14 +106,57 @@
 
 
 
+static void CheckStowAnswer(const Json::Value& response,
+                            const std::string& serverName,
+                            size_t instancesCount)
+{
+  if (response.type() != Json::objectValue ||
+      !response.isMember("00081199"))
+  {
+    throw Orthanc::OrthancException(
+      Orthanc::ErrorCode_NetworkProtocol,
+      "Unable to parse STOW-RS JSON response from DICOMweb server " + serverName);
+  }
+
+  size_t size;
+  if (!GetSequenceSize(size, response, "00081199", true, serverName) ||
+      size != instancesCount)
+  {
+    throw Orthanc::OrthancException(
+      Orthanc::ErrorCode_NetworkProtocol,
+      "The STOW-RS server was only able to receive " + 
+      boost::lexical_cast<std::string>(size) + " instances out of " +
+      boost::lexical_cast<std::string>(instancesCount));
+  }
+
+  if (GetSequenceSize(size, response, "00081198", false, serverName) &&
+      size != 0)
+  {
+    throw Orthanc::OrthancException(
+      Orthanc::ErrorCode_NetworkProtocol,
+      "The response from the STOW-RS server contains " + 
+      boost::lexical_cast<std::string>(size) + 
+      " items in its Failed SOP Sequence (0008,1198) tag");
+  }
+
+  if (GetSequenceSize(size, response, "0008119A", false, serverName) &&
+      size != 0)
+  {
+    throw Orthanc::OrthancException(
+      Orthanc::ErrorCode_NetworkProtocol,
+      "The response from the STOW-RS server contains " + 
+      boost::lexical_cast<std::string>(size) + 
+      " items in its Other Failures Sequence (0008,119A) tag");
+  }
+}
+
+
 static void ParseStowRequest(std::list<std::string>& instances /* out */,
                              std::map<std::string, std::string>& httpHeaders /* out */,
-                             std::map<std::string, std::string>& queryArguments /* out */,
                              const OrthancPluginHttpRequest* request /* in */)
 {
   static const char* RESOURCES = "Resources";
   static const char* HTTP_HEADERS = "HttpHeaders";
-  static const char* QUERY_ARGUMENTS = "Arguments";
 
   Json::Value body;
   Json::Reader reader;
@@ -128,7 +173,6 @@
       "\" containing an array of resources to be sent");
   }
 
-  OrthancPlugins::ParseAssociativeArray(queryArguments, body, QUERY_ARGUMENTS);
   OrthancPlugins::ParseAssociativeArray(httpHeaders, body, HTTP_HEADERS);
 
   Json::Value& resources = body[RESOURCES];
@@ -181,17 +225,16 @@
 
 static void SendStowChunks(const Orthanc::WebServiceParameters& server,
                            const std::map<std::string, std::string>& httpHeaders,
-                           const std::map<std::string, std::string>& queryArguments,
                            const std::string& boundary,
                            Orthanc::ChunkedBuffer& chunks,
-                           size_t& countInstances,
+                           size_t& instancesCount,
                            bool force)
 {
   unsigned int maxInstances = OrthancPlugins::Configuration::GetUnsignedIntegerValue("StowMaxInstances", 10);
   size_t maxSize = static_cast<size_t>(OrthancPlugins::Configuration::GetUnsignedIntegerValue("StowMaxSize", 10)) * 1024 * 1024;
 
-  if ((force && countInstances > 0) ||
-      (maxInstances != 0 && countInstances >= maxInstances) ||
+  if ((force && instancesCount > 0) ||
+      (maxInstances != 0 && instancesCount >= maxInstances) ||
       (maxSize != 0 && chunks.GetNumBytes() >= maxSize))
   {
     chunks.AddChunk("\r\n--" + boundary + "--\r\n");
@@ -202,11 +245,8 @@
     OrthancPlugins::MemoryBuffer answerBody;
     std::map<std::string, std::string> answerHeaders;
 
-    std::string uri;
-    OrthancPlugins::UriEncode(uri, "studies", queryArguments);
-
     OrthancPlugins::CallServer(answerBody, answerHeaders, server, OrthancPluginHttpMethod_Post,
-                               httpHeaders, uri, body);
+                               httpHeaders, "studies", body);
 
     Json::Value response;
     Json::Reader reader;
@@ -214,49 +254,300 @@
                                 reinterpret_cast<const char*>((*answerBody)->data) + (*answerBody)->size, response);
     answerBody.Clear();
 
-    if (!success ||
-        response.type() != Json::objectValue ||
-        !response.isMember("00081199"))
+    if (!success)
     {
       throw Orthanc::OrthancException(
         Orthanc::ErrorCode_NetworkProtocol,
         "Unable to parse STOW-RS JSON response from DICOMweb server " + server.GetUrl());
     }
-
-    size_t size;
-    if (!GetSequenceSize(size, response, "00081199", true, server.GetUrl()) ||
-        size != countInstances)
+    else
     {
-      throw Orthanc::OrthancException(
-        Orthanc::ErrorCode_NetworkProtocol,
-        "The STOW-RS server was only able to receive " + 
-        boost::lexical_cast<std::string>(size) + " instances out of " +
-        boost::lexical_cast<std::string>(countInstances));
+      CheckStowAnswer(response, server.GetUrl(), instancesCount);
+    }
+
+    instancesCount = 0;
+  }
+}
+
+
+
+class StowClientJob : public OrthancPlugins::OrthancJob
+{
+private:
+  enum State
+  {
+    State_Running,
+    State_Paused,
+    State_Error,
+    State_Done
+  };
+
+
+  boost::mutex                               mutex_;
+  std::string                                serverName_;
+  std::vector<std::string>                   instances_;
+  OrthancPlugins::HttpClient::HttpHeaders    headers_;
+  std::string                                boundary_;
+
+
+  std::auto_ptr<boost::thread>  worker_;
+
+
+  State   state_;
+  size_t  position_;
+  Json::Value    content_;
+
+
+  bool ReadNextInstance(std::string& dicom)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    if (state_ != State_Running)
+    {
+      return false;
+    }
+
+    while (position_ < instances_.size())
+    {
+      size_t i = position_++;
+
+      if (OrthancPlugins::RestApiGetString(dicom, "/instances/" + instances_[i] + "/file", false))
+      {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+
+  class RequestBody : public OrthancPlugins::HttpClient::IRequestBody
+  {
+  private:
+    StowClientJob&  that_;
+    std::string     boundary_;
+    bool            done_;
+
+  public:
+    RequestBody(StowClientJob& that) :
+      that_(that),
+      boundary_(that.boundary_),
+      done_(false)
+    {
     }
 
-    if (GetSequenceSize(size, response, "00081198", false, server.GetUrl()) &&
-        size != 0)
+    virtual bool ReadNextChunk(std::string& chunk)
+    {
+      if (done_)
+      {
+        return false;
+      }
+      else
+      {
+        std::string dicom;
+
+        if (that_.ReadNextInstance(dicom))
+        {
+          chunk = ("--" + boundary_ + "\r\n" +
+                   "Content-Type: application/dicom\r\n" +
+                   "Content-Length: " + boost::lexical_cast<std::string>(dicom.size()) + 
+                   "\r\n\r\n" + dicom + "\r\n");
+        }
+        else
+        {
+          done_ = true;
+          chunk = ("--" + boundary_ + "--");
+        }
+
+        //boost::this_thread::sleep(boost::posix_time::seconds(1));
+
+        return true;
+      }
+    }
+  };
+
+
+  static void Worker(StowClientJob* that)
+  {
+    try
     {
-      throw Orthanc::OrthancException(
-        Orthanc::ErrorCode_NetworkProtocol,
-        "The response from the STOW-RS server contains " + 
-        boost::lexical_cast<std::string>(size) + 
-        " items in its Failed SOP Sequence (0008,1198) tag");
+      std::string serverName;
+      size_t startPosition;
+
+      // The lifetime of "body" should be larger than "client"
+      std::auto_ptr<RequestBody> body;
+      std::auto_ptr<OrthancPlugins::HttpClient> client;
+
+      {
+        boost::mutex::scoped_lock lock(that->mutex_);
+        serverName = that->serverName_;
+        startPosition = that->position_;
+
+        body.reset(new RequestBody(*that));
+
+        client.reset(new OrthancPlugins::HttpClient);
+        OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(*client, that->serverName_, "/studies");
+        client->SetMethod(OrthancPluginHttpMethod_Post);
+        client->AddHeaders(that->headers_);
+      }
+
+      OrthancPlugins::HttpClient::HttpHeaders answerHeaders;
+      Json::Value answerBody;
+
+      client->SetBody(*body);
+      client->Execute(answerHeaders, answerBody);
+
+      size_t endPosition;
+
+      {
+        boost::mutex::scoped_lock lock(that->mutex_);
+        endPosition = that->position_;
+      }
+
+      CheckStowAnswer(answerBody, serverName, endPosition - startPosition);
+    }
+    catch (Orthanc::OrthancException& e)
+    {
+      {
+        boost::mutex::scoped_lock lock(that->mutex_);
+        LOG(ERROR) << "Error in STOW-RS client job to server " << that->serverName_ << ": " << e.What();
+        that->state_ = State_Error;
+      }
+
+      that->SetContent("Error", e.What());
+    }
+  }
+
+
+  void SetContent(const std::string& key,
+                  const std::string& value)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    content_[key] = value;
+    UpdateContent(content_);
+  }
+
+
+  void StopWorker()
+  {
+    if (worker_.get() != NULL)
+    {
+      if (worker_->joinable())
+      {
+        worker_->join();
+      }
+
+      worker_.reset();
+    }
+  }
+
+  
+public:
+  StowClientJob(const std::string& serverName,
+                const std::list<std::string>& instances,
+                const OrthancPlugins::HttpClient::HttpHeaders& headers) :
+    OrthancJob("DicomWebStowClient"),
+    serverName_(serverName),
+    headers_(headers),
+    state_(State_Running),
+    position_(0),
+    content_(Json::objectValue)
+  {
+    instances_.reserve(instances.size());
+
+    for (std::list<std::string>::const_iterator
+           it = instances.begin(); it != instances.end(); ++it)
+    {
+      instances_.push_back(*it);
     }
 
-    if (GetSequenceSize(size, response, "0008119A", false, server.GetUrl()) &&
-        size != 0)
     {
-      throw Orthanc::OrthancException(
-        Orthanc::ErrorCode_NetworkProtocol,
-        "The response from the STOW-RS server contains " + 
-        boost::lexical_cast<std::string>(size) + 
-        " items in its Other Failures Sequence (0008,119A) tag");
+      OrthancPlugins::OrthancString tmp;
+      tmp.Assign(OrthancPluginGenerateUuid(OrthancPlugins::GetGlobalContext()));
+      tmp.ToString(boundary_);
+    }
+
+    boundary_ = (boundary_ + "-" + boundary_);  // Make the boundary longer
+
+    headers_["Accept"] = "application/dicom+json";
+    headers_["Expect"] = "";
+    headers_["Content-Type"] = "multipart/related; type=\"application/dicom\"; boundary=" + boundary_;
+  }
+
+
+  virtual OrthancPluginJobStepStatus Step()
+  {
+    State state;
+
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      if (state_ == State_Paused)
+      {
+        state_ = State_Running;
+      }
+
+      UpdateProgress(instances_.empty() ? 1 :
+                     static_cast<float>(position_) / static_cast<float>(instances_.size()));
+
+      if (position_ == instances_.size() &&
+          state_ == State_Running)
+      {
+        state_ = State_Done;
+      }
+
+      state = state_;
     }
 
-    countInstances = 0;
+    switch (state)
+    {
+      case State_Done:
+        StopWorker();
+        return (state_ == State_Done ? 
+                OrthancPluginJobStepStatus_Success :
+                OrthancPluginJobStepStatus_Failure);
+
+      case State_Error:
+        StopWorker();
+        return OrthancPluginJobStepStatus_Failure;
+
+      case State_Running:
+        if (worker_.get() == NULL)
+        {
+          worker_.reset(new boost::thread(Worker, this));
+        }
+        
+        boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+        
+        return OrthancPluginJobStepStatus_Continue;
+
+      default:
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
+    }
   }
-}
+
+
+  virtual void Stop(OrthancPluginJobStopReason reason)
+  {
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      state_ = State_Paused;
+    }
+
+    StopWorker();
+  }
+
+    
+  virtual void Reset()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    position_ = 0;
+    state_ = State_Running;
+    content_ = Json::objectValue;
+    ClearContent();
+  }
+};
+
 
 
 void StowClient(OrthancPluginRestOutput* output,
@@ -276,41 +567,45 @@
     return;
   }
 
-  Orthanc::WebServiceParameters server(OrthancPlugins::DicomWebServers::GetInstance().GetServer(request->groups[0]));
+  std::string serverName(request->groups[0]);
+
+#if 1
+  std::list<std::string> instances;
+  std::map<std::string, std::string> httpHeaders;
+  ParseStowRequest(instances, httpHeaders, request);
 
+  OrthancPlugins::LogInfo("Sending " + boost::lexical_cast<std::string>(instances.size()) +
+                          " instances using STOW-RS to DICOMweb server: " + serverName);
+
+  OrthancPlugins::OrthancJob::Submit(new StowClientJob(serverName, instances, httpHeaders),
+                                     0 /* TODO priority, synchronous */);
+
+#else
   std::string boundary;
-
+  
   {
-    char* uuid = OrthancPluginGenerateUuid(context);
-    try
-    {
-      boundary.assign(uuid);
-    }
-    catch (...)
-    {
-      OrthancPluginFreeString(context, uuid);
-      throw Orthanc::OrthancException(Orthanc::ErrorCode_NotEnoughMemory);
-    }
-
-    OrthancPluginFreeString(context, uuid);
+    OrthancPlugins::OrthancString tmp;
+    tmp.Assign(OrthancPluginGenerateUuid(context));
+    tmp.ToString(boundary);
   }
 
-  std::string mime = "multipart/related; type=\"application/dicom\"; boundary=" + boundary;
+  boundary = (boundary + "-" + boundary);
 
-  std::map<std::string, std::string> queryArguments;
   std::map<std::string, std::string> httpHeaders;
   httpHeaders["Accept"] = "application/dicom+json";
   httpHeaders["Expect"] = "";
-  httpHeaders["Content-Type"] = mime;
+  httpHeaders["Content-Type"] = "multipart/related; type=\"application/dicom\"; boundary=" + boundary;
 
   std::list<std::string> instances;
-  ParseStowRequest(instances, httpHeaders, queryArguments, request);
+  ParseStowRequest(instances, httpHeaders, request);
 
   OrthancPlugins::LogInfo("Sending " + boost::lexical_cast<std::string>(instances.size()) +
-                          " instances using STOW-RS to DICOMweb server: " + server.GetUrl());
+                          " instances using STOW-RS to DICOMweb server: " + serverName);
+
+  Orthanc::WebServiceParameters server(OrthancPlugins::DicomWebServers::GetInstance().GetServer(serverName));
 
   Orthanc::ChunkedBuffer chunks;
-  size_t countInstances = 0;
+  size_t instancesCount = 0;
 
   for (std::list<std::string>::const_iterator it = instances.begin(); it != instances.end(); ++it)
   {
@@ -322,13 +617,14 @@
                       "Content-Length: " + boost::lexical_cast<std::string>(dicom.GetSize()) +
                       "\r\n\r\n");
       chunks.AddChunk(dicom.GetData(), dicom.GetSize());
-      countInstances ++;
+      instancesCount ++;
 
-      SendStowChunks(server, httpHeaders, queryArguments, boundary, chunks, countInstances, false);
+      SendStowChunks(server, httpHeaders, boundary, chunks, instancesCount, false);
     }
   }
 
-  SendStowChunks(server, httpHeaders, queryArguments, boundary, chunks, countInstances, true);
+  SendStowChunks(server, httpHeaders, boundary, chunks, instancesCount, true);
+#endif
 
   std::string answer = "{}\n";
   OrthancPluginAnswerBuffer(context, output, answer.c_str(), answer.size(), "application/json");
@@ -362,48 +658,6 @@
 }
 
 
-static std::string RemoveMultipleSlashes(const std::string& source)
-{
-  std::string target;
-  target.reserve(source.size());
-
-  size_t prefix = 0;
-  
-  if (boost::starts_with(source, "https://"))
-  {
-    prefix = 8;
-  }
-  else if (boost::starts_with(source, "http://"))
-  {
-    prefix = 7;
-  }
-
-  for (size_t i = 0; i < prefix; i++)
-  {
-    target.push_back(source[i]);
-  }
-
-  bool isLastSlash = false;
-
-  for (size_t i = prefix; i < source.size(); i++)
-  {
-    if (source[i] == '/')
-    {
-      if (!isLastSlash)
-      {
-        target.push_back('/');
-        isLastSlash = true;
-      }
-    }
-    else
-    {
-      target.push_back(source[i]);
-      isLastSlash = false;
-    }
-  }
-
-  return target;
-}
 
 
 void GetFromServer(OrthancPluginRestOutput* output,
@@ -422,9 +676,6 @@
     return;
   }
 
-  Orthanc::WebServiceParameters server(
-    OrthancPlugins::DicomWebServers::GetInstance().GetServer(request->groups[0]));
-
   std::string tmp;
   Json::Value body;
   Json::Reader reader;
@@ -442,20 +693,14 @@
   OrthancPlugins::ParseAssociativeArray(getArguments, body, GET_ARGUMENTS);
 
   std::string uri;
-  OrthancPlugins::UriEncode(uri, tmp, getArguments);
+  OrthancPlugins::DicomWebServers::UriEncode(uri, tmp, getArguments);
 
   OrthancPlugins::HttpClient client;
-  client.SetUrl(RemoveMultipleSlashes(server.GetUrl() + "/" + uri));
-  client.SetHeaders(server.GetHttpHeaders());
+  OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(client, request->groups[0], uri);
 
   std::map<std::string, std::string> additionalHeaders;
   OrthancPlugins::ParseAssociativeArray(additionalHeaders, body, HTTP_HEADERS);
-
-  for (std::map<std::string, std::string>::const_iterator
-         it = additionalHeaders.begin(); it != additionalHeaders.end(); ++it)
-  {
-    client.AddHeader(it->first, it->second);
-  }
+  client.AddHeaders(additionalHeaders);
   
   std::map<std::string, std::string> answerHeaders;
   std::string answer;
@@ -540,7 +785,7 @@
   }
 
   std::string uri;
-  OrthancPlugins::UriEncode(uri, tmpUri, getArguments);
+  OrthancPlugins::DicomWebServers::UriEncode(uri, tmpUri, getArguments);
 
   OrthancPlugins::MemoryBuffer answerBody;
   std::map<std::string, std::string> answerHeaders;
--- a/Plugin/DicomWebServers.cpp	Fri Jun 14 15:21:34 2019 +0200
+++ b/Plugin/DicomWebServers.cpp	Fri Jun 14 17:52:17 2019 +0200
@@ -25,6 +25,8 @@
 
 #include <Core/Toolbox.h>
 
+#include <boost/algorithm/string/predicate.hpp>
+
 namespace OrthancPlugins
 {
   void DicomWebServers::Clear()
@@ -116,36 +118,58 @@
   }
 
 
+  static std::string RemoveMultipleSlashes(const std::string& source)
+  {
+    std::string target;
+    target.reserve(source.size());
+
+    size_t prefix = 0;
+  
+    if (boost::starts_with(source, "https://"))
+    {
+      prefix = 8;
+    }
+    else if (boost::starts_with(source, "http://"))
+    {
+      prefix = 7;
+    }
+
+    for (size_t i = 0; i < prefix; i++)
+    {
+      target.push_back(source[i]);
+    }
+
+    bool isLastSlash = false;
+
+    for (size_t i = prefix; i < source.size(); i++)
+    {
+      if (source[i] == '/')
+      {
+        if (!isLastSlash)
+        {
+          target.push_back('/');
+          isLastSlash = true;
+        }
+      }
+      else
+      {
+        target.push_back(source[i]);
+        isLastSlash = false;
+      }
+    }
+
+    return target;
+  }
+
+
   void DicomWebServers::ConfigureHttpClient(HttpClient& client,
                                             const std::string& name,
                                             const std::string& uri)
   {
-    boost::mutex::scoped_lock lock(mutex_);
     const Orthanc::WebServiceParameters parameters = GetServer(name);
 
-    std::string url = parameters.GetUrl();
-    
-    if (url.empty() ||
-        url[url.size() - 1] != '/')
-    {
-      url += '/';
-    }
-    
-    std::string normalizedUri = uri;
-    while (!normalizedUri.empty() &&
-           normalizedUri[0] == '/')
-    {
-      normalizedUri = normalizedUri.substr(1);
-    }
-    
-    client.SetUrl(url + normalizedUri);
-
-    for (Orthanc::WebServiceParameters::Dictionary::const_iterator
-           it = parameters.GetHttpHeaders().begin();
-         it != parameters.GetHttpHeaders().end(); ++it)
-    {
-      client.AddHeader(it->first, it->second);
-    }
+    client.SetUrl(RemoveMultipleSlashes(parameters.GetUrl() + "/" + uri));
+    client.SetHeaders(parameters.GetHttpHeaders());
 
     if (!parameters.GetUsername().empty())
     {
@@ -329,9 +353,9 @@
   }
 
 
-  void UriEncode(std::string& uri,
-                 const std::string& resource,
-                 const std::map<std::string, std::string>& getArguments)
+  void DicomWebServers::UriEncode(std::string& uri,
+                                  const std::string& resource,
+                                  const std::map<std::string, std::string>& getArguments)
   {
     if (resource.find('?') != std::string::npos)
     {
--- a/Plugin/DicomWebServers.h	Fri Jun 14 15:21:34 2019 +0200
+++ b/Plugin/DicomWebServers.h	Fri Jun 14 17:52:17 2019 +0200
@@ -45,6 +45,10 @@
     }
 
   public:
+    static void UriEncode(std::string& uri,
+                          const std::string& resource,
+                          const std::map<std::string, std::string>& getArguments);
+
     void Load(const Json::Value& configuration);
 
     ~DicomWebServers()
@@ -76,8 +80,4 @@
                   const std::map<std::string, std::string>& httpHeaders,
                   const std::string& uri,
                   const std::string& body);
-
-  void UriEncode(std::string& uri,
-                 const std::string& resource,
-                 const std::map<std::string, std::string>& getArguments);
 }