Mercurial > hg > orthanc
annotate UnitTestsSources/MultiThreading.cpp @ 779:76eb563f08f0 lua-scripting
improvements
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 30 Apr 2014 18:10:16 +0200 |
parents | 9ae0bb3f188b |
children | f0ac3a53ccf2 |
rev | line source |
---|---|
723 | 1 #include "gtest/gtest.h" |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
2 #include <glog/logging.h> |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
3 |
723 | 4 #include "../Core/OrthancException.h" |
5 #include "../Core/Toolbox.h" | |
6 #include "../Core/MultiThreading/ArrayFilledByThreads.h" | |
760 | 7 #include "../Core/MultiThreading/Locker.h" |
8 #include "../Core/MultiThreading/Mutex.h" | |
9 #include "../Core/MultiThreading/ReaderWriterLock.h" | |
723 | 10 #include "../Core/MultiThreading/ThreadedCommandProcessor.h" |
11 | |
12 using namespace Orthanc; | |
13 | |
14 namespace | |
15 { | |
16 class DynamicInteger : public ICommand | |
17 { | |
18 private: | |
19 int value_; | |
20 std::set<int>& target_; | |
21 | |
22 public: | |
23 DynamicInteger(int value, std::set<int>& target) : | |
24 value_(value), target_(target) | |
25 { | |
26 } | |
27 | |
28 int GetValue() const | |
29 { | |
30 return value_; | |
31 } | |
32 | |
33 virtual bool Execute() | |
34 { | |
35 static boost::mutex mutex; | |
36 boost::mutex::scoped_lock lock(mutex); | |
37 target_.insert(value_); | |
38 return true; | |
39 } | |
40 }; | |
41 | |
42 class MyFiller : public ArrayFilledByThreads::IFiller | |
43 { | |
44 private: | |
45 int size_; | |
46 unsigned int created_; | |
47 std::set<int> set_; | |
48 | |
49 public: | |
50 MyFiller(int size) : size_(size), created_(0) | |
51 { | |
52 } | |
53 | |
54 virtual size_t GetFillerSize() | |
55 { | |
56 return size_; | |
57 } | |
58 | |
59 virtual IDynamicObject* GetFillerItem(size_t index) | |
60 { | |
61 static boost::mutex mutex; | |
62 boost::mutex::scoped_lock lock(mutex); | |
63 created_++; | |
64 return new DynamicInteger(index * 2, set_); | |
65 } | |
66 | |
67 unsigned int GetCreatedCount() const | |
68 { | |
69 return created_; | |
70 } | |
71 | |
72 std::set<int> GetSet() | |
73 { | |
74 return set_; | |
75 } | |
76 }; | |
77 } | |
78 | |
79 | |
80 | |
81 | |
82 TEST(MultiThreading, SharedMessageQueueBasic) | |
83 { | |
84 std::set<int> s; | |
85 | |
86 SharedMessageQueue q; | |
87 ASSERT_TRUE(q.WaitEmpty(0)); | |
88 q.Enqueue(new DynamicInteger(10, s)); | |
89 ASSERT_FALSE(q.WaitEmpty(1)); | |
90 q.Enqueue(new DynamicInteger(20, s)); | |
91 q.Enqueue(new DynamicInteger(30, s)); | |
92 q.Enqueue(new DynamicInteger(40, s)); | |
93 | |
94 std::auto_ptr<DynamicInteger> i; | |
95 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue()); | |
96 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue()); | |
97 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue()); | |
98 ASSERT_FALSE(q.WaitEmpty(1)); | |
99 i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue()); | |
100 ASSERT_TRUE(q.WaitEmpty(0)); | |
101 ASSERT_EQ(NULL, q.Dequeue(1)); | |
102 } | |
103 | |
104 | |
105 TEST(MultiThreading, SharedMessageQueueClean) | |
106 { | |
107 std::set<int> s; | |
108 | |
109 try | |
110 { | |
111 SharedMessageQueue q; | |
112 q.Enqueue(new DynamicInteger(10, s)); | |
113 q.Enqueue(new DynamicInteger(20, s)); | |
114 throw OrthancException("Nope"); | |
115 } | |
116 catch (OrthancException&) | |
117 { | |
118 } | |
119 } | |
120 | |
121 | |
122 TEST(MultiThreading, ArrayFilledByThreadEmpty) | |
123 { | |
124 MyFiller f(0); | |
125 ArrayFilledByThreads a(f); | |
126 a.SetThreadCount(1); | |
127 ASSERT_EQ(0, a.GetSize()); | |
128 } | |
129 | |
130 | |
131 TEST(MultiThreading, ArrayFilledByThread1) | |
132 { | |
133 MyFiller f(100); | |
134 ArrayFilledByThreads a(f); | |
135 a.SetThreadCount(1); | |
136 ASSERT_EQ(100, a.GetSize()); | |
137 for (size_t i = 0; i < a.GetSize(); i++) | |
138 { | |
139 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue()); | |
140 } | |
141 } | |
142 | |
143 | |
144 TEST(MultiThreading, ArrayFilledByThread4) | |
145 { | |
146 MyFiller f(100); | |
147 ArrayFilledByThreads a(f); | |
148 a.SetThreadCount(4); | |
149 ASSERT_EQ(100, a.GetSize()); | |
150 for (size_t i = 0; i < a.GetSize(); i++) | |
151 { | |
152 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue()); | |
153 } | |
154 | |
155 ASSERT_EQ(100u, f.GetCreatedCount()); | |
156 | |
157 a.Invalidate(); | |
158 | |
159 ASSERT_EQ(100, a.GetSize()); | |
160 ASSERT_EQ(200u, f.GetCreatedCount()); | |
161 ASSERT_EQ(4u, a.GetThreadCount()); | |
162 ASSERT_TRUE(f.GetSet().empty()); | |
163 | |
164 for (size_t i = 0; i < a.GetSize(); i++) | |
165 { | |
166 ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue()); | |
167 } | |
168 } | |
169 | |
170 | |
171 TEST(MultiThreading, CommandProcessor) | |
172 { | |
173 ThreadedCommandProcessor p(4); | |
174 | |
175 std::set<int> s; | |
176 | |
177 for (size_t i = 0; i < 100; i++) | |
178 { | |
179 p.Post(new DynamicInteger(i * 2, s)); | |
180 } | |
181 | |
182 p.Join(); | |
183 | |
184 for (size_t i = 0; i < 200; i++) | |
185 { | |
186 if (i % 2) | |
187 ASSERT_TRUE(s.find(i) == s.end()); | |
188 else | |
189 ASSERT_TRUE(s.find(i) != s.end()); | |
190 } | |
191 } | |
760 | 192 |
193 | |
194 TEST(MultiThreading, Mutex) | |
195 { | |
196 Mutex mutex; | |
197 Locker locker(mutex); | |
198 } | |
199 | |
200 | |
201 TEST(MultiThreading, ReaderWriterLock) | |
202 { | |
203 ReaderWriterLock lock; | |
204 | |
205 { | |
206 Locker locker1(lock.ForReader()); | |
207 Locker locker2(lock.ForReader()); | |
208 } | |
209 | |
210 { | |
211 Locker locker3(lock.ForWriter()); | |
212 } | |
213 } | |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
214 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
215 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
216 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
217 #include "../OrthancServer/DicomProtocol/ReusableDicomUserConnection.h" |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
218 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
219 TEST(ReusableDicomUserConnection, DISABLED_Basic) |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
220 { |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
221 ReusableDicomUserConnection c; |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
222 c.SetMillisecondsBeforeClose(200); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
223 printf("START\n"); fflush(stdout); |
775
d3ba35466225
integration mainline -> lua-scripting
Sebastien Jodogne <s.jodogne@gmail.com>
diff
changeset
|
224 |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
225 { |
776 | 226 ReusableDicomUserConnection::Locker lock(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic); |
227 lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676281"); | |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
228 } |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
229 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
230 printf("**\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
231 Toolbox::USleep(1000000); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
232 printf("**\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
233 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
234 { |
776 | 235 ReusableDicomUserConnection::Locker lock(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic); |
236 lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676277"); | |
769
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
237 } |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
238 |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
239 Toolbox::ServerBarrier(); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
240 printf("DONE\n"); fflush(stdout); |
3f946e5c3802
ReusableDicomUserConnection
Sebastien Jodogne <s.jodogne@gmail.com>
parents:
760
diff
changeset
|
241 } |
765 | 242 |
243 | |
244 | |
245 #include "../Core/ICommand.h" | |
246 #include "../Core/Toolbox.h" | |
247 #include "../Core/Uuid.h" | |
248 #include "../Core/MultiThreading/SharedMessageQueue.h" | |
249 #include <boost/lexical_cast.hpp> | |
250 | |
251 | |
252 namespace Orthanc | |
253 { | |
254 class IServerFilter | |
255 { | |
256 public: | |
779 | 257 typedef std::list<std::string> ListOfStrings; |
258 | |
765 | 259 virtual ~IServerFilter() |
260 { | |
261 } | |
262 | |
263 virtual bool Apply(ListOfStrings& outputs, | |
264 const ListOfStrings& inputs) = 0; | |
779 | 265 |
266 virtual bool SendOutputsToSink() const = 0; | |
765 | 267 }; |
268 | |
269 | |
270 class Sink : public IServerFilter | |
271 { | |
272 private: | |
273 ListOfStrings& target_; | |
274 | |
275 public: | |
276 Sink(ListOfStrings& target) : target_(target) | |
277 { | |
278 } | |
279 | |
779 | 280 virtual bool SendOutputsToSink() const |
281 { | |
282 return false; | |
283 } | |
284 | |
765 | 285 virtual bool Apply(ListOfStrings& outputs, |
286 const ListOfStrings& inputs) | |
287 { | |
288 for (ListOfStrings::const_iterator | |
289 it = inputs.begin(); it != inputs.end(); it++) | |
290 { | |
291 target_.push_back(*it); | |
292 } | |
293 | |
294 return true; | |
295 } | |
296 }; | |
297 | |
298 | |
299 | |
300 class IServerFilterListener | |
301 { | |
302 public: | |
303 virtual ~IServerFilterListener() | |
304 { | |
305 } | |
306 | |
307 virtual void SignalSuccess(const std::string& jobId) = 0; | |
308 | |
309 virtual void SignalFailure(const std::string& jobId) = 0; | |
310 }; | |
311 | |
312 | |
779 | 313 class ServerFilterInstance : public IDynamicObject |
765 | 314 { |
768 | 315 friend class ServerScheduler; |
316 | |
765 | 317 private: |
779 | 318 typedef IServerFilter::ListOfStrings ListOfStrings; |
319 | |
765 | 320 IServerFilter *filter_; |
321 std::string jobId_; | |
322 ListOfStrings inputs_; | |
779 | 323 std::list<ServerFilterInstance*> next_; |
765 | 324 |
768 | 325 bool Execute(IServerFilterListener& listener) |
326 { | |
327 ListOfStrings outputs; | |
328 if (!filter_->Apply(outputs, inputs_)) | |
329 { | |
330 listener.SignalFailure(jobId_); | |
331 return true; | |
332 } | |
333 | |
779 | 334 for (std::list<ServerFilterInstance*>::iterator |
768 | 335 it = next_.begin(); it != next_.end(); it++) |
336 { | |
337 for (ListOfStrings::const_iterator | |
338 output = outputs.begin(); output != outputs.end(); output++) | |
339 { | |
340 (*it)->AddInput(*output); | |
341 } | |
342 } | |
343 | |
344 listener.SignalSuccess(jobId_); | |
345 return true; | |
346 } | |
347 | |
348 | |
765 | 349 public: |
779 | 350 ServerFilterInstance(IServerFilter *filter, |
351 const std::string& jobId) : | |
765 | 352 filter_(filter), |
353 jobId_(jobId) | |
354 { | |
355 if (filter_ == NULL) | |
356 { | |
357 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
358 } | |
359 } | |
360 | |
779 | 361 virtual ~ServerFilterInstance() |
765 | 362 { |
363 if (filter_ != NULL) | |
364 { | |
365 delete filter_; | |
366 } | |
367 } | |
368 | |
369 const std::string& GetJobId() const | |
370 { | |
371 return jobId_; | |
372 } | |
373 | |
374 void AddInput(const std::string& input) | |
375 { | |
376 inputs_.push_back(input); | |
377 } | |
378 | |
779 | 379 void ConnectNext(ServerFilterInstance& filter) |
765 | 380 { |
381 next_.push_back(&filter); | |
382 } | |
383 | |
779 | 384 const std::list<ServerFilterInstance*>& GetNextFilters() const |
765 | 385 { |
386 return next_; | |
387 } | |
779 | 388 |
389 IServerFilter& GetFilter() const | |
390 { | |
391 return *filter_; | |
392 } | |
765 | 393 }; |
394 | |
395 | |
396 class ServerJob | |
397 { | |
398 friend class ServerScheduler; | |
399 | |
400 private: | |
779 | 401 std::list<ServerFilterInstance*> filters_; |
765 | 402 std::string jobId_; |
403 bool submitted_; | |
404 std::string description_; | |
405 | |
406 | |
407 void CheckOrdering() | |
408 { | |
779 | 409 std::map<ServerFilterInstance*, unsigned int> index; |
765 | 410 |
411 unsigned int count = 0; | |
779 | 412 for (std::list<ServerFilterInstance*>::const_iterator |
765 | 413 it = filters_.begin(); it != filters_.end(); it++) |
414 { | |
415 index[*it] = count++; | |
416 } | |
417 | |
779 | 418 for (std::list<ServerFilterInstance*>::const_iterator |
765 | 419 it = filters_.begin(); it != filters_.end(); it++) |
420 { | |
779 | 421 const std::list<ServerFilterInstance*>& nextFilters = (*it)->GetNextFilters(); |
765 | 422 |
779 | 423 for (std::list<ServerFilterInstance*>::const_iterator |
765 | 424 next = nextFilters.begin(); next != nextFilters.end(); next++) |
425 { | |
426 if (index.find(*next) == index.end() || | |
427 index[*next] <= index[*it]) | |
428 { | |
429 // You must reorder your calls to "ServerJob::AddFilter" | |
430 throw OrthancException("Bad ordering of filters in a job"); | |
431 } | |
432 } | |
433 } | |
434 } | |
435 | |
436 | |
437 size_t Submit(SharedMessageQueue& target, | |
438 IServerFilterListener& listener) | |
439 { | |
440 if (submitted_) | |
441 { | |
442 // This job has already been submitted | |
443 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
444 } | |
445 | |
446 CheckOrdering(); | |
447 | |
448 size_t size = filters_.size(); | |
449 | |
779 | 450 for (std::list<ServerFilterInstance*>::iterator |
765 | 451 it = filters_.begin(); it != filters_.end(); it++) |
452 { | |
453 target.Enqueue(*it); | |
454 } | |
455 | |
456 filters_.clear(); | |
457 submitted_ = true; | |
458 | |
459 return size; | |
460 } | |
461 | |
462 public: | |
463 ServerJob() | |
464 { | |
465 jobId_ = Toolbox::GenerateUuid(); | |
466 submitted_ = false; | |
467 description_ = "no description"; | |
468 } | |
469 | |
470 ~ServerJob() | |
471 { | |
779 | 472 for (std::list<ServerFilterInstance*>::iterator |
765 | 473 it = filters_.begin(); it != filters_.end(); it++) |
474 { | |
475 delete *it; | |
476 } | |
477 } | |
478 | |
479 const std::string& GetId() const | |
480 { | |
481 return jobId_; | |
482 } | |
483 | |
484 void SetDescription(const char* description) | |
485 { | |
486 description_ = description; | |
487 } | |
488 | |
489 const std::string& GetDescription() const | |
490 { | |
491 return description_; | |
492 } | |
493 | |
779 | 494 ServerFilterInstance& AddFilter(IServerFilter* filter) |
765 | 495 { |
496 if (submitted_) | |
497 { | |
498 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
499 } | |
500 | |
779 | 501 filters_.push_back(new ServerFilterInstance(filter, jobId_)); |
765 | 502 |
503 return *filters_.back(); | |
504 } | |
505 }; | |
506 | |
507 | |
508 class ServerScheduler : public IServerFilterListener | |
509 { | |
510 private: | |
511 struct JobInfo | |
512 { | |
513 bool watched_; | |
514 bool cancel_; | |
515 size_t size_; | |
516 size_t success_; | |
517 size_t failures_; | |
518 std::string description_; | |
519 }; | |
520 | |
768 | 521 enum JobStatus |
522 { | |
523 JobStatus_Running = 1, | |
524 JobStatus_Success = 2, | |
525 JobStatus_Failure = 3 | |
526 }; | |
527 | |
779 | 528 typedef IServerFilter::ListOfStrings ListOfStrings; |
765 | 529 typedef std::map<std::string, JobInfo> Jobs; |
530 | |
531 boost::mutex mutex_; | |
532 boost::condition_variable jobFinished_; | |
533 Jobs jobs_; | |
534 SharedMessageQueue queue_; | |
535 bool finish_; | |
536 boost::thread worker_; | |
768 | 537 std::map<std::string, JobStatus> watchedJobStatus_; |
765 | 538 |
539 JobInfo& GetJobInfo(const std::string& jobId) | |
540 { | |
541 Jobs::iterator info = jobs_.find(jobId); | |
542 | |
543 if (info == jobs_.end()) | |
544 { | |
545 throw OrthancException(ErrorCode_InternalError); | |
546 } | |
547 | |
548 return info->second; | |
549 } | |
550 | |
551 virtual void SignalSuccess(const std::string& jobId) | |
552 { | |
553 boost::mutex::scoped_lock lock(mutex_); | |
554 | |
555 JobInfo& info = GetJobInfo(jobId); | |
556 info.success_++; | |
557 | |
558 assert(info.failures_ == 0); | |
559 | |
560 if (info.success_ >= info.size_) | |
561 { | |
562 if (info.watched_) | |
563 { | |
768 | 564 watchedJobStatus_[jobId] = JobStatus_Success; |
765 | 565 jobFinished_.notify_all(); |
566 } | |
567 | |
568 LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; | |
569 jobs_.erase(jobId); | |
570 } | |
571 } | |
572 | |
573 virtual void SignalFailure(const std::string& jobId) | |
574 { | |
575 boost::mutex::scoped_lock lock(mutex_); | |
576 | |
577 JobInfo& info = GetJobInfo(jobId); | |
578 info.failures_++; | |
579 | |
580 if (info.success_ + info.failures_ >= info.size_) | |
581 { | |
582 if (info.watched_) | |
583 { | |
768 | 584 watchedJobStatus_[jobId] = JobStatus_Failure; |
765 | 585 jobFinished_.notify_all(); |
586 } | |
587 | |
588 LOG(ERROR) << "Job has failed (" << info.description_ << ")"; | |
589 jobs_.erase(jobId); | |
590 } | |
591 } | |
592 | |
593 static void Worker(ServerScheduler* that) | |
594 { | |
595 static const int32_t TIMEOUT = 100; | |
596 | |
597 while (!that->finish_) | |
598 { | |
599 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT)); | |
600 if (object.get() != NULL) | |
601 { | |
779 | 602 ServerFilterInstance& filter = dynamic_cast<ServerFilterInstance&>(*object); |
765 | 603 |
604 // Skip the execution of this filter if its parent job has | |
605 // previously failed. | |
606 bool jobHasFailed; | |
607 { | |
608 boost::mutex::scoped_lock lock(that->mutex_); | |
609 JobInfo& info = that->GetJobInfo(filter.GetJobId()); | |
610 jobHasFailed = (info.failures_ > 0 || info.cancel_); | |
611 } | |
612 | |
613 if (jobHasFailed) | |
614 { | |
615 that->SignalFailure(filter.GetJobId()); | |
616 } | |
617 else | |
618 { | |
768 | 619 filter.Execute(*that); |
765 | 620 } |
621 } | |
622 } | |
623 } | |
624 | |
625 void SubmitInternal(ServerJob& job, | |
626 bool watched) | |
627 { | |
628 boost::mutex::scoped_lock lock(mutex_); | |
629 | |
630 JobInfo info; | |
631 info.size_ = job.Submit(queue_, *this); | |
632 info.cancel_ = false; | |
633 info.success_ = 0; | |
634 info.failures_ = 0; | |
635 info.description_ = job.GetDescription(); | |
636 info.watched_ = watched; | |
637 | |
638 assert(info.size_ > 0); | |
639 | |
640 if (watched) | |
641 { | |
768 | 642 watchedJobStatus_[job.GetId()] = JobStatus_Running; |
765 | 643 } |
644 | |
645 jobs_[job.GetId()] = info; | |
646 | |
647 LOG(INFO) << "New job submitted (" << job.description_ << ")"; | |
648 } | |
649 | |
650 public: | |
651 ServerScheduler() | |
652 { | |
653 finish_ = false; | |
654 worker_ = boost::thread(Worker, this); | |
655 } | |
656 | |
657 ~ServerScheduler() | |
658 { | |
659 finish_ = true; | |
660 worker_.join(); | |
661 } | |
662 | |
663 void Submit(ServerJob& job) | |
664 { | |
665 if (job.filters_.empty()) | |
666 { | |
667 return; | |
668 } | |
669 | |
670 SubmitInternal(job, false); | |
671 } | |
672 | |
673 bool SubmitAndWait(ListOfStrings& outputs, | |
674 ServerJob& job) | |
675 { | |
676 std::string jobId = job.GetId(); | |
677 | |
678 outputs.clear(); | |
679 | |
680 if (job.filters_.empty()) | |
681 { | |
682 return true; | |
683 } | |
684 | |
685 // Add a sink filter to collect all the results of the filters | |
686 // that have no next filter. | |
779 | 687 ServerFilterInstance& sink = job.AddFilter(new Sink(outputs)); |
765 | 688 |
779 | 689 for (std::list<ServerFilterInstance*>::iterator |
765 | 690 it = job.filters_.begin(); it != job.filters_.end(); it++) |
691 { | |
692 if ((*it) != &sink && | |
779 | 693 (*it)->GetNextFilters().size() == 0 && |
694 (*it)->GetFilter().SendOutputsToSink()) | |
765 | 695 { |
696 (*it)->ConnectNext(sink); | |
697 } | |
698 } | |
699 | |
700 // Submit the job | |
701 SubmitInternal(job, true); | |
702 | |
703 // Wait for the job to complete (either success or failure) | |
768 | 704 JobStatus status; |
765 | 705 |
706 { | |
707 boost::mutex::scoped_lock lock(mutex_); | |
768 | 708 |
709 assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end()); | |
765 | 710 |
768 | 711 while (watchedJobStatus_[jobId] == JobStatus_Running) |
765 | 712 { |
713 jobFinished_.wait(lock); | |
714 } | |
715 | |
716 status = watchedJobStatus_[jobId]; | |
717 watchedJobStatus_.erase(jobId); | |
718 } | |
719 | |
768 | 720 return (status == JobStatus_Success); |
765 | 721 } |
722 | |
723 | |
724 bool IsRunning(const std::string& jobId) | |
725 { | |
726 boost::mutex::scoped_lock lock(mutex_); | |
727 return jobs_.find(jobId) != jobs_.end(); | |
728 } | |
729 | |
730 | |
731 void Cancel(const std::string& jobId) | |
732 { | |
733 boost::mutex::scoped_lock lock(mutex_); | |
734 | |
735 Jobs::iterator job = jobs_.find(jobId); | |
736 | |
737 if (job != jobs_.end()) | |
738 { | |
739 job->second.cancel_ = true; | |
740 LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")"; | |
741 } | |
742 } | |
743 | |
744 | |
745 // Returns a number between 0 and 1 | |
746 float GetProgress(const std::string& jobId) | |
747 { | |
748 boost::mutex::scoped_lock lock(mutex_); | |
749 | |
750 Jobs::iterator job = jobs_.find(jobId); | |
751 | |
752 if (job == jobs_.end() || | |
753 job->second.size_ == 0 /* should never happen */) | |
754 { | |
755 // This job is not running | |
756 return 1; | |
757 } | |
758 | |
768 | 759 if (job->second.failures_ != 0) |
760 { | |
761 return 1; | |
762 } | |
763 | |
764 if (job->second.size_ == 1) | |
765 { | |
766 return job->second.success_; | |
767 } | |
768 | |
769 return (static_cast<float>(job->second.success_) / | |
770 static_cast<float>(job->second.size_ - 1)); | |
765 | 771 } |
772 | |
773 bool IsRunning(const ServerJob& job) | |
774 { | |
775 return IsRunning(job.GetId()); | |
776 } | |
777 | |
778 void Cancel(const ServerJob& job) | |
779 { | |
780 Cancel(job.GetId()); | |
781 } | |
782 | |
783 float GetProgress(const ServerJob& job) | |
784 { | |
785 return GetProgress(job); | |
786 } | |
768 | 787 |
788 void GetListOfJobs(ListOfStrings& jobs) | |
789 { | |
790 boost::mutex::scoped_lock lock(mutex_); | |
791 | |
792 jobs.clear(); | |
793 | |
794 for (Jobs::const_iterator | |
795 it = jobs_.begin(); it != jobs_.end(); it++) | |
796 { | |
797 jobs.push_back(it->first); | |
798 } | |
799 } | |
765 | 800 }; |
801 | |
802 } | |
803 | |
804 | |
805 | |
806 class Tutu : public IServerFilter | |
807 { | |
808 private: | |
809 int factor_; | |
810 | |
811 public: | |
812 Tutu(int f) : factor_(f) | |
813 { | |
814 } | |
815 | |
816 virtual bool Apply(ListOfStrings& outputs, | |
817 const ListOfStrings& inputs) | |
818 { | |
819 for (ListOfStrings::const_iterator | |
820 it = inputs.begin(); it != inputs.end(); it++) | |
821 { | |
822 int a = boost::lexical_cast<int>(*it); | |
823 int b = factor_ * a; | |
824 | |
825 printf("%d * %d = %d\n", a, factor_, b); | |
826 | |
827 //if (a == 84) { printf("BREAK\n"); return false; } | |
828 | |
829 outputs.push_back(boost::lexical_cast<std::string>(b)); | |
830 } | |
831 | |
832 Toolbox::USleep(1000000); | |
833 | |
834 return true; | |
835 } | |
779 | 836 |
837 virtual bool SendOutputsToSink() const | |
838 { | |
839 return true; | |
840 } | |
765 | 841 }; |
842 | |
768 | 843 |
770 | 844 static void Tata(ServerScheduler* s, ServerJob* j, bool* done) |
768 | 845 { |
779 | 846 typedef IServerFilter::ListOfStrings ListOfStrings; |
847 | |
768 | 848 #if 1 |
770 | 849 while (!(*done)) |
768 | 850 { |
851 ListOfStrings l; | |
852 s->GetListOfJobs(l); | |
853 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
854 printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i)); | |
855 Toolbox::USleep(100000); | |
856 } | |
857 #else | |
858 ListOfStrings l; | |
859 s->GetListOfJobs(l); | |
860 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
861 printf(">> %s\n", i->c_str()); | |
862 Toolbox::USleep(1500000); | |
863 s->Cancel(*j); | |
864 Toolbox::USleep(1000000); | |
865 s->GetListOfJobs(l); | |
866 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
867 printf(">> %s\n", i->c_str()); | |
868 #endif | |
869 } | |
870 | |
871 | |
765 | 872 TEST(Toto, Toto) |
873 { | |
874 ServerScheduler scheduler; | |
875 | |
876 ServerJob job; | |
779 | 877 ServerFilterInstance& f2 = job.AddFilter(new Tutu(2)); |
878 ServerFilterInstance& f3 = job.AddFilter(new Tutu(3)); | |
879 ServerFilterInstance& f4 = job.AddFilter(new Tutu(4)); | |
880 ServerFilterInstance& f5 = job.AddFilter(new Tutu(5)); | |
765 | 881 f2.AddInput(boost::lexical_cast<std::string>(42)); |
882 //f3.AddInput(boost::lexical_cast<std::string>(42)); | |
883 //f4.AddInput(boost::lexical_cast<std::string>(42)); | |
884 f2.ConnectNext(f3); | |
885 f3.ConnectNext(f4); | |
768 | 886 f4.ConnectNext(f5); |
765 | 887 |
888 job.SetDescription("tutu"); | |
889 | |
770 | 890 bool done = false; |
891 boost::thread t(Tata, &scheduler, &job, &done); | |
768 | 892 |
893 | |
765 | 894 //scheduler.Submit(job); |
895 | |
779 | 896 IServerFilter::ListOfStrings l; |
765 | 897 scheduler.SubmitAndWait(l, job); |
898 | |
779 | 899 for (IServerFilter::ListOfStrings::iterator i = l.begin(); i != l.end(); i++) |
765 | 900 { |
901 printf("** %s\n", i->c_str()); | |
902 } | |
903 | |
904 //Toolbox::ServerBarrier(); | |
768 | 905 //Toolbox::USleep(3000000); |
906 | |
770 | 907 done = true; |
908 t.join(); | |
765 | 909 } |