changeset 2583:1b6a6d80b6f2 jobs

OrthancPeerStoreJob
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 14 May 2018 20:43:16 +0200
parents b3da733d984c
children 38b5045f2bff
files Core/JobsEngine/IJob.h Core/JobsEngine/JobsRegistry.cpp OrthancExplorer/explorer.js OrthancServer/OrthancRestApi/OrthancRestModalities.cpp UnitTestsSources/MultiThreadingTests.cpp
diffstat 5 files changed, 249 insertions(+), 154 deletions(-) [+]
line wrap: on
line diff
--- a/Core/JobsEngine/IJob.h	Fri May 11 17:58:06 2018 +0200
+++ b/Core/JobsEngine/IJob.h	Mon May 14 20:43:16 2018 +0200
@@ -52,7 +52,10 @@
     
     virtual JobStepResult* ExecuteStep() = 0;
 
-    virtual void ReleaseResources() = 0;   // For pausing jobs
+    // Method called once the job is resubmitted after a failure
+    virtual void SignalResubmit() = 0;
+
+    virtual void ReleaseResources() = 0;   // For pausing/canceling jobs
 
     virtual float GetProgress() = 0;
 
--- a/Core/JobsEngine/JobsRegistry.cpp	Fri May 11 17:58:06 2018 +0200
+++ b/Core/JobsEngine/JobsRegistry.cpp	Mon May 14 20:43:16 2018 +0200
@@ -773,6 +773,8 @@
     }
     else
     {
+      found->second->GetJob().SignalResubmit();
+      
       bool ok = false;
       for (CompletedJobs::iterator it = completedJobs_.begin(); 
            it != completedJobs_.end(); ++it)
--- a/OrthancExplorer/explorer.js	Fri May 11 17:58:06 2018 +0200
+++ b/OrthancExplorer/explorer.js	Mon May 14 20:43:16 2018 +0200
@@ -1151,6 +1151,22 @@
       var target = $('#all-jobs');
       $('li', target).remove();
 
+      var running = $('<li>')
+          .attr('data-role', 'list-divider')
+          .text('Currently running');
+
+      var pending = $('<li>')
+          .attr('data-role', 'list-divider')
+          .text('Pending jobs');
+
+      var inactive = $('<li>')
+          .attr('data-role', 'list-divider')
+          .text('Inactive jobs');
+
+      target.append(running);
+      target.append(pending);
+      target.append(inactive);
+
       jobs.map(function(job) {
         var li = $('<li>');
         var item = $('<a>');
@@ -1164,7 +1180,16 @@
         AddJobDateField(item, 'Creation time: ', job.CreationTime);
         AddJobDateField(item, 'Completion time: ', job.CompletionTime);
         AddJobDateField(item, 'ETA: ', job.EstimatedTimeOfArrival);
-        target.append(li);
+
+        if (job.State == 'Running') {
+          AddJobField(item, 'Progress: ', job.Progress);
+          li.insertAfter(running);
+        } else if (job.State == 'Pending' ||
+                   job.State == 'Paused') {
+          li.insertAfter(pending);
+        } else {
+          li.insertAfter(inactive);
+        }
       });
 
       target.listview('refresh');
@@ -1210,8 +1235,14 @@
                       .text('Detailed information'));
 
         var block = $('<li>');
