comparison Core/JobsEngine/JobsRegistry.h @ 4024:1d2b31fc782f more-changes

new 'changes': JobSubmitted, JobSuccess, JobFailure
author Alain Mazy <alain@mazy.be>
date Tue, 09 Jun 2020 12:20:42 +0200
parents a2e4edc7b9aa
children
comparison
equal deleted inserted replaced
4023:cbdf62468d77 4024:1d2b31fc782f
18 * modify file(s) with this exception, you may extend this exception to 18 * modify file(s) with this exception, you may extend this exception to
19 * your version of the file(s), but you are not obligated to do so. If 19 * your version of the file(s), but you are not obligated to do so. If
20 * you do not wish to do so, delete this exception statement from your 20 * you do not wish to do so, delete this exception statement from your
21 * version. If you delete this exception statement from all source files 21 * version. If you delete this exception statement from all source files
22 * in the program, then also delete it here. 22 * in the program, then also delete it here.
23 * 23 *
24 * This program is distributed in the hope that it will be useful, but 24 * This program is distributed in the hope that it will be useful, but
25 * WITHOUT ANY WARRANTY; without even the implied warranty of 25 * WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27 * General Public License for more details. 27 * General Public License for more details.
28 * 28 *
67 67
68 virtual void SignalJobSuccess(const std::string& jobId) = 0; 68 virtual void SignalJobSuccess(const std::string& jobId) = 0;
69 69
70 virtual void SignalJobFailure(const std::string& jobId) = 0; 70 virtual void SignalJobFailure(const std::string& jobId) = 0;
71 }; 71 };
72 72
73 private: 73 private:
74 enum CompletedReason 74 enum CompletedReason
75 { 75 {
76 CompletedReason_Success, 76 CompletedReason_Success,
77 CompletedReason_Failure, 77 CompletedReason_Failure,
78 CompletedReason_Canceled 78 CompletedReason_Canceled
79 }; 79 };
80 80
81 class JobHandler; 81 class JobHandler;
82 82
83 struct PriorityComparator 83 struct PriorityComparator
84 { 84 {
85 bool operator() (JobHandler*& a, 85 bool operator() (JobHandler*& a,
86 JobHandler*& b) const; 86 JobHandler*& b) const;
87 }; 87 };
88 88
89 typedef std::set<IObserver*> Observers;
90 typedef std::map<std::string, JobHandler*> JobsIndex; 89 typedef std::map<std::string, JobHandler*> JobsIndex;
91 typedef std::list<JobHandler*> CompletedJobs; 90 typedef std::list<JobHandler*> CompletedJobs;
92 typedef std::set<JobHandler*> RetryJobs; 91 typedef std::set<JobHandler*> RetryJobs;
93 typedef std::priority_queue<JobHandler*, 92 typedef std::priority_queue<JobHandler*,
94 std::vector<JobHandler*>, // Could be a "std::deque" 93 std::vector<JobHandler*>, // Could be a "std::deque"
95 PriorityComparator> PendingJobs; 94 PriorityComparator> PendingJobs;
96 95
97 boost::mutex mutex_; 96 boost::mutex mutex_;
98 JobsIndex jobsIndex_; 97 JobsIndex jobsIndex_;
102 101
103 boost::condition_variable pendingJobAvailable_; 102 boost::condition_variable pendingJobAvailable_;
104 boost::condition_variable someJobComplete_; 103 boost::condition_variable someJobComplete_;
105 size_t maxCompletedJobs_; 104 size_t maxCompletedJobs_;
106 105
107 boost::shared_mutex observersMutex_; 106 IObserver* observer_;
108 Observers observers_;
109 107
110 108
111 #ifndef NDEBUG 109 #ifndef NDEBUG
112 bool IsPendingJob(const JobHandler& job) const; 110 bool IsPendingJob(const JobHandler& job) const;
113 111
114 bool IsCompletedJob(JobHandler& job) const; 112 bool IsCompletedJob(JobHandler& job) const;
115 113
116 bool IsRetryJob(JobHandler& job) const; 114 bool IsRetryJob(JobHandler& job) const;
117 #endif 115 #endif
118 116
119 void CheckInvariants() const; 117 void CheckInvariants() const;
120 118
121 void ForgetOldCompletedJobs(); 119 void ForgetOldCompletedJobs();
122 120
123 void SetCompletedJob(JobHandler& job, 121 void SetCompletedJob(JobHandler& job,
124 bool success); 122 bool success);
125 123
126 void MarkRunningAsCompleted(JobHandler& job, 124 void MarkRunningAsCompleted(JobHandler& job,
127 CompletedReason reason); 125 CompletedReason reason);
128 126
129 void MarkRunningAsRetry(JobHandler& job, 127 void MarkRunningAsRetry(JobHandler& job,
130 unsigned int timeout); 128 unsigned int timeout);
131 129
132 void MarkRunningAsPaused(JobHandler& job); 130 void MarkRunningAsPaused(JobHandler& job);
133 131
134 bool GetStateInternal(JobState& state, 132 bool GetStateInternal(JobState& state,
135 const std::string& id); 133 const std::string& id);
136 134
137 void RemovePendingJob(const std::string& id); 135 void RemovePendingJob(const std::string& id);
138 136
139 void RemoveRetryJob(JobHandler* handler); 137 void RemoveRetryJob(JobHandler* handler);
140 138
141 void SubmitInternal(std::string& id, 139 void SubmitInternal(std::string& id,
142 JobHandler* handler); 140 JobHandler* handler);
143 141
144 public: 142 public:
145 JobsRegistry(size_t maxCompletedJobs) : 143 JobsRegistry(size_t maxCompletedJobs) :
146 maxCompletedJobs_(maxCompletedJobs) 144 maxCompletedJobs_(maxCompletedJobs),
145 observer_(NULL)
147 { 146 {
148 } 147 }
149 148
150 JobsRegistry(IJobUnserializer& unserializer, 149 JobsRegistry(IJobUnserializer& unserializer,
151 const Json::Value& s, 150 const Json::Value& s,
152 size_t maxCompletedJobs); 151 size_t maxCompletedJobs);
153 152
154 ~JobsRegistry(); 153 ~JobsRegistry();
155 154
156 void SetMaxCompletedJobs(size_t i); 155 void SetMaxCompletedJobs(size_t i);
157 156
158 size_t GetMaxCompletedJobs(); 157 size_t GetMaxCompletedJobs();
159 158
160 void ListJobs(std::set<std::string>& target); 159 void ListJobs(std::set<std::string>& target);
161 160
162 bool GetJobInfo(JobInfo& target, 161 bool GetJobInfo(JobInfo& target,
166 MimeType& mime, 165 MimeType& mime,
167 const std::string& job, 166 const std::string& job,
168 const std::string& key); 167 const std::string& key);
169 168
170 void Serialize(Json::Value& target); 169 void Serialize(Json::Value& target);
171 170
172 void Submit(std::string& id, 171 void Submit(std::string& id,
173 IJob* job, // Takes ownership 172 IJob* job, // Takes ownership
174 int priority); 173 int priority);
175 174
176 void Submit(IJob* job, // Takes ownership 175 void Submit(IJob* job, // Takes ownership
177 int priority); 176 int priority);
178 177
179 void SubmitAndWait(Json::Value& successContent, 178 void SubmitAndWait(Json::Value& successContent,
180 IJob* job, // Takes ownership 179 IJob* job, // Takes ownership
181 int priority); 180 int priority);
182 181
183 bool SetPriority(const std::string& id, 182 bool SetPriority(const std::string& id,
184 int priority); 183 int priority);
185 184
186 bool Pause(const std::string& id); 185 bool Pause(const std::string& id);
187 186
188 bool Resume(const std::string& id); 187 bool Resume(const std::string& id);
189 188
190 bool Resubmit(const std::string& id); 189 bool Resubmit(const std::string& id);
191 190
192 bool Cancel(const std::string& id); 191 bool Cancel(const std::string& id);
193 192
194 void ScheduleRetries(); 193 void ScheduleRetries();
195 194
196 bool GetState(JobState& state, 195 bool GetState(JobState& state,
197 const std::string& id); 196 const std::string& id);
198 197
199 void AddObserver(IObserver& observer); 198 void SetObserver(IObserver& observer);
200 199
201 void ResetObserver(IObserver& observer); 200 void ResetObserver();
202 201
203 void GetStatistics(unsigned int& pending, 202 void GetStatistics(unsigned int& pending,
204 unsigned int& running, 203 unsigned int& running,
205 unsigned int& success, 204 unsigned int& success,
206 unsigned int& errors); 205 unsigned int& errors);
218 std::string id_; 217 std::string id_;
219 int priority_; 218 int priority_;
220 JobState targetState_; 219 JobState targetState_;
221 unsigned int targetRetryTimeout_; 220 unsigned int targetRetryTimeout_;
222 bool canceled_; 221 bool canceled_;
223 222
224 public: 223 public:
225 RunningJob(JobsRegistry& registry, 224 RunningJob(JobsRegistry& registry,
226 unsigned int timeout); 225 unsigned int timeout);
227 226
228 ~RunningJob(); 227 ~RunningJob();