changeset 333:bc903f260e38

refactoring of StowClientJob using SingleFunctionJob
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 21 Jun 2019 17:16:50 +0200
parents bff4b45cbf1f
children b995fc9156de
files Plugin/DicomWebClient.cpp
diffstat 1 files changed, 120 insertions(+), 158 deletions(-) [+]
line wrap: on
line diff
--- a/Plugin/DicomWebClient.cpp	Fri Jun 21 16:12:42 2019 +0200
+++ b/Plugin/DicomWebClient.cpp	Fri Jun 21 17:16:50 2019 +0200
@@ -64,7 +64,7 @@
                      unsigned int maxPosition)
     {
       boost::mutex::scoped_lock lock(that_.mutex_);
-      
+
       if (maxPosition == 0 || 
           position > maxPosition)
       {
@@ -175,8 +175,6 @@
   static void Worker(SingleFunctionJob* job,
                      IFunctionFactory* factory)
   {
-    printf("=================================> STARTING\n");
-      
     assert(job != NULL && factory != NULL);
 
     JobContext context(*job);
@@ -247,8 +245,6 @@
 
   virtual OrthancPluginJobStepStatus Step()
   {
-    printf("=================================> STEP\n");
-          
     if (factory_ == NULL)
     {
       throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
@@ -283,8 +279,6 @@
 
   virtual void Stop(OrthancPluginJobStopReason reason)
   {
-    printf("=================================> STOP %d\n", (int) reason);
-          
     if (factory_ == NULL)
     {
       return;
@@ -312,8 +306,6 @@
 
   virtual void Reset()
   {
-    printf("=================================> RESET\n");
-          
     boost::mutex::scoped_lock lock(mutex_);
 
     assert(worker_.get() == NULL);
@@ -564,48 +556,55 @@
 }
 
 
-class StowClientJob : public OrthancPlugins::OrthancJob
+class StowClientJob :
+  public SingleFunctionJob,
+  private SingleFunctionJob::IFunctionFactory
 {
 private:
-  enum State
+  enum Action
   {
-    State_Running,
-    State_Stopped,
-    State_Error,
-    State_Done
+    Action_None,
+    Action_Pause,
+    Action_Cancel
   };
-
+  
+  boost::mutex                             mutex_;
+  std::string                              serverName_;
+  std::vector<std::string>                 instances_;
+  OrthancPlugins::HttpClient::HttpHeaders  headers_;
+  std::string                              boundary_;
+  size_t                                   position_;
+  Action                                   action_;
+  size_t                                   networkSize_;
+  bool                                     debug_;
 
-  boost::mutex                               mutex_;
-  std::string                                serverName_;
-  std::vector<std::string>                   instances_;
-  OrthancPlugins::HttpClient::HttpHeaders    headers_;
-  std::string                                boundary_;
-
-
-  std::auto_ptr<boost::thread>  worker_;
-
-
-  State   state_;
-  size_t  position_;
-  Json::Value    content_;
-
-
-  bool ReadNextInstance(std::string& dicom)
+  bool ReadNextInstance(std::string& dicom,
+                        JobContext& context)
   {
     boost::mutex::scoped_lock lock(mutex_);
 
-    if (state_ != State_Running)
+    if (action_ != Action_None)
     {
       return false;
     }
 
     while (position_ < instances_.size())
     {
+      context.SetProgress(position_, instances_.size());
+      
       size_t i = position_++;
 
+      if (debug_)
+      {
+        boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+      }
+
       if (OrthancPlugins::RestApiGetString(dicom, "/instances/" + instances_[i] + "/file", false))
       {
+        networkSize_ += dicom.size();
+        context.SetContent("NetworkSizeMB", boost::lexical_cast<std::string>
+                           (networkSize_ / static_cast<uint64_t>(1024 * 1024)));
+
         return true;
       }
     }
@@ -618,12 +617,15 @@
   {
   private:
     StowClientJob&  that_;
+    JobContext&     context_;
     std::string     boundary_;
     bool            done_;
 
   public:
-    RequestBody(StowClientJob& that) :
+    RequestBody(StowClientJob& that,
+                JobContext& context) :
       that_(that),
+      context_(context),
       boundary_(that.boundary_),
       done_(false)
     {
@@ -633,13 +635,14 @@
     {
       if (done_)
       {
+        context_.SetProgress(1, 1);
         return false;
       }
       else
       {
         std::string dicom;
 
-        if (that_.ReadNextInstance(dicom))
+        if (that_.ReadNextInstance(dicom, context_))
         {
           chunk = ("--" + boundary_ + "\r\n" +
                    "Content-Type: application/dicom\r\n" +
@@ -660,11 +663,19 @@
   };
 
 
-  static void Worker(StowClientJob* that)
+  class F : public IFunction
   {
-    try
+  private:
+    StowClientJob& that_;
+
+  public:
+    F(StowClientJob& that) :
+      that_(that)
     {
-      std::string serverName;
+    }
+
+    virtual void Execute(JobContext& context)
+    {
       size_t startPosition;
 
       // The lifetime of "body" should be larger than "client"
@@ -672,16 +683,16 @@
       std::auto_ptr<OrthancPlugins::HttpClient> client;
 
       {
-        boost::mutex::scoped_lock lock(that->mutex_);
-        serverName = that->serverName_;
-        startPosition = that->position_;
-
-        body.reset(new RequestBody(*that));
+        boost::mutex::scoped_lock lock(that_.mutex_);
+        context.SetContent("InstancesCount", boost::lexical_cast<std::string>(that_.instances_.size()));
+        
+        startPosition = that_.position_;        
+        body.reset(new RequestBody(that_, context));
 
         client.reset(new OrthancPlugins::HttpClient);
-        OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(*client, that->serverName_, "/studies");
+        OrthancPlugins::DicomWebServers::GetInstance().ConfigureHttpClient(*client, that_.serverName_, "/studies");
         client->SetMethod(OrthancPluginHttpMethod_Post);
-        client->AddHeaders(that->headers_);
+        client->AddHeaders(that_.headers_);
       }
 
       OrthancPlugins::HttpClient::HttpHeaders answerHeaders;
@@ -693,45 +704,37 @@
       size_t endPosition;
 
       {
-        boost::mutex::scoped_lock lock(that->mutex_);
-        endPosition = that->position_;
+        boost::mutex::scoped_lock lock(that_.mutex_);
+        endPosition = that_.position_;
+        CheckStowAnswer(answerBody, that_.serverName_, endPosition - startPosition);
+
+        if (that_.action_ == Action_Cancel)
+        {
+          that_.position_ = 0;
+        }
       }
-
-      CheckStowAnswer(answerBody, serverName, endPosition - startPosition);
     }
-    catch (Orthanc::OrthancException& e)
-    {
-      {
-        boost::mutex::scoped_lock lock(that->mutex_);
-        LOG(ERROR) << "Error in STOW-RS client job to server " << that->serverName_ << ": " << e.What();
-        that->state_ = State_Error;
-      }
+  };
+  
 
-      that->SetContent("Error", e.What());
-    }
+  virtual void CancelFunction()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    action_ = Action_Cancel;
+  }
+  
+
+  virtual void PauseFunction()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    action_ = Action_Pause;
   }
 
-
-  void SetContent(const std::string& key,
-                  const std::string& value)
+  
+  virtual IFunction* CreateFunction()
   {
-    boost::mutex::scoped_lock lock(mutex_);
-    content_[key] = value;
-    UpdateContent(content_);
-  }
-
-
-  void StopWorker()
-  {
-    if (worker_.get() != NULL)
-    {
-      if (worker_->joinable())
-      {
-        worker_->join();
-      }
-
-      worker_.reset();
-    }
+    action_ = Action_None;
+    return new F(*this);
   }
 
   
@@ -739,13 +742,16 @@
   StowClientJob(const std::string& serverName,
                 const std::list<std::string>& instances,
                 const OrthancPlugins::HttpClient::HttpHeaders& headers) :
-    OrthancJob("DicomWebStowClient"),
+    SingleFunctionJob("DicomWebStowClient"),
     serverName_(serverName),
     headers_(headers),
-    state_(State_Running),
     position_(0),
-    content_(Json::objectValue)
+    action_(Action_None),
+    networkSize_(0),
+    debug_(false)
   {
+    SetFactory(*this);
+
     instances_.reserve(instances.size());
 
     for (std::list<std::string>::const_iterator
@@ -767,77 +773,9 @@
     headers_["Content-Type"] = "multipart/related; type=\"application/dicom\"; boundary=" + boundary_;
   }
 
-
-  virtual OrthancPluginJobStepStatus Step()
+  void SetDebug(bool debug)
   {
-    State state;
-
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-
-      if (state_ == State_Stopped)
-      {
-        state_ = State_Running;
-      }
-
-      UpdateProgress(instances_.empty() ? 1 :
-                     static_cast<float>(position_) / static_cast<float>(instances_.size()));
-
-      if (position_ == instances_.size() &&
-          state_ == State_Running)
-      {
-        state_ = State_Done;
-      }
-
-      state = state_;
-    }
-
-    switch (state)
-    {
-      case State_Done:
-        StopWorker();
-        return (state_ == State_Done ? 
-                OrthancPluginJobStepStatus_Success :
-                OrthancPluginJobStepStatus_Failure);
-
-      case State_Error:
-        StopWorker();
-        return OrthancPluginJobStepStatus_Failure;
-
-      case State_Running:
-        if (worker_.get() == NULL)
-        {
-          worker_.reset(new boost::thread(Worker, this));
-        }
-        
-        boost::this_thread::sleep(boost::posix_time::milliseconds(500));
-        
-        return OrthancPluginJobStepStatus_Continue;
-
-      default:
-        throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
-    }
-  }
-
-
-  virtual void Stop(OrthancPluginJobStopReason reason)
-  {
-    {
-      boost::mutex::scoped_lock lock(mutex_);
-      state_ = State_Stopped;
-    }
-
-    StopWorker();
-  }
-
-    
-  virtual void Reset()
-  {
-    boost::mutex::scoped_lock lock(mutex_);
-    position_ = 0;
-    state_ = State_Running;
-    content_ = Json::objectValue;
-    ClearContent();
+    debug_ = debug;
   }
 };
 
@@ -872,8 +810,16 @@
   OrthancPlugins::LogInfo("Sending " + boost::lexical_cast<std::string>(instances.size()) +
                           " instances using STOW-RS to DICOMweb server: " + serverName);
 
+  std::auto_ptr<StowClientJob> job(new StowClientJob(serverName, instances, httpHeaders));
+
+  bool debug;
+  if (OrthancPlugins::LookupBooleanValue(debug, body, "Debug"))
+  {
+    job->SetDebug(debug);
+  }
+  
   Json::Value answer;
-  SubmitJob(output, new StowClientJob(serverName, instances, httpHeaders), body, 
+  SubmitJob(output, job.release(), body, 
             true /* synchronous by default, for compatibility with <= 0.6 */);
 }
 
@@ -1011,8 +957,6 @@
                           const void* part,
                           size_t size)
   {
-    printf("  part %d\n", size);
-    
     std::string contentType;
     if (!Orthanc::MultipartStreamReader::GetMainContentType(contentType, headers))
     {
@@ -1052,7 +996,7 @@
 
     if (debug_)
     {
-      boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+      boost::this_thread::sleep(boost::posix_time::milliseconds(50));
     }
   }
 
@@ -1254,7 +1198,7 @@
   std::list<std::string>  retrievedInstances_;
   std::auto_ptr<WadoRetrieveAnswer>  answer_;
   uint64_t                networkSize_;
-
+  bool                    debug_;
 
   bool SetupNextResource(OrthancPlugins::HttpClient& client,
                          JobContext& context)
@@ -1271,7 +1215,7 @@
       context.SetProgress(position_, resources_.size());
 
       answer_.reset(new WadoRetrieveAnswer);
-      answer_->SetDebug(true);   // TODO
+      answer_->SetDebug(debug_);
 
       const Resource* resource = resources_[position_++];
       if (resource == NULL)
@@ -1343,7 +1287,8 @@
     serverName_(serverName),
     position_(0),
     stopped_(false),
-    networkSize_(0)
+    networkSize_(0),
+    debug_(false)
   {
     SetFactory(*this);
   }
@@ -1359,6 +1304,11 @@
     }
   }
 
+  void SetDebug(bool debug)
+  {
+    debug_ = debug;
+  }
+
   void AddResource(const std::string uri)
   {
     resources_.push_back(new Resource(uri));
@@ -1403,6 +1353,12 @@
   std::auto_ptr<WadoRetrieveJob>  job(new WadoRetrieveJob(serverName));
   job->AddResourceFromRequest(body);
 
+  bool debug;
+  if (OrthancPlugins::LookupBooleanValue(debug, body, "Debug"))
+  {
+    job->SetDebug(debug);
+  }
+
   SubmitJob(output, job.release(), body, false /* asynchronous by default */);
 }
 
@@ -1499,6 +1455,12 @@
     job->AddResource(uri, additionalHeaders);
   }
 
+  bool debug;
+  if (OrthancPlugins::LookupBooleanValue(debug, body, "Debug"))
+  {
+    job->SetDebug(debug);
+  }
+
   SubmitJob(output, job.release(), body, 
             true /* synchronous by default, for compatibility with <= 0.6 */);
 }