-        for (var i in job.PublicContent) {
-          AddJobField(block, i + ': ', JSON.stringify(job.PublicContent[i]));
+
+        for (var item in job.PublicContent) {
+          var value = job.PublicContent[item];
+          if (typeof value !== 'string') {
+            value = JSON.stringify(value);
+          }
+          
+          AddJobField(block, item + ': ', value);
         }
 
         target.append(block);
@@ -1251,6 +1282,7 @@
     url: '../jobs/' + $.mobile.pageData.uuid + '/' + action,
     type: 'POST',
     async: false,
+    cache: false,
     complete: function(s) {
       window.location.reload();
     }
--- a/OrthancServer/OrthancRestApi/OrthancRestModalities.cpp	Fri May 11 17:58:06 2018 +0200
+++ b/OrthancServer/OrthancRestApi/OrthancRestModalities.cpp	Mon May 14 20:43:16 2018 +0200
@@ -48,16 +48,22 @@
 
 namespace Orthanc
 {
-  class InstancesIteratorJob : public IJob
+  class SetOfInstancesJob : public IJob
   {
   private:
     bool                      started_;
     std::vector<std::string>  instances_;
+    bool                      permissive_;
     size_t                    position_;
+    std::set<std::string>     failedInstances_;
+
+  protected:
+    virtual bool HandleInstance(const std::string& instance) = 0;
 
   public:
-    InstancesIteratorJob() :
+    SetOfInstancesJob() :
       started_(false),
+      permissive_(false),
       position_(0)
     {
     }
@@ -90,7 +96,37 @@
         instances_.push_back(instance);
       }
     }
-    
+
+    bool IsPermissive() const
+    {
+      return permissive_;
+    }
+
+    void SetPermissive(bool permissive)
+    {
+      if (IsStarted())
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+      else
+      {
+        permissive_ = permissive;
+      }
+    }
+
+    virtual void SignalResubmit()
+    {
+      if (started_)
+      {
+        position_ = 0;
+        failedInstances_.clear();
+      }
+      else
+      {
+        throw OrthancException(ErrorCode_BadSequenceOfCalls);
+      }
+    }
+
     virtual void Start()
     {
       started_ = true;
@@ -142,22 +178,98 @@
         return instances_[position_];
       }      
     }
+
+
+    const std::vector<std::string>& GetInstances() const
+    {
+      return instances_;
+    }
+
+  
+    const std::set<std::string>& GetFailedInstances() const
+    {
+      return failedInstances_;
+    }
+
+  
+    virtual JobStepResult* ExecuteStep()
+    {
+      if (IsDone())
+      {
+        return new JobStepResult(JobStepCode_Failure);
+      }
+
+      bool ok;
+      
+      try
+      {
+        ok = HandleInstance(GetCurrentInstance());
+      }
+      catch (OrthancException& e)
+      {
+        ok = false;
+      }
+
+      if (!ok)
+      {
+        if (permissive_)
+        {
+          failedInstances_.insert(GetCurrentInstance());
+        }
+        else
+        {
+          return new JobStepResult(JobStepCode_Failure);
+        }
+      }
+
+      Next();
+
+      if (IsDone())
+      {
+        return new JobStepResult(JobStepCode_Success);
+      }
+      else
+      {
+        return new JobStepResult(JobStepCode_Continue);
+      }
+    }
+
+    virtual void GetInternalContent(Json::Value& value)
+    {
+      Json::Value v = Json::arrayValue;
+      
+      for (size_t i = 0; i < instances_.size(); i++)
+      {
+        v.append(instances_[i]);
+      }
+
+      value["Instances"] = v;
+
+      
+      v = Json::arrayValue;
+
+      for (std::set<std::string>::const_iterator it = failedInstances_.begin();
+           it != failedInstances_.end(); ++it)
+      {
+        v.append(*it);
+      }
+      
+      value["FailedInstances"] = v;
+    }
   };
 
 
-  class StoreScuJob : public InstancesIteratorJob
+  class DicomStoreJob : public SetOfInstancesJob
   {
   private:
     ServerContext&                      context_;
     std::string                         localAet_;
     RemoteModalityParameters            remote_;
-    bool                                permissive_;
     std::string                         moveOriginatorAet_;
     uint16_t                            moveOriginatorId_;
     std::auto_ptr<DicomUserConnection>  connection_;
-    std::set<std::string>               failedInstances_;
 
-    void CreateConnection()
+    void OpenConnection()
     {
       if (connection_.get() == NULL)
       {
@@ -166,32 +278,40 @@
         connection_->SetRemoteModality(remote_);
       }
     }
+
+  protected:
+    virtual bool HandleInstance(const std::string& instance)
+    {
+      OpenConnection();
+
+      LOG(INFO) << "Sending instance " << instance << " to modality \"" 
+                << remote_.GetApplicationEntityTitle() << "\"";
+
+      std::string dicom;
+      context_.ReadDicom(dicom, GetCurrentInstance());
+
+      if (HasMoveOriginator())
+      {
+        connection_->Store(dicom, moveOriginatorAet_, moveOriginatorId_);
+      }
+      else
+      {
+        connection_->Store(dicom);
+      }
+
+      boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+
+      return true;
+    }
     
   public:
-    StoreScuJob(ServerContext& context) :
+    DicomStoreJob(ServerContext& context) :
       context_(context),
       localAet_("ORTHANC"),
-      permissive_(false),
       moveOriginatorId_(0)  // By default, not a C-MOVE
     {
     }
 
-    void AddResource(const std::string& publicId)
-    {
-      typedef std::list<std::string> Instances;
-
-      Instances instances;
-      context_.GetIndex().GetChildInstances(instances, publicId);
-
-      Reserve(GetInstancesCount() + instances.size());
-      
-      for (Instances::const_iterator it = instances.begin();
-           it != instances.end(); ++it)
-      {
-        AddInstance(*it);
-      }
-    }
-
     const std::string& GetLocalAet() const
     {
       return localAet_;
@@ -226,23 +346,6 @@
       }
     }
 
