comparison UnitTestsSources/MultiThreading.cpp @ 765:fc97f762834c lua-scripting

scheduler
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 23 Apr 2014 16:53:59 +0200
parents b2a62f22fbe8
children 476febedc516
comparison
equal deleted inserted replaced
762:45b16f67259c 765:fc97f762834c
1 #include "gtest/gtest.h" 1 #include "gtest/gtest.h"
2 #include <glog/logging.h>
2 3
3 #include "../Core/OrthancException.h" 4 #include "../Core/OrthancException.h"
4 #include "../Core/Toolbox.h" 5 #include "../Core/Toolbox.h"
5 #include "../Core/MultiThreading/ArrayFilledByThreads.h" 6 #include "../Core/MultiThreading/ArrayFilledByThreads.h"
6 #include "../Core/MultiThreading/Locker.h" 7 #include "../Core/MultiThreading/Locker.h"
208 209
209 { 210 {
210 Locker locker3(lock.ForWriter()); 211 Locker locker3(lock.ForWriter());
211 } 212 }
212 } 213 }
214
215
216
217
218
219
220
221 #include "../Core/ICommand.h"
222 #include "../Core/Toolbox.h"
223 #include "../Core/Uuid.h"
224 #include "../Core/MultiThreading/SharedMessageQueue.h"
225 #include <boost/lexical_cast.hpp>
226
227
228 namespace Orthanc
229 {
230 typedef std::list<std::string> ListOfStrings;
231
232 class IServerFilter
233 {
234 public:
235 virtual ~IServerFilter()
236 {
237 }
238
239 virtual bool Apply(ListOfStrings& outputs,
240 const ListOfStrings& inputs) = 0;
241 };
242
243
244 class Sink : public IServerFilter
245 {
246 private:
247 ListOfStrings& target_;
248
249 public:
250 Sink(ListOfStrings& target) : target_(target)
251 {
252 }
253
254 virtual bool Apply(ListOfStrings& outputs,
255 const ListOfStrings& inputs)
256 {
257 for (ListOfStrings::const_iterator
258 it = inputs.begin(); it != inputs.end(); it++)
259 {
260 target_.push_back(*it);
261 }
262
263 return true;
264 }
265 };
266
267
268
269 class IServerFilterListener
270 {
271 public:
272 virtual ~IServerFilterListener()
273 {
274 }
275
276 virtual void SignalSuccess(const std::string& jobId) = 0;
277
278 virtual void SignalFailure(const std::string& jobId) = 0;
279 };
280
281
282 class FilterWrapper : public IDynamicObject
283 {
284 private:
285 IServerFilterListener *listener_;
286 IServerFilter *filter_;
287 std::string jobId_;
288 ListOfStrings inputs_;
289 std::list<FilterWrapper*> next_;
290
291 public:
292 FilterWrapper(IServerFilter *filter,
293 const std::string& jobId) :
294 listener_(NULL),
295 filter_(filter),
296 jobId_(jobId)
297 {
298 if (filter_ == NULL)
299 {
300 throw OrthancException(ErrorCode_ParameterOutOfRange);
301 }
302 }
303
304 virtual ~FilterWrapper()
305 {
306 if (filter_ != NULL)
307 {
308 delete filter_;
309 }
310 }
311
312 void SetListener(IServerFilterListener& listener)
313 {
314 listener_ = &listener;
315 }
316
317 const std::string& GetJobId() const
318 {
319 return jobId_;
320 }
321
322 void AddInput(const std::string& input)
323 {
324 inputs_.push_back(input);
325 }
326
327 bool Execute()
328 {
329 ListOfStrings outputs;
330 if (!filter_->Apply(outputs, inputs_))
331 {
332 if (listener_)
333 {
334 listener_->SignalFailure(jobId_);
335 }
336
337 return true;
338 }
339
340 for (std::list<FilterWrapper*>::iterator
341 it = next_.begin(); it != next_.end(); it++)
342 {
343 for (ListOfStrings::const_iterator
344 output = outputs.begin(); output != outputs.end(); output++)
345 {
346 (*it)->AddInput(*output);
347 }
348 }
349
350 if (listener_)
351 {
352 listener_->SignalSuccess(jobId_);
353 }
354
355 return true;
356 }
357
358 void ConnectNext(FilterWrapper& filter)
359 {
360 next_.push_back(&filter);
361 }
362
363 const std::list<FilterWrapper*>& GetNextFilters() const
364 {
365 return next_;
366 }
367 };
368
369
370 enum ServerJobStatus
371 {
372 ServerJobStatus_Running = 1,
373 ServerJobStatus_Success = 2,
374 ServerJobStatus_Failure = 3
375 };
376
377
378 class ServerJob
379 {
380 friend class ServerScheduler;
381
382 private:
383 std::list<FilterWrapper*> filters_;
384 std::string jobId_;
385 bool submitted_;
386 std::string description_;
387
388
389 void CheckOrdering()
390 {
391 std::map<FilterWrapper*, unsigned int> index;
392
393 unsigned int count = 0;
394 for (std::list<FilterWrapper*>::const_iterator
395 it = filters_.begin(); it != filters_.end(); it++)
396 {
397 index[*it] = count++;
398 }
399
400 for (std::list<FilterWrapper*>::const_iterator
401 it = filters_.begin(); it != filters_.end(); it++)
402 {
403 const std::list<FilterWrapper*>& nextFilters = (*it)->GetNextFilters();
404
405 for (std::list<FilterWrapper*>::const_iterator
406 next = nextFilters.begin(); next != nextFilters.end(); next++)
407 {
408 if (index.find(*next) == index.end() ||
409 index[*next] <= index[*it])
410 {
411 // You must reorder your calls to "ServerJob::AddFilter"
412 throw OrthancException("Bad ordering of filters in a job");
413 }
414 }
415 }
416 }
417
418
419 size_t Submit(SharedMessageQueue& target,
420 IServerFilterListener& listener)
421 {
422 if (submitted_)
423 {
424 // This job has already been submitted
425 throw OrthancException(ErrorCode_BadSequenceOfCalls);
426 }
427
428 CheckOrdering();
429
430 size_t size = filters_.size();
431
432 for (std::list<FilterWrapper*>::iterator
433 it = filters_.begin(); it != filters_.end(); it++)
434 {
435 (*it)->SetListener(listener);
436 target.Enqueue(*it);
437 }
438
439 filters_.clear();
440 submitted_ = true;
441
442 return size;
443 }
444
445 public:
446 ServerJob()
447 {
448 jobId_ = Toolbox::GenerateUuid();
449 submitted_ = false;
450 description_ = "no description";
451 }
452
453 ~ServerJob()
454 {
455 for (std::list<FilterWrapper*>::iterator
456 it = filters_.begin(); it != filters_.end(); it++)
457 {
458 delete *it;
459 }
460 }
461
462 const std::string& GetId() const
463 {
464 return jobId_;
465 }
466
467 void SetDescription(const char* description)
468 {
469 description_ = description;
470 }
471
472 const std::string& GetDescription() const
473 {
474 return description_;
475 }
476
477 FilterWrapper& AddFilter(IServerFilter* filter)
478 {
479 if (submitted_)
480 {
481 throw OrthancException(ErrorCode_BadSequenceOfCalls);
482 }
483
484 filters_.push_back(new FilterWrapper(filter, jobId_));
485
486 return *filters_.back();
487 }
488 };
489
490
491 class ServerScheduler : public IServerFilterListener
492 {
493 private:
494 struct JobInfo
495 {
496 bool watched_;
497 bool cancel_;
498 size_t size_;
499 size_t success_;
500 size_t failures_;
501 std::string description_;
502 };
503
504 typedef std::map<std::string, JobInfo> Jobs;
505
506 boost::mutex mutex_;
507 boost::condition_variable jobFinished_;
508 Jobs jobs_;
509 SharedMessageQueue queue_;
510 bool finish_;
511 boost::thread worker_;
512 std::map<std::string, ServerJobStatus> watchedJobStatus_;
513
514 JobInfo& GetJobInfo(const std::string& jobId)
515 {
516 Jobs::iterator info = jobs_.find(jobId);
517
518 if (info == jobs_.end())
519 {
520 throw OrthancException(ErrorCode_InternalError);
521 }
522
523 return info->second;
524 }
525
526 virtual void SignalSuccess(const std::string& jobId)
527 {
528 boost::mutex::scoped_lock lock(mutex_);
529
530 JobInfo& info = GetJobInfo(jobId);
531 info.success_++;
532
533 assert(info.failures_ == 0);
534
535 if (info.success_ >= info.size_)
536 {
537 if (info.watched_)
538 {
539 watchedJobStatus_[jobId] = ServerJobStatus_Success;
540 jobFinished_.notify_all();
541 }
542
543 LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
544 jobs_.erase(jobId);
545 }
546 }
547
548 virtual void SignalFailure(const std::string& jobId)
549 {
550 boost::mutex::scoped_lock lock(mutex_);
551
552 JobInfo& info = GetJobInfo(jobId);
553 info.failures_++;
554
555 if (info.success_ + info.failures_ >= info.size_)
556 {
557 if (info.watched_)
558 {
559 watchedJobStatus_[jobId] = ServerJobStatus_Failure;
560 jobFinished_.notify_all();
561 }
562
563 LOG(ERROR) << "Job has failed (" << info.description_ << ")";
564 jobs_.erase(jobId);
565 }
566 }
567
568 static void Worker(ServerScheduler* that)
569 {
570 static const int32_t TIMEOUT = 100;
571
572 while (!that->finish_)
573 {
574 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
575 if (object.get() != NULL)
576 {
577 FilterWrapper& filter = dynamic_cast<FilterWrapper&>(*object);
578
579 // Skip the execution of this filter if its parent job has
580 // previously failed.
581 bool jobHasFailed;
582 {
583 boost::mutex::scoped_lock lock(that->mutex_);
584 JobInfo& info = that->GetJobInfo(filter.GetJobId());
585 jobHasFailed = (info.failures_ > 0 || info.cancel_);
586 }
587
588 if (jobHasFailed)
589 {
590 that->SignalFailure(filter.GetJobId());
591 }
592 else
593 {
594 filter.Execute();
595 }
596 }
597 }
598 }
599
600 void SubmitInternal(ServerJob& job,
601 bool watched)
602 {
603 boost::mutex::scoped_lock lock(mutex_);
604
605 JobInfo info;
606 info.size_ = job.Submit(queue_, *this);
607 info.cancel_ = false;
608 info.success_ = 0;
609 info.failures_ = 0;
610 info.description_ = job.GetDescription();
611 info.watched_ = watched;
612
613 assert(info.size_ > 0);
614
615 if (watched)
616 {
617 watchedJobStatus_[job.GetId()] = ServerJobStatus_Running;
618 }
619
620 jobs_[job.GetId()] = info;
621
622 LOG(INFO) << "New job submitted (" << job.description_ << ")";
623 }
624
625 public:
626 ServerScheduler()
627 {
628 finish_ = false;
629 worker_ = boost::thread(Worker, this);
630 }
631
632 ~ServerScheduler()
633 {
634 finish_ = true;
635 worker_.join();
636 }
637
638 void Submit(ServerJob& job)
639 {
640 if (job.filters_.empty())
641 {
642 return;
643 }
644
645 SubmitInternal(job, false);
646 }
647
648 bool SubmitAndWait(ListOfStrings& outputs,
649 ServerJob& job)
650 {
651 std::string jobId = job.GetId();
652
653 outputs.clear();
654
655 if (job.filters_.empty())
656 {
657 return true;
658 }
659
660 // Add a sink filter to collect all the results of the filters
661 // that have no next filter.
662 FilterWrapper& sink = job.AddFilter(new Sink(outputs));
663
664 for (std::list<FilterWrapper*>::iterator
665 it = job.filters_.begin(); it != job.filters_.end(); it++)
666 {
667 if ((*it) != &sink &&
668 (*it)->GetNextFilters().size() == 0)
669 {
670 (*it)->ConnectNext(sink);
671 }
672 }
673
674 // Submit the job
675 SubmitInternal(job, true);
676
677 // Wait for the job to complete (either success or failure)
678 ServerJobStatus status;
679
680 {
681 boost::mutex::scoped_lock lock(mutex_);
682
683 while (watchedJobStatus_[jobId] == ServerJobStatus_Running)
684 {
685 jobFinished_.wait(lock);
686 }
687
688 status = watchedJobStatus_[jobId];
689 watchedJobStatus_.erase(jobId);
690 }
691
692 return (status == ServerJobStatus_Success);
693 }
694
695
696 bool IsRunning(const std::string& jobId)
697 {
698 boost::mutex::scoped_lock lock(mutex_);
699 return jobs_.find(jobId) != jobs_.end();
700 }
701
702
703 void Cancel(const std::string& jobId)
704 {
705 boost::mutex::scoped_lock lock(mutex_);
706
707 Jobs::iterator job = jobs_.find(jobId);
708
709 if (job != jobs_.end())
710 {
711 job->second.cancel_ = true;
712 LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")";
713 }
714 }
715
716
717 // Returns a number between 0 and 1
718 float GetProgress(const std::string& jobId)
719 {
720 boost::mutex::scoped_lock lock(mutex_);
721
722 Jobs::iterator job = jobs_.find(jobId);
723
724 if (job == jobs_.end() ||
725 job->second.size_ == 0 /* should never happen */)
726 {
727 // This job is not running
728 return 1;
729 }
730
731 float n = static_cast<float>(job->second.failures_ + job->second.success_);
732 float d = static_cast<float>(job->second.size_);
733 return n / d;
734 }
735
736 bool IsRunning(const ServerJob& job)
737 {
738 return IsRunning(job.GetId());
739 }
740
741 void Cancel(const ServerJob& job)
742 {
743 Cancel(job.GetId());
744 }
745
746 float GetProgress(const ServerJob& job)
747 {
748 return GetProgress(job);
749 }
750 };
751
752 }
753
754
755
756 class Tutu : public IServerFilter
757 {
758 private:
759 int factor_;
760
761 public:
762 Tutu(int f) : factor_(f)
763 {
764 }
765
766 virtual bool Apply(ListOfStrings& outputs,
767 const ListOfStrings& inputs)
768 {
769 for (ListOfStrings::const_iterator
770 it = inputs.begin(); it != inputs.end(); it++)
771 {
772 int a = boost::lexical_cast<int>(*it);
773 int b = factor_ * a;
774
775 printf("%d * %d = %d\n", a, factor_, b);
776
777 //if (a == 84) { printf("BREAK\n"); return false; }
778
779 outputs.push_back(boost::lexical_cast<std::string>(b));
780 }
781
782 Toolbox::USleep(1000000);
783
784 return true;
785 }
786 };
787
788 TEST(Toto, Toto)
789 {
790 ServerScheduler scheduler;
791
792 ServerJob job;
793 FilterWrapper& f2 = job.AddFilter(new Tutu(2));
794 FilterWrapper& f3 = job.AddFilter(new Tutu(3));
795 FilterWrapper& f4 = job.AddFilter(new Tutu(4));
796 f2.AddInput(boost::lexical_cast<std::string>(42));
797 //f3.AddInput(boost::lexical_cast<std::string>(42));
798 //f4.AddInput(boost::lexical_cast<std::string>(42));
799 f2.ConnectNext(f3);
800 f3.ConnectNext(f4);
801
802 job.SetDescription("tutu");
803
804 //scheduler.Submit(job);
805
806 ListOfStrings l;
807 scheduler.SubmitAndWait(l, job);
808
809 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
810 {
811 printf("** %s\n", i->c_str());
812 }
813
814 //Toolbox::ServerBarrier();
815 Toolbox::USleep(30000);
816 }