comparison UnitTestsSources/MultiThreadingTests.cpp @ 994:b3d4f8a30324 lua-scripting

integration mainline->lua-scripting
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 02 Jul 2014 14:42:49 +0200
parents UnitTestsSources/MultiThreading.cpp@394a19d44f9d UnitTestsSources/MultiThreading.cpp@dfc076546821
children 8c67382f44a7
comparison
equal deleted inserted replaced
954:a91e7b4080d1 994:b3d4f8a30324
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege,
4 * Belgium
5 *
6 * This program is free software: you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation, either version 3 of the
9 * License, or (at your option) any later version.
10 *
11 * In addition, as a special exception, the copyright holders of this
12 * program give permission to link the code of its release with the
13 * OpenSSL project's "OpenSSL" library (or with modified versions of it
14 * that use the same license as the "OpenSSL" library), and distribute
15 * the linked executables. You must obey the GNU General Public License
16 * in all respects for all of the code used other than "OpenSSL". If you
17 * modify file(s) with this exception, you may extend this exception to
18 * your version of the file(s), but you are not obligated to do so. If
19 * you do not wish to do so, delete this exception statement from your
20 * version. If you delete this exception statement from all source files
21 * in the program, then also delete it here.
22 *
23 * This program is distributed in the hope that it will be useful, but
24 * WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 * General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License
29 * along with this program. If not, see <http://www.gnu.org/licenses/>.
30 **/
31
32
33 #include "PrecompiledHeadersUnitTests.h"
34 #include "gtest/gtest.h"
35 #include <glog/logging.h>
36
37 #include "../OrthancServer/Scheduler/ServerScheduler.h"
38
39 #include "../Core/OrthancException.h"
40 #include "../Core/Toolbox.h"
41 #include "../Core/MultiThreading/ArrayFilledByThreads.h"
42 #include "../Core/MultiThreading/Locker.h"
43 #include "../Core/MultiThreading/Mutex.h"
44 #include "../Core/MultiThreading/ReaderWriterLock.h"
45 #include "../Core/MultiThreading/ThreadedCommandProcessor.h"
46
47 using namespace Orthanc;
48
49 namespace
50 {
51 class DynamicInteger : public ICommand
52 {
53 private:
54 int value_;
55 std::set<int>& target_;
56
57 public:
58 DynamicInteger(int value, std::set<int>& target) :
59 value_(value), target_(target)
60 {
61 }
62
63 int GetValue() const
64 {
65 return value_;
66 }
67
68 virtual bool Execute()
69 {
70 static boost::mutex mutex;
71 boost::mutex::scoped_lock lock(mutex);
72 target_.insert(value_);
73 return true;
74 }
75 };
76
77 class MyFiller : public ArrayFilledByThreads::IFiller
78 {
79 private:
80 int size_;
81 unsigned int created_;
82 std::set<int> set_;
83
84 public:
85 MyFiller(int size) : size_(size), created_(0)
86 {
87 }
88
89 virtual size_t GetFillerSize()
90 {
91 return size_;
92 }
93
94 virtual IDynamicObject* GetFillerItem(size_t index)
95 {
96 static boost::mutex mutex;
97 boost::mutex::scoped_lock lock(mutex);
98 created_++;
99 return new DynamicInteger(index * 2, set_);
100 }
101
102 unsigned int GetCreatedCount() const
103 {
104 return created_;
105 }
106
107 std::set<int> GetSet()
108 {
109 return set_;
110 }
111 };
112 }
113
114
115
116
117 TEST(MultiThreading, SharedMessageQueueBasic)
118 {
119 std::set<int> s;
120
121 SharedMessageQueue q;
122 ASSERT_TRUE(q.WaitEmpty(0));
123 q.Enqueue(new DynamicInteger(10, s));
124 ASSERT_FALSE(q.WaitEmpty(1));
125 q.Enqueue(new DynamicInteger(20, s));
126 q.Enqueue(new DynamicInteger(30, s));
127 q.Enqueue(new DynamicInteger(40, s));
128
129 std::auto_ptr<DynamicInteger> i;
130 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue());
131 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue());
132 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue());
133 ASSERT_FALSE(q.WaitEmpty(1));
134 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue());
135 ASSERT_TRUE(q.WaitEmpty(0));
136 ASSERT_EQ(NULL, q.Dequeue(1));
137 }
138
139
140 TEST(MultiThreading, SharedMessageQueueClean)
141 {
142 std::set<int> s;
143
144 try
145 {
146 SharedMessageQueue q;
147 q.Enqueue(new DynamicInteger(10, s));
148 q.Enqueue(new DynamicInteger(20, s));
149 throw OrthancException("Nope");
150 }
151 catch (OrthancException&)
152 {
153 }
154 }
155
156
157 TEST(MultiThreading, ArrayFilledByThreadEmpty)
158 {
159 MyFiller f(0);
160 ArrayFilledByThreads a(f);
161 a.SetThreadCount(1);
162 ASSERT_EQ(0, a.GetSize());
163 }
164
165
166 TEST(MultiThreading, ArrayFilledByThread1)
167 {
168 MyFiller f(100);
169 ArrayFilledByThreads a(f);
170 a.SetThreadCount(1);
171 ASSERT_EQ(100, a.GetSize());
172 for (size_t i = 0; i < a.GetSize(); i++)
173 {
174 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue());
175 }
176 }
177
178
179 TEST(MultiThreading, ArrayFilledByThread4)
180 {
181 MyFiller f(100);
182 ArrayFilledByThreads a(f);
183 a.SetThreadCount(4);
184 ASSERT_EQ(100, a.GetSize());
185 for (size_t i = 0; i < a.GetSize(); i++)
186 {
187 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue());
188 }
189
190 ASSERT_EQ(100u, f.GetCreatedCount());
191
192 a.Invalidate();
193
194 ASSERT_EQ(100, a.GetSize());
195 ASSERT_EQ(200u, f.GetCreatedCount());
196 ASSERT_EQ(4u, a.GetThreadCount());
197 ASSERT_TRUE(f.GetSet().empty());
198
199 for (size_t i = 0; i < a.GetSize(); i++)
200 {
201 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue());
202 }
203 }
204
205
206 TEST(MultiThreading, CommandProcessor)
207 {
208 ThreadedCommandProcessor p(4);
209
210 std::set<int> s;
211
212 for (size_t i = 0; i < 100; i++)
213 {
214 p.Post(new DynamicInteger(i * 2, s));
215 }
216
217 p.Join();
218
219 for (size_t i = 0; i < 200; i++)
220 {
221 if (i % 2)
222 ASSERT_TRUE(s.find(i) == s.end());
223 else
224 ASSERT_TRUE(s.find(i) != s.end());
225 }
226 }
227
228
229 TEST(MultiThreading, Mutex)
230 {
231 Mutex mutex;
232 Locker locker(mutex);
233 }
234
235
236 TEST(MultiThreading, ReaderWriterLock)
237 {
238 ReaderWriterLock lock;
239
240 {
241 Locker locker1(lock.ForReader());
242 Locker locker2(lock.ForReader());
243 }
244
245 {
246 Locker locker3(lock.ForWriter());
247 }
248 }
249
250
251
252 #include "../OrthancServer/DicomProtocol/ReusableDicomUserConnection.h"
253
254 TEST(ReusableDicomUserConnection, DISABLED_Basic)
255 {
256 ReusableDicomUserConnection c;
257 c.SetMillisecondsBeforeClose(200);
258 printf("START\n"); fflush(stdout);
259
260 {
261 ReusableDicomUserConnection::Locker lock(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic);
262 lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676281");
263 }
264
265 printf("**\n"); fflush(stdout);
266 Toolbox::USleep(1000000);
267 printf("**\n"); fflush(stdout);
268
269 {
270 ReusableDicomUserConnection::Locker lock(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic);
271 lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676277");
272 }
273
274 Toolbox::ServerBarrier();
275 printf("DONE\n"); fflush(stdout);
276 }
277
278
279
280 class Tutu : public IServerFilter
281 {
282 private:
283 int factor_;
284
285 public:
286 Tutu(int f) : factor_(f)
287 {
288 }
289
290 virtual bool Apply(ListOfStrings& outputs,
291 const ListOfStrings& inputs)
292 {
293 for (ListOfStrings::const_iterator
294 it = inputs.begin(); it != inputs.end(); it++)
295 {
296 int a = boost::lexical_cast<int>(*it);
297 int b = factor_ * a;
298
299 printf("%d * %d = %d\n", a, factor_, b);
300
301 //if (a == 84) { printf("BREAK\n"); return false; }
302
303 outputs.push_back(boost::lexical_cast<std::string>(b));
304 }
305
306 Toolbox::USleep(1000000);
307
308 return true;
309 }
310
311 virtual bool SendOutputsToSink() const
312 {
313 return true;
314 }
315 };
316
317
318 static void Tata(ServerScheduler* s, ServerJob* j, bool* done)
319 {
320 typedef IServerFilter::ListOfStrings ListOfStrings;
321
322 #if 1
323 while (!(*done))
324 {
325 ListOfStrings l;
326 s->GetListOfJobs(l);
327 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
328 printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i));
329 Toolbox::USleep(100000);
330 }
331 #else
332 ListOfStrings l;
333 s->GetListOfJobs(l);
334 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
335 printf(">> %s\n", i->c_str());
336 Toolbox::USleep(1500000);
337 s->Cancel(*j);
338 Toolbox::USleep(1000000);
339 s->GetListOfJobs(l);
340 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
341 printf(">> %s\n", i->c_str());
342 #endif
343 }
344
345
346 TEST(Toto, Toto)
347 {
348 ServerScheduler scheduler;
349
350 ServerJob job;
351 ServerFilterInstance& f2 = job.AddFilter(new Tutu(2));
352 ServerFilterInstance& f3 = job.AddFilter(new Tutu(3));
353 ServerFilterInstance& f4 = job.AddFilter(new Tutu(4));
354 ServerFilterInstance& f5 = job.AddFilter(new Tutu(5));
355 f2.AddInput(boost::lexical_cast<std::string>(42));
356 //f3.AddInput(boost::lexical_cast<std::string>(42));
357 //f4.AddInput(boost::lexical_cast<std::string>(42));
358 f2.ConnectNext(f3);
359 f3.ConnectNext(f4);
360 f4.ConnectNext(f5);
361
362 job.SetDescription("tutu");
363
364 bool done = false;
365 boost::thread t(Tata, &scheduler, &job, &done);
366
367
368 //scheduler.Submit(job);
369
370 IServerFilter::ListOfStrings l;
371 scheduler.SubmitAndWait(l, job);
372
373 for (IServerFilter::ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
374 {
375 printf("** %s\n", i->c_str());
376 }
377
378 //Toolbox::ServerBarrier();
379 //Toolbox::USleep(3000000);
380
381 done = true;
382 t.join();
383 }