diff OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h @ 5130:f2dcdbe05884

ResourceModification jobs can now use multiple threads
author Alain Mazy <am@osimis.io>
date Thu, 05 Jan 2023 17:24:43 +0100
parents
children 6aa41d86b948
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h	Thu Jan 05 17:24:43 2023 +0100
@@ -0,0 +1,158 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
+ * Department, University Hospital of Liege, Belgium
+ * Copyright (C) 2017-2022 Osimis S.A., Belgium
+ * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "../../../OrthancFramework/Sources/Compatibility.h"  // For ORTHANC_OVERRIDE
+#include "../../../OrthancFramework/Sources/JobsEngine/IJob.h"
+#include "../../../OrthancFramework/Sources/MultiThreading/SharedMessageQueue.h"
+
+#include <set>
+#include <boost/thread/recursive_mutex.hpp>
+#include <boost/thread.hpp>
+
+namespace Orthanc
+{
+  class ServerContext;
+
+  // This class is a threaded version of SetOfInstancesJob merged with CleaningInstancesJob
+  class ORTHANC_PUBLIC ThreadedSetOfInstancesJob : public IJob
+  {
+  public:
+    enum ThreadedJobStep  // cannot use "Step" since there is a method with this name !
+    {
+      ThreadedJobStep_BeforeStart,
+      ThreadedJobStep_ProcessingInstances,
+      ThreadedJobStep_PostProcessingInstances,
+      ThreadedJobStep_Cleanup,
+      ThreadedJobStep_Done
+    };
+
+  private:
+    std::set<std::string>  failedInstances_;
+    std::set<std::string>  parentResources_;
+    
+    size_t                              processedInstancesCount_;
+    SharedMessageQueue                  instancesToProcess_;
+    std::vector<boost::shared_ptr<boost::thread> >         instancesWorkers_;
+    std::set<std::string>               instances_;
+
+    bool                    hasPostProcessing_;  // final step before "KeepSource" cleanup
+    bool                    started_;
+    bool                    stopRequested_;
+    bool                    permissive_;
+    ThreadedJobStep         currentStep_;
+    std::string             description_;
+    size_t                  workersCount_;
+
+    ServerContext&          context_;
+    bool                    keepSource_;
+  protected:
+    mutable boost::recursive_mutex mutex_;
+
+  public:
+    ThreadedSetOfInstancesJob(ServerContext& context,
+                              bool hasTrailingStep,
+                              bool keepSource,
+                              size_t workersCount);
+
+    explicit ThreadedSetOfInstancesJob(ServerContext& context,
+                                       const Json::Value& source,
+                                       bool hasTrailingStep,
+                                       bool defaultKeepSource);
+
+    virtual ~ThreadedSetOfInstancesJob();
+
+  protected:
+    virtual bool HandleInstance(const std::string& instance) = 0;
+
+    virtual void PostProcessInstances() {}
+
+    void InitWorkers(size_t workersCount);
+
+    void StopWorkers();
+
+    void WaitWorkersComplete();
+
+    static void InstanceWorkerThread(ThreadedSetOfInstancesJob* that);
+
+    const std::string& GetInstance(size_t index) const;
+
+    bool HasPostProcessingStep() const;
+
+    bool HasCleanupStep() const;
+
+  public:
+
+    ThreadedJobStep GetCurrentStep() const;
+
+    void SetDescription(const std::string& description);
+
+    const std::string& GetDescription() const;
+
+    void SetKeepSource(bool keep);
+
+    void GetInstances(std::set<std::string>& target) const;
+
+    void GetFailedInstances(std::set<std::string>& target) const;
+
+    size_t GetInstancesCount() const;
+
+    void AddInstances(const std::list<std::string>& instances);
+
+    void AddParentResource(const std::string &resource);
+
+    bool IsPermissive() const;
+
+    void SetPermissive(bool permissive);
+
+    virtual void Reset() ORTHANC_OVERRIDE;
+    
+    virtual void Start() ORTHANC_OVERRIDE;
+    
+    virtual void Stop(JobStopReason reason) ORTHANC_OVERRIDE;
+
+    virtual float GetProgress() ORTHANC_OVERRIDE;
+
+    bool IsStarted() const;
+
+    virtual JobStepResult Step(const std::string& jobId) ORTHANC_OVERRIDE;
+    
+    virtual void GetPublicContent(Json::Value& value) ORTHANC_OVERRIDE;
+    
+    virtual bool Serialize(Json::Value& target) ORTHANC_OVERRIDE;
+
+    virtual bool GetOutput(std::string& output,
+                           MimeType& mime,
+                           std::string& filename,
+                           const std::string& key) ORTHANC_OVERRIDE;
+
+    bool IsFailedInstance(const std::string& instance) const;
+
+    ServerContext& GetContext() const
+    {
+      return context_;
+    }
+
+  };
+}