comparison Core/JobsEngine/JobsEngine.cpp @ 2569:2af17cd5eb1f jobs

reorganization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 07 May 2018 15:37:20 +0200
parents
children 2e879c796ec7
comparison
equal deleted inserted replaced
2568:a46094602346 2569:2af17cd5eb1f
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation, either version 3 of the
10 * License, or (at your option) any later version.
11 *
12 * In addition, as a special exception, the copyright holders of this
13 * program give permission to link the code of its release with the
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it
15 * that use the same license as the "OpenSSL" library), and distribute
16 * the linked executables. You must obey the GNU General Public License
17 * in all respects for all of the code used other than "OpenSSL". If you
18 * modify file(s) with this exception, you may extend this exception to
19 * your version of the file(s), but you are not obligated to do so. If
20 * you do not wish to do so, delete this exception statement from your
21 * version. If you delete this exception statement from all source files
22 * in the program, then also delete it here.
23 *
24 * This program is distributed in the hope that it will be useful, but
25 * WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27 * General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program. If not, see <http://www.gnu.org/licenses/>.
31 **/
32
33
34 #include "../PrecompiledHeaders.h"
35 #include "JobsEngine.h"
36
37 #include "JobStepRetry.h"
38
39 #include "../Logging.h"
40 #include "../OrthancException.h"
41
42 namespace Orthanc
43 {
44 bool JobsEngine::ExecuteStep(JobsRegistry::RunningJob& running,
45 size_t workerIndex)
46 {
47 assert(running.IsValid());
48
49 LOG(INFO) << "Executing job with priority " << running.GetPriority()
50 << " in worker thread " << workerIndex << ": " << running.GetId();
51
52 if (running.IsPauseScheduled())
53 {
54 running.GetJob().ReleaseResources();
55 running.MarkPause();
56 return false;
57 }
58
59 std::auto_ptr<JobStepResult> result;
60
61 {
62 try
63 {
64 result.reset(running.GetJob().ExecuteStep());
65
66 if (result->GetCode() == JobStepCode_Failure)
67 {
68 running.UpdateStatus(ErrorCode_InternalError);
69 }
70 else
71 {
72 running.UpdateStatus(ErrorCode_Success);
73 }
74 }
75 catch (OrthancException& e)
76 {
77 running.UpdateStatus(e.GetErrorCode());
78 }
79 catch (boost::bad_lexical_cast&)
80 {
81 running.UpdateStatus(ErrorCode_BadFileFormat);
82 }
83 catch (...)
84 {
85 running.UpdateStatus(ErrorCode_InternalError);
86 }
87
88 if (result.get() == NULL)
89 {
90 result.reset(new JobStepResult(JobStepCode_Failure));
91 }
92 }
93
94 switch (result->GetCode())
95 {
96 case JobStepCode_Success:
97 running.MarkSuccess();
98 return false;
99
100 case JobStepCode_Failure:
101 running.MarkFailure();
102 return false;
103
104 case JobStepCode_Retry:
105 running.MarkRetry(dynamic_cast<JobStepRetry&>(*result).GetTimeout());
106 return false;
107
108 case JobStepCode_Continue:
109 return true;
110
111 default:
112 throw OrthancException(ErrorCode_InternalError);
113 }
114 }
115
116
117 void JobsEngine::RetryHandler(JobsEngine* engine)
118 {
119 assert(engine != NULL);
120
121 for (;;)
122 {
123 boost::this_thread::sleep(boost::posix_time::milliseconds(200));
124
125 {
126 boost::mutex::scoped_lock lock(engine->stateMutex_);
127
128 if (engine->state_ != State_Running)
129 {
130 return;
131 }
132 }
133
134 engine->GetRegistry().ScheduleRetries();
135 }
136 }
137
138
139 void JobsEngine::Worker(JobsEngine* engine,
140 size_t workerIndex)
141 {
142 assert(engine != NULL);
143
144 LOG(INFO) << "Worker thread " << workerIndex << " has started";
145
146 for (;;)
147 {
148 {
149 boost::mutex::scoped_lock lock(engine->stateMutex_);
150
151 if (engine->state_ != State_Running)
152 {
153 return;
154 }
155 }
156
157 JobsRegistry::RunningJob running(engine->GetRegistry(), 100);
158
159 if (running.IsValid())
160 {
161 for (;;)
162 {
163 if (!engine->ExecuteStep(running, workerIndex))
164 {
165 break;
166 }
167 }
168 }
169 }
170 }
171
172
173 JobsEngine::JobsEngine() :
174 state_(State_Setup),
175 workers_(1)
176 {
177 }
178
179
180 JobsEngine::~JobsEngine()
181 {
182 if (state_ != State_Setup &&
183 state_ != State_Done)
184 {
185 LOG(ERROR) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!";
186 Stop();
187 }
188 }
189
190
191 void JobsEngine::SetWorkersCount(size_t count)
192 {
193 if (count == 0)
194 {
195 throw OrthancException(ErrorCode_ParameterOutOfRange);
196 }
197
198 boost::mutex::scoped_lock lock(stateMutex_);
199
200 if (state_ != State_Setup)
201 {
202 // Can only be invoked before calling "Start()"
203 throw OrthancException(ErrorCode_BadSequenceOfCalls);
204 }
205
206 workers_.resize(count);
207 }
208
209
210 void JobsEngine::Start()
211 {
212 boost::mutex::scoped_lock lock(stateMutex_);
213
214 if (state_ != State_Setup)
215 {
216 throw OrthancException(ErrorCode_BadSequenceOfCalls);
217 }
218
219 retryHandler_ = boost::thread(RetryHandler, this);
220
221 for (size_t i = 0; i < workers_.size(); i++)
222 {
223 workers_[i] = boost::thread(Worker, this, i);
224 }
225
226 state_ = State_Running;
227
228 LOG(WARNING) << "The jobs engine has started";
229 }
230
231
232 void JobsEngine::Stop()
233 {
234 {
235 boost::mutex::scoped_lock lock(stateMutex_);
236
237 if (state_ != State_Running)
238 {
239 return;
240 }
241
242 state_ = State_Stopping;
243 }
244
245 LOG(INFO) << "Stopping the jobs engine";
246
247 if (retryHandler_.joinable())
248 {
249 retryHandler_.join();
250 }
251
252 for (size_t i = 0; i < workers_.size(); i++)
253 {
254 if (workers_[i].joinable())
255 {
256 workers_[i].join();
257 }
258 }
259
260 {
261 boost::mutex::scoped_lock lock(stateMutex_);
262 state_ = State_Done;
263 }
264
265 LOG(WARNING) << "The jobs engine has stopped";
266 }
267 }