-    bool IsPermissive() const
-    {
-      return permissive_;
-    }
-
-    void SetPermissive(bool permissive)
-    {
-      if (IsStarted())
-      {
-        throw OrthancException(ErrorCode_BadSequenceOfCalls);
-      }
-      else
-      {
-        permissive_ = permissive;
-      }
-    }
-
     bool HasMoveOriginator() const
     {
       return moveOriginatorId_ != 0;
@@ -291,63 +394,6 @@
       }
     }
 
-    virtual JobStepResult* ExecuteStep()
-    {
-      if (IsDone())
-      {
-        return new JobStepResult(JobStepCode_Success);
-      }
-
-      CreateConnection();
-
-      bool ok = false;
-      
-      try
-      {
-        std::string dicom;
-        context_.ReadDicom(dicom, GetCurrentInstance());
-
-        if (HasMoveOriginator())
-        {
-          connection_->Store(dicom, moveOriginatorAet_, moveOriginatorId_);
-        }
-        else
-        {
-          connection_->Store(dicom);
-        }
-
-        boost::this_thread::sleep(boost::posix_time::milliseconds(300));
-
-        ok = true;
-      }
-      catch (OrthancException& e)
-      {
-      }
-
-      if (!ok)
-      {
-        if (permissive_)
-        {
-          failedInstances_.insert(GetCurrentInstance());
-        }
-        else
-        {
-          return new JobStepResult(JobStepCode_Failure);
-        }
-      }
-
-      Next();
-
-      if (IsDone())
-      {
-        return new JobStepResult(JobStepCode_Success);
-      }
-      else
-      {
-        return new JobStepResult(JobStepCode_Continue);
-      }
-    }
-
     virtual void ReleaseResources()   // For pausing jobs
     {
       connection_.reset(NULL);
@@ -355,7 +401,7 @@
 
     virtual void GetJobType(std::string& target)
     {
-      target = "C-Store";
+      target = "DicomStore";
     }
 
     virtual void GetPublicContent(Json::Value& value)
@@ -369,19 +415,66 @@
         value["MoveOriginatorID"] = GetMoveOriginatorId();
       }
 
