Mercurial > hg > orthanc
annotate UnitTestsSources/MultiThreading.cpp @ 775:d3ba35466225 lua-scripting
integration mainline -> lua-scripting
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 30 Apr 2014 15:35:10 +0200 |
parents | a64ca424e0e2 3f946e5c3802 |
children | 9ae0bb3f188b |
rev | line source |
---|---|
723 | 1 #include "gtest/gtest.h" |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
2 #include <glog/logging.h> |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
3 |
723 | 4 #include "../Core/OrthancException.h" |
5 #include "../Core/Toolbox.h" | |
6 #include "../Core/MultiThreading/ArrayFilledByThreads.h" | |
760 | 7 #include "../Core/MultiThreading/Locker.h" |
8 #include "../Core/MultiThreading/Mutex.h" | |
9 #include "../Core/MultiThreading/ReaderWriterLock.h" | |
723 | 10 #include "../Core/MultiThreading/ThreadedCommandProcessor.h" |
11 | |
12 using namespace Orthanc; | |
13 | |
14 namespace | |
15 { | |
16 class DynamicInteger : public ICommand | |
17 { | |
18 private: | |
19 int value_; | |
20 std::set<int>& target_; | |
21 | |
22 public: | |
23 DynamicInteger(int value, std::set<int>& target) : | |
24 value_(value), target_(target) | |
25 { | |
26 } | |
27 | |
28 int GetValue() const | |
29 { | |
30 return value_; | |
31 } | |
32 | |
33 virtual bool Execute() | |
34 { | |
35 static boost::mutex mutex; | |
36 boost::mutex::scoped_lock lock(mutex); | |
37 target_.insert(value_); | |
38 return true; | |
39 } | |
40 }; | |
41 | |
42 class MyFiller : public ArrayFilledByThreads::IFiller | |
43 { | |
44 private: | |
45 int size_; | |
46 unsigned int created_; | |
47 std::set<int> set_; | |
48 | |
49 public: | |
50 MyFiller(int size) : size_(size), created_(0) | |
51 { | |
52 } | |
53 | |
54 virtual size_t GetFillerSize() | |
55 { | |
56 return size_; | |
57 } | |
58 | |
59 virtual IDynamicObject* GetFillerItem(size_t index) | |
60 { | |
61 static boost::mutex mutex; | |
62 boost::mutex::scoped_lock lock(mutex); | |
63 created_++; | |
64 return new DynamicInteger(index * 2, set_); | |
65 } | |
66 | |
67 unsigned int GetCreatedCount() const | |
68 { | |
69 return created_; | |
70 } | |
71 | |
72 std::set<int> GetSet() | |
73 { | |
74 return set_; | |
75 } | |
76 }; | |
77 } | |
78 | |
79 | |
80 | |
81 | |
82 TEST(MultiThreading, SharedMessageQueueBasic) | |
83 { | |
84 std::set<int> s; | |
85 | |
86 SharedMessageQueue q; | |
87 ASSERT_TRUE(q.WaitEmpty(0)); | |
88 q.Enqueue(new DynamicInteger(10, s)); | |
89 ASSERT_FALSE(q.WaitEmpty(1)); | |
90 q.Enqueue(new DynamicInteger(20, s)); | |
91 q.Enqueue(new DynamicInteger(30, s)); | |
92 q.Enqueue(new DynamicInteger(40, s)); | |
93 | |
94 std::auto_ptr<DynamicInteger> i; | |
95 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue()); | |
96 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue()); | |
97 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue()); | |
98 ASSERT_FALSE(q.WaitEmpty(1)); | |
99 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue()); | |
100 ASSERT_TRUE(q.WaitEmpty(0)); | |
101 ASSERT_EQ(NULL, q.Dequeue(1)); | |
102 } | |
103 | |
104 | |
105 TEST(MultiThreading, SharedMessageQueueClean) | |
106 { | |
107 std::set<int> s; | |
108 | |
109 try | |
110 { | |
111 SharedMessageQueue q; | |
112 q.Enqueue(new DynamicInteger(10, s)); | |
113 q.Enqueue(new DynamicInteger(20, s)); | |
114 throw OrthancException("Nope"); | |
115 } | |
116 catch (OrthancException&) | |
117 { | |
118 } | |
119 } | |
120 | |
121 | |
122 TEST(MultiThreading, ArrayFilledByThreadEmpty) | |
123 { | |
124 MyFiller f(0); | |
125 ArrayFilledByThreads a(f); | |
126 a.SetThreadCount(1); | |
127 ASSERT_EQ(0, a.GetSize()); | |
128 } | |
129 | |
130 | |
131 TEST(MultiThreading, ArrayFilledByThread1) | |
132 { | |
133 MyFiller f(100); | |
134 ArrayFilledByThreads a(f); | |
135 a.SetThreadCount(1); | |
136 ASSERT_EQ(100, a.GetSize()); | |
137 for (size_t i = 0; i < a.GetSize(); i++) | |
138 { | |
139 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue()); | |
140 } | |
141 } | |
142 | |
143 | |
144 TEST(MultiThreading, ArrayFilledByThread4) | |
145 { | |
146 MyFiller f(100); | |
147 ArrayFilledByThreads a(f); | |
148 a.SetThreadCount(4); | |
149 ASSERT_EQ(100, a.GetSize()); | |
150 for (size_t i = 0; i < a.GetSize(); i++) | |
151 { | |
152 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue()); | |
153 } | |
154 | |
155 ASSERT_EQ(100u, f.GetCreatedCount()); | |
156 | |
157 a.Invalidate(); | |
158 | |
159 ASSERT_EQ(100, a.GetSize()); | |
160 ASSERT_EQ(200u, f.GetCreatedCount()); | |
161 ASSERT_EQ(4u, a.GetThreadCount()); | |
162 ASSERT_TRUE(f.GetSet().empty()); | |
163 | |
164 for (size_t i = 0; i < a.GetSize(); i++) | |
165 { | |
166 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue()); | |
167 } | |
168 } | |
169 | |
170 | |
171 TEST(MultiThreading, CommandProcessor) | |
172 { | |
173 ThreadedCommandProcessor p(4); | |
174 | |
175 std::set<int> s; | |
176 | |
177 for (size_t i = 0; i < 100; i++) | |
178 { | |
179 p.Post(new DynamicInteger(i * 2, s)); | |
180 } | |
181 | |
182 p.Join(); | |
183 | |
184 for (size_t i = 0; i < 200; i++) | |
185 { | |
186 if (i % 2) | |
187 ASSERT_TRUE(s.find(i) == s.end()); | |
188 else | |
189 ASSERT_TRUE(s.find(i) != s.end()); | |
190 } | |
191 } | |
760 | 192 |
193 | |
194 TEST(MultiThreading, Mutex) | |
195 { | |
196 Mutex mutex; | |
197 Locker locker(mutex); | |
198 } | |
199 | |
200 | |
201 TEST(MultiThreading, ReaderWriterLock) | |
202 { | |
203 ReaderWriterLock lock; | |
204 | |
205 { | |
206 Locker locker1(lock.ForReader()); | |
207 Locker locker2(lock.ForReader()); | |
208 } | |
209 | |
210 { | |
211 Locker locker3(lock.ForWriter()); | |
212 } | |
213 } | |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
214 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
215 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
216 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
217 #include "../OrthancServer/DicomProtocol/ReusableDicomUserConnection.h" |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
218 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
219 TEST(ReusableDicomUserConnection, DISABLED_Basic) |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
220 { |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
221 ReusableDicomUserConnection c; |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
222 c.SetMillisecondsBeforeClose(200); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
223 printf("START\n"); fflush(stdout); |
775
d3ba35466225
integration mainline -> lua-scripting
Sebastien Jodogne <s.jodogne@gmail.com>
diff
changeset
|
224 |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
225 { |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
226 ReusableDicomUserConnection::Connection cc(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
227 cc.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676281"); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
228 } |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
229 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
230 printf("**\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
231 Toolbox::USleep(1000000); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
232 printf("**\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
233 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
234 { |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
235 ReusableDicomUserConnection::Connection cc(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
236 cc.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676277"); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
237 } |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
238 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
239 Toolbox::ServerBarrier(); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
240 printf("DONE\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
241 } |
765 | 242 |
243 | |
244 | |
245 #include "../Core/ICommand.h" | |
246 #include "../Core/Toolbox.h" | |
247 #include "../Core/Uuid.h" | |
248 #include "../Core/MultiThreading/SharedMessageQueue.h" | |
249 #include <boost/lexical_cast.hpp> | |
250 | |
251 | |
252 namespace Orthanc | |
253 { | |
254 typedef std::list<std::string> ListOfStrings; | |
255 | |
256 class IServerFilter | |
257 { | |
258 public: | |
259 virtual ~IServerFilter() | |
260 { | |
261 } | |
262 | |
263 virtual bool Apply(ListOfStrings& outputs, | |
264 const ListOfStrings& inputs) = 0; | |
265 }; | |
266 | |
267 | |
268 class Sink : public IServerFilter | |
269 { | |
270 private: | |
271 ListOfStrings& target_; | |
272 | |
273 public: | |
274 Sink(ListOfStrings& target) : target_(target) | |
275 { | |
276 } | |
277 | |
278 virtual bool Apply(ListOfStrings& outputs, | |
279 const ListOfStrings& inputs) | |
280 { | |
281 for (ListOfStrings::const_iterator | |
282 it = inputs.begin(); it != inputs.end(); it++) | |
283 { | |
284 target_.push_back(*it); | |
285 } | |
286 | |
287 return true; | |
288 } | |
289 }; | |
290 | |
291 | |
292 | |
293 class IServerFilterListener | |
294 { | |
295 public: | |
296 virtual ~IServerFilterListener() | |
297 { | |
298 } | |
299 | |
300 virtual void SignalSuccess(const std::string& jobId) = 0; | |
301 | |
302 virtual void SignalFailure(const std::string& jobId) = 0; | |
303 }; | |
304 | |
305 | |
306 class FilterWrapper : public IDynamicObject | |
307 { | |
768 | 308 friend class ServerScheduler; |
309 | |
765 | 310 private: |
311 IServerFilter *filter_; | |
312 std::string jobId_; | |
313 ListOfStrings inputs_; | |
314 std::list<FilterWrapper*> next_; | |
315 | |
768 | 316 bool Execute(IServerFilterListener& listener) |
317 { | |
318 ListOfStrings outputs; | |
319 if (!filter_->Apply(outputs, inputs_)) | |
320 { | |
321 listener.SignalFailure(jobId_); | |
322 return true; | |
323 } | |
324 | |
325 for (std::list<FilterWrapper*>::iterator | |
326 it = next_.begin(); it != next_.end(); it++) | |
327 { | |
328 for (ListOfStrings::const_iterator | |
329 output = outputs.begin(); output != outputs.end(); output++) | |
330 { | |
331 (*it)->AddInput(*output); | |
332 } | |
333 } | |
334 | |
335 listener.SignalSuccess(jobId_); | |
336 return true; | |
337 } | |
338 | |
339 | |
765 | 340 public: |
341 FilterWrapper(IServerFilter *filter, | |
342 const std::string& jobId) : | |
343 filter_(filter), | |
344 jobId_(jobId) | |
345 { | |
346 if (filter_ == NULL) | |
347 { | |
348 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
349 } | |
350 } | |
351 | |
352 virtual ~FilterWrapper() | |
353 { | |
354 if (filter_ != NULL) | |
355 { | |
356 delete filter_; | |
357 } | |
358 } | |
359 | |
360 const std::string& GetJobId() const | |
361 { | |
362 return jobId_; | |
363 } | |
364 | |
365 void AddInput(const std::string& input) | |
366 { | |
367 inputs_.push_back(input); | |
368 } | |
369 | |
370 void ConnectNext(FilterWrapper& filter) | |
371 { | |
372 next_.push_back(&filter); | |
373 } | |
374 | |
375 const std::list<FilterWrapper*>& GetNextFilters() const | |
376 { | |
377 return next_; | |
378 } | |
379 }; | |
380 | |
381 | |
382 class ServerJob | |
383 { | |
384 friend class ServerScheduler; | |
385 | |
386 private: | |
387 std::list<FilterWrapper*> filters_; | |
388 std::string jobId_; | |
389 bool submitted_; | |
390 std::string description_; | |
391 | |
392 | |
393 void CheckOrdering() | |
394 { | |
395 std::map<FilterWrapper*, unsigned int> index; | |
396 | |
397 unsigned int count = 0; | |
398 for (std::list<FilterWrapper*>::const_iterator | |
399 it = filters_.begin(); it != filters_.end(); it++) | |
400 { | |
401 index[*it] = count++; | |
402 } | |
403 | |
404 for (std::list<FilterWrapper*>::const_iterator | |
405 it = filters_.begin(); it != filters_.end(); it++) | |
406 { | |
407 const std::list<FilterWrapper*>& nextFilters = (*it)->GetNextFilters(); | |
408 | |
409 for (std::list<FilterWrapper*>::const_iterator | |
410 next = nextFilters.begin(); next != nextFilters.end(); next++) | |
411 { | |
412 if (index.find(*next) == index.end() || | |
413 index[*next] <= index[*it]) | |
414 { | |
415 // You must reorder your calls to "ServerJob::AddFilter" | |
416 throw OrthancException("Bad ordering of filters in a job"); | |
417 } | |
418 } | |
419 } | |
420 } | |
421 | |
422 | |
423 size_t Submit(SharedMessageQueue& target, | |
424 IServerFilterListener& listener) | |
425 { | |
426 if (submitted_) | |
427 { | |
428 // This job has already been submitted | |
429 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
430 } | |
431 | |
432 CheckOrdering(); | |
433 | |
434 size_t size = filters_.size(); | |
435 | |
436 for (std::list<FilterWrapper*>::iterator | |
437 it = filters_.begin(); it != filters_.end(); it++) | |
438 { | |
439 target.Enqueue(*it); | |
440 } | |
441 | |
442 filters_.clear(); | |
443 submitted_ = true; | |
444 | |
445 return size; | |
446 } | |
447 | |
448 public: | |
449 ServerJob() | |
450 { | |
451 jobId_ = Toolbox::GenerateUuid(); | |
452 submitted_ = false; | |
453 description_ = "no description"; | |
454 } | |
455 | |
456 ~ServerJob() | |
457 { | |
458 for (std::list<FilterWrapper*>::iterator | |
459 it = filters_.begin(); it != filters_.end(); it++) | |
460 { | |
461 delete *it; | |
462 } | |
463 } | |
464 | |
465 const std::string& GetId() const | |
466 { | |
467 return jobId_; | |
468 } | |
469 | |
470 void SetDescription(const char* description) | |
471 { | |
472 description_ = description; | |
473 } | |
474 | |
475 const std::string& GetDescription() const | |
476 { | |
477 return description_; | |
478 } | |
479 | |
480 FilterWrapper& AddFilter(IServerFilter* filter) | |
481 { | |
482 if (submitted_) | |
483 { | |
484 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
485 } | |
486 | |
487 filters_.push_back(new FilterWrapper(filter, jobId_)); | |
488 | |
489 return *filters_.back(); | |
490 } | |
491 }; | |
492 | |
493 | |
494 class ServerScheduler : public IServerFilterListener | |
495 { | |
496 private: | |
497 struct JobInfo | |
498 { | |
499 bool watched_; | |
500 bool cancel_; | |
501 size_t size_; | |
502 size_t success_; | |
503 size_t failures_; | |
504 std::string description_; | |
505 }; | |
506 | |
768 | 507 enum JobStatus |
508 { | |
509 JobStatus_Running = 1, | |
510 JobStatus_Success = 2, | |
511 JobStatus_Failure = 3 | |
512 }; | |
513 | |
765 | 514 typedef std::map<std::string, JobInfo> Jobs; |
515 | |
516 boost::mutex mutex_; | |
517 boost::condition_variable jobFinished_; | |
518 Jobs jobs_; | |
519 SharedMessageQueue queue_; | |
520 bool finish_; | |
521 boost::thread worker_; | |
768 | 522 std::map<std::string, JobStatus> watchedJobStatus_; |
765 | 523 |
524 JobInfo& GetJobInfo(const std::string& jobId) | |
525 { | |
526 Jobs::iterator info = jobs_.find(jobId); | |
527 | |
528 if (info == jobs_.end()) | |
529 { | |
530 throw OrthancException(ErrorCode_InternalError); | |
531 } | |
532 | |
533 return info->second; | |
534 } | |
535 | |
536 virtual void SignalSuccess(const std::string& jobId) | |
537 { | |
538 boost::mutex::scoped_lock lock(mutex_); | |
539 | |
540 JobInfo& info = GetJobInfo(jobId); | |
541 info.success_++; | |
542 | |
543 assert(info.failures_ == 0); | |
544 | |
545 if (info.success_ >= info.size_) | |
546 { | |
547 if (info.watched_) | |
548 { | |
768 | 549 watchedJobStatus_[jobId] = JobStatus_Success; |
765 | 550 jobFinished_.notify_all(); |
551 } | |
552 | |
553 LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; | |
554 jobs_.erase(jobId); | |
555 } | |
556 } | |
557 | |
558 virtual void SignalFailure(const std::string& jobId) | |
559 { | |
560 boost::mutex::scoped_lock lock(mutex_); | |
561 | |
562 JobInfo& info = GetJobInfo(jobId); | |
563 info.failures_++; | |
564 | |
565 if (info.success_ + info.failures_ >= info.size_) | |
566 { | |
567 if (info.watched_) | |
568 { | |
768 | 569 watchedJobStatus_[jobId] = JobStatus_Failure; |
765 | 570 jobFinished_.notify_all(); |
571 } | |
572 | |
573 LOG(ERROR) << "Job has failed (" << info.description_ << ")"; | |
574 jobs_.erase(jobId); | |
575 } | |
576 } | |
577 | |
578 static void Worker(ServerScheduler* that) | |
579 { | |
580 static const int32_t TIMEOUT = 100; | |
581 | |
582 while (!that->finish_) | |
583 { | |
584 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT)); | |
585 if (object.get() != NULL) | |
586 { | |
587 FilterWrapper& filter = dynamic_cast<FilterWrapper&>(*object); | |
588 | |
589 // Skip the execution of this filter if its parent job has | |
590 // previously failed. | |
591 bool jobHasFailed; | |
592 { | |
593 boost::mutex::scoped_lock lock(that->mutex_); | |
594 JobInfo& info = that->GetJobInfo(filter.GetJobId()); | |
595 jobHasFailed = (info.failures_ > 0 || info.cancel_); | |
596 } | |
597 | |
598 if (jobHasFailed) | |
599 { | |
600 that->SignalFailure(filter.GetJobId()); | |
601 } | |
602 else | |
603 { | |
768 | 604 filter.Execute(*that); |
765 | 605 } |
606 } | |
607 } | |
608 } | |
609 | |
610 void SubmitInternal(ServerJob& job, | |
611 bool watched) | |
612 { | |
613 boost::mutex::scoped_lock lock(mutex_); | |
614 | |
615 JobInfo info; | |
616 info.size_ = job.Submit(queue_, *this); | |
617 info.cancel_ = false; | |
618 info.success_ = 0; | |
619 info.failures_ = 0; | |
620 info.description_ = job.GetDescription(); | |
621 info.watched_ = watched; | |
622 | |
623 assert(info.size_ > 0); | |
624 | |
625 if (watched) | |
626 { | |
768 | 627 watchedJobStatus_[job.GetId()] = JobStatus_Running; |
765 | 628 } |
629 | |
630 jobs_[job.GetId()] = info; | |
631 | |
632 LOG(INFO) << "New job submitted (" << job.description_ << ")"; | |
633 } | |
634 | |
635 public: | |
636 ServerScheduler() | |
637 { | |
638 finish_ = false; | |
639 worker_ = boost::thread(Worker, this); | |
640 } | |
641 | |
642 ~ServerScheduler() | |
643 { | |
644 finish_ = true; | |
645 worker_.join(); | |
646 } | |
647 | |
648 void Submit(ServerJob& job) | |
649 { | |
650 if (job.filters_.empty()) | |
651 { | |
652 return; | |
653 } | |
654 | |
655 SubmitInternal(job, false); | |
656 } | |
657 | |
658 bool SubmitAndWait(ListOfStrings& outputs, | |
659 ServerJob& job) | |
660 { | |
661 std::string jobId = job.GetId(); | |
662 | |
663 outputs.clear(); | |
664 | |
665 if (job.filters_.empty()) | |
666 { | |
667 return true; | |
668 } | |
669 | |
670 // Add a sink filter to collect all the results of the filters | |
671 // that have no next filter. | |
672 FilterWrapper& sink = job.AddFilter(new Sink(outputs)); | |
673 | |
674 for (std::list<FilterWrapper*>::iterator | |
675 it = job.filters_.begin(); it != job.filters_.end(); it++) | |
676 { | |
677 if ((*it) != &sink && | |
678 (*it)->GetNextFilters().size() == 0) | |
679 { | |
680 (*it)->ConnectNext(sink); | |
681 } | |
682 } | |
683 | |
684 // Submit the job | |
685 SubmitInternal(job, true); | |
686 | |
687 // Wait for the job to complete (either success or failure) | |
768 | 688 JobStatus status; |
765 | 689 |
690 { | |
691 boost::mutex::scoped_lock lock(mutex_); | |
768 | 692 |
693 assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end()); | |
765 | 694 |
768 | 695 while (watchedJobStatus_[jobId] == JobStatus_Running) |
765 | 696 { |
697 jobFinished_.wait(lock); | |
698 } | |
699 | |
700 status = watchedJobStatus_[jobId]; | |
701 watchedJobStatus_.erase(jobId); | |
702 } | |
703 | |
768 | 704 return (status == JobStatus_Success); |
765 | 705 } |
706 | |
707 | |
708 bool IsRunning(const std::string& jobId) | |
709 { | |
710 boost::mutex::scoped_lock lock(mutex_); | |
711 return jobs_.find(jobId) != jobs_.end(); | |
712 } | |
713 | |
714 | |
715 void Cancel(const std::string& jobId) | |
716 { | |
717 boost::mutex::scoped_lock lock(mutex_); | |
718 | |
719 Jobs::iterator job = jobs_.find(jobId); | |
720 | |
721 if (job != jobs_.end()) | |
722 { | |
723 job->second.cancel_ = true; | |
724 LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")"; | |
725 } | |
726 } | |
727 | |
728 | |
729 // Returns a number between 0 and 1 | |
730 float GetProgress(const std::string& jobId) | |
731 { | |
732 boost::mutex::scoped_lock lock(mutex_); | |
733 | |
734 Jobs::iterator job = jobs_.find(jobId); | |
735 | |
736 if (job == jobs_.end() || | |
737 job->second.size_ == 0 /* should never happen */) | |
738 { | |
739 // This job is not running | |
740 return 1; | |
741 } | |
742 | |
768 | 743 if (job->second.failures_ != 0) |
744 { | |
745 return 1; | |
746 } | |
747 | |
748 if (job->second.size_ == 1) | |
749 { | |
750 return job->second.success_; | |
751 } | |
752 | |
753 return (static_cast<float>(job->second.success_) / | |
754 static_cast<float>(job->second.size_ - 1)); | |
765 | 755 } |
756 | |
757 bool IsRunning(const ServerJob& job) | |
758 { | |
759 return IsRunning(job.GetId()); | |
760 } | |
761 | |
762 void Cancel(const ServerJob& job) | |
763 { | |
764 Cancel(job.GetId()); | |
765 } | |
766 | |
767 float GetProgress(const ServerJob& job) | |
768 { | |
769 return GetProgress(job); | |
770 } | |
768 | 771 |
772 void GetListOfJobs(ListOfStrings& jobs) | |
773 { | |
774 boost::mutex::scoped_lock lock(mutex_); | |
775 | |
776 jobs.clear(); | |
777 | |
778 for (Jobs::const_iterator | |
779 it = jobs_.begin(); it != jobs_.end(); it++) | |
780 { | |
781 jobs.push_back(it->first); | |
782 } | |
783 } | |
765 | 784 }; |
785 | |
786 } | |
787 | |
788 | |
789 | |
790 class Tutu : public IServerFilter | |
791 { | |
792 private: | |
793 int factor_; | |
794 | |
795 public: | |
796 Tutu(int f) : factor_(f) | |
797 { | |
798 } | |
799 | |
800 virtual bool Apply(ListOfStrings& outputs, | |
801 const ListOfStrings& inputs) | |
802 { | |
803 for (ListOfStrings::const_iterator | |
804 it = inputs.begin(); it != inputs.end(); it++) | |
805 { | |
806 int a = boost::lexical_cast<int>(*it); | |
807 int b = factor_ * a; | |
808 | |
809 printf("%d * %d = %d\n", a, factor_, b); | |
810 | |
811 //if (a == 84) { printf("BREAK\n"); return false; } | |
812 | |
813 outputs.push_back(boost::lexical_cast<std::string>(b)); | |
814 } | |
815 | |
816 Toolbox::USleep(1000000); | |
817 | |
818 return true; | |
819 } | |
820 }; | |
821 | |
768 | 822 |
770 | 823 static void Tata(ServerScheduler* s, ServerJob* j, bool* done) |
768 | 824 { |
825 #if 1 | |
770 | 826 while (!(*done)) |
768 | 827 { |
828 ListOfStrings l; | |
829 s->GetListOfJobs(l); | |
830 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
831 printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i)); | |
832 Toolbox::USleep(100000); | |
833 } | |
834 #else | |
835 ListOfStrings l; | |
836 s->GetListOfJobs(l); | |
837 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
838 printf(">> %s\n", i->c_str()); | |
839 Toolbox::USleep(1500000); | |
840 s->Cancel(*j); | |
841 Toolbox::USleep(1000000); | |
842 s->GetListOfJobs(l); | |
843 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
844 printf(">> %s\n", i->c_str()); | |
845 #endif | |
846 } | |
847 | |
848 | |
765 | 849 TEST(Toto, Toto) |
850 { | |
851 ServerScheduler scheduler; | |
852 | |
853 ServerJob job; | |
854 FilterWrapper& f2 = job.AddFilter(new Tutu(2)); | |
855 FilterWrapper& f3 = job.AddFilter(new Tutu(3)); | |
856 FilterWrapper& f4 = job.AddFilter(new Tutu(4)); | |
768 | 857 FilterWrapper& f5 = job.AddFilter(new Tutu(5)); |
765 | 858 f2.AddInput(boost::lexical_cast<std::string>(42)); |
859 //f3.AddInput(boost::lexical_cast<std::string>(42)); | |
860 //f4.AddInput(boost::lexical_cast<std::string>(42)); | |
861 f2.ConnectNext(f3); | |
862 f3.ConnectNext(f4); | |
768 | 863 f4.ConnectNext(f5); |
765 | 864 |
865 job.SetDescription("tutu"); | |
866 | |
770 | 867 bool done = false; |
868 boost::thread t(Tata, &scheduler, &job, &done); | |
768 | 869 |
870 | |
765 | 871 //scheduler.Submit(job); |
872 | |
873 ListOfStrings l; | |
874 scheduler.SubmitAndWait(l, job); | |
875 | |
876 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
877 { | |
878 printf("** %s\n", i->c_str()); | |
879 } | |
880 | |
881 //Toolbox::ServerBarrier(); | |
768 | 882 //Toolbox::USleep(3000000); |
883 | |
770 | 884 done = true; |
885 t.join(); | |
765 | 886 } |