Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreading.cpp @ 768:476febedc516 lua-scripting
improvements
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 23 Apr 2014 17:23:18 +0200 |
parents | fc97f762834c |
children | a64ca424e0e2 |
comparison
equal
deleted
inserted
replaced
765:fc97f762834c | 768:476febedc516 |
---|---|
279 }; | 279 }; |
280 | 280 |
281 | 281 |
282 class FilterWrapper : public IDynamicObject | 282 class FilterWrapper : public IDynamicObject |
283 { | 283 { |
284 friend class ServerScheduler; | |
285 | |
284 private: | 286 private: |
285 IServerFilterListener *listener_; | |
286 IServerFilter *filter_; | 287 IServerFilter *filter_; |
287 std::string jobId_; | 288 std::string jobId_; |
288 ListOfStrings inputs_; | 289 ListOfStrings inputs_; |
289 std::list<FilterWrapper*> next_; | 290 std::list<FilterWrapper*> next_; |
290 | 291 |
291 public: | 292 bool Execute(IServerFilterListener& listener) |
292 FilterWrapper(IServerFilter *filter, | |
293 const std::string& jobId) : | |
294 listener_(NULL), | |
295 filter_(filter), | |
296 jobId_(jobId) | |
297 { | |
298 if (filter_ == NULL) | |
299 { | |
300 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
301 } | |
302 } | |
303 | |
304 virtual ~FilterWrapper() | |
305 { | |
306 if (filter_ != NULL) | |
307 { | |
308 delete filter_; | |
309 } | |
310 } | |
311 | |
312 void SetListener(IServerFilterListener& listener) | |
313 { | |
314 listener_ = &listener; | |
315 } | |
316 | |
317 const std::string& GetJobId() const | |
318 { | |
319 return jobId_; | |
320 } | |
321 | |
322 void AddInput(const std::string& input) | |
323 { | |
324 inputs_.push_back(input); | |
325 } | |
326 | |
327 bool Execute() | |
328 { | 293 { |
329 ListOfStrings outputs; | 294 ListOfStrings outputs; |
330 if (!filter_->Apply(outputs, inputs_)) | 295 if (!filter_->Apply(outputs, inputs_)) |
331 { | 296 { |
332 if (listener_) | 297 listener.SignalFailure(jobId_); |
333 { | |
334 listener_->SignalFailure(jobId_); | |
335 } | |
336 | |
337 return true; | 298 return true; |
338 } | 299 } |
339 | 300 |
340 for (std::list<FilterWrapper*>::iterator | 301 for (std::list<FilterWrapper*>::iterator |
341 it = next_.begin(); it != next_.end(); it++) | 302 it = next_.begin(); it != next_.end(); it++) |
345 { | 306 { |
346 (*it)->AddInput(*output); | 307 (*it)->AddInput(*output); |
347 } | 308 } |
348 } | 309 } |
349 | 310 |
350 if (listener_) | 311 listener.SignalSuccess(jobId_); |
351 { | |
352 listener_->SignalSuccess(jobId_); | |
353 } | |
354 | |
355 return true; | 312 return true; |
356 } | 313 } |
357 | 314 |
315 | |
316 public: | |
317 FilterWrapper(IServerFilter *filter, | |
318 const std::string& jobId) : | |
319 filter_(filter), | |
320 jobId_(jobId) | |
321 { | |
322 if (filter_ == NULL) | |
323 { | |
324 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
325 } | |
326 } | |
327 | |
328 virtual ~FilterWrapper() | |
329 { | |
330 if (filter_ != NULL) | |
331 { | |
332 delete filter_; | |
333 } | |
334 } | |
335 | |
336 const std::string& GetJobId() const | |
337 { | |
338 return jobId_; | |
339 } | |
340 | |
341 void AddInput(const std::string& input) | |
342 { | |
343 inputs_.push_back(input); | |
344 } | |
345 | |
358 void ConnectNext(FilterWrapper& filter) | 346 void ConnectNext(FilterWrapper& filter) |
359 { | 347 { |
360 next_.push_back(&filter); | 348 next_.push_back(&filter); |
361 } | 349 } |
362 | 350 |
363 const std::list<FilterWrapper*>& GetNextFilters() const | 351 const std::list<FilterWrapper*>& GetNextFilters() const |
364 { | 352 { |
365 return next_; | 353 return next_; |
366 } | 354 } |
367 }; | |
368 | |
369 | |
370 enum ServerJobStatus | |
371 { | |
372 ServerJobStatus_Running = 1, | |
373 ServerJobStatus_Success = 2, | |
374 ServerJobStatus_Failure = 3 | |
375 }; | 355 }; |
376 | 356 |
377 | 357 |
378 class ServerJob | 358 class ServerJob |
379 { | 359 { |
430 size_t size = filters_.size(); | 410 size_t size = filters_.size(); |
431 | 411 |
432 for (std::list<FilterWrapper*>::iterator | 412 for (std::list<FilterWrapper*>::iterator |
433 it = filters_.begin(); it != filters_.end(); it++) | 413 it = filters_.begin(); it != filters_.end(); it++) |
434 { | 414 { |
435 (*it)->SetListener(listener); | |
436 target.Enqueue(*it); | 415 target.Enqueue(*it); |
437 } | 416 } |
438 | 417 |
439 filters_.clear(); | 418 filters_.clear(); |
440 submitted_ = true; | 419 submitted_ = true; |
499 size_t success_; | 478 size_t success_; |
500 size_t failures_; | 479 size_t failures_; |
501 std::string description_; | 480 std::string description_; |
502 }; | 481 }; |
503 | 482 |
483 enum JobStatus | |
484 { | |
485 JobStatus_Running = 1, | |
486 JobStatus_Success = 2, | |
487 JobStatus_Failure = 3 | |
488 }; | |
489 | |
504 typedef std::map<std::string, JobInfo> Jobs; | 490 typedef std::map<std::string, JobInfo> Jobs; |
505 | 491 |
506 boost::mutex mutex_; | 492 boost::mutex mutex_; |
507 boost::condition_variable jobFinished_; | 493 boost::condition_variable jobFinished_; |
508 Jobs jobs_; | 494 Jobs jobs_; |
509 SharedMessageQueue queue_; | 495 SharedMessageQueue queue_; |
510 bool finish_; | 496 bool finish_; |
511 boost::thread worker_; | 497 boost::thread worker_; |
512 std::map<std::string, ServerJobStatus> watchedJobStatus_; | 498 std::map<std::string, JobStatus> watchedJobStatus_; |
513 | 499 |
514 JobInfo& GetJobInfo(const std::string& jobId) | 500 JobInfo& GetJobInfo(const std::string& jobId) |
515 { | 501 { |
516 Jobs::iterator info = jobs_.find(jobId); | 502 Jobs::iterator info = jobs_.find(jobId); |
517 | 503 |
534 | 520 |
535 if (info.success_ >= info.size_) | 521 if (info.success_ >= info.size_) |
536 { | 522 { |
537 if (info.watched_) | 523 if (info.watched_) |
538 { | 524 { |
539 watchedJobStatus_[jobId] = ServerJobStatus_Success; | 525 watchedJobStatus_[jobId] = JobStatus_Success; |
540 jobFinished_.notify_all(); | 526 jobFinished_.notify_all(); |
541 } | 527 } |
542 | 528 |
543 LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; | 529 LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; |
544 jobs_.erase(jobId); | 530 jobs_.erase(jobId); |
554 | 540 |
555 if (info.success_ + info.failures_ >= info.size_) | 541 if (info.success_ + info.failures_ >= info.size_) |
556 { | 542 { |
557 if (info.watched_) | 543 if (info.watched_) |
558 { | 544 { |
559 watchedJobStatus_[jobId] = ServerJobStatus_Failure; | 545 watchedJobStatus_[jobId] = JobStatus_Failure; |
560 jobFinished_.notify_all(); | 546 jobFinished_.notify_all(); |
561 } | 547 } |
562 | 548 |
563 LOG(ERROR) << "Job has failed (" << info.description_ << ")"; | 549 LOG(ERROR) << "Job has failed (" << info.description_ << ")"; |
564 jobs_.erase(jobId); | 550 jobs_.erase(jobId); |
589 { | 575 { |
590 that->SignalFailure(filter.GetJobId()); | 576 that->SignalFailure(filter.GetJobId()); |
591 } | 577 } |
592 else | 578 else |
593 { | 579 { |
594 filter.Execute(); | 580 filter.Execute(*that); |
595 } | 581 } |
596 } | 582 } |
597 } | 583 } |
598 } | 584 } |
599 | 585 |
612 | 598 |
613 assert(info.size_ > 0); | 599 assert(info.size_ > 0); |
614 | 600 |
615 if (watched) | 601 if (watched) |
616 { | 602 { |
617 watchedJobStatus_[job.GetId()] = ServerJobStatus_Running; | 603 watchedJobStatus_[job.GetId()] = JobStatus_Running; |
618 } | 604 } |
619 | 605 |
620 jobs_[job.GetId()] = info; | 606 jobs_[job.GetId()] = info; |
621 | 607 |
622 LOG(INFO) << "New job submitted (" << job.description_ << ")"; | 608 LOG(INFO) << "New job submitted (" << job.description_ << ")"; |
673 | 659 |
674 // Submit the job | 660 // Submit the job |
675 SubmitInternal(job, true); | 661 SubmitInternal(job, true); |
676 | 662 |
677 // Wait for the job to complete (either success or failure) | 663 // Wait for the job to complete (either success or failure) |
678 ServerJobStatus status; | 664 JobStatus status; |
679 | 665 |
680 { | 666 { |
681 boost::mutex::scoped_lock lock(mutex_); | 667 boost::mutex::scoped_lock lock(mutex_); |
668 | |
669 assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end()); | |
682 | 670 |
683 while (watchedJobStatus_[jobId] == ServerJobStatus_Running) | 671 while (watchedJobStatus_[jobId] == JobStatus_Running) |
684 { | 672 { |
685 jobFinished_.wait(lock); | 673 jobFinished_.wait(lock); |
686 } | 674 } |
687 | 675 |
688 status = watchedJobStatus_[jobId]; | 676 status = watchedJobStatus_[jobId]; |
689 watchedJobStatus_.erase(jobId); | 677 watchedJobStatus_.erase(jobId); |
690 } | 678 } |
691 | 679 |
692 return (status == ServerJobStatus_Success); | 680 return (status == JobStatus_Success); |
693 } | 681 } |
694 | 682 |
695 | 683 |
696 bool IsRunning(const std::string& jobId) | 684 bool IsRunning(const std::string& jobId) |
697 { | 685 { |
726 { | 714 { |
727 // This job is not running | 715 // This job is not running |
728 return 1; | 716 return 1; |
729 } | 717 } |
730 | 718 |
731 float n = static_cast<float>(job->second.failures_ + job->second.success_); | 719 if (job->second.failures_ != 0) |
732 float d = static_cast<float>(job->second.size_); | 720 { |
733 return n / d; | 721 return 1; |
722 } | |
723 | |
724 if (job->second.size_ == 1) | |
725 { | |
726 return job->second.success_; | |
727 } | |
728 | |
729 return (static_cast<float>(job->second.success_) / | |
730 static_cast<float>(job->second.size_ - 1)); | |
734 } | 731 } |
735 | 732 |
736 bool IsRunning(const ServerJob& job) | 733 bool IsRunning(const ServerJob& job) |
737 { | 734 { |
738 return IsRunning(job.GetId()); | 735 return IsRunning(job.GetId()); |
744 } | 741 } |
745 | 742 |
746 float GetProgress(const ServerJob& job) | 743 float GetProgress(const ServerJob& job) |
747 { | 744 { |
748 return GetProgress(job); | 745 return GetProgress(job); |
746 } | |
747 | |
748 void GetListOfJobs(ListOfStrings& jobs) | |
749 { | |
750 boost::mutex::scoped_lock lock(mutex_); | |
751 | |
752 jobs.clear(); | |
753 | |
754 for (Jobs::const_iterator | |
755 it = jobs_.begin(); it != jobs_.end(); it++) | |
756 { | |
757 jobs.push_back(it->first); | |
758 } | |
749 } | 759 } |
750 }; | 760 }; |
751 | 761 |
752 } | 762 } |
753 | 763 |
782 Toolbox::USleep(1000000); | 792 Toolbox::USleep(1000000); |
783 | 793 |
784 return true; | 794 return true; |
785 } | 795 } |
786 }; | 796 }; |
797 | |
798 | |
799 static void Tata(ServerScheduler* s, ServerJob* j) | |
800 { | |
801 #if 1 | |
802 for (;;) | |
803 { | |
804 ListOfStrings l; | |
805 s->GetListOfJobs(l); | |
806 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
807 printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i)); | |
808 Toolbox::USleep(100000); | |
809 } | |
810 #else | |
811 ListOfStrings l; | |
812 s->GetListOfJobs(l); | |
813 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
814 printf(">> %s\n", i->c_str()); | |
815 Toolbox::USleep(1500000); | |
816 s->Cancel(*j); | |
817 Toolbox::USleep(1000000); | |
818 s->GetListOfJobs(l); | |
819 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
820 printf(">> %s\n", i->c_str()); | |
821 #endif | |
822 } | |
823 | |
787 | 824 |
788 TEST(Toto, Toto) | 825 TEST(Toto, Toto) |
789 { | 826 { |
790 ServerScheduler scheduler; | 827 ServerScheduler scheduler; |
791 | 828 |
792 ServerJob job; | 829 ServerJob job; |
793 FilterWrapper& f2 = job.AddFilter(new Tutu(2)); | 830 FilterWrapper& f2 = job.AddFilter(new Tutu(2)); |
794 FilterWrapper& f3 = job.AddFilter(new Tutu(3)); | 831 FilterWrapper& f3 = job.AddFilter(new Tutu(3)); |
795 FilterWrapper& f4 = job.AddFilter(new Tutu(4)); | 832 FilterWrapper& f4 = job.AddFilter(new Tutu(4)); |
833 FilterWrapper& f5 = job.AddFilter(new Tutu(5)); | |
796 f2.AddInput(boost::lexical_cast<std::string>(42)); | 834 f2.AddInput(boost::lexical_cast<std::string>(42)); |
797 //f3.AddInput(boost::lexical_cast<std::string>(42)); | 835 //f3.AddInput(boost::lexical_cast<std::string>(42)); |
798 //f4.AddInput(boost::lexical_cast<std::string>(42)); | 836 //f4.AddInput(boost::lexical_cast<std::string>(42)); |
799 f2.ConnectNext(f3); | 837 f2.ConnectNext(f3); |
800 f3.ConnectNext(f4); | 838 f3.ConnectNext(f4); |
839 f4.ConnectNext(f5); | |
801 | 840 |
802 job.SetDescription("tutu"); | 841 job.SetDescription("tutu"); |
842 | |
843 boost::thread t(Tata, &scheduler, &job); | |
844 | |
803 | 845 |
804 //scheduler.Submit(job); | 846 //scheduler.Submit(job); |
805 | 847 |
806 ListOfStrings l; | 848 ListOfStrings l; |
807 scheduler.SubmitAndWait(l, job); | 849 scheduler.SubmitAndWait(l, job); |
810 { | 852 { |
811 printf("** %s\n", i->c_str()); | 853 printf("** %s\n", i->c_str()); |
812 } | 854 } |
813 | 855 |
814 //Toolbox::ServerBarrier(); | 856 //Toolbox::ServerBarrier(); |
815 Toolbox::USleep(30000); | 857 //Toolbox::USleep(3000000); |
816 } | 858 |
859 //t.join(); | |
860 } |