comparison OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h @ 5130:f2dcdbe05884

ResourceModification jobs can now use multiple threads
author Alain Mazy <am@osimis.io>
date Thu, 05 Jan 2023 17:24:43 +0100
parents
children 6aa41d86b948
comparison
equal deleted inserted replaced
5128:ede035d48b8e 5130:f2dcdbe05884
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2022 Osimis S.A., Belgium
6 * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
7 *
8 * This program is free software: you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public License
10 * as published by the Free Software Foundation, either version 3 of
11 * the License, or (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this program. If not, see
20 * <http://www.gnu.org/licenses/>.
21 **/
22
23
24 #pragma once
25
26 #include "../../../OrthancFramework/Sources/Compatibility.h" // For ORTHANC_OVERRIDE
27 #include "../../../OrthancFramework/Sources/JobsEngine/IJob.h"
28 #include "../../../OrthancFramework/Sources/MultiThreading/SharedMessageQueue.h"
29
30 #include <set>
31 #include <boost/thread/recursive_mutex.hpp>
32 #include <boost/thread.hpp>
33
34 namespace Orthanc
35 {
36 class ServerContext;
37
38 // This class is a threaded version of SetOfInstancesJob merged with CleaningInstancesJob
39 class ORTHANC_PUBLIC ThreadedSetOfInstancesJob : public IJob
40 {
41 public:
42 enum ThreadedJobStep // cannot use "Step" since there is a method with this name !
43 {
44 ThreadedJobStep_BeforeStart,
45 ThreadedJobStep_ProcessingInstances,
46 ThreadedJobStep_PostProcessingInstances,
47 ThreadedJobStep_Cleanup,
48 ThreadedJobStep_Done
49 };
50
51 private:
52 std::set<std::string> failedInstances_;
53 std::set<std::string> parentResources_;
54
55 size_t processedInstancesCount_;
56 SharedMessageQueue instancesToProcess_;
57 std::vector<boost::shared_ptr<boost::thread> > instancesWorkers_;
58 std::set<std::string> instances_;
59
60 bool hasPostProcessing_; // final step before "KeepSource" cleanup
61 bool started_;
62 bool stopRequested_;
63 bool permissive_;
64 ThreadedJobStep currentStep_;
65 std::string description_;
66 size_t workersCount_;
67
68 ServerContext& context_;
69 bool keepSource_;
70 protected:
71 mutable boost::recursive_mutex mutex_;
72
73 public:
74 ThreadedSetOfInstancesJob(ServerContext& context,
75 bool hasTrailingStep,
76 bool keepSource,
77 size_t workersCount);
78
79 explicit ThreadedSetOfInstancesJob(ServerContext& context,
80 const Json::Value& source,
81 bool hasTrailingStep,
82 bool defaultKeepSource);
83
84 virtual ~ThreadedSetOfInstancesJob();
85
86 protected:
87 virtual bool HandleInstance(const std::string& instance) = 0;
88
89 virtual void PostProcessInstances() {}
90
91 void InitWorkers(size_t workersCount);
92
93 void StopWorkers();
94
95 void WaitWorkersComplete();
96
97 static void InstanceWorkerThread(ThreadedSetOfInstancesJob* that);
98
99 const std::string& GetInstance(size_t index) const;
100
101 bool HasPostProcessingStep() const;
102
103 bool HasCleanupStep() const;
104
105 public:
106
107 ThreadedJobStep GetCurrentStep() const;
108
109 void SetDescription(const std::string& description);
110
111 const std::string& GetDescription() const;
112
113 void SetKeepSource(bool keep);
114
115 void GetInstances(std::set<std::string>& target) const;
116
117 void GetFailedInstances(std::set<std::string>& target) const;
118
119 size_t GetInstancesCount() const;
120
121 void AddInstances(const std::list<std::string>& instances);
122
123 void AddParentResource(const std::string &resource);
124
125 bool IsPermissive() const;
126
127 void SetPermissive(bool permissive);
128
129 virtual void Reset() ORTHANC_OVERRIDE;
130
131 virtual void Start() ORTHANC_OVERRIDE;
132
133 virtual void Stop(JobStopReason reason) ORTHANC_OVERRIDE;
134
135 virtual float GetProgress() ORTHANC_OVERRIDE;
136
137 bool IsStarted() const;
138
139 virtual JobStepResult Step(const std::string& jobId) ORTHANC_OVERRIDE;
140
141 virtual void GetPublicContent(Json::Value& value) ORTHANC_OVERRIDE;
142
143 virtual bool Serialize(Json::Value& target) ORTHANC_OVERRIDE;
144
145 virtual bool GetOutput(std::string& output,
146 MimeType& mime,
147 std::string& filename,
148 const std::string& key) ORTHANC_OVERRIDE;
149
150 bool IsFailedInstance(const std::string& instance) const;
151
152 ServerContext& GetContext() const
153 {
154 return context_;
155 }
156
157 };
158 }