-      Json::Value v = Json::arrayValue;
-      for (std::set<std::string>::const_iterator it = failedInstances_.begin();
-           it != failedInstances_.end(); ++it)
+      value["InstancesCount"] = static_cast<uint32_t>(GetInstances().size());
+      value["FailedInstancesCount"] = static_cast<uint32_t>(GetFailedInstances().size());
+    }
+  };
+
+
+  class OrthancPeerStoreJob : public SetOfInstancesJob
+  {
+  private:
+    ServerContext&             context_;
+    WebServiceParameters       peer_;
+    std::auto_ptr<HttpClient>  client_;
+
+  protected:
+    virtual bool HandleInstance(const std::string& instance)
+    {
+      if (client_.get() == NULL)
       {
-        v.append(*it);
+        client_.reset(new HttpClient(peer_, "instances"));
+        client_->SetMethod(HttpMethod_Post);
       }
+      
+      LOG(INFO) << "Sending instance " << instance << " to peer \"" 
+                << peer_.GetUrl() << "\"";
 
-      value["FailedInstances"] = v;
+      context_.ReadDicom(client_->GetBody(), GetCurrentInstance());
+
+      std::string answer;
+      return client_->Apply(answer);
+    }
+    
+  public:
+    OrthancPeerStoreJob(ServerContext& context) :
+      context_(context)
+    {
     }
 
-    virtual void GetInternalContent(Json::Value& value)
+    const WebServiceParameters& GetPeer() const
+    {
+      return peer_;
+    }
+
+    virtual void ReleaseResources()   // For pausing jobs
+    {
+      client_.reset(NULL);
+    }
+
+    virtual void GetJobType(std::string& target)
     {
-      // TODO
+      target = "OrthancPeerStore";
+    }
+
+    virtual void GetPublicContent(Json::Value& value)
+    {
+      Json::Value v;
+      peer_.ToJson(v);
+      value["Peer"] = v;
+        
+      value["InstancesCount"] = static_cast<uint32_t>(GetInstances().size());
+      value["FailedInstancesCount"] = static_cast<uint32_t>(GetFailedInstances().size());
     }
   };
 }
@@ -1036,25 +1129,20 @@
     int moveOriginatorID = Toolbox::GetJsonIntegerField(request, "MoveOriginatorID", 0 /* By default, not a C-MOVE */);
     int priority = Toolbox::GetJsonIntegerField(request, "Priority", 0);
 
-    if (moveOriginatorID < 0 || 
-        moveOriginatorID >= 65536)
-    {
-      throw OrthancException(ErrorCode_ParameterOutOfRange);
-    }
-    
     RemoteModalityParameters p = Configuration::GetModalityUsingSymbolicName(remote);
 
-#if 1
-    std::auto_ptr<StoreScuJob> job(new StoreScuJob(context));
+    std::auto_ptr<DicomStoreJob> job(new DicomStoreJob(context));
     job->SetLocalAet(localAet);
     job->SetRemoteModality(p);
     job->SetPermissive(permissive);
 
     if (moveOriginatorID != 0)
     {
-      job->SetMoveOriginator(moveOriginatorAET, static_cast<uint16_t>(moveOriginatorID));
+      job->SetMoveOriginator(moveOriginatorAET, moveOriginatorID);
     }
 
+    job->Reserve(instances.size());
+    
     for (std::list<std::string>::const_iterator 
            it = instances.begin(); it != instances.end(); ++it)
     {
@@ -1080,40 +1168,6 @@
     {
       call.GetOutput().SignalError(HttpStatus_500_InternalServerError);
     }
-    
-#else
-    ServerJob job;
-    for (std::list<std::string>::const_iterator 
-           it = instances.begin(); it != instances.end(); ++it)
-    {
-      std::auto_ptr<StoreScuCommand> command(new StoreScuCommand(context, localAet, p, permissive));
-
-      if (moveOriginatorID != 0)
-      {
-        command->SetMoveOriginator(moveOriginatorAET, static_cast<uint16_t>(moveOriginatorID));
-      }
-
-      job.AddCommand(command.release()).AddInput(*it);
-    }
-
-    job.SetDescription("HTTP request: Store-SCU to peer \"" + remote + "\"");
-
-    if (asynchronous)
-    {
-      // Asynchronous mode: Submit the job, but don't wait for its completion
-      context.GetScheduler().Submit(job);
-      call.GetOutput().AnswerBuffer("{}", "application/json");
-    }
-    else if (context.GetScheduler().SubmitAndWait(job))
-    {
-      // Synchronous mode: We have submitted and waited for completion
-      call.GetOutput().AnswerBuffer("{}", "application/json");
-    }
-    else
-    {
-      call.GetOutput().SignalError(HttpStatus_500_InternalServerError);
-    }
-#endif
   }
 
 
--- a/UnitTestsSources/MultiThreadingTests.cpp	Fri May 11 17:58:06 2018 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Mon May 14 20:43:16 2018 +0200
@@ -266,6 +266,10 @@
   virtual void Start()
   {
   }
+
+  virtual void SignalResubmit()
+  {
+  }
     
   virtual JobStepResult* ExecuteStep()
   {