changeset 317:ffe1c92c73c7 refactoring

WadoRetrieveJob
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 20 Jun 2019 18:37:20 +0200
parents 9313f6157eef
children 1f948ba78c5d
files Plugin/DicomWebClient.cpp Plugin/DicomWebClient.h Plugin/Plugin.cpp
diffstat 3 files changed, 584 insertions(+), 35 deletions(-) [+]
line wrap: on
line diff
--- a/Plugin/DicomWebClient.cpp	Thu Jun 20 12:55:26 2019 +0200
+++ b/Plugin/DicomWebClient.cpp	Thu Jun 20 18:37:20 2019 +0200
@@ -31,6 +31,7 @@
 #include <Core/HttpServer/MultipartStreamReader.h>
 #include <Core/ChunkedBuffer.h>
 #include <Core/Toolbox.h>
+#include <Plugins/Samples/Common/OrthancPluginCppWrapper.h>
 
 
 #include <boost/thread.hpp>
@@ -284,7 +285,7 @@
   enum State
   {
     State_Running,
-    State_Paused,
+    State_Stopped,
     State_Error,
     State_Done
   };
@@ -489,7 +490,7 @@
     {
       boost::mutex::scoped_lock lock(mutex_);
 
-      if (state_ == State_Paused)
+      if (state_ == State_Stopped)
       {
         state_ = State_Running;
       }
@@ -538,7 +539,7 @@
   {
     {
       boost::mutex::scoped_lock lock(mutex_);
-      state_ = State_Paused;
+      state_ = State_Stopped;
     }
 
     StopWorker();
@@ -673,13 +674,35 @@
 
 
 
-static void ConfigureGetFromServer(OrthancPlugins::HttpClient& client,
-                                   const OrthancPluginHttpRequest* request)
+static void ParseGetFromServer(std::string& uri,
+                               std::map<std::string, std::string>& additionalHeaders,
+                               const Json::Value& resource)
 {
   static const char* URI = "Uri";
   static const char* HTTP_HEADERS = "HttpHeaders";
   static const char* GET_ARGUMENTS = "Arguments";
 
+  std::string tmp;
+  if (resource.type() != Json::objectValue ||
+      !GetStringValue(tmp, resource, URI))
+  {
+    throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat,
+                                    "A request to the DICOMweb client must provide a JSON object "
+                                    "with the field \"Uri\" containing the URI of interest");
+  }
+
+  std::map<std::string, std::string> getArguments;
+  OrthancPlugins::ParseAssociativeArray(getArguments, resource, GET_ARGUMENTS); 
+  OrthancPlugins::DicomWebServers::UriEncode(uri, tmp, getArguments);
+
+  OrthancPlugins::ParseAssociativeArray(additionalHeaders, resource, HTTP_HEADERS);
+}
+
+
+
+static void ConfigureGetFromServer(OrthancPlugins::HttpClient& client,
+                                   const OrthancPluginHttpRequest* request)
+{
   if (request->method != OrthancPluginHttpMethod_Post)
   {
     throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
@@ -688,25 +711,11 @@
   Json::Value body;
   OrthancPlugins::ParseJsonBody(body, request);
 
-  std::string tmp;
-  if (body.type() != Json::objectValue ||
-      !GetStringValue(tmp, body, URI))
-  {
-    throw Orthanc::OrthancException(Orthanc::ErrorCode_BadFileFormat,
-                                    "A request to the DICOMweb client must provide a JSON object "
-                                    "with the field \"Uri\" containing the URI of interest");
-  }
-
-  std::map<std::string, std::string> getArguments;
-  OrthancPlugins::ParseAssociativeArray(getArguments, body, GET_ARGUMENTS);
-
   std::string uri;
-  OrthancPlugins::DicomWebServers::UriEncode(uri, tmp, getArguments);
+  std::map<std::string, std::string> additionalHeaders;
+  ParseGetFromServer(uri, additionalHeaders, body);
 
   OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(client, request->groups[0], uri);
-
-  std::map<std::string, std::string> additionalHeaders;
-  OrthancPlugins::ParseAssociativeArray(additionalHeaders, body, HTTP_HEADERS);
   client.AddHeaders(additionalHeaders);
 }
 
@@ -1042,13 +1051,14 @@
   {
     State_Headers,
     State_Body,
-    State_Stopped
+    State_Canceled
   };
   
   boost::mutex                                   mutex_;
   State                                          state_;
   std::list<std::string>                         instances_;
   std::auto_ptr<Orthanc::MultipartStreamReader>  reader_;
+  uint64_t                                       networkSize_;
 
   virtual void HandlePart(const Orthanc::MultipartStreamReader::HttpHeaders& headers,
                           const void* part,
@@ -1095,7 +1105,8 @@
 
 public:
   WadoRetrieveAnswer() :
-    state_(State_Headers)
+    state_(State_Headers),
+    networkSize_(0)
   {
   }
 
@@ -1107,7 +1118,7 @@
   {
     boost::mutex::scoped_lock lock(mutex_);
 
-    if (state_ != State_Stopped &&
+    if (state_ != State_Canceled &&
         reader_.get() != NULL)
     {
       reader_->CloseStream();
@@ -1119,7 +1130,7 @@
   {
     boost::mutex::scoped_lock lock(mutex_);
 
-    if (state_ == State_Stopped)
+    if (state_ == State_Canceled)
     {
       return;
     }
@@ -1163,20 +1174,21 @@
   {
     boost::mutex::scoped_lock lock(mutex_);
 
-    if (state_ == State_Stopped)
+    if (state_ == State_Canceled)
     {
-      return;
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_CanceledJob);
     }
-
-    if (reader_.get() == NULL)
+    else if (reader_.get() == NULL)
     {
       throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol,
                                       "No Content-Type provided by the remote WADO-RS server");
     }
-
-    state_ = State_Body;
-
-    reader_->AddChunk(data, size);
+    else
+    {
+      state_ = State_Body;
+      networkSize_ += size;
+      reader_->AddChunk(data, size);
+    }
   }
 
   void GetReceivedInstances(std::list<std::string>& target)
@@ -1185,11 +1197,17 @@
     target = instances_;
   }
 
-  void Stop()
+  void Cancel()
   {
     boost::mutex::scoped_lock lock(mutex_);
+    LOG(ERROR) << "A WADO-RS retrieve job has been canceled, expect \"Error in the network protocol\" errors";
+    state_ = State_Canceled;
+  }
 
-    state_ = State_Stopped;
+  uint64_t GetNetworkSize()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    return networkSize_;
   }
 };
 
@@ -1207,6 +1225,7 @@
   // Do some loop
   {
     WadoRetrieveAnswer answer;
+    answer.Cancel();
     client.Execute(answer);
     answer.Close();
 
@@ -1223,3 +1242,524 @@
                             tmp.c_str(), tmp.size(), "application/json");
 
 }
+
+
+
+
+
+#include <Core/IDynamicObject.h>
+
+
+class SingleFunctionJob : public OrthancPlugins::OrthancJob
+{
+public:
+  class JobContext : public boost::noncopyable
+  {
+  private:
+    SingleFunctionJob&  that_;
+
+  public:
+    JobContext(SingleFunctionJob& that) :
+      that_(that)
+    {
+    }
+
+    void SetContent(const std::string& key,
+                    const std::string& value)
+    {
+      that_.SetContent(key, value);
+    }
+
+    void SetProgress(unsigned int position,
+                     unsigned int maxPosition)
+    {
+      boost::mutex::scoped_lock lock(that_.mutex_);
+      
+      if (maxPosition == 0 || 
+          position > maxPosition)
+      {
+        that_.UpdateProgress(1);
+      }
+      else
+      {
+        that_.UpdateProgress(static_cast<float>(position) / static_cast<float>(maxPosition));
+      }
+    }
+  };
+
+
+  class IFunction : public boost::noncopyable
+  {
+  public:
+    virtual ~IFunction()
+    {
+    }
+
+    // Must return "true" if the job has completed with success, or
+    // "false" if the job has been canceled. Pausing the job
+    // corresponds to canceling it.
+    virtual bool Execute(JobContext& context) = 0;
+  };
+
+
+  class IFunctionFactory : public boost::noncopyable
+  {
+  public:
+    virtual ~IFunctionFactory()
+    {
+    }
+
+    // WARNING: "CancelFunction()" will be invoked while "Execute()"
+    // is running. Mutex is probably necessary.
+    virtual void CancelFunction() = 0;
+
+    // Only called when no function is running, to deal with
+    // "Resubmit()" after job cancelation/failure.
+    virtual void ResetFunction() = 0;
+
+    virtual IFunction* CreateFunction() = 0;
+  };
+
+
+protected:
+  void SetFactory(IFunctionFactory& factory)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    if (state_ != State_Setup)
+    {
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
+    }
+    else
+    {
+      factory_ = &factory;
+    }
+  }
+  
+
+private:
+  enum State
+  {
+    State_Setup,
+    State_Running,
+    State_Success,
+    State_Failure
+  };
+
+  boost::mutex                  mutex_;
+  State                         state_;  // Can only be modified by the "Worker()" function
+  std::auto_ptr<boost::thread>  worker_;
+  Json::Value                   content_;
+  IFunctionFactory*             factory_;
+
+  void JoinWorker()
+  {
+    assert(factory_ != NULL);
+
+    if (worker_.get() != NULL)
+    {
+      if (worker_->joinable())
+      {
+        worker_->join();
+      }
+
+      worker_.reset();
+    }
+  }
+
+  void StartWorker()
+  {
+    assert(factory_ != NULL);
+
+    if (worker_.get() == NULL &&
+        factory_ != NULL)
+    {
+      worker_.reset(new boost::thread(Worker, this, factory_));
+    }
+  }
+
+  void SetContent(const std::string& key,
+                  const std::string& value)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    content_[key] = value;
+    UpdateContent(content_);
+  }
+
+  static void Worker(SingleFunctionJob* job,
+                     IFunctionFactory* factory)
+  {
+    assert(job != NULL && factory != NULL);
+
+    JobContext context(*job);
+
+    {
+      boost::mutex::scoped_lock lock(job->mutex_);
+      job->state_ = State_Running;
+    }
+
+    try
+    {
+      std::auto_ptr<IFunction> function(factory->CreateFunction());
+      bool success = function->Execute(context);
+
+      {
+        boost::mutex::scoped_lock lock(job->mutex_);
+        job->state_ = (success ? State_Success : State_Failure);
+        if (success)
+        {
+          job->UpdateProgress(1);
+        }
+      }
+    }
+    catch (Orthanc::OrthancException& e)
+    {
+      LOG(ERROR) << "Error in a job: " << e.What();
+
+      {
+        boost::mutex::scoped_lock lock(job->mutex_);
+        job->state_ = State_Failure;
+        job->content_["FunctionErrorCode"] = e.GetErrorCode();
+        job->content_["FunctionErrorDescription"] = e.What();
+        if (e.HasDetails())
+        {
+          job->content_["FunctionErrorDetails"] = e.GetDetails();
+        }
+        job->UpdateContent(job->content_);
+      }
+    }
+  }  
+
+public:
+  SingleFunctionJob(const std::string& jobName) :
+    OrthancJob(jobName),
+    state_(State_Setup),
+    content_(Json::objectValue),
+    factory_(NULL)
+  {
+  }
+
+  virtual ~SingleFunctionJob()
+  {
+    if (worker_.get() != NULL)
+    {
+      LOG(ERROR) << "Classes deriving from SingleFunctionJob must "
+                 << "explicitly call Finalize() in their destructor";
+
+      try
+      {
+        JoinWorker();
+      }
+      catch (Orthanc::OrthancException&)
+      {
+      }
+    }
+  }
+
+  void Finalize()
+  {
+    try
+    {
+      if (factory_ != NULL)
+      {
+        factory_->CancelFunction();
+        JoinWorker();
+      }
+    }
+    catch (Orthanc::OrthancException&)
+    {
+    }
+  }
+
+  virtual OrthancPluginJobStepStatus Step()
+  {
+    if (factory_ == NULL)
+    {
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
+    }
+
+    State state;
+
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+      state = state_;
+    }
+
+    switch (state)
+    {
+      case State_Setup:
+        StartWorker();
+        break;
+
+      case State_Running:
+        break;
+
+      case State_Success:
+        JoinWorker();
+        return OrthancPluginJobStepStatus_Success;
+
+      case State_Failure:
+        JoinWorker();
+        return OrthancPluginJobStepStatus_Failure;
+
+      default:
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
+    }
+
+    boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+    return OrthancPluginJobStepStatus_Continue;
+  }
+
+  virtual void Stop(OrthancPluginJobStopReason reason)
+  {
+    if (factory_ == NULL)
+    {
+      return;
+    }
+
+    if (reason == OrthancPluginJobStopReason_Paused ||
+        reason == OrthancPluginJobStopReason_Canceled)
+    {
+      factory_->CancelFunction();
+    }
+
+    JoinWorker();
+
+    if (reason == OrthancPluginJobStopReason_Paused)
+    {
+      // This type of job cannot be paused: Reset under the hood
+      Reset();
+    }
+  }
+
+  virtual void Reset()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    if (factory_ != NULL)
+    {
+      factory_->ResetFunction();
+    }
+
+    state_ = State_Setup;    
+
+    content_ = Json::objectValue;
+    ClearContent();
+  }
+};
+
+
+
+class WadoRetrieveJob : 
+  public SingleFunctionJob,
+  private SingleFunctionJob::IFunctionFactory
+{
+private:
+  class Resource : public boost::noncopyable
+  {
+  private:
+    std::string                        uri_;
+    std::map<std::string, std::string> additionalHeaders_;
+
+  public:
+    Resource(const std::string& uri) :
+      uri_(uri)
+    {
+    }
+
+    Resource(const std::string& uri,
+             const std::map<std::string, std::string>& additionalHeaders) :
+      uri_(uri),
+      additionalHeaders_(additionalHeaders)
+    {
+    }
+
+    const std::string& GetUri() const
+    {
+      return uri_;
+    }
+
+    const std::map<std::string, std::string>& GetAdditionalHeaders() const
+    {
+      return additionalHeaders_;
+    }
+  };
+
+
+  enum Status
+  {
+    Status_Done,
+    Status_Canceled,
+    Status_Continue
+  };
+
+
+  class F : public IFunction
+  {
+  private:
+    WadoRetrieveJob&   that_;
+
+  public:
+    F(WadoRetrieveJob& that) :
+    that_(that)
+    {
+    }
+
+    virtual bool Execute(JobContext& context)
+    {
+      for (;;)
+      {
+        OrthancPlugins::HttpClient client;
+
+        switch (that_.SetupNextResource(client))
+        {
+          case Status_Continue:
+            client.Execute(*that_.answer_);
+            that_.CloseResource(context);
+            break;
+
+          case Status_Canceled:
+            return false;
+
+          case Status_Done:
+            return true;
+        }
+      }
+    }
+  };
+
+
+  boost::mutex            mutex_;
+  std::string             serverName_;
+  size_t                  position_;
+  std::vector<Resource*>  resources_;
+  bool                    canceled_;
+  std::list<std::string>  retrievedInstances_;
+  std::auto_ptr<WadoRetrieveAnswer>  answer_;
+  uint64_t                networkSize_;
+
+
+  Status SetupNextResource(OrthancPlugins::HttpClient& client)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    if (canceled_)
+    {
+      return Status_Canceled;
+    }
+    else if (position_ == resources_.size())
+    {
+      return Status_Done;
+    }
+    else
+    {
+      answer_.reset(new WadoRetrieveAnswer);
+
+      const Resource* resource = resources_[position_++];
+      if (resource == NULL)
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
+      }
+
+      OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient
+        (client, serverName_, resource->GetUri());
+      client.AddHeaders(resource->GetAdditionalHeaders());
+
+      return Status_Continue;
+    }
+  }
+
+
+  void CloseResource(JobContext& context)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    answer_->Close();
+
+    std::list<std::string> instances;
+    answer_->GetReceivedInstances(instances);
+    networkSize_ += answer_->GetNetworkSize();
+
+    answer_.reset();
+
+    retrievedInstances_.splice(retrievedInstances_.end(), instances);
+
+    context.SetProgress(position_, resources_.size());
+    context.SetContent("NetworkUsageMB", boost::lexical_cast<std::string>(networkSize_ / (1024llu * 1024llu)));
+    context.SetContent("ReceivedInstancesCount", boost::lexical_cast<std::string>(retrievedInstances_.size()));
+  }
+
+
+  virtual void CancelFunction()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    canceled_ = true;
+    if (answer_.get() != NULL)
+    {
+      answer_->Cancel();
+    }
+  }
+
+  virtual void ResetFunction()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    canceled_ = false;
+    position_ = 0;
+    retrievedInstances_.clear();
+  }
+
+  virtual IFunction* CreateFunction()
+  {
+    return new F(*this);
+  }
+
+public:
+  WadoRetrieveJob(const std::string& serverName) :
+    SingleFunctionJob("DicomWebWadoRetrieveClient"),
+    serverName_(serverName),
+    position_(0),
+    canceled_(false),
+    networkSize_(0)
+  {
+    SetFactory(*this);
+  }
+
+  virtual ~WadoRetrieveJob()
+  {
+    SingleFunctionJob::Finalize();
+
+    for (size_t i = 0; i < resources_.size(); i++)
+    {
+      assert(resources_[i] != NULL);
+      delete resources_[i];
+    }
+  }
+
+  void AddResource(const std::string uri)
+  {
+    resources_.push_back(new Resource(uri));
+  }
+
+  void AddResourceFromRequest(const Json::Value& resource)
+  {
+    std::string uri;
+    std::map<std::string, std::string> additionalHeaders;
+    ParseGetFromServer(uri, additionalHeaders, resource);
+
+    resources_.push_back(new Resource(uri, additionalHeaders));
+  }
+};
+
+
+
+void WadoTest()
+{
+  std::auto_ptr<WadoRetrieveJob>  job(new WadoRetrieveJob("self"));
+  //job->AddResource("studies/2.16.840.1.113669.632.20.1211.10000315526");  // VIX
+  job->AddResource("studies/1.3.51.0.1.1.192.168.29.133.1688840.1688819");  // Cardiac
+
+  //OrthancPlugins::OrthancJob::Submit(job.release(), 0);
+
+  Json::Value result;
+  OrthancPlugins::OrthancJob::SubmitAndWait(result, job.release(), 0);
+  std::cout << result.toStyledString() << std::endl;
+}
--- a/Plugin/DicomWebClient.h	Thu Jun 20 12:55:26 2019 +0200
+++ b/Plugin/DicomWebClient.h	Thu Jun 20 18:37:20 2019 +0200
@@ -43,3 +43,7 @@
 void WadoRetrieveClient(OrthancPluginRestOutput* output,
                         const char* url,
                         const OrthancPluginHttpRequest* request);
+
+
+// TODO => remove
+void WadoTest();
--- a/Plugin/Plugin.cpp	Thu Jun 20 12:55:26 2019 +0200
+++ b/Plugin/Plugin.cpp	Thu Jun 20 18:37:20 2019 +0200
@@ -502,6 +502,11 @@
   {
     try
     {
+#if 1
+      WadoTest();
+#endif
+
+
 #if 0
       {
         StowClientBody stow;