comparison UnitTestsSources/MultiThreading.cpp @ 768:476febedc516 lua-scripting

improvements
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 23 Apr 2014 17:23:18 +0200
parents fc97f762834c
children a64ca424e0e2
comparison
equal deleted inserted replaced
765:fc97f762834c 768:476febedc516
279 }; 279 };
280 280
281 281
282 class FilterWrapper : public IDynamicObject 282 class FilterWrapper : public IDynamicObject
283 { 283 {
284 friend class ServerScheduler;
285
284 private: 286 private:
285 IServerFilterListener *listener_;
286 IServerFilter *filter_; 287 IServerFilter *filter_;
287 std::string jobId_; 288 std::string jobId_;
288 ListOfStrings inputs_; 289 ListOfStrings inputs_;
289 std::list<FilterWrapper*> next_; 290 std::list<FilterWrapper*> next_;
290 291
291 public: 292 bool Execute(IServerFilterListener& listener)
292 FilterWrapper(IServerFilter *filter,
293 const std::string& jobId) :
294 listener_(NULL),
295 filter_(filter),
296 jobId_(jobId)
297 {
298 if (filter_ == NULL)
299 {
300 throw OrthancException(ErrorCode_ParameterOutOfRange);
301 }
302 }
303
304 virtual ~FilterWrapper()
305 {
306 if (filter_ != NULL)
307 {
308 delete filter_;
309 }
310 }
311
312 void SetListener(IServerFilterListener& listener)
313 {
314 listener_ = &listener;
315 }
316
317 const std::string& GetJobId() const
318 {
319 return jobId_;
320 }
321
322 void AddInput(const std::string& input)
323 {
324 inputs_.push_back(input);
325 }
326
327 bool Execute()
328 { 293 {
329 ListOfStrings outputs; 294 ListOfStrings outputs;
330 if (!filter_->Apply(outputs, inputs_)) 295 if (!filter_->Apply(outputs, inputs_))
331 { 296 {
332 if (listener_) 297 listener.SignalFailure(jobId_);
333 {
334 listener_->SignalFailure(jobId_);
335 }
336
337 return true; 298 return true;
338 } 299 }
339 300
340 for (std::list<FilterWrapper*>::iterator 301 for (std::list<FilterWrapper*>::iterator
341 it = next_.begin(); it != next_.end(); it++) 302 it = next_.begin(); it != next_.end(); it++)
345 { 306 {
346 (*it)->AddInput(*output); 307 (*it)->AddInput(*output);
347 } 308 }
348 } 309 }
349 310
350 if (listener_) 311 listener.SignalSuccess(jobId_);
351 {
352 listener_->SignalSuccess(jobId_);
353 }
354
355 return true; 312 return true;
356 } 313 }
357 314
315
316 public:
317 FilterWrapper(IServerFilter *filter,
318 const std::string& jobId) :
319 filter_(filter),
320 jobId_(jobId)
321 {
322 if (filter_ == NULL)
323 {
324 throw OrthancException(ErrorCode_ParameterOutOfRange);
325 }
326 }
327
328 virtual ~FilterWrapper()
329 {
330 if (filter_ != NULL)
331 {
332 delete filter_;
333 }
334 }
335
336 const std::string& GetJobId() const
337 {
338 return jobId_;
339 }
340
341 void AddInput(const std::string& input)
342 {
343 inputs_.push_back(input);
344 }
345
358 void ConnectNext(FilterWrapper& filter) 346 void ConnectNext(FilterWrapper& filter)
359 { 347 {
360 next_.push_back(&filter); 348 next_.push_back(&filter);
361 } 349 }
362 350
363 const std::list<FilterWrapper*>& GetNextFilters() const 351 const std::list<FilterWrapper*>& GetNextFilters() const
364 { 352 {
365 return next_; 353 return next_;
366 } 354 }
367 };
368
369
370 enum ServerJobStatus
371 {
372 ServerJobStatus_Running = 1,
373 ServerJobStatus_Success = 2,
374 ServerJobStatus_Failure = 3
375 }; 355 };
376 356
377 357
378 class ServerJob 358 class ServerJob
379 { 359 {
430 size_t size = filters_.size(); 410 size_t size = filters_.size();
431 411
432 for (std::list<FilterWrapper*>::iterator 412 for (std::list<FilterWrapper*>::iterator
433 it = filters_.begin(); it != filters_.end(); it++) 413 it = filters_.begin(); it != filters_.end(); it++)
434 { 414 {
435 (*it)->SetListener(listener);
436 target.Enqueue(*it); 415 target.Enqueue(*it);
437 } 416 }
438 417
439 filters_.clear(); 418 filters_.clear();
440 submitted_ = true; 419 submitted_ = true;
499 size_t success_; 478 size_t success_;
500 size_t failures_; 479 size_t failures_;
501 std::string description_; 480 std::string description_;
502 }; 481 };
503 482
483 enum JobStatus
484 {
485 JobStatus_Running = 1,
486 JobStatus_Success = 2,
487 JobStatus_Failure = 3
488 };
489
504 typedef std::map<std::string, JobInfo> Jobs; 490 typedef std::map<std::string, JobInfo> Jobs;
505 491
506 boost::mutex mutex_; 492 boost::mutex mutex_;
507 boost::condition_variable jobFinished_; 493 boost::condition_variable jobFinished_;
508 Jobs jobs_; 494 Jobs jobs_;
509 SharedMessageQueue queue_; 495 SharedMessageQueue queue_;
510 bool finish_; 496 bool finish_;
511 boost::thread worker_; 497 boost::thread worker_;
512 std::map<std::string, ServerJobStatus> watchedJobStatus_; 498 std::map<std::string, JobStatus> watchedJobStatus_;
513 499
514 JobInfo& GetJobInfo(const std::string& jobId) 500 JobInfo& GetJobInfo(const std::string& jobId)
515 { 501 {
516 Jobs::iterator info = jobs_.find(jobId); 502 Jobs::iterator info = jobs_.find(jobId);
517 503
534 520
535 if (info.success_ >= info.size_) 521 if (info.success_ >= info.size_)
536 { 522 {
537 if (info.watched_) 523 if (info.watched_)
538 { 524 {
539 watchedJobStatus_[jobId] = ServerJobStatus_Success; 525 watchedJobStatus_[jobId] = JobStatus_Success;
540 jobFinished_.notify_all(); 526 jobFinished_.notify_all();
541 } 527 }
542 528
543 LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; 529 LOG(INFO) << "Job successfully finished (" << info.description_ << ")";
544 jobs_.erase(jobId); 530 jobs_.erase(jobId);
554 540
555 if (info.success_ + info.failures_ >= info.size_) 541 if (info.success_ + info.failures_ >= info.size_)
556 { 542 {
557 if (info.watched_) 543 if (info.watched_)
558 { 544 {
559 watchedJobStatus_[jobId] = ServerJobStatus_Failure; 545 watchedJobStatus_[jobId] = JobStatus_Failure;
560 jobFinished_.notify_all(); 546 jobFinished_.notify_all();
561 } 547 }
562 548
563 LOG(ERROR) << "Job has failed (" << info.description_ << ")"; 549 LOG(ERROR) << "Job has failed (" << info.description_ << ")";
564 jobs_.erase(jobId); 550 jobs_.erase(jobId);
589 { 575 {
590 that->SignalFailure(filter.GetJobId()); 576 that->SignalFailure(filter.GetJobId());
591 } 577 }
592 else 578 else
593 { 579 {
594 filter.Execute(); 580 filter.Execute(*that);
595 } 581 }
596 } 582 }
597 } 583 }
598 } 584 }
599 585
612 598
613 assert(info.size_ > 0); 599 assert(info.size_ > 0);
614 600
615 if (watched) 601 if (watched)
616 { 602 {
617 watchedJobStatus_[job.GetId()] = ServerJobStatus_Running; 603 watchedJobStatus_[job.GetId()] = JobStatus_Running;
618 } 604 }
619 605
620 jobs_[job.GetId()] = info; 606 jobs_[job.GetId()] = info;
621 607
622 LOG(INFO) << "New job submitted (" << job.description_ << ")"; 608 LOG(INFO) << "New job submitted (" << job.description_ << ")";
673 659
674 // Submit the job 660 // Submit the job
675 SubmitInternal(job, true); 661 SubmitInternal(job, true);
676 662
677 // Wait for the job to complete (either success or failure) 663 // Wait for the job to complete (either success or failure)
678 ServerJobStatus status; 664 JobStatus status;
679 665
680 { 666 {
681 boost::mutex::scoped_lock lock(mutex_); 667 boost::mutex::scoped_lock lock(mutex_);
668
669 assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end());
682 670
683 while (watchedJobStatus_[jobId] == ServerJobStatus_Running) 671 while (watchedJobStatus_[jobId] == JobStatus_Running)
684 { 672 {
685 jobFinished_.wait(lock); 673 jobFinished_.wait(lock);
686 } 674 }
687 675
688 status = watchedJobStatus_[jobId]; 676 status = watchedJobStatus_[jobId];
689 watchedJobStatus_.erase(jobId); 677 watchedJobStatus_.erase(jobId);
690 } 678 }
691 679
692 return (status == ServerJobStatus_Success); 680 return (status == JobStatus_Success);
693 } 681 }
694 682
695 683
696 bool IsRunning(const std::string& jobId) 684 bool IsRunning(const std::string& jobId)
697 { 685 {
726 { 714 {
727 // This job is not running 715 // This job is not running
728 return 1; 716 return 1;
729 } 717 }
730 718
731 float n = static_cast<float>(job->second.failures_ + job->second.success_); 719 if (job->second.failures_ != 0)
732 float d = static_cast<float>(job->second.size_); 720 {
733 return n / d; 721 return 1;
722 }
723
724 if (job->second.size_ == 1)
725 {
726 return job->second.success_;
727 }
728
729 return (static_cast<float>(job->second.success_) /
730 static_cast<float>(job->second.size_ - 1));
734 } 731 }
735 732
736 bool IsRunning(const ServerJob& job) 733 bool IsRunning(const ServerJob& job)
737 { 734 {
738 return IsRunning(job.GetId()); 735 return IsRunning(job.GetId());
744 } 741 }
745 742
746 float GetProgress(const ServerJob& job) 743 float GetProgress(const ServerJob& job)
747 { 744 {
748 return GetProgress(job); 745 return GetProgress(job);
746 }
747
748 void GetListOfJobs(ListOfStrings& jobs)
749 {
750 boost::mutex::scoped_lock lock(mutex_);
751
752 jobs.clear();
753
754 for (Jobs::const_iterator
755 it = jobs_.begin(); it != jobs_.end(); it++)
756 {
757 jobs.push_back(it->first);
758 }
749 } 759 }
750 }; 760 };
751 761
752 } 762 }
753 763
782 Toolbox::USleep(1000000); 792 Toolbox::USleep(1000000);
783 793
784 return true; 794 return true;
785 } 795 }
786 }; 796 };
797
798
799 static void Tata(ServerScheduler* s, ServerJob* j)
800 {
801 #if 1
802 for (;;)
803 {
804 ListOfStrings l;
805 s->GetListOfJobs(l);
806 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
807 printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i));
808 Toolbox::USleep(100000);
809 }
810 #else
811 ListOfStrings l;
812 s->GetListOfJobs(l);
813 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
814 printf(">> %s\n", i->c_str());
815 Toolbox::USleep(1500000);
816 s->Cancel(*j);
817 Toolbox::USleep(1000000);
818 s->GetListOfJobs(l);
819 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
820 printf(">> %s\n", i->c_str());
821 #endif
822 }
823
787 824
788 TEST(Toto, Toto) 825 TEST(Toto, Toto)
789 { 826 {
790 ServerScheduler scheduler; 827 ServerScheduler scheduler;
791 828
792 ServerJob job; 829 ServerJob job;
793 FilterWrapper& f2 = job.AddFilter(new Tutu(2)); 830 FilterWrapper& f2 = job.AddFilter(new Tutu(2));
794 FilterWrapper& f3 = job.AddFilter(new Tutu(3)); 831 FilterWrapper& f3 = job.AddFilter(new Tutu(3));
795 FilterWrapper& f4 = job.AddFilter(new Tutu(4)); 832 FilterWrapper& f4 = job.AddFilter(new Tutu(4));
833 FilterWrapper& f5 = job.AddFilter(new Tutu(5));
796 f2.AddInput(boost::lexical_cast<std::string>(42)); 834 f2.AddInput(boost::lexical_cast<std::string>(42));
797 //f3.AddInput(boost::lexical_cast<std::string>(42)); 835 //f3.AddInput(boost::lexical_cast<std::string>(42));
798 //f4.AddInput(boost::lexical_cast<std::string>(42)); 836 //f4.AddInput(boost::lexical_cast<std::string>(42));
799 f2.ConnectNext(f3); 837 f2.ConnectNext(f3);
800 f3.ConnectNext(f4); 838 f3.ConnectNext(f4);
839 f4.ConnectNext(f5);
801 840
802 job.SetDescription("tutu"); 841 job.SetDescription("tutu");
842
843 boost::thread t(Tata, &scheduler, &job);
844
803 845
804 //scheduler.Submit(job); 846 //scheduler.Submit(job);
805 847
806 ListOfStrings l; 848 ListOfStrings l;
807 scheduler.SubmitAndWait(l, job); 849 scheduler.SubmitAndWait(l, job);
810 { 852 {
811 printf("** %s\n", i->c_str()); 853 printf("** %s\n", i->c_str());
812 } 854 }
813 855
814 //Toolbox::ServerBarrier(); 856 //Toolbox::ServerBarrier();
815 Toolbox::USleep(30000); 857 //Toolbox::USleep(3000000);
816 } 858
859 //t.join();
860 }