Mercurial > hg > orthanc
diff OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h @ 5130:f2dcdbe05884
ResourceModification jobs can now use multiple threads
author | Alain Mazy <am@osimis.io> |
---|---|
date | Thu, 05 Jan 2023 17:24:43 +0100 |
parents | |
children | 6aa41d86b948 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h Thu Jan 05 17:24:43 2023 +0100 @@ -0,0 +1,158 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2022 Osimis S.A., Belgium + * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "../../../OrthancFramework/Sources/Compatibility.h" // For ORTHANC_OVERRIDE +#include "../../../OrthancFramework/Sources/JobsEngine/IJob.h" +#include "../../../OrthancFramework/Sources/MultiThreading/SharedMessageQueue.h" + +#include <set> +#include <boost/thread/recursive_mutex.hpp> +#include <boost/thread.hpp> + +namespace Orthanc +{ + class ServerContext; + + // This class is a threaded version of SetOfInstancesJob merged with CleaningInstancesJob + class ORTHANC_PUBLIC ThreadedSetOfInstancesJob : public IJob + { + public: + enum ThreadedJobStep // cannot use "Step" since there is a method with this name ! + { + ThreadedJobStep_BeforeStart, + ThreadedJobStep_ProcessingInstances, + ThreadedJobStep_PostProcessingInstances, + ThreadedJobStep_Cleanup, + ThreadedJobStep_Done + }; + + private: + std::set<std::string> failedInstances_; + std::set<std::string> parentResources_; + + size_t processedInstancesCount_; + SharedMessageQueue instancesToProcess_; + std::vector<boost::shared_ptr<boost::thread> > instancesWorkers_; + std::set<std::string> instances_; + + bool hasPostProcessing_; // final step before "KeepSource" cleanup + bool started_; + bool stopRequested_; + bool permissive_; + ThreadedJobStep currentStep_; + std::string description_; + size_t workersCount_; + + ServerContext& context_; + bool keepSource_; + protected: + mutable boost::recursive_mutex mutex_; + + public: + ThreadedSetOfInstancesJob(ServerContext& context, + bool hasTrailingStep, + bool keepSource, + size_t workersCount); + + explicit ThreadedSetOfInstancesJob(ServerContext& context, + const Json::Value& source, + bool hasTrailingStep, + bool defaultKeepSource); + + virtual ~ThreadedSetOfInstancesJob(); + + protected: + virtual bool HandleInstance(const std::string& instance) = 0; + + virtual void PostProcessInstances() {} + + void InitWorkers(size_t workersCount); + + void StopWorkers(); + + void WaitWorkersComplete(); + + static void InstanceWorkerThread(ThreadedSetOfInstancesJob* that); + + const std::string& GetInstance(size_t index) const; + + bool HasPostProcessingStep() const; + + bool HasCleanupStep() const; + + public: + + ThreadedJobStep GetCurrentStep() const; + + void SetDescription(const std::string& description); + + const std::string& GetDescription() const; + + void SetKeepSource(bool keep); + + void GetInstances(std::set<std::string>& target) const; + + void GetFailedInstances(std::set<std::string>& target) const; + + size_t GetInstancesCount() const; + + void AddInstances(const std::list<std::string>& instances); + + void AddParentResource(const std::string &resource); + + bool IsPermissive() const; + + void SetPermissive(bool permissive); + + virtual void Reset() ORTHANC_OVERRIDE; + + virtual void Start() ORTHANC_OVERRIDE; + + virtual void Stop(JobStopReason reason) ORTHANC_OVERRIDE; + + virtual float GetProgress() ORTHANC_OVERRIDE; + + bool IsStarted() const; + + virtual JobStepResult Step(const std::string& jobId) ORTHANC_OVERRIDE; + + virtual void GetPublicContent(Json::Value& value) ORTHANC_OVERRIDE; + + virtual bool Serialize(Json::Value& target) ORTHANC_OVERRIDE; + + virtual bool GetOutput(std::string& output, + MimeType& mime, + std::string& filename, + const std::string& key) ORTHANC_OVERRIDE; + + bool IsFailedInstance(const std::string& instance) const; + + ServerContext& GetContext() const + { + return context_; + } + + }; +}