Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreading.cpp @ 779:76eb563f08f0 lua-scripting
improvements
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 30 Apr 2014 18:10:16 +0200 |
parents | 9ae0bb3f188b |
children | f0ac3a53ccf2 |
comparison
equal
deleted
inserted
replaced
777:9ae0bb3f188b | 779:76eb563f08f0 |
---|---|
249 #include <boost/lexical_cast.hpp> | 249 #include <boost/lexical_cast.hpp> |
250 | 250 |
251 | 251 |
252 namespace Orthanc | 252 namespace Orthanc |
253 { | 253 { |
254 typedef std::list<std::string> ListOfStrings; | |
255 | |
256 class IServerFilter | 254 class IServerFilter |
257 { | 255 { |
258 public: | 256 public: |
257 typedef std::list<std::string> ListOfStrings; | |
258 | |
259 virtual ~IServerFilter() | 259 virtual ~IServerFilter() |
260 { | 260 { |
261 } | 261 } |
262 | 262 |
263 virtual bool Apply(ListOfStrings& outputs, | 263 virtual bool Apply(ListOfStrings& outputs, |
264 const ListOfStrings& inputs) = 0; | 264 const ListOfStrings& inputs) = 0; |
265 | |
266 virtual bool SendOutputsToSink() const = 0; | |
265 }; | 267 }; |
266 | 268 |
267 | 269 |
268 class Sink : public IServerFilter | 270 class Sink : public IServerFilter |
269 { | 271 { |
271 ListOfStrings& target_; | 273 ListOfStrings& target_; |
272 | 274 |
273 public: | 275 public: |
274 Sink(ListOfStrings& target) : target_(target) | 276 Sink(ListOfStrings& target) : target_(target) |
275 { | 277 { |
278 } | |
279 | |
280 virtual bool SendOutputsToSink() const | |
281 { | |
282 return false; | |
276 } | 283 } |
277 | 284 |
278 virtual bool Apply(ListOfStrings& outputs, | 285 virtual bool Apply(ListOfStrings& outputs, |
279 const ListOfStrings& inputs) | 286 const ListOfStrings& inputs) |
280 { | 287 { |
301 | 308 |
302 virtual void SignalFailure(const std::string& jobId) = 0; | 309 virtual void SignalFailure(const std::string& jobId) = 0; |
303 }; | 310 }; |
304 | 311 |
305 | 312 |
306 class FilterWrapper : public IDynamicObject | 313 class ServerFilterInstance : public IDynamicObject |
307 { | 314 { |
308 friend class ServerScheduler; | 315 friend class ServerScheduler; |
309 | 316 |
310 private: | 317 private: |
318 typedef IServerFilter::ListOfStrings ListOfStrings; | |
319 | |
311 IServerFilter *filter_; | 320 IServerFilter *filter_; |
312 std::string jobId_; | 321 std::string jobId_; |
313 ListOfStrings inputs_; | 322 ListOfStrings inputs_; |
314 std::list<FilterWrapper*> next_; | 323 std::list<ServerFilterInstance*> next_; |
315 | 324 |
316 bool Execute(IServerFilterListener& listener) | 325 bool Execute(IServerFilterListener& listener) |
317 { | 326 { |
318 ListOfStrings outputs; | 327 ListOfStrings outputs; |
319 if (!filter_->Apply(outputs, inputs_)) | 328 if (!filter_->Apply(outputs, inputs_)) |
320 { | 329 { |
321 listener.SignalFailure(jobId_); | 330 listener.SignalFailure(jobId_); |
322 return true; | 331 return true; |
323 } | 332 } |
324 | 333 |
325 for (std::list<FilterWrapper*>::iterator | 334 for (std::list<ServerFilterInstance*>::iterator |
326 it = next_.begin(); it != next_.end(); it++) | 335 it = next_.begin(); it != next_.end(); it++) |
327 { | 336 { |
328 for (ListOfStrings::const_iterator | 337 for (ListOfStrings::const_iterator |
329 output = outputs.begin(); output != outputs.end(); output++) | 338 output = outputs.begin(); output != outputs.end(); output++) |
330 { | 339 { |
336 return true; | 345 return true; |
337 } | 346 } |
338 | 347 |
339 | 348 |
340 public: | 349 public: |
341 FilterWrapper(IServerFilter *filter, | 350 ServerFilterInstance(IServerFilter *filter, |
342 const std::string& jobId) : | 351 const std::string& jobId) : |
343 filter_(filter), | 352 filter_(filter), |
344 jobId_(jobId) | 353 jobId_(jobId) |
345 { | 354 { |
346 if (filter_ == NULL) | 355 if (filter_ == NULL) |
347 { | 356 { |
348 throw OrthancException(ErrorCode_ParameterOutOfRange); | 357 throw OrthancException(ErrorCode_ParameterOutOfRange); |
349 } | 358 } |
350 } | 359 } |
351 | 360 |
352 virtual ~FilterWrapper() | 361 virtual ~ServerFilterInstance() |
353 { | 362 { |
354 if (filter_ != NULL) | 363 if (filter_ != NULL) |
355 { | 364 { |
356 delete filter_; | 365 delete filter_; |
357 } | 366 } |
365 void AddInput(const std::string& input) | 374 void AddInput(const std::string& input) |
366 { | 375 { |
367 inputs_.push_back(input); | 376 inputs_.push_back(input); |
368 } | 377 } |
369 | 378 |
370 void ConnectNext(FilterWrapper& filter) | 379 void ConnectNext(ServerFilterInstance& filter) |
371 { | 380 { |
372 next_.push_back(&filter); | 381 next_.push_back(&filter); |
373 } | 382 } |
374 | 383 |
375 const std::list<FilterWrapper*>& GetNextFilters() const | 384 const std::list<ServerFilterInstance*>& GetNextFilters() const |
376 { | 385 { |
377 return next_; | 386 return next_; |
378 } | 387 } |
388 | |
389 IServerFilter& GetFilter() const | |
390 { | |
391 return *filter_; | |
392 } | |
379 }; | 393 }; |
380 | 394 |
381 | 395 |
382 class ServerJob | 396 class ServerJob |
383 { | 397 { |
384 friend class ServerScheduler; | 398 friend class ServerScheduler; |
385 | 399 |
386 private: | 400 private: |
387 std::list<FilterWrapper*> filters_; | 401 std::list<ServerFilterInstance*> filters_; |
388 std::string jobId_; | 402 std::string jobId_; |
389 bool submitted_; | 403 bool submitted_; |
390 std::string description_; | 404 std::string description_; |
391 | 405 |
392 | 406 |
393 void CheckOrdering() | 407 void CheckOrdering() |
394 { | 408 { |
395 std::map<FilterWrapper*, unsigned int> index; | 409 std::map<ServerFilterInstance*, unsigned int> index; |
396 | 410 |
397 unsigned int count = 0; | 411 unsigned int count = 0; |
398 for (std::list<FilterWrapper*>::const_iterator | 412 for (std::list<ServerFilterInstance*>::const_iterator |
399 it = filters_.begin(); it != filters_.end(); it++) | 413 it = filters_.begin(); it != filters_.end(); it++) |
400 { | 414 { |
401 index[*it] = count++; | 415 index[*it] = count++; |
402 } | 416 } |
403 | 417 |
404 for (std::list<FilterWrapper*>::const_iterator | 418 for (std::list<ServerFilterInstance*>::const_iterator |
405 it = filters_.begin(); it != filters_.end(); it++) | 419 it = filters_.begin(); it != filters_.end(); it++) |
406 { | 420 { |
407 const std::list<FilterWrapper*>& nextFilters = (*it)->GetNextFilters(); | 421 const std::list<ServerFilterInstance*>& nextFilters = (*it)->GetNextFilters(); |
408 | 422 |
409 for (std::list<FilterWrapper*>::const_iterator | 423 for (std::list<ServerFilterInstance*>::const_iterator |
410 next = nextFilters.begin(); next != nextFilters.end(); next++) | 424 next = nextFilters.begin(); next != nextFilters.end(); next++) |
411 { | 425 { |
412 if (index.find(*next) == index.end() || | 426 if (index.find(*next) == index.end() || |
413 index[*next] <= index[*it]) | 427 index[*next] <= index[*it]) |
414 { | 428 { |
431 | 445 |
432 CheckOrdering(); | 446 CheckOrdering(); |
433 | 447 |
434 size_t size = filters_.size(); | 448 size_t size = filters_.size(); |
435 | 449 |
436 for (std::list<FilterWrapper*>::iterator | 450 for (std::list<ServerFilterInstance*>::iterator |
437 it = filters_.begin(); it != filters_.end(); it++) | 451 it = filters_.begin(); it != filters_.end(); it++) |
438 { | 452 { |
439 target.Enqueue(*it); | 453 target.Enqueue(*it); |
440 } | 454 } |
441 | 455 |
453 description_ = "no description"; | 467 description_ = "no description"; |
454 } | 468 } |
455 | 469 |
456 ~ServerJob() | 470 ~ServerJob() |
457 { | 471 { |
458 for (std::list<FilterWrapper*>::iterator | 472 for (std::list<ServerFilterInstance*>::iterator |
459 it = filters_.begin(); it != filters_.end(); it++) | 473 it = filters_.begin(); it != filters_.end(); it++) |
460 { | 474 { |
461 delete *it; | 475 delete *it; |
462 } | 476 } |
463 } | 477 } |
475 const std::string& GetDescription() const | 489 const std::string& GetDescription() const |
476 { | 490 { |
477 return description_; | 491 return description_; |
478 } | 492 } |
479 | 493 |
480 FilterWrapper& AddFilter(IServerFilter* filter) | 494 ServerFilterInstance& AddFilter(IServerFilter* filter) |
481 { | 495 { |
482 if (submitted_) | 496 if (submitted_) |
483 { | 497 { |
484 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 498 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
485 } | 499 } |
486 | 500 |
487 filters_.push_back(new FilterWrapper(filter, jobId_)); | 501 filters_.push_back(new ServerFilterInstance(filter, jobId_)); |
488 | 502 |
489 return *filters_.back(); | 503 return *filters_.back(); |
490 } | 504 } |
491 }; | 505 }; |
492 | 506 |
509 JobStatus_Running = 1, | 523 JobStatus_Running = 1, |
510 JobStatus_Success = 2, | 524 JobStatus_Success = 2, |
511 JobStatus_Failure = 3 | 525 JobStatus_Failure = 3 |
512 }; | 526 }; |
513 | 527 |
528 typedef IServerFilter::ListOfStrings ListOfStrings; | |
514 typedef std::map<std::string, JobInfo> Jobs; | 529 typedef std::map<std::string, JobInfo> Jobs; |
515 | 530 |
516 boost::mutex mutex_; | 531 boost::mutex mutex_; |
517 boost::condition_variable jobFinished_; | 532 boost::condition_variable jobFinished_; |
518 Jobs jobs_; | 533 Jobs jobs_; |
582 while (!that->finish_) | 597 while (!that->finish_) |
583 { | 598 { |
584 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT)); | 599 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT)); |
585 if (object.get() != NULL) | 600 if (object.get() != NULL) |
586 { | 601 { |
587 FilterWrapper& filter = dynamic_cast<FilterWrapper&>(*object); | 602 ServerFilterInstance& filter = dynamic_cast<ServerFilterInstance&>(*object); |
588 | 603 |
589 // Skip the execution of this filter if its parent job has | 604 // Skip the execution of this filter if its parent job has |
590 // previously failed. | 605 // previously failed. |
591 bool jobHasFailed; | 606 bool jobHasFailed; |
592 { | 607 { |
667 return true; | 682 return true; |
668 } | 683 } |
669 | 684 |
670 // Add a sink filter to collect all the results of the filters | 685 // Add a sink filter to collect all the results of the filters |
671 // that have no next filter. | 686 // that have no next filter. |
672 FilterWrapper& sink = job.AddFilter(new Sink(outputs)); | 687 ServerFilterInstance& sink = job.AddFilter(new Sink(outputs)); |
673 | 688 |
674 for (std::list<FilterWrapper*>::iterator | 689 for (std::list<ServerFilterInstance*>::iterator |
675 it = job.filters_.begin(); it != job.filters_.end(); it++) | 690 it = job.filters_.begin(); it != job.filters_.end(); it++) |
676 { | 691 { |
677 if ((*it) != &sink && | 692 if ((*it) != &sink && |
678 (*it)->GetNextFilters().size() == 0) | 693 (*it)->GetNextFilters().size() == 0 && |
694 (*it)->GetFilter().SendOutputsToSink()) | |
679 { | 695 { |
680 (*it)->ConnectNext(sink); | 696 (*it)->ConnectNext(sink); |
681 } | 697 } |
682 } | 698 } |
683 | 699 |
815 | 831 |
816 Toolbox::USleep(1000000); | 832 Toolbox::USleep(1000000); |
817 | 833 |
818 return true; | 834 return true; |
819 } | 835 } |
836 | |
837 virtual bool SendOutputsToSink() const | |
838 { | |
839 return true; | |
840 } | |
820 }; | 841 }; |
821 | 842 |
822 | 843 |
823 static void Tata(ServerScheduler* s, ServerJob* j, bool* done) | 844 static void Tata(ServerScheduler* s, ServerJob* j, bool* done) |
824 { | 845 { |
846 typedef IServerFilter::ListOfStrings ListOfStrings; | |
847 | |
825 #if 1 | 848 #if 1 |
826 while (!(*done)) | 849 while (!(*done)) |
827 { | 850 { |
828 ListOfStrings l; | 851 ListOfStrings l; |
829 s->GetListOfJobs(l); | 852 s->GetListOfJobs(l); |
849 TEST(Toto, Toto) | 872 TEST(Toto, Toto) |
850 { | 873 { |
851 ServerScheduler scheduler; | 874 ServerScheduler scheduler; |
852 | 875 |
853 ServerJob job; | 876 ServerJob job; |
854 FilterWrapper& f2 = job.AddFilter(new Tutu(2)); | 877 ServerFilterInstance& f2 = job.AddFilter(new Tutu(2)); |
855 FilterWrapper& f3 = job.AddFilter(new Tutu(3)); | 878 ServerFilterInstance& f3 = job.AddFilter(new Tutu(3)); |
856 FilterWrapper& f4 = job.AddFilter(new Tutu(4)); | 879 ServerFilterInstance& f4 = job.AddFilter(new Tutu(4)); |
857 FilterWrapper& f5 = job.AddFilter(new Tutu(5)); | 880 ServerFilterInstance& f5 = job.AddFilter(new Tutu(5)); |
858 f2.AddInput(boost::lexical_cast<std::string>(42)); | 881 f2.AddInput(boost::lexical_cast<std::string>(42)); |
859 //f3.AddInput(boost::lexical_cast<std::string>(42)); | 882 //f3.AddInput(boost::lexical_cast<std::string>(42)); |
860 //f4.AddInput(boost::lexical_cast<std::string>(42)); | 883 //f4.AddInput(boost::lexical_cast<std::string>(42)); |
861 f2.ConnectNext(f3); | 884 f2.ConnectNext(f3); |
862 f3.ConnectNext(f4); | 885 f3.ConnectNext(f4); |
868 boost::thread t(Tata, &scheduler, &job, &done); | 891 boost::thread t(Tata, &scheduler, &job, &done); |
869 | 892 |
870 | 893 |
871 //scheduler.Submit(job); | 894 //scheduler.Submit(job); |
872 | 895 |
873 ListOfStrings l; | 896 IServerFilter::ListOfStrings l; |
874 scheduler.SubmitAndWait(l, job); | 897 scheduler.SubmitAndWait(l, job); |
875 | 898 |
876 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) | 899 for (IServerFilter::ListOfStrings::iterator i = l.begin(); i != l.end(); i++) |
877 { | 900 { |
878 printf("** %s\n", i->c_str()); | 901 printf("** %s\n", i->c_str()); |
879 } | 902 } |
880 | 903 |
881 //Toolbox::ServerBarrier(); | 904 //Toolbox::ServerBarrier(); |