Mercurial > hg > orthanc
annotate UnitTestsSources/MultiThreadingTests.cpp @ 2558:57f81b988713 jobs
cont
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 03 May 2018 15:24:33 +0200 |
parents | b4516a6f214b |
children | 9b7680dee75d |
rev | line source |
---|---|
827
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
1 /** |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
2 * Orthanc - A Lightweight, RESTful DICOM Store |
1900 | 3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics |
1288
6e7e5ed91c2d
upgrade to year 2015
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1009
diff
changeset
|
4 * Department, University Hospital of Liege, Belgium |
2447
878b59270859
upgrade to year 2018
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2382
diff
changeset
|
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium |
827
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
6 * |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
7 * This program is free software: you can redistribute it and/or |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
8 * modify it under the terms of the GNU General Public License as |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
9 * published by the Free Software Foundation, either version 3 of the |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
10 * License, or (at your option) any later version. |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
11 * |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
12 * In addition, as a special exception, the copyright holders of this |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
13 * program give permission to link the code of its release with the |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
15 * that use the same license as the "OpenSSL" library), and distribute |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
16 * the linked executables. You must obey the GNU General Public License |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
17 * in all respects for all of the code used other than "OpenSSL". If you |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
18 * modify file(s) with this exception, you may extend this exception to |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
19 * your version of the file(s), but you are not obligated to do so. If |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
20 * you do not wish to do so, delete this exception statement from your |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
21 * version. If you delete this exception statement from all source files |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
22 * in the program, then also delete it here. |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
23 * |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
24 * This program is distributed in the hope that it will be useful, but |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
25 * WITHOUT ANY WARRANTY; without even the implied warranty of |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
27 * General Public License for more details. |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
28 * |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
29 * You should have received a copy of the GNU General Public License |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
30 * along with this program. If not, see <http://www.gnu.org/licenses/>. |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
31 **/ |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
32 |
3d6f9b7d0add
precompiled headers in unit tests
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
776
diff
changeset
|
33 |
831
84513f2ee1f3
pch for unit tests and server
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
827
diff
changeset
|
34 #include "PrecompiledHeadersUnitTests.h" |
723 | 35 #include "gtest/gtest.h" |
36 | |
781 | 37 #include "../OrthancServer/Scheduler/ServerScheduler.h" |
723 | 38 #include "../Core/OrthancException.h" |
2143
fd5875662670
creation of namespace SystemToolbox
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2140
diff
changeset
|
39 #include "../Core/SystemToolbox.h" |
723 | 40 #include "../Core/Toolbox.h" |
760 | 41 #include "../Core/MultiThreading/Locker.h" |
42 #include "../Core/MultiThreading/Mutex.h" | |
43 #include "../Core/MultiThreading/ReaderWriterLock.h" | |
723 | 44 |
45 using namespace Orthanc; | |
46 | |
47 namespace | |
48 { | |
1396
ac4efabeb80c
Migration of the orthanc-client as a separate project
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1304
diff
changeset
|
49 class DynamicInteger : public IDynamicObject |
723 | 50 { |
51 private: | |
52 int value_; | |
53 std::set<int>& target_; | |
54 | |
55 public: | |
56 DynamicInteger(int value, std::set<int>& target) : | |
57 value_(value), target_(target) | |
58 { | |
59 } | |
60 | |
61 int GetValue() const | |
62 { | |
63 return value_; | |
64 } | |
65 }; | |
66 } | |
67 | |
68 | |
69 TEST(MultiThreading, SharedMessageQueueBasic) | |
70 { | |
71 std::set<int> s; | |
72 | |
73 SharedMessageQueue q; | |
74 ASSERT_TRUE(q.WaitEmpty(0)); | |
75 q.Enqueue(new DynamicInteger(10, s)); | |
76 ASSERT_FALSE(q.WaitEmpty(1)); | |
77 q.Enqueue(new DynamicInteger(20, s)); | |
78 q.Enqueue(new DynamicInteger(30, s)); | |
79 q.Enqueue(new DynamicInteger(40, s)); | |
80 | |
81 std::auto_ptr<DynamicInteger> i; | |
82 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue()); | |
83 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue()); | |
84 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue()); | |
85 ASSERT_FALSE(q.WaitEmpty(1)); | |
86 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue()); | |
87 ASSERT_TRUE(q.WaitEmpty(0)); | |
88 ASSERT_EQ(NULL, q.Dequeue(1)); | |
89 } | |
90 | |
91 | |
92 TEST(MultiThreading, SharedMessageQueueClean) | |
93 { | |
94 std::set<int> s; | |
95 | |
96 try | |
97 { | |
98 SharedMessageQueue q; | |
99 q.Enqueue(new DynamicInteger(10, s)); | |
100 q.Enqueue(new DynamicInteger(20, s)); | |
1583
9ea3d082b064
got rid of custom exceptions
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1582
diff
changeset
|
101 throw OrthancException(ErrorCode_InternalError); |
723 | 102 } |
103 catch (OrthancException&) | |
104 { | |
105 } | |
106 } | |
107 | |
108 | |
760 | 109 TEST(MultiThreading, Mutex) |
110 { | |
111 Mutex mutex; | |
112 Locker locker(mutex); | |
113 } | |
114 | |
115 | |
116 TEST(MultiThreading, ReaderWriterLock) | |
117 { | |
118 ReaderWriterLock lock; | |
119 | |
120 { | |
121 Locker locker1(lock.ForReader()); | |
122 Locker locker2(lock.ForReader()); | |
123 } | |
124 | |
125 { | |
126 Locker locker3(lock.ForWriter()); | |
127 } | |
128 } | |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
129 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
130 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
131 |
2382
7284093111b0
big reorganization to cleanly separate framework vs. server
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2244
diff
changeset
|
132 #include "../Core/DicomNetworking/ReusableDicomUserConnection.h" |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
133 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
134 TEST(ReusableDicomUserConnection, DISABLED_Basic) |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
135 { |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
136 ReusableDicomUserConnection c; |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
137 c.SetMillisecondsBeforeClose(200); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
138 printf("START\n"); fflush(stdout); |
775
d3ba35466225
integration mainline -> lua-scripting
Sebastien Jodogne <s.jodogne@gmail.com>
diff
changeset
|
139 |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
140 { |
1427
d710ea64f0fd
Custom setting of the local AET during C-Store SCU (both in Lua and in the REST API)
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1396
diff
changeset
|
141 RemoteModalityParameters remote("STORESCP", "localhost", 2000, ModalityManufacturer_Generic); |
d710ea64f0fd
Custom setting of the local AET during C-Store SCU (both in Lua and in the REST API)
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1396
diff
changeset
|
142 ReusableDicomUserConnection::Locker lock(c, "ORTHANC", remote); |
2222
21713ce8717b
Fix handling of Move Originator AET and ID in C-MOVE SCP
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2143
diff
changeset
|
143 lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676281"); |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
144 } |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
145 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
146 printf("**\n"); fflush(stdout); |
2242
4e8e0ad2001c
move USleep() in SystemToolbox
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2222
diff
changeset
|
147 SystemToolbox::USleep(1000000); |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
148 printf("**\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
149 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
150 { |
1427
d710ea64f0fd
Custom setting of the local AET during C-Store SCU (both in Lua and in the REST API)
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1396
diff
changeset
|
151 RemoteModalityParameters remote("STORESCP", "localhost", 2000, ModalityManufacturer_Generic); |
d710ea64f0fd
Custom setting of the local AET during C-Store SCU (both in Lua and in the REST API)
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1396
diff
changeset
|
152 ReusableDicomUserConnection::Locker lock(c, "ORTHANC", remote); |
2222
21713ce8717b
Fix handling of Move Originator AET and ID in C-MOVE SCP
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2143
diff
changeset
|
153 lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676277"); |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
154 } |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
155 |
2140 | 156 SystemToolbox::ServerBarrier(); |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
157 printf("DONE\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
158 } |
765 | 159 |
160 | |
161 | |
1000
13e230bbd882
rename filter to command
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
995
diff
changeset
|
162 class Tutu : public IServerCommand |
765 | 163 { |
164 private: | |
165 int factor_; | |
166 | |
167 public: | |
168 Tutu(int f) : factor_(f) | |
169 { | |
170 } | |
171 | |
172 virtual bool Apply(ListOfStrings& outputs, | |
173 const ListOfStrings& inputs) | |
174 { | |
175 for (ListOfStrings::const_iterator | |
1304 | 176 it = inputs.begin(); it != inputs.end(); ++it) |
765 | 177 { |
178 int a = boost::lexical_cast<int>(*it); | |
179 int b = factor_ * a; | |
180 | |
181 printf("%d * %d = %d\n", a, factor_, b); | |
182 | |
183 //if (a == 84) { printf("BREAK\n"); return false; } | |
184 | |
185 outputs.push_back(boost::lexical_cast<std::string>(b)); | |
186 } | |
187 | |
2242
4e8e0ad2001c
move USleep() in SystemToolbox
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2222
diff
changeset
|
188 SystemToolbox::USleep(30000); |
765 | 189 |
190 return true; | |
191 } | |
192 }; | |
193 | |
768 | 194 |
770 | 195 static void Tata(ServerScheduler* s, ServerJob* j, bool* done) |
768 | 196 { |
1000
13e230bbd882
rename filter to command
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
995
diff
changeset
|
197 typedef IServerCommand::ListOfStrings ListOfStrings; |
779 | 198 |
770 | 199 while (!(*done)) |
768 | 200 { |
201 ListOfStrings l; | |
202 s->GetListOfJobs(l); | |
1304 | 203 for (ListOfStrings::iterator it = l.begin(); it != l.end(); ++it) |
204 { | |
205 printf(">> %s: %0.1f\n", it->c_str(), 100.0f * s->GetProgress(*it)); | |
206 } | |
2242
4e8e0ad2001c
move USleep() in SystemToolbox
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2222
diff
changeset
|
207 SystemToolbox::USleep(3000); |
768 | 208 } |
209 } | |
210 | |
211 | |
1009
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
212 TEST(MultiThreading, ServerScheduler) |
765 | 213 { |
995
8c67382f44a7
limit number of jobs in the scheduler
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
994
diff
changeset
|
214 ServerScheduler scheduler(10); |
765 | 215 |
216 ServerJob job; | |
1000
13e230bbd882
rename filter to command
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
995
diff
changeset
|
217 ServerCommandInstance& f2 = job.AddCommand(new Tutu(2)); |
13e230bbd882
rename filter to command
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
995
diff
changeset
|
218 ServerCommandInstance& f3 = job.AddCommand(new Tutu(3)); |
13e230bbd882
rename filter to command
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
995
diff
changeset
|
219 ServerCommandInstance& f4 = job.AddCommand(new Tutu(4)); |
13e230bbd882
rename filter to command
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
995
diff
changeset
|
220 ServerCommandInstance& f5 = job.AddCommand(new Tutu(5)); |
765 | 221 f2.AddInput(boost::lexical_cast<std::string>(42)); |
222 //f3.AddInput(boost::lexical_cast<std::string>(42)); | |
223 //f4.AddInput(boost::lexical_cast<std::string>(42)); | |
1009
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
224 f2.ConnectOutput(f3); |
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
225 f3.ConnectOutput(f4); |
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
226 f4.ConnectOutput(f5); |
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
227 |
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
228 f3.SetConnectedToSink(true); |
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
229 f5.SetConnectedToSink(true); |
765 | 230 |
231 job.SetDescription("tutu"); | |
232 | |
770 | 233 bool done = false; |
234 boost::thread t(Tata, &scheduler, &job, &done); | |
768 | 235 |
236 | |
765 | 237 //scheduler.Submit(job); |
238 | |
1000
13e230bbd882
rename filter to command
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
995
diff
changeset
|
239 IServerCommand::ListOfStrings l; |
765 | 240 scheduler.SubmitAndWait(l, job); |
241 | |
1492 | 242 ASSERT_EQ(2u, l.size()); |
1009
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
243 ASSERT_EQ(42 * 2 * 3, boost::lexical_cast<int>(l.front())); |
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
244 ASSERT_EQ(42 * 2 * 3 * 4 * 5, boost::lexical_cast<int>(l.back())); |
26642cecd36d
clearer job interface
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1005
diff
changeset
|
245 |
1000
13e230bbd882
rename filter to command
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
995
diff
changeset
|
246 for (IServerCommand::ListOfStrings::iterator i = l.begin(); i != l.end(); i++) |
765 | 247 { |
248 printf("** %s\n", i->c_str()); | |
249 } | |
250 | |
2140 | 251 //SystemToolbox::ServerBarrier(); |
2242
4e8e0ad2001c
move USleep() in SystemToolbox
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
2222
diff
changeset
|
252 //SystemToolbox::USleep(3000000); |
768 | 253 |
1453
c0bdc47165ef
code to warn about possible threading problems
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1427
diff
changeset
|
254 scheduler.Stop(); |
c0bdc47165ef
code to warn about possible threading problems
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1427
diff
changeset
|
255 |
770 | 256 done = true; |
1453
c0bdc47165ef
code to warn about possible threading problems
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1427
diff
changeset
|
257 if (t.joinable()) |
c0bdc47165ef
code to warn about possible threading problems
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1427
diff
changeset
|
258 { |
c0bdc47165ef
code to warn about possible threading problems
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1427
diff
changeset
|
259 t.join(); |
c0bdc47165ef
code to warn about possible threading problems
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
1427
diff
changeset
|
260 } |
765 | 261 } |
2556 | 262 |
263 | |
264 | |
265 | |
266 | |
267 #if !defined(ORTHANC_SANDBOXED) | |
268 # error The macro ORTHANC_SANDBOXED must be defined | |
269 #endif | |
270 | |
271 #if ORTHANC_SANDBOXED == 1 | |
272 # error The job engine cannot be used in sandboxed environments | |
273 #endif | |
274 | |
2557 | 275 #include "../Core/Logging.h" |
276 | |
2556 | 277 #include <boost/date_time/posix_time/posix_time.hpp> |
2557 | 278 #include <queue> |
2556 | 279 |
280 namespace Orthanc | |
281 { | |
282 enum JobState | |
283 { | |
284 JobState_Pending, | |
285 JobState_Running, | |
286 JobState_Success, | |
287 JobState_Failure, | |
288 JobState_Paused, | |
289 JobState_Retry | |
290 }; | |
291 | |
292 enum JobStepStatus | |
293 { | |
294 JobStepStatus_Success, | |
2557 | 295 JobStepStatus_Failure, |
2556 | 296 JobStepStatus_Continue, |
297 JobStepStatus_Retry | |
298 }; | |
299 | |
300 | |
2557 | 301 class JobStepResult |
2556 | 302 { |
303 private: | |
304 JobStepStatus status_; | |
305 | |
306 public: | |
2557 | 307 explicit JobStepResult(JobStepStatus status) : |
308 status_(status) | |
2556 | 309 { |
310 } | |
311 | |
2557 | 312 virtual ~JobStepResult() |
2556 | 313 { |
314 } | |
315 | |
316 JobStepStatus GetStatus() const | |
317 { | |
318 return status_; | |
319 } | |
320 }; | |
321 | |
322 | |
2557 | 323 class RetryResult : public JobStepResult |
2556 | 324 { |
325 private: | |
326 unsigned int timeout_; // Retry after "timeout_" milliseconds | |
327 | |
328 public: | |
329 RetryResult(unsigned int timeout) : | |
2557 | 330 JobStepResult(JobStepStatus_Retry), |
331 timeout_(timeout) | |
2556 | 332 { |
333 } | |
334 | |
335 unsigned int GetTimeout() const | |
336 { | |
337 return timeout_; | |
338 } | |
339 }; | |
340 | |
341 | |
342 class IJob : public boost::noncopyable | |
343 { | |
344 public: | |
345 virtual ~IJob() | |
346 { | |
347 } | |
348 | |
2557 | 349 virtual JobStepResult* ExecuteStep() = 0; |
2556 | 350 |
351 virtual void ReleaseResources() = 0; // For pausing jobs | |
352 | |
353 virtual float GetProgress() = 0; | |
354 | |
355 virtual void FormatStatus(Json::Value& value) = 0; | |
356 }; | |
357 | |
358 | |
2557 | 359 class JobHandler : public boost::noncopyable |
360 { | |
361 private: | |
362 std::string id_; | |
363 JobState state_; | |
364 std::auto_ptr<IJob> job_; | |
365 int priority_; // "+inf()" means highest priority | |
366 boost::posix_time::ptime creationTime_; | |
367 boost::posix_time::ptime lastUpdateTime_; | |
368 boost::posix_time::ptime retryTime_; | |
369 uint64_t runtime_; // In milliseconds | |
370 bool pauseScheduled_; | |
371 | |
372 void SetStateInternal(JobState state) | |
373 { | |
374 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
375 | |
376 if (state_ == JobState_Running) | |
377 { | |
378 runtime_ += (now - lastUpdateTime_).total_milliseconds(); | |
379 } | |
380 | |
381 state_ = state; | |
382 lastUpdateTime_ = now; | |
383 pauseScheduled_ = false; | |
384 } | |
385 | |
386 public: | |
387 JobHandler(IJob* job, | |
388 int priority) : | |
389 id_(Toolbox::GenerateUuid()), | |
390 state_(JobState_Pending), | |
391 job_(job), | |
392 priority_(priority), | |
393 creationTime_(boost::posix_time::microsec_clock::universal_time()), | |
394 lastUpdateTime_(creationTime_), | |
395 runtime_(0), | |
396 pauseScheduled_(false) | |
397 { | |
398 if (job == NULL) | |
399 { | |
400 throw OrthancException(ErrorCode_NullPointer); | |
401 } | |
402 } | |
403 | |
404 const std::string& GetId() const | |
405 { | |
406 return id_; | |
407 } | |
408 | |
409 IJob& GetJob() const | |
410 { | |
411 assert(job_.get() != NULL); | |
412 return *job_; | |
413 } | |
414 | |
415 void SetPriority(int priority) | |
416 { | |
417 priority_ = priority; | |
418 } | |
419 | |
420 int GetPriority() const | |
421 { | |
422 return priority_; | |
423 } | |
424 | |
425 JobState GetState() const | |
426 { | |
427 return state_; | |
428 } | |
429 | |
430 void SetState(JobState state) | |
431 { | |
432 if (state == JobState_Retry) | |
433 { | |
434 // Use "SetRetryState()" | |
435 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
436 } | |
437 else | |
438 { | |
439 SetStateInternal(state); | |
440 } | |
441 } | |
442 | |
443 void SetRetryState(unsigned int timeout) | |
444 { | |
445 if (state_ == JobState_Running) | |
446 { | |
447 SetStateInternal(JobState_Retry); | |
448 retryTime_ = (boost::posix_time::microsec_clock::universal_time() + | |
449 boost::posix_time::milliseconds(timeout)); | |
450 } | |
451 else | |
452 { | |
453 // Only valid for running jobs | |
454 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
455 } | |
456 } | |
457 | |
458 void SchedulePause() | |
459 { | |
460 if (state_ == JobState_Running) | |
461 { | |
462 pauseScheduled_ = true; | |
463 } | |
464 else | |
465 { | |
466 // Only valid for running jobs | |
467 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
468 } | |
469 } | |
470 | |
471 bool IsPauseScheduled() | |
472 { | |
473 return pauseScheduled_; | |
474 } | |
475 | |
476 bool IsRetryReady(const boost::posix_time::ptime& now) const | |
477 { | |
478 if (state_ != JobState_Retry) | |
479 { | |
480 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
481 } | |
482 else | |
483 { | |
2558 | 484 return retryTime_ <= now; |
2557 | 485 } |
486 } | |
487 }; | |
488 | |
489 | |
2558 | 490 class JobsRegistry : public boost::noncopyable |
2556 | 491 { |
492 private: | |
2557 | 493 struct PriorityComparator |
2556 | 494 { |
2557 | 495 bool operator() (JobHandler*& a, |
496 JobHandler*& b) const | |
497 { | |
498 return a->GetPriority() < b->GetPriority(); | |
499 } | |
500 }; | |
501 | |
502 typedef std::map<std::string, JobHandler*> JobsIndex; | |
503 typedef std::list<const JobHandler*> CompletedJobs; | |
504 typedef std::set<JobHandler*> RetryJobs; | |
505 typedef std::priority_queue<JobHandler*, | |
506 std::vector<JobHandler*>, // Could be a "std::deque" | |
507 PriorityComparator> PendingJobs; | |
508 | |
2558 | 509 boost::mutex mutex_; |
510 JobsIndex jobsIndex_; | |
511 PendingJobs pendingJobs_; | |
512 CompletedJobs completedJobs_; | |
513 RetryJobs retryJobs_; | |
2557 | 514 |
2558 | 515 boost::condition_variable pendingJobAvailable_; |
516 size_t maxCompletedJobs_; | |
2557 | 517 |
518 | |
519 #ifndef NDEBUG | |
2558 | 520 bool IsPendingJob(const JobHandler& job) const |
521 { | |
522 PendingJobs copy = pendingJobs_; | |
523 while (!copy.empty()) | |
524 { | |
525 if (copy.top() == &job) | |
526 { | |
527 return true; | |
528 } | |
529 | |
530 copy.pop(); | |
531 } | |
532 | |
533 return false; | |
534 } | |
535 | |
536 bool IsCompletedJob(const JobHandler& job) const | |
537 { | |
538 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
539 it != completedJobs_.end(); ++it) | |
540 { | |
541 if (*it == &job) | |
542 { | |
543 return true; | |
544 } | |
545 } | |
546 | |
547 return false; | |
548 } | |
549 | |
550 bool IsRetryJob(JobHandler& job) const | |
551 { | |
552 return retryJobs_.find(&job) != retryJobs_.end(); | |
553 } | |
554 #endif | |
555 | |
556 | |
557 void CheckInvariants() | |
558 { | |
559 #ifndef NDEBUG | |
2557 | 560 { |
561 PendingJobs copy = pendingJobs_; | |
562 while (!copy.empty()) | |
563 { | |
2558 | 564 assert(copy.top()->GetState() == JobState_Pending); |
2557 | 565 copy.pop(); |
566 } | |
2558 | 567 } |
2557 | 568 |
2558 | 569 assert(completedJobs_.size() <= maxCompletedJobs_); |
570 | |
571 for (CompletedJobs::const_iterator it = completedJobs_.begin(); | |
572 it != completedJobs_.end(); ++it) | |
573 { | |
574 assert((*it)->GetState() == JobState_Success || | |
575 (*it)->GetState() == JobState_Failure); | |
576 } | |
577 | |
578 for (RetryJobs::const_iterator it = retryJobs_.begin(); | |
579 it != retryJobs_.end(); ++it) | |
580 { | |
581 assert((*it)->GetState() == JobState_Retry); | |
2557 | 582 } |
583 | |
2558 | 584 for (JobsIndex::iterator it = jobsIndex_.begin(); |
585 it != jobsIndex_.end(); ++it) | |
2557 | 586 { |
2558 | 587 JobHandler& job = *it->second; |
588 | |
589 assert(job.GetId() == it->first); | |
590 | |
591 switch (job.GetState()) | |
2556 | 592 { |
2558 | 593 case JobState_Pending: |
594 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job)); | |
595 break; | |
596 | |
597 case JobState_Success: | |
598 case JobState_Failure: | |
599 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job)); | |
600 break; | |
601 | |
602 case JobState_Retry: | |
603 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
604 break; | |
605 | |
606 case JobState_Running: | |
607 case JobState_Paused: | |
608 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job)); | |
609 break; | |
2557 | 610 |
2558 | 611 default: |
612 throw OrthancException(ErrorCode_InternalError); | |
613 } | |
2557 | 614 } |
615 #endif | |
2558 | 616 } |
2557 | 617 |
618 | |
2558 | 619 void ForgetOldCompletedJobs() |
620 { | |
621 if (maxCompletedJobs_ != 0) | |
2557 | 622 { |
2558 | 623 while (completedJobs_.size() > maxCompletedJobs_) |
2557 | 624 { |
2558 | 625 assert(completedJobs_.front() != NULL); |
626 | |
627 std::string id = completedJobs_.front()->GetId(); | |
628 assert(jobsIndex_.find(id) != jobsIndex_.end()); | |
2557 | 629 |
2558 | 630 jobsIndex_.erase(id); |
631 delete(completedJobs_.front()); | |
632 completedJobs_.pop_front(); | |
633 } | |
634 } | |
635 } | |
636 | |
2557 | 637 |
2558 | 638 void MarkRunningAsCompleted(JobHandler& job, |
639 bool success) | |
640 { | |
641 LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") | |
642 << ": " << job.GetId(); | |
2557 | 643 |
2558 | 644 boost::mutex::scoped_lock lock(mutex_); |
645 CheckInvariants(); | |
646 assert(job.GetState() == JobState_Running); | |
647 | |
648 job.SetState(success ? JobState_Success : JobState_Failure); | |
649 | |
650 completedJobs_.push_back(&job); | |
651 ForgetOldCompletedJobs(); | |
652 | |
653 CheckInvariants(); | |
654 } | |
2557 | 655 |
656 | |
2558 | 657 void MarkRunningAsRetry(JobHandler& job, |
658 unsigned int timeout) | |
659 { | |
660 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId(); | |
661 | |
662 boost::mutex::scoped_lock lock(mutex_); | |
663 CheckInvariants(); | |
664 | |
665 assert(job.GetState() == JobState_Running && | |
666 retryJobs_.find(&job) == retryJobs_.end()); | |
667 | |
668 retryJobs_.insert(&job); | |
669 job.SetRetryState(timeout); | |
670 | |
671 CheckInvariants(); | |
672 } | |
673 | |
674 | |
675 void MarkRunningAsPaused(JobHandler& job) | |
676 { | |
677 LOG(INFO) << "Job paused: " << job.GetId(); | |
2557 | 678 |
2558 | 679 boost::mutex::scoped_lock lock(mutex_); |
680 CheckInvariants(); | |
681 assert(job.GetState() == JobState_Running); | |
682 | |
683 job.SetState(JobState_Paused); | |
2557 | 684 |
2558 | 685 CheckInvariants(); |
686 } | |
687 | |
688 | |
689 JobHandler* WaitPendingJob(unsigned int timeout) | |
690 { | |
691 boost::mutex::scoped_lock lock(mutex_); | |
2557 | 692 |
2558 | 693 while (pendingJobs_.empty()) |
694 { | |
695 if (timeout == 0) | |
696 { | |
697 pendingJobAvailable_.wait(lock); | |
698 } | |
699 else | |
700 { | |
701 bool success = pendingJobAvailable_.timed_wait | |
702 (lock, boost::posix_time::milliseconds(timeout)); | |
703 if (!success) | |
704 { | |
705 return NULL; | |
2557 | 706 } |
2556 | 707 } |
2557 | 708 } |
709 | |
2558 | 710 JobHandler* job = pendingJobs_.top(); |
711 pendingJobs_.pop(); | |
712 | |
713 job->SetState(JobState_Running); | |
714 return job; | |
715 } | |
2557 | 716 |
717 | |
2558 | 718 public: |
719 JobsRegistry() : | |
720 maxCompletedJobs_(10) | |
721 { | |
722 } | |
2557 | 723 |
724 | |
2558 | 725 ~JobsRegistry() |
726 { | |
727 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it) | |
2557 | 728 { |
2558 | 729 assert(it->second != NULL); |
730 delete it->second; | |
731 } | |
732 } | |
733 | |
734 | |
735 void SetMaxCompletedJobs(size_t i) | |
736 { | |
737 boost::mutex::scoped_lock lock(mutex_); | |
738 CheckInvariants(); | |
2557 | 739 |
2558 | 740 maxCompletedJobs_ = i; |
741 ForgetOldCompletedJobs(); | |
742 | |
743 CheckInvariants(); | |
744 } | |
745 | |
2557 | 746 |
2558 | 747 void ListJobs(std::set<std::string>& target) |
748 { | |
749 boost::mutex::scoped_lock lock(mutex_); | |
750 CheckInvariants(); | |
2557 | 751 |
2558 | 752 for (JobsIndex::const_iterator it = jobsIndex_.begin(); |
753 it != jobsIndex_.end(); ++it) | |
754 { | |
755 target.insert(it->first); | |
2557 | 756 } |
2558 | 757 } |
2557 | 758 |
759 | |
2558 | 760 void Submit(std::string& id, |
761 IJob* job, // Takes ownership | |
762 int priority) | |
763 { | |
764 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); | |
765 | |
766 boost::mutex::scoped_lock lock(mutex_); | |
767 CheckInvariants(); | |
768 | |
769 id = handler->GetId(); | |
770 | |
771 pendingJobs_.push(handler.get()); | |
772 pendingJobAvailable_.notify_one(); | |
2557 | 773 |
2558 | 774 jobsIndex_.insert(std::make_pair(id, handler.release())); |
775 | |
776 LOG(INFO) << "New job submitted: " << id; | |
777 | |
778 CheckInvariants(); | |
779 } | |
2557 | 780 |
2558 | 781 |
782 void Submit(IJob* job, // Takes ownership | |
783 int priority) | |
784 { | |
785 std::string id; | |
786 Submit(id, job, priority); | |
787 } | |
2557 | 788 |
789 | |
2558 | 790 void SetPriority(const std::string& id, |
791 int priority) | |
792 { | |
793 LOG(INFO) << "Changing priority to " << priority << " for job: " << id; | |
2557 | 794 |
2558 | 795 boost::mutex::scoped_lock lock(mutex_); |
796 CheckInvariants(); | |
797 | |
798 JobsIndex::iterator found = jobsIndex_.find(id); | |
2557 | 799 |
2558 | 800 if (found == jobsIndex_.end()) |
801 { | |
802 LOG(WARNING) << "Unknown job: " << id; | |
2557 | 803 } |
2558 | 804 else |
805 { | |
806 found->second->SetPriority(priority); | |
2556 | 807 |
2558 | 808 if (found->second->GetState() == JobState_Pending) |
809 { | |
810 // If the job is pending, we need to reconstruct the | |
811 // priority queue, as the heap condition has changed | |
2557 | 812 |
2558 | 813 PendingJobs copy; |
814 std::swap(copy, pendingJobs_); | |
815 | |
816 assert(pendingJobs_.empty()); | |
817 while (!copy.empty()) | |
818 { | |
819 pendingJobs_.push(copy.top()); | |
820 copy.pop(); | |
821 } | |
2557 | 822 } |
823 } | |
824 | |
2558 | 825 CheckInvariants(); |
826 } | |
2557 | 827 |
828 | |
2558 | 829 void Pause(const std::string& id) |
830 { | |
831 LOG(INFO) << "Pausing job: " << id; | |
2557 | 832 |
2558 | 833 boost::mutex::scoped_lock lock(mutex_); |
834 CheckInvariants(); | |
2557 | 835 |
2558 | 836 JobsIndex::iterator found = jobsIndex_.find(id); |
2557 | 837 |
2558 | 838 if (found == jobsIndex_.end()) |
2557 | 839 { |
2558 | 840 LOG(WARNING) << "Unknown job: " << id; |
2557 | 841 } |
2558 | 842 else |
2556 | 843 { |
2558 | 844 switch (found->second->GetState()) |
2557 | 845 { |
2558 | 846 case JobState_Pending: |
2557 | 847 { |
848 // If the job is pending, we need to reconstruct the | |
2558 | 849 // priority queue to remove it |
2557 | 850 PendingJobs copy; |
851 std::swap(copy, pendingJobs_); | |
852 | |
853 assert(pendingJobs_.empty()); | |
854 while (!copy.empty()) | |
855 { | |
2558 | 856 if (copy.top()->GetId() != id) |
857 { | |
858 pendingJobs_.push(copy.top()); | |
859 } | |
860 | |
2557 | 861 copy.pop(); |
862 } | |
2558 | 863 |
864 found->second->SetState(JobState_Paused); | |
865 | |
866 break; | |
2557 | 867 } |
2558 | 868 |
869 case JobState_Retry: | |
870 { | |
871 RetryJobs::iterator item = retryJobs_.find(found->second); | |
872 assert(item != retryJobs_.end()); | |
873 retryJobs_.erase(item); | |
874 | |
875 found->second->SetState(JobState_Paused); | |
876 | |
877 break; | |
878 } | |
879 | |
880 case JobState_Paused: | |
881 case JobState_Success: | |
882 case JobState_Failure: | |
883 // Nothing to be done | |
884 break; | |
885 | |
886 case JobState_Running: | |
887 found->second->SchedulePause(); | |
888 break; | |
889 | |
890 default: | |
891 throw OrthancException(ErrorCode_InternalError); | |
2557 | 892 } |
2556 | 893 } |
894 | |
2558 | 895 CheckInvariants(); |
896 } | |
2556 | 897 |
2557 | 898 |
2558 | 899 void Resume(const std::string& id) |
900 { | |
901 LOG(INFO) << "Resuming job: " << id; | |
2557 | 902 |
2558 | 903 boost::mutex::scoped_lock lock(mutex_); |
904 CheckInvariants(); | |
905 | |
906 JobsIndex::iterator found = jobsIndex_.find(id); | |
2557 | 907 |
2558 | 908 if (found == jobsIndex_.end()) |
909 { | |
910 LOG(WARNING) << "Unknown job: " << id; | |
911 } | |
912 else if (found->second->GetState() != JobState_Paused) | |
913 { | |
914 LOG(WARNING) << "Cannot resume a job that is not paused: " << id; | |
915 } | |
916 else | |
917 { | |
918 found->second->SetState(JobState_Pending); | |
919 pendingJobs_.push(found->second); | |
920 pendingJobAvailable_.notify_one(); | |
921 } | |
2557 | 922 |
2558 | 923 CheckInvariants(); |
924 } | |
2556 | 925 |
2557 | 926 |
2558 | 927 void Resubmit(const std::string& id) |
928 { | |
929 LOG(INFO) << "Resubmitting failed job: " << id; | |
2557 | 930 |
2558 | 931 boost::mutex::scoped_lock lock(mutex_); |
932 CheckInvariants(); | |
933 | |
934 JobsIndex::iterator found = jobsIndex_.find(id); | |
2557 | 935 |
2558 | 936 if (found == jobsIndex_.end()) |
937 { | |
938 LOG(WARNING) << "Unknown job: " << id; | |
2556 | 939 } |
2558 | 940 else if (found->second->GetState() != JobState_Failure) |
941 { | |
942 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id; | |
943 } | |
944 else | |
2557 | 945 { |
2558 | 946 bool ok = false; |
947 for (CompletedJobs::iterator it = completedJobs_.begin(); | |
948 it != completedJobs_.end(); ++it) | |
2557 | 949 { |
2558 | 950 if (*it == found->second) |
2557 | 951 { |
2558 | 952 ok = true; |
953 completedJobs_.erase(it); | |
954 break; | |
2557 | 955 } |
956 } | |
957 | |
2558 | 958 assert(ok); |
959 | |
960 found->second->SetState(JobState_Pending); | |
961 pendingJobs_.push(found->second); | |
962 pendingJobAvailable_.notify_one(); | |
2557 | 963 } |
964 | |
2558 | 965 CheckInvariants(); |
966 } | |
967 | |
2557 | 968 |
2558 | 969 void ScheduleRetries() |
970 { | |
971 boost::mutex::scoped_lock lock(mutex_); | |
972 CheckInvariants(); | |
973 | |
974 RetryJobs copy; | |
975 std::swap(copy, retryJobs_); | |
976 | |
977 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); | |
978 | |
979 assert(retryJobs_.empty()); | |
980 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it) | |
2557 | 981 { |
2558 | 982 if ((*it)->IsRetryReady(now)) |
2557 | 983 { |
2558 | 984 LOG(INFO) << "Retrying job: " << (*it)->GetId(); |
985 (*it)->SetState(JobState_Pending); | |
986 pendingJobs_.push(*it); | |
987 pendingJobAvailable_.notify_one(); | |
2557 | 988 } |
989 else | |
990 { | |
2558 | 991 retryJobs_.insert(*it); |
2557 | 992 } |
993 } | |
994 | |
2558 | 995 CheckInvariants(); |
996 } | |
2557 | 997 |
2558 | 998 |
999 bool GetState(JobState& state, | |
1000 const std::string& id) | |
1001 { | |
1002 boost::mutex::scoped_lock lock(mutex_); | |
1003 CheckInvariants(); | |
1004 | |
1005 JobsIndex::const_iterator it = jobsIndex_.find(id); | |
1006 if (it == jobsIndex_.end()) | |
2556 | 1007 { |
2558 | 1008 return false; |
1009 } | |
1010 else | |
1011 { | |
1012 state = it->second->GetState(); | |
1013 return true; | |
1014 } | |
1015 } | |
1016 | |
1017 | |
1018 class RunningJob : public boost::noncopyable | |
1019 { | |
1020 private: | |
1021 JobsRegistry& that_; | |
1022 JobHandler* handler_; | |
1023 JobState targetState_; | |
1024 unsigned int retryTimeout_; | |
2556 | 1025 |
2558 | 1026 public: |
1027 RunningJob(JobsRegistry& that, | |
1028 unsigned int timeout) : | |
1029 that_(that), | |
1030 handler_(NULL), | |
1031 targetState_(JobState_Failure), | |
1032 retryTimeout_(0) | |
1033 { | |
1034 handler_ = that_.WaitPendingJob(timeout); | |
1035 } | |
2557 | 1036 |
2558 | 1037 ~RunningJob() |
1038 { | |
1039 if (IsValid()) | |
2556 | 1040 { |
2558 | 1041 switch (targetState_) |
2557 | 1042 { |
2558 | 1043 case JobState_Failure: |
1044 that_.MarkRunningAsCompleted(*handler_, false); | |
1045 break; | |
2557 | 1046 |
2558 | 1047 case JobState_Success: |
1048 that_.MarkRunningAsCompleted(*handler_, true); | |
1049 break; | |
2557 | 1050 |
2558 | 1051 case JobState_Paused: |
1052 that_.MarkRunningAsPaused(*handler_); | |
1053 break; | |
2557 | 1054 |
2558 | 1055 case JobState_Retry: |
1056 that_.MarkRunningAsRetry(*handler_, retryTimeout_); | |
1057 break; | |
2557 | 1058 |
2558 | 1059 default: |
1060 assert(0); | |
2557 | 1061 } |
2556 | 1062 } |
2558 | 1063 } |
2556 | 1064 |
2558 | 1065 bool IsValid() const |
1066 { | |
1067 return handler_ != NULL; | |
1068 } | |
1069 | |
1070 const std::string& GetId() const | |
1071 { | |
1072 if (IsValid()) | |
2556 | 1073 { |
2558 | 1074 return handler_->GetId(); |
2556 | 1075 } |
2558 | 1076 else |
2557 | 1077 { |
2558 | 1078 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
1079 } | |
1080 } | |
1081 | |
1082 int GetPriority() const | |
1083 { | |
1084 if (IsValid()) | |
1085 { | |
1086 return handler_->GetPriority(); | |
1087 } | |
1088 else | |
1089 { | |
1090 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1091 } | |
1092 } | |
1093 | |
1094 bool IsPauseScheduled() | |
1095 { | |
1096 if (!IsValid()) | |
1097 { | |
1098 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
2557 | 1099 } |
1100 | |
2558 | 1101 boost::mutex::scoped_lock lock(that_.mutex_); |
1102 that_.CheckInvariants(); | |
1103 assert(handler_->GetState() == JobState_Running); | |
1104 | |
1105 return handler_->IsPauseScheduled(); | |
1106 } | |
1107 | |
1108 IJob& GetJob() | |
1109 { | |
1110 if (!IsValid()) | |
2557 | 1111 { |
2558 | 1112 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
2557 | 1113 } |
1114 | |
2558 | 1115 boost::mutex::scoped_lock lock(that_.mutex_); |
1116 that_.CheckInvariants(); | |
1117 assert(handler_->GetState() == JobState_Running); | |
2557 | 1118 |
2558 | 1119 return handler_->GetJob(); |
1120 } | |
2557 | 1121 |
2558 | 1122 void MarkSuccess() |
1123 { | |
1124 if (!IsValid()) | |
2557 | 1125 { |
2558 | 1126 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
2557 | 1127 } |
1128 | |
2558 | 1129 targetState_ = JobState_Success; |
1130 } | |
2557 | 1131 |
2558 | 1132 void MarkFailure() |
1133 { | |
1134 if (!IsValid()) | |
2557 | 1135 { |
2558 | 1136 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
2557 | 1137 } |
1138 | |
2558 | 1139 targetState_ = JobState_Failure; |
1140 } | |
1141 | |
1142 void SchedulePause() | |
1143 { | |
1144 if (!IsValid()) | |
2557 | 1145 { |
2558 | 1146 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
2557 | 1147 } |
1148 | |
2558 | 1149 targetState_ = JobState_Paused; |
1150 } | |
1151 | |
1152 void MarkRetry(unsigned int timeout) | |
1153 { | |
1154 if (!IsValid()) | |
2557 | 1155 { |
2558 | 1156 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
1157 } | |
2557 | 1158 |
2558 | 1159 targetState_ = JobState_Retry; |
1160 retryTimeout_ = timeout; | |
1161 } | |
2556 | 1162 }; |
2558 | 1163 }; |
2556 | 1164 } |
2557 | 1165 |
1166 | |
1167 | |
1168 class DummyJob : public Orthanc::IJob | |
1169 { | |
1170 private: | |
1171 JobStepResult result_; | |
1172 | |
1173 public: | |
1174 DummyJob() : | |
1175 result_(Orthanc::JobStepStatus_Success) | |
1176 { | |
1177 } | |
1178 | |
1179 explicit DummyJob(JobStepResult result) : | |
1180 result_(result) | |
1181 { | |
1182 } | |
1183 | |
1184 virtual JobStepResult* ExecuteStep() | |
1185 { | |
1186 return new JobStepResult(result_); | |
1187 } | |
1188 | |
1189 virtual void ReleaseResources() | |
1190 { | |
1191 } | |
1192 | |
1193 virtual float GetProgress() | |
1194 { | |
1195 return 0; | |
1196 } | |
1197 | |
1198 virtual void FormatStatus(Json::Value& value) | |
1199 { | |
1200 } | |
1201 }; | |
1202 | |
1203 | |
2558 | 1204 static bool CheckState(Orthanc::JobsRegistry& registry, |
2557 | 1205 const std::string& id, |
1206 Orthanc::JobState state) | |
1207 { | |
1208 Orthanc::JobState s; | |
2558 | 1209 if (registry.GetState(s, id)) |
2557 | 1210 { |
1211 return state == s; | |
1212 } | |
1213 else | |
1214 { | |
1215 return false; | |
1216 } | |
1217 } | |
1218 | |
1219 | |
2558 | 1220 TEST(JobsRegistry, Priority) |
2557 | 1221 { |
2558 | 1222 JobsRegistry registry; |
2557 | 1223 |
1224 std::string i1, i2, i3, i4; | |
2558 | 1225 registry.Submit(i1, new DummyJob(), 10); |
1226 registry.Submit(i2, new DummyJob(), 30); | |
1227 registry.Submit(i3, new DummyJob(), 20); | |
1228 registry.Submit(i4, new DummyJob(), 5); | |
2557 | 1229 |
2558 | 1230 registry.SetMaxCompletedJobs(2); |
2557 | 1231 |
1232 std::set<std::string> id; | |
2558 | 1233 registry.ListJobs(id); |
2557 | 1234 |
1235 ASSERT_EQ(4u, id.size()); | |
1236 ASSERT_TRUE(id.find(i1) != id.end()); | |
1237 ASSERT_TRUE(id.find(i2) != id.end()); | |
1238 ASSERT_TRUE(id.find(i3) != id.end()); | |
1239 ASSERT_TRUE(id.find(i4) != id.end()); | |
1240 | |
2558 | 1241 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending)); |
2557 | 1242 |
1243 { | |
2558 | 1244 JobsRegistry::RunningJob job(registry, 0); |
2557 | 1245 ASSERT_TRUE(job.IsValid()); |
1246 ASSERT_EQ(30, job.GetPriority()); | |
1247 ASSERT_EQ(i2, job.GetId()); | |
1248 | |
2558 | 1249 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running)); |
2557 | 1250 } |
1251 | |
2558 | 1252 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Failure)); |
1253 ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Pending)); | |
2557 | 1254 |
1255 { | |
2558 | 1256 JobsRegistry::RunningJob job(registry, 0); |
2557 | 1257 ASSERT_TRUE(job.IsValid()); |
1258 ASSERT_EQ(20, job.GetPriority()); | |
1259 ASSERT_EQ(i3, job.GetId()); | |
1260 | |
1261 job.MarkSuccess(); | |
1262 | |
2558 | 1263 ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Running)); |
2557 | 1264 } |
1265 | |
2558 | 1266 ASSERT_TRUE(CheckState(registry, i3, Orthanc::JobState_Success)); |
2557 | 1267 |
1268 { | |
2558 | 1269 JobsRegistry::RunningJob job(registry, 0); |
2557 | 1270 ASSERT_TRUE(job.IsValid()); |
1271 ASSERT_EQ(10, job.GetPriority()); | |
1272 ASSERT_EQ(i1, job.GetId()); | |
1273 } | |
1274 | |
1275 { | |
2558 | 1276 JobsRegistry::RunningJob job(registry, 0); |
2557 | 1277 ASSERT_TRUE(job.IsValid()); |
1278 ASSERT_EQ(5, job.GetPriority()); | |
1279 ASSERT_EQ(i4, job.GetId()); | |
1280 } | |
1281 | |
1282 { | |
2558 | 1283 JobsRegistry::RunningJob job(registry, 1); |
2557 | 1284 ASSERT_FALSE(job.IsValid()); |
1285 } | |
1286 | |
1287 Orthanc::JobState s; | |
2558 | 1288 ASSERT_TRUE(registry.GetState(s, i1)); |
1289 ASSERT_FALSE(registry.GetState(s, i2)); // Removed because oldest | |
1290 ASSERT_FALSE(registry.GetState(s, i3)); // Removed because second oldest | |
1291 ASSERT_TRUE(registry.GetState(s, i4)); | |
2557 | 1292 |
2558 | 1293 registry.SetMaxCompletedJobs(1); // (*) |
1294 ASSERT_FALSE(registry.GetState(s, i1)); // Just discarded by (*) | |
1295 ASSERT_TRUE(registry.GetState(s, i4)); | |
2557 | 1296 } |
1297 | |
1298 | |
2558 | 1299 TEST(JobsRegistry, Simultaneous) |
2557 | 1300 { |
2558 | 1301 JobsRegistry registry; |
1302 | |
1303 std::string i1, i2; | |
1304 registry.Submit(i1, new DummyJob(), 20); | |
1305 registry.Submit(i2, new DummyJob(), 10); | |
1306 | |
1307 ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Pending)); | |
1308 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Pending)); | |
1309 | |
1310 { | |
1311 JobsRegistry::RunningJob job1(registry, 0); | |
1312 JobsRegistry::RunningJob job2(registry, 0); | |
1313 | |
1314 ASSERT_TRUE(job1.IsValid()); | |
1315 ASSERT_TRUE(job2.IsValid()); | |
1316 | |
1317 job1.MarkFailure(); | |
1318 job2.MarkSuccess(); | |
1319 | |
1320 ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Running)); | |
1321 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Running)); | |
1322 } | |
1323 | |
1324 ASSERT_TRUE(CheckState(registry, i1, Orthanc::JobState_Failure)); | |
1325 ASSERT_TRUE(CheckState(registry, i2, Orthanc::JobState_Success)); | |
1326 } | |
1327 | |
1328 | |
1329 TEST(JobsRegistry, Resubmit) | |
1330 { | |
1331 JobsRegistry registry; | |
2557 | 1332 |
1333 std::string id; | |
2558 | 1334 registry.Submit(id, new DummyJob(), 10); |
2557 | 1335 |
2558 | 1336 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); |
2557 | 1337 |
2558 | 1338 registry.Resubmit(id); |
1339 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
2557 | 1340 |
1341 { | |
2558 | 1342 JobsRegistry::RunningJob job(registry, 0); |
2557 | 1343 ASSERT_TRUE(job.IsValid()); |
1344 job.MarkFailure(); | |
1345 | |
2558 | 1346 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); |
2557 | 1347 |
2558 | 1348 registry.Resubmit(id); |
1349 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
2557 | 1350 } |
1351 | |
2558 | 1352 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Failure)); |
2557 | 1353 |
2558 | 1354 registry.Resubmit(id); |
1355 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
2557 | 1356 |
1357 { | |
2558 | 1358 JobsRegistry::RunningJob job(registry, 0); |
2557 | 1359 ASSERT_TRUE(job.IsValid()); |
1360 ASSERT_EQ(id, job.GetId()); | |
1361 | |
1362 job.MarkSuccess(); | |
2558 | 1363 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); |
1364 } | |
1365 | |
1366 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); | |
1367 | |
1368 registry.Resubmit(id); | |
1369 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); | |
1370 } | |
1371 | |
1372 | |
1373 TEST(JobsRegistry, Retry) | |
1374 { | |
1375 JobsRegistry registry; | |
1376 | |
1377 std::string id; | |
1378 registry.Submit(id, new DummyJob(), 10); | |
1379 | |
1380 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1381 | |
1382 { | |
1383 JobsRegistry::RunningJob job(registry, 0); | |
1384 ASSERT_TRUE(job.IsValid()); | |
1385 job.MarkRetry(0); | |
1386 | |
1387 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1388 } | |
1389 | |
1390 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); | |
1391 | |
1392 registry.Resubmit(id); | |
1393 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); | |
1394 | |
1395 registry.ScheduleRetries(); | |
1396 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1397 | |
1398 { | |
1399 JobsRegistry::RunningJob job(registry, 0); | |
1400 ASSERT_TRUE(job.IsValid()); | |
1401 job.MarkSuccess(); | |
1402 | |
1403 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
2557 | 1404 } |
1405 | |
2558 | 1406 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); |
1407 } | |
1408 | |
1409 | |
1410 TEST(JobsRegistry, PausePending) | |
1411 { | |
1412 JobsRegistry registry; | |
1413 | |
1414 std::string id; | |
1415 registry.Submit(id, new DummyJob(), 10); | |
1416 | |
1417 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1418 | |
1419 registry.Pause(id); | |
1420 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1421 | |
1422 registry.Pause(id); | |
1423 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1424 | |
1425 registry.Resubmit(id); | |
1426 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1427 | |
1428 registry.Resume(id); | |
1429 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
2557 | 1430 } |
2558 | 1431 |
1432 | |
1433 TEST(JobsRegistry, PauseRunning) | |
1434 { | |
1435 JobsRegistry registry; | |
1436 | |
1437 std::string id; | |
1438 registry.Submit(id, new DummyJob(), 10); | |
1439 | |
1440 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1441 | |
1442 { | |
1443 JobsRegistry::RunningJob job(registry, 0); | |
1444 ASSERT_TRUE(job.IsValid()); | |
1445 | |
1446 registry.Resubmit(id); | |
1447 job.SchedulePause(); | |
1448 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1449 } | |
1450 | |
1451 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1452 | |
1453 registry.Resubmit(id); | |
1454 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1455 | |
1456 registry.Resume(id); | |
1457 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1458 | |
1459 { | |
1460 JobsRegistry::RunningJob job(registry, 0); | |
1461 ASSERT_TRUE(job.IsValid()); | |
1462 | |
1463 job.MarkSuccess(); | |
1464 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1465 } | |
1466 | |
1467 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); | |
1468 } | |
1469 | |
1470 | |
1471 TEST(JobsRegistry, PauseRetry) | |
1472 { | |
1473 JobsRegistry registry; | |
1474 | |
1475 std::string id; | |
1476 registry.Submit(id, new DummyJob(), 10); | |
1477 | |
1478 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1479 | |
1480 { | |
1481 JobsRegistry::RunningJob job(registry, 0); | |
1482 ASSERT_TRUE(job.IsValid()); | |
1483 | |
1484 job.MarkRetry(0); | |
1485 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1486 } | |
1487 | |
1488 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Retry)); | |
1489 | |
1490 registry.Pause(id); | |
1491 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Paused)); | |
1492 | |
1493 registry.Resume(id); | |
1494 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Pending)); | |
1495 | |
1496 { | |
1497 JobsRegistry::RunningJob job(registry, 0); | |
1498 ASSERT_TRUE(job.IsValid()); | |
1499 | |
1500 job.MarkSuccess(); | |
1501 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); | |
1502 } | |
1503 | |
1504 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); | |
1505 } |