Mercurial > hg > orthanc
annotate UnitTestsSources/MultiThreading.cpp @ 783:c9cdd53a6b31 lua-scripting
main scheduler added to the server context
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 30 Apr 2014 18:36:20 +0200 |
parents | f0ac3a53ccf2 |
children | 394a19d44f9d |
rev | line source |
---|---|
723 | 1 #include "gtest/gtest.h" |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
2 #include <glog/logging.h> |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
3 |
781 | 4 #include "../OrthancServer/Scheduler/ServerScheduler.h" |
5 | |
723 | 6 #include "../Core/OrthancException.h" |
7 #include "../Core/Toolbox.h" | |
8 #include "../Core/MultiThreading/ArrayFilledByThreads.h" | |
760 | 9 #include "../Core/MultiThreading/Locker.h" |
10 #include "../Core/MultiThreading/Mutex.h" | |
11 #include "../Core/MultiThreading/ReaderWriterLock.h" | |
723 | 12 #include "../Core/MultiThreading/ThreadedCommandProcessor.h" |
13 | |
14 using namespace Orthanc; | |
15 | |
16 namespace | |
17 { | |
18 class DynamicInteger : public ICommand | |
19 { | |
20 private: | |
21 int value_; | |
22 std::set<int>& target_; | |
23 | |
24 public: | |
25 DynamicInteger(int value, std::set<int>& target) : | |
26 value_(value), target_(target) | |
27 { | |
28 } | |
29 | |
30 int GetValue() const | |
31 { | |
32 return value_; | |
33 } | |
34 | |
35 virtual bool Execute() | |
36 { | |
37 static boost::mutex mutex; | |
38 boost::mutex::scoped_lock lock(mutex); | |
39 target_.insert(value_); | |
40 return true; | |
41 } | |
42 }; | |
43 | |
44 class MyFiller : public ArrayFilledByThreads::IFiller | |
45 { | |
46 private: | |
47 int size_; | |
48 unsigned int created_; | |
49 std::set<int> set_; | |
50 | |
51 public: | |
52 MyFiller(int size) : size_(size), created_(0) | |
53 { | |
54 } | |
55 | |
56 virtual size_t GetFillerSize() | |
57 { | |
58 return size_; | |
59 } | |
60 | |
61 virtual IDynamicObject* GetFillerItem(size_t index) | |
62 { | |
63 static boost::mutex mutex; | |
64 boost::mutex::scoped_lock lock(mutex); | |
65 created_++; | |
66 return new DynamicInteger(index * 2, set_); | |
67 } | |
68 | |
69 unsigned int GetCreatedCount() const | |
70 { | |
71 return created_; | |
72 } | |
73 | |
74 std::set<int> GetSet() | |
75 { | |
76 return set_; | |
77 } | |
78 }; | |
79 } | |
80 | |
81 | |
82 | |
83 | |
84 TEST(MultiThreading, SharedMessageQueueBasic) | |
85 { | |
86 std::set<int> s; | |
87 | |
88 SharedMessageQueue q; | |
89 ASSERT_TRUE(q.WaitEmpty(0)); | |
90 q.Enqueue(new DynamicInteger(10, s)); | |
91 ASSERT_FALSE(q.WaitEmpty(1)); | |
92 q.Enqueue(new DynamicInteger(20, s)); | |
93 q.Enqueue(new DynamicInteger(30, s)); | |
94 q.Enqueue(new DynamicInteger(40, s)); | |
95 | |
96 std::auto_ptr<DynamicInteger> i; | |
97 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue()); | |
98 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue()); | |
99 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue()); | |
100 ASSERT_FALSE(q.WaitEmpty(1)); | |
101 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue()); | |
102 ASSERT_TRUE(q.WaitEmpty(0)); | |
103 ASSERT_EQ(NULL, q.Dequeue(1)); | |
104 } | |
105 | |
106 | |
107 TEST(MultiThreading, SharedMessageQueueClean) | |
108 { | |
109 std::set<int> s; | |
110 | |
111 try | |
112 { | |
113 SharedMessageQueue q; | |
114 q.Enqueue(new DynamicInteger(10, s)); | |
115 q.Enqueue(new DynamicInteger(20, s)); | |
116 throw OrthancException("Nope"); | |
117 } | |
118 catch (OrthancException&) | |
119 { | |
120 } | |
121 } | |
122 | |
123 | |
124 TEST(MultiThreading, ArrayFilledByThreadEmpty) | |
125 { | |
126 MyFiller f(0); | |
127 ArrayFilledByThreads a(f); | |
128 a.SetThreadCount(1); | |
129 ASSERT_EQ(0, a.GetSize()); | |
130 } | |
131 | |
132 | |
133 TEST(MultiThreading, ArrayFilledByThread1) | |
134 { | |
135 MyFiller f(100); | |
136 ArrayFilledByThreads a(f); | |
137 a.SetThreadCount(1); | |
138 ASSERT_EQ(100, a.GetSize()); | |
139 for (size_t i = 0; i < a.GetSize(); i++) | |
140 { | |
141 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue()); | |
142 } | |
143 } | |
144 | |
145 | |
146 TEST(MultiThreading, ArrayFilledByThread4) | |
147 { | |
148 MyFiller f(100); | |
149 ArrayFilledByThreads a(f); | |
150 a.SetThreadCount(4); | |
151 ASSERT_EQ(100, a.GetSize()); | |
152 for (size_t i = 0; i < a.GetSize(); i++) | |
153 { | |
154 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue()); | |
155 } | |
156 | |
157 ASSERT_EQ(100u, f.GetCreatedCount()); | |
158 | |
159 a.Invalidate(); | |
160 | |
161 ASSERT_EQ(100, a.GetSize()); | |
162 ASSERT_EQ(200u, f.GetCreatedCount()); | |
163 ASSERT_EQ(4u, a.GetThreadCount()); | |
164 ASSERT_TRUE(f.GetSet().empty()); | |
165 | |
166 for (size_t i = 0; i < a.GetSize(); i++) | |
167 { | |
168 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue()); | |
169 } | |
170 } | |
171 | |
172 | |
173 TEST(MultiThreading, CommandProcessor) | |
174 { | |
175 ThreadedCommandProcessor p(4); | |
176 | |
177 std::set<int> s; | |
178 | |
179 for (size_t i = 0; i < 100; i++) | |
180 { | |
181 p.Post(new DynamicInteger(i * 2, s)); | |
182 } | |
183 | |
184 p.Join(); | |
185 | |
186 for (size_t i = 0; i < 200; i++) | |
187 { | |
188 if (i % 2) | |
189 ASSERT_TRUE(s.find(i) == s.end()); | |
190 else | |
191 ASSERT_TRUE(s.find(i) != s.end()); | |
192 } | |
193 } | |
760 | 194 |
195 | |
196 TEST(MultiThreading, Mutex) | |
197 { | |
198 Mutex mutex; | |
199 Locker locker(mutex); | |
200 } | |
201 | |
202 | |
203 TEST(MultiThreading, ReaderWriterLock) | |
204 { | |
205 ReaderWriterLock lock; | |
206 | |
207 { | |
208 Locker locker1(lock.ForReader()); | |
209 Locker locker2(lock.ForReader()); | |
210 } | |
211 | |
212 { | |
213 Locker locker3(lock.ForWriter()); | |
214 } | |
215 } | |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
216 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
217 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
218 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
219 #include "../OrthancServer/DicomProtocol/ReusableDicomUserConnection.h" |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
220 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
221 TEST(ReusableDicomUserConnection, DISABLED_Basic) |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
222 { |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
223 ReusableDicomUserConnection c; |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
224 c.SetMillisecondsBeforeClose(200); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
225 printf("START\n"); fflush(stdout); |
775
d3ba35466225
integration mainline -> lua-scripting
Sebastien Jodogne <s.jodogne@gmail.com>
diff
changeset
|
226 |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
227 { |
776 | 228 ReusableDicomUserConnection::Locker lock(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic); |
229 lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676281"); | |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
230 } |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
231 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
232 printf("**\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
233 Toolbox::USleep(1000000); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
234 printf("**\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
235 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
236 { |
776 | 237 ReusableDicomUserConnection::Locker lock(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic); |
238 lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676277"); | |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
239 } |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
240 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
241 Toolbox::ServerBarrier(); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
242 printf("DONE\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
243 } |
765 | 244 |
245 | |
246 | |
247 class Tutu : public IServerFilter | |
248 { | |
249 private: | |
250 int factor_; | |
251 | |
252 public: | |
253 Tutu(int f) : factor_(f) | |
254 { | |
255 } | |
256 | |
257 virtual bool Apply(ListOfStrings& outputs, | |
258 const ListOfStrings& inputs) | |
259 { | |
260 for (ListOfStrings::const_iterator | |
261 it = inputs.begin(); it != inputs.end(); it++) | |
262 { | |
263 int a = boost::lexical_cast<int>(*it); | |
264 int b = factor_ * a; | |
265 | |
266 printf("%d * %d = %d\n", a, factor_, b); | |
267 | |
268 //if (a == 84) { printf("BREAK\n"); return false; } | |
269 | |
270 outputs.push_back(boost::lexical_cast<std::string>(b)); | |
271 } | |
272 | |
273 Toolbox::USleep(1000000); | |
274 | |
275 return true; | |
276 } | |
779 | 277 |
278 virtual bool SendOutputsToSink() const | |
279 { | |
280 return true; | |
281 } | |
765 | 282 }; |
283 | |
768 | 284 |
770 | 285 static void Tata(ServerScheduler* s, ServerJob* j, bool* done) |
768 | 286 { |
779 | 287 typedef IServerFilter::ListOfStrings ListOfStrings; |
288 | |
768 | 289 #if 1 |
770 | 290 while (!(*done)) |
768 | 291 { |
292 ListOfStrings l; | |
293 s->GetListOfJobs(l); | |
294 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
295 printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i)); | |
296 Toolbox::USleep(100000); | |
297 } | |
298 #else | |
299 ListOfStrings l; | |
300 s->GetListOfJobs(l); | |
301 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
302 printf(">> %s\n", i->c_str()); | |
303 Toolbox::USleep(1500000); | |
304 s->Cancel(*j); | |
305 Toolbox::USleep(1000000); | |
306 s->GetListOfJobs(l); | |
307 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
308 printf(">> %s\n", i->c_str()); | |
309 #endif | |
310 } | |
311 | |
312 | |
765 | 313 TEST(Toto, Toto) |
314 { | |
315 ServerScheduler scheduler; | |
316 | |
317 ServerJob job; | |
779 | 318 ServerFilterInstance& f2 = job.AddFilter(new Tutu(2)); |
319 ServerFilterInstance& f3 = job.AddFilter(new Tutu(3)); | |
320 ServerFilterInstance& f4 = job.AddFilter(new Tutu(4)); | |
321 ServerFilterInstance& f5 = job.AddFilter(new Tutu(5)); | |
765 | 322 f2.AddInput(boost::lexical_cast<std::string>(42)); |
323 //f3.AddInput(boost::lexical_cast<std::string>(42)); | |
324 //f4.AddInput(boost::lexical_cast<std::string>(42)); | |
325 f2.ConnectNext(f3); | |
326 f3.ConnectNext(f4); | |
768 | 327 f4.ConnectNext(f5); |
765 | 328 |
329 job.SetDescription("tutu"); | |
330 | |
770 | 331 bool done = false; |
332 boost::thread t(Tata, &scheduler, &job, &done); | |
768 | 333 |
334 | |
765 | 335 //scheduler.Submit(job); |
336 | |
779 | 337 IServerFilter::ListOfStrings l; |
765 | 338 scheduler.SubmitAndWait(l, job); |
339 | |
779 | 340 for (IServerFilter::ListOfStrings::iterator i = l.begin(); i != l.end(); i++) |
765 | 341 { |
342 printf("** %s\n", i->c_str()); | |
343 } | |
344 | |
345 //Toolbox::ServerBarrier(); | |
768 | 346 //Toolbox::USleep(3000000); |
347 | |
770 | 348 done = true; |
349 t.join(); | |
765 | 350 } |