Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreading.cpp @ 765:fc97f762834c lua-scripting
scheduler
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 23 Apr 2014 16:53:59 +0200 |
parents | b2a62f22fbe8 |
children | 476febedc516 |
comparison
equal
deleted
inserted
replaced
762:45b16f67259c | 765:fc97f762834c |
---|---|
1 #include "gtest/gtest.h" | 1 #include "gtest/gtest.h" |
2 #include <glog/logging.h> | |
2 | 3 |
3 #include "../Core/OrthancException.h" | 4 #include "../Core/OrthancException.h" |
4 #include "../Core/Toolbox.h" | 5 #include "../Core/Toolbox.h" |
5 #include "../Core/MultiThreading/ArrayFilledByThreads.h" | 6 #include "../Core/MultiThreading/ArrayFilledByThreads.h" |
6 #include "../Core/MultiThreading/Locker.h" | 7 #include "../Core/MultiThreading/Locker.h" |
208 | 209 |
209 { | 210 { |
210 Locker locker3(lock.ForWriter()); | 211 Locker locker3(lock.ForWriter()); |
211 } | 212 } |
212 } | 213 } |
214 | |
215 | |
216 | |
217 | |
218 | |
219 | |
220 | |
221 #include "../Core/ICommand.h" | |
222 #include "../Core/Toolbox.h" | |
223 #include "../Core/Uuid.h" | |
224 #include "../Core/MultiThreading/SharedMessageQueue.h" | |
225 #include <boost/lexical_cast.hpp> | |
226 | |
227 | |
228 namespace Orthanc | |
229 { | |
230 typedef std::list<std::string> ListOfStrings; | |
231 | |
232 class IServerFilter | |
233 { | |
234 public: | |
235 virtual ~IServerFilter() | |
236 { | |
237 } | |
238 | |
239 virtual bool Apply(ListOfStrings& outputs, | |
240 const ListOfStrings& inputs) = 0; | |
241 }; | |
242 | |
243 | |
244 class Sink : public IServerFilter | |
245 { | |
246 private: | |
247 ListOfStrings& target_; | |
248 | |
249 public: | |
250 Sink(ListOfStrings& target) : target_(target) | |
251 { | |
252 } | |
253 | |
254 virtual bool Apply(ListOfStrings& outputs, | |
255 const ListOfStrings& inputs) | |
256 { | |
257 for (ListOfStrings::const_iterator | |
258 it = inputs.begin(); it != inputs.end(); it++) | |
259 { | |
260 target_.push_back(*it); | |
261 } | |
262 | |
263 return true; | |
264 } | |
265 }; | |
266 | |
267 | |
268 | |
269 class IServerFilterListener | |
270 { | |
271 public: | |
272 virtual ~IServerFilterListener() | |
273 { | |
274 } | |
275 | |
276 virtual void SignalSuccess(const std::string& jobId) = 0; | |
277 | |
278 virtual void SignalFailure(const std::string& jobId) = 0; | |
279 }; | |
280 | |
281 | |
282 class FilterWrapper : public IDynamicObject | |
283 { | |
284 private: | |
285 IServerFilterListener *listener_; | |
286 IServerFilter *filter_; | |
287 std::string jobId_; | |
288 ListOfStrings inputs_; | |
289 std::list<FilterWrapper*> next_; | |
290 | |
291 public: | |
292 FilterWrapper(IServerFilter *filter, | |
293 const std::string& jobId) : | |
294 listener_(NULL), | |
295 filter_(filter), | |
296 jobId_(jobId) | |
297 { | |
298 if (filter_ == NULL) | |
299 { | |
300 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
301 } | |
302 } | |
303 | |
304 virtual ~FilterWrapper() | |
305 { | |
306 if (filter_ != NULL) | |
307 { | |
308 delete filter_; | |
309 } | |
310 } | |
311 | |
312 void SetListener(IServerFilterListener& listener) | |
313 { | |
314 listener_ = &listener; | |
315 } | |
316 | |
317 const std::string& GetJobId() const | |
318 { | |
319 return jobId_; | |
320 } | |
321 | |
322 void AddInput(const std::string& input) | |
323 { | |
324 inputs_.push_back(input); | |
325 } | |
326 | |
327 bool Execute() | |
328 { | |
329 ListOfStrings outputs; | |
330 if (!filter_->Apply(outputs, inputs_)) | |
331 { | |
332 if (listener_) | |
333 { | |
334 listener_->SignalFailure(jobId_); | |
335 } | |
336 | |
337 return true; | |
338 } | |
339 | |
340 for (std::list<FilterWrapper*>::iterator | |
341 it = next_.begin(); it != next_.end(); it++) | |
342 { | |
343 for (ListOfStrings::const_iterator | |
344 output = outputs.begin(); output != outputs.end(); output++) | |
345 { | |
346 (*it)->AddInput(*output); | |
347 } | |
348 } | |
349 | |
350 if (listener_) | |
351 { | |
352 listener_->SignalSuccess(jobId_); | |
353 } | |
354 | |
355 return true; | |
356 } | |
357 | |
358 void ConnectNext(FilterWrapper& filter) | |
359 { | |
360 next_.push_back(&filter); | |
361 } | |
362 | |
363 const std::list<FilterWrapper*>& GetNextFilters() const | |
364 { | |
365 return next_; | |
366 } | |
367 }; | |
368 | |
369 | |
370 enum ServerJobStatus | |
371 { | |
372 ServerJobStatus_Running = 1, | |
373 ServerJobStatus_Success = 2, | |
374 ServerJobStatus_Failure = 3 | |
375 }; | |
376 | |
377 | |
378 class ServerJob | |
379 { | |
380 friend class ServerScheduler; | |
381 | |
382 private: | |
383 std::list<FilterWrapper*> filters_; | |
384 std::string jobId_; | |
385 bool submitted_; | |
386 std::string description_; | |
387 | |
388 | |
389 void CheckOrdering() | |
390 { | |
391 std::map<FilterWrapper*, unsigned int> index; | |
392 | |
393 unsigned int count = 0; | |
394 for (std::list<FilterWrapper*>::const_iterator | |
395 it = filters_.begin(); it != filters_.end(); it++) | |
396 { | |
397 index[*it] = count++; | |
398 } | |
399 | |
400 for (std::list<FilterWrapper*>::const_iterator | |
401 it = filters_.begin(); it != filters_.end(); it++) | |
402 { | |
403 const std::list<FilterWrapper*>& nextFilters = (*it)->GetNextFilters(); | |
404 | |
405 for (std::list<FilterWrapper*>::const_iterator | |
406 next = nextFilters.begin(); next != nextFilters.end(); next++) | |
407 { | |
408 if (index.find(*next) == index.end() || | |
409 index[*next] <= index[*it]) | |
410 { | |
411 // You must reorder your calls to "ServerJob::AddFilter" | |
412 throw OrthancException("Bad ordering of filters in a job"); | |
413 } | |
414 } | |
415 } | |
416 } | |
417 | |
418 | |
419 size_t Submit(SharedMessageQueue& target, | |
420 IServerFilterListener& listener) | |
421 { | |
422 if (submitted_) | |
423 { | |
424 // This job has already been submitted | |
425 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
426 } | |
427 | |
428 CheckOrdering(); | |
429 | |
430 size_t size = filters_.size(); | |
431 | |
432 for (std::list<FilterWrapper*>::iterator | |
433 it = filters_.begin(); it != filters_.end(); it++) | |
434 { | |
435 (*it)->SetListener(listener); | |
436 target.Enqueue(*it); | |
437 } | |
438 | |
439 filters_.clear(); | |
440 submitted_ = true; | |
441 | |
442 return size; | |
443 } | |
444 | |
445 public: | |
446 ServerJob() | |
447 { | |
448 jobId_ = Toolbox::GenerateUuid(); | |
449 submitted_ = false; | |
450 description_ = "no description"; | |
451 } | |
452 | |
453 ~ServerJob() | |
454 { | |
455 for (std::list<FilterWrapper*>::iterator | |
456 it = filters_.begin(); it != filters_.end(); it++) | |
457 { | |
458 delete *it; | |
459 } | |
460 } | |
461 | |
462 const std::string& GetId() const | |
463 { | |
464 return jobId_; | |
465 } | |
466 | |
467 void SetDescription(const char* description) | |
468 { | |
469 description_ = description; | |
470 } | |
471 | |
472 const std::string& GetDescription() const | |
473 { | |
474 return description_; | |
475 } | |
476 | |
477 FilterWrapper& AddFilter(IServerFilter* filter) | |
478 { | |
479 if (submitted_) | |
480 { | |
481 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
482 } | |
483 | |
484 filters_.push_back(new FilterWrapper(filter, jobId_)); | |
485 | |
486 return *filters_.back(); | |
487 } | |
488 }; | |
489 | |
490 | |
491 class ServerScheduler : public IServerFilterListener | |
492 { | |
493 private: | |
494 struct JobInfo | |
495 { | |
496 bool watched_; | |
497 bool cancel_; | |
498 size_t size_; | |
499 size_t success_; | |
500 size_t failures_; | |
501 std::string description_; | |
502 }; | |
503 | |
504 typedef std::map<std::string, JobInfo> Jobs; | |
505 | |
506 boost::mutex mutex_; | |
507 boost::condition_variable jobFinished_; | |
508 Jobs jobs_; | |
509 SharedMessageQueue queue_; | |
510 bool finish_; | |
511 boost::thread worker_; | |
512 std::map<std::string, ServerJobStatus> watchedJobStatus_; | |
513 | |
514 JobInfo& GetJobInfo(const std::string& jobId) | |
515 { | |
516 Jobs::iterator info = jobs_.find(jobId); | |
517 | |
518 if (info == jobs_.end()) | |
519 { | |
520 throw OrthancException(ErrorCode_InternalError); | |
521 } | |
522 | |
523 return info->second; | |
524 } | |
525 | |
526 virtual void SignalSuccess(const std::string& jobId) | |
527 { | |
528 boost::mutex::scoped_lock lock(mutex_); | |
529 | |
530 JobInfo& info = GetJobInfo(jobId); | |
531 info.success_++; | |
532 | |
533 assert(info.failures_ == 0); | |
534 | |
535 if (info.success_ >= info.size_) | |
536 { | |
537 if (info.watched_) | |
538 { | |
539 watchedJobStatus_[jobId] = ServerJobStatus_Success; | |
540 jobFinished_.notify_all(); | |
541 } | |
542 | |
543 LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; | |
544 jobs_.erase(jobId); | |
545 } | |
546 } | |
547 | |
548 virtual void SignalFailure(const std::string& jobId) | |
549 { | |
550 boost::mutex::scoped_lock lock(mutex_); | |
551 | |
552 JobInfo& info = GetJobInfo(jobId); | |
553 info.failures_++; | |
554 | |
555 if (info.success_ + info.failures_ >= info.size_) | |
556 { | |
557 if (info.watched_) | |
558 { | |
559 watchedJobStatus_[jobId] = ServerJobStatus_Failure; | |
560 jobFinished_.notify_all(); | |
561 } | |
562 | |
563 LOG(ERROR) << "Job has failed (" << info.description_ << ")"; | |
564 jobs_.erase(jobId); | |
565 } | |
566 } | |
567 | |
568 static void Worker(ServerScheduler* that) | |
569 { | |
570 static const int32_t TIMEOUT = 100; | |
571 | |
572 while (!that->finish_) | |
573 { | |
574 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT)); | |
575 if (object.get() != NULL) | |
576 { | |
577 FilterWrapper& filter = dynamic_cast<FilterWrapper&>(*object); | |
578 | |
579 // Skip the execution of this filter if its parent job has | |
580 // previously failed. | |
581 bool jobHasFailed; | |
582 { | |
583 boost::mutex::scoped_lock lock(that->mutex_); | |
584 JobInfo& info = that->GetJobInfo(filter.GetJobId()); | |
585 jobHasFailed = (info.failures_ > 0 || info.cancel_); | |
586 } | |
587 | |
588 if (jobHasFailed) | |
589 { | |
590 that->SignalFailure(filter.GetJobId()); | |
591 } | |
592 else | |
593 { | |
594 filter.Execute(); | |
595 } | |
596 } | |
597 } | |
598 } | |
599 | |
600 void SubmitInternal(ServerJob& job, | |
601 bool watched) | |
602 { | |
603 boost::mutex::scoped_lock lock(mutex_); | |
604 | |
605 JobInfo info; | |
606 info.size_ = job.Submit(queue_, *this); | |
607 info.cancel_ = false; | |
608 info.success_ = 0; | |
609 info.failures_ = 0; | |
610 info.description_ = job.GetDescription(); | |
611 info.watched_ = watched; | |
612 | |
613 assert(info.size_ > 0); | |
614 | |
615 if (watched) | |
616 { | |
617 watchedJobStatus_[job.GetId()] = ServerJobStatus_Running; | |
618 } | |
619 | |
620 jobs_[job.GetId()] = info; | |
621 | |
622 LOG(INFO) << "New job submitted (" << job.description_ << ")"; | |
623 } | |
624 | |
625 public: | |
626 ServerScheduler() | |
627 { | |
628 finish_ = false; | |
629 worker_ = boost::thread(Worker, this); | |
630 } | |
631 | |
632 ~ServerScheduler() | |
633 { | |
634 finish_ = true; | |
635 worker_.join(); | |
636 } | |
637 | |
638 void Submit(ServerJob& job) | |
639 { | |
640 if (job.filters_.empty()) | |
641 { | |
642 return; | |
643 } | |
644 | |
645 SubmitInternal(job, false); | |
646 } | |
647 | |
648 bool SubmitAndWait(ListOfStrings& outputs, | |
649 ServerJob& job) | |
650 { | |
651 std::string jobId = job.GetId(); | |
652 | |
653 outputs.clear(); | |
654 | |
655 if (job.filters_.empty()) | |
656 { | |
657 return true; | |
658 } | |
659 | |
660 // Add a sink filter to collect all the results of the filters | |
661 // that have no next filter. | |
662 FilterWrapper& sink = job.AddFilter(new Sink(outputs)); | |
663 | |
664 for (std::list<FilterWrapper*>::iterator | |
665 it = job.filters_.begin(); it != job.filters_.end(); it++) | |
666 { | |
667 if ((*it) != &sink && | |
668 (*it)->GetNextFilters().size() == 0) | |
669 { | |
670 (*it)->ConnectNext(sink); | |
671 } | |
672 } | |
673 | |
674 // Submit the job | |
675 SubmitInternal(job, true); | |
676 | |
677 // Wait for the job to complete (either success or failure) | |
678 ServerJobStatus status; | |
679 | |
680 { | |
681 boost::mutex::scoped_lock lock(mutex_); | |
682 | |
683 while (watchedJobStatus_[jobId] == ServerJobStatus_Running) | |
684 { | |
685 jobFinished_.wait(lock); | |
686 } | |
687 | |
688 status = watchedJobStatus_[jobId]; | |
689 watchedJobStatus_.erase(jobId); | |
690 } | |
691 | |
692 return (status == ServerJobStatus_Success); | |
693 } | |
694 | |
695 | |
696 bool IsRunning(const std::string& jobId) | |
697 { | |
698 boost::mutex::scoped_lock lock(mutex_); | |
699 return jobs_.find(jobId) != jobs_.end(); | |
700 } | |
701 | |
702 | |
703 void Cancel(const std::string& jobId) | |
704 { | |
705 boost::mutex::scoped_lock lock(mutex_); | |
706 | |
707 Jobs::iterator job = jobs_.find(jobId); | |
708 | |
709 if (job != jobs_.end()) | |
710 { | |
711 job->second.cancel_ = true; | |
712 LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")"; | |
713 } | |
714 } | |
715 | |
716 | |
717 // Returns a number between 0 and 1 | |
718 float GetProgress(const std::string& jobId) | |
719 { | |
720 boost::mutex::scoped_lock lock(mutex_); | |
721 | |
722 Jobs::iterator job = jobs_.find(jobId); | |
723 | |
724 if (job == jobs_.end() || | |
725 job->second.size_ == 0 /* should never happen */) | |
726 { | |
727 // This job is not running | |
728 return 1; | |
729 } | |
730 | |
731 float n = static_cast<float>(job->second.failures_ + job->second.success_); | |
732 float d = static_cast<float>(job->second.size_); | |
733 return n / d; | |
734 } | |
735 | |
736 bool IsRunning(const ServerJob& job) | |
737 { | |
738 return IsRunning(job.GetId()); | |
739 } | |
740 | |
741 void Cancel(const ServerJob& job) | |
742 { | |
743 Cancel(job.GetId()); | |
744 } | |
745 | |
746 float GetProgress(const ServerJob& job) | |
747 { | |
748 return GetProgress(job); | |
749 } | |
750 }; | |
751 | |
752 } | |
753 | |
754 | |
755 | |
756 class Tutu : public IServerFilter | |
757 { | |
758 private: | |
759 int factor_; | |
760 | |
761 public: | |
762 Tutu(int f) : factor_(f) | |
763 { | |
764 } | |
765 | |
766 virtual bool Apply(ListOfStrings& outputs, | |
767 const ListOfStrings& inputs) | |
768 { | |
769 for (ListOfStrings::const_iterator | |
770 it = inputs.begin(); it != inputs.end(); it++) | |
771 { | |
772 int a = boost::lexical_cast<int>(*it); | |
773 int b = factor_ * a; | |
774 | |
775 printf("%d * %d = %d\n", a, factor_, b); | |
776 | |
777 //if (a == 84) { printf("BREAK\n"); return false; } | |
778 | |
779 outputs.push_back(boost::lexical_cast<std::string>(b)); | |
780 } | |
781 | |
782 Toolbox::USleep(1000000); | |
783 | |
784 return true; | |
785 } | |
786 }; | |
787 | |
788 TEST(Toto, Toto) | |
789 { | |
790 ServerScheduler scheduler; | |
791 | |
792 ServerJob job; | |
793 FilterWrapper& f2 = job.AddFilter(new Tutu(2)); | |
794 FilterWrapper& f3 = job.AddFilter(new Tutu(3)); | |
795 FilterWrapper& f4 = job.AddFilter(new Tutu(4)); | |
796 f2.AddInput(boost::lexical_cast<std::string>(42)); | |
797 //f3.AddInput(boost::lexical_cast<std::string>(42)); | |
798 //f4.AddInput(boost::lexical_cast<std::string>(42)); | |
799 f2.ConnectNext(f3); | |
800 f3.ConnectNext(f4); | |
801 | |
802 job.SetDescription("tutu"); | |
803 | |
804 //scheduler.Submit(job); | |
805 | |
806 ListOfStrings l; | |
807 scheduler.SubmitAndWait(l, job); | |
808 | |
809 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | |
810 { | |
811 printf("** %s\n", i->c_str()); | |
812 } | |
813 | |
814 //Toolbox::ServerBarrier(); | |
815 Toolbox::USleep(30000); | |
816 } |