comparison UnitTestsSources/MultiThreading.cpp @ 779:76eb563f08f0 lua-scripting

improvements
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 30 Apr 2014 18:10:16 +0200
parents 9ae0bb3f188b
children f0ac3a53ccf2
comparison
equal deleted inserted replaced
777:9ae0bb3f188b 779:76eb563f08f0
249 #include <boost/lexical_cast.hpp> 249 #include <boost/lexical_cast.hpp>
250 250
251 251
252 namespace Orthanc 252 namespace Orthanc
253 { 253 {
254 typedef std::list<std::string> ListOfStrings;
255
256 class IServerFilter 254 class IServerFilter
257 { 255 {
258 public: 256 public:
257 typedef std::list<std::string> ListOfStrings;
258
259 virtual ~IServerFilter() 259 virtual ~IServerFilter()
260 { 260 {
261 } 261 }
262 262
263 virtual bool Apply(ListOfStrings& outputs, 263 virtual bool Apply(ListOfStrings& outputs,
264 const ListOfStrings& inputs) = 0; 264 const ListOfStrings& inputs) = 0;
265
266 virtual bool SendOutputsToSink() const = 0;
265 }; 267 };
266 268
267 269
268 class Sink : public IServerFilter 270 class Sink : public IServerFilter
269 { 271 {
271 ListOfStrings& target_; 273 ListOfStrings& target_;
272 274
273 public: 275 public:
274 Sink(ListOfStrings& target) : target_(target) 276 Sink(ListOfStrings& target) : target_(target)
275 { 277 {
278 }
279
280 virtual bool SendOutputsToSink() const
281 {
282 return false;
276 } 283 }
277 284
278 virtual bool Apply(ListOfStrings& outputs, 285 virtual bool Apply(ListOfStrings& outputs,
279 const ListOfStrings& inputs) 286 const ListOfStrings& inputs)
280 { 287 {
301 308
302 virtual void SignalFailure(const std::string& jobId) = 0; 309 virtual void SignalFailure(const std::string& jobId) = 0;
303 }; 310 };
304 311
305 312
306 class FilterWrapper : public IDynamicObject 313 class ServerFilterInstance : public IDynamicObject
307 { 314 {
308 friend class ServerScheduler; 315 friend class ServerScheduler;
309 316
310 private: 317 private:
318 typedef IServerFilter::ListOfStrings ListOfStrings;
319
311 IServerFilter *filter_; 320 IServerFilter *filter_;
312 std::string jobId_; 321 std::string jobId_;
313 ListOfStrings inputs_; 322 ListOfStrings inputs_;
314 std::list<FilterWrapper*> next_; 323 std::list<ServerFilterInstance*> next_;
315 324
316 bool Execute(IServerFilterListener& listener) 325 bool Execute(IServerFilterListener& listener)
317 { 326 {
318 ListOfStrings outputs; 327 ListOfStrings outputs;
319 if (!filter_->Apply(outputs, inputs_)) 328 if (!filter_->Apply(outputs, inputs_))
320 { 329 {
321 listener.SignalFailure(jobId_); 330 listener.SignalFailure(jobId_);
322 return true; 331 return true;
323 } 332 }
324 333
325 for (std::list<FilterWrapper*>::iterator 334 for (std::list<ServerFilterInstance*>::iterator
326 it = next_.begin(); it != next_.end(); it++) 335 it = next_.begin(); it != next_.end(); it++)
327 { 336 {
328 for (ListOfStrings::const_iterator 337 for (ListOfStrings::const_iterator
329 output = outputs.begin(); output != outputs.end(); output++) 338 output = outputs.begin(); output != outputs.end(); output++)
330 { 339 {
336 return true; 345 return true;
337 } 346 }
338 347
339 348
340 public: 349 public:
341 FilterWrapper(IServerFilter *filter, 350 ServerFilterInstance(IServerFilter *filter,
342 const std::string& jobId) : 351 const std::string& jobId) :
343 filter_(filter), 352 filter_(filter),
344 jobId_(jobId) 353 jobId_(jobId)
345 { 354 {
346 if (filter_ == NULL) 355 if (filter_ == NULL)
347 { 356 {
348 throw OrthancException(ErrorCode_ParameterOutOfRange); 357 throw OrthancException(ErrorCode_ParameterOutOfRange);
349 } 358 }
350 } 359 }
351 360
352 virtual ~FilterWrapper() 361 virtual ~ServerFilterInstance()
353 { 362 {
354 if (filter_ != NULL) 363 if (filter_ != NULL)
355 { 364 {
356 delete filter_; 365 delete filter_;
357 } 366 }
365 void AddInput(const std::string& input) 374 void AddInput(const std::string& input)
366 { 375 {
367 inputs_.push_back(input); 376 inputs_.push_back(input);
368 } 377 }
369 378
370 void ConnectNext(FilterWrapper& filter) 379 void ConnectNext(ServerFilterInstance& filter)
371 { 380 {
372 next_.push_back(&filter); 381 next_.push_back(&filter);
373 } 382 }
374 383
375 const std::list<FilterWrapper*>& GetNextFilters() const 384 const std::list<ServerFilterInstance*>& GetNextFilters() const
376 { 385 {
377 return next_; 386 return next_;
378 } 387 }
388
389 IServerFilter& GetFilter() const
390 {
391 return *filter_;
392 }
379 }; 393 };
380 394
381 395
382 class ServerJob 396 class ServerJob
383 { 397 {
384 friend class ServerScheduler; 398 friend class ServerScheduler;
385 399
386 private: 400 private:
387 std::list<FilterWrapper*> filters_; 401 std::list<ServerFilterInstance*> filters_;
388 std::string jobId_; 402 std::string jobId_;
389 bool submitted_; 403 bool submitted_;
390 std::string description_; 404 std::string description_;
391 405
392 406
393 void CheckOrdering() 407 void CheckOrdering()
394 { 408 {
395 std::map<FilterWrapper*, unsigned int> index; 409 std::map<ServerFilterInstance*, unsigned int> index;
396 410
397 unsigned int count = 0; 411 unsigned int count = 0;
398 for (std::list<FilterWrapper*>::const_iterator 412 for (std::list<ServerFilterInstance*>::const_iterator
399 it = filters_.begin(); it != filters_.end(); it++) 413 it = filters_.begin(); it != filters_.end(); it++)
400 { 414 {
401 index[*it] = count++; 415 index[*it] = count++;
402 } 416 }
403 417
404 for (std::list<FilterWrapper*>::const_iterator 418 for (std::list<ServerFilterInstance*>::const_iterator
405 it = filters_.begin(); it != filters_.end(); it++) 419 it = filters_.begin(); it != filters_.end(); it++)
406 { 420 {
407 const std::list<FilterWrapper*>& nextFilters = (*it)->GetNextFilters(); 421 const std::list<ServerFilterInstance*>& nextFilters = (*it)->GetNextFilters();
408 422
409 for (std::list<FilterWrapper*>::const_iterator 423 for (std::list<ServerFilterInstance*>::const_iterator
410 next = nextFilters.begin(); next != nextFilters.end(); next++) 424 next = nextFilters.begin(); next != nextFilters.end(); next++)
411 { 425 {
412 if (index.find(*next) == index.end() || 426 if (index.find(*next) == index.end() ||
413 index[*next] <= index[*it]) 427 index[*next] <= index[*it])
414 { 428 {
431 445
432 CheckOrdering(); 446 CheckOrdering();
433 447
434 size_t size = filters_.size(); 448 size_t size = filters_.size();
435 449
436 for (std::list<FilterWrapper*>::iterator 450 for (std::list<ServerFilterInstance*>::iterator
437 it = filters_.begin(); it != filters_.end(); it++) 451 it = filters_.begin(); it != filters_.end(); it++)
438 { 452 {
439 target.Enqueue(*it); 453 target.Enqueue(*it);
440 } 454 }
441 455
453 description_ = "no description"; 467 description_ = "no description";
454 } 468 }
455 469
456 ~ServerJob() 470 ~ServerJob()
457 { 471 {
458 for (std::list<FilterWrapper*>::iterator 472 for (std::list<ServerFilterInstance*>::iterator
459 it = filters_.begin(); it != filters_.end(); it++) 473 it = filters_.begin(); it != filters_.end(); it++)
460 { 474 {
461 delete *it; 475 delete *it;
462 } 476 }
463 } 477 }
475 const std::string& GetDescription() const 489 const std::string& GetDescription() const
476 { 490 {
477 return description_; 491 return description_;
478 } 492 }
479 493
480 FilterWrapper& AddFilter(IServerFilter* filter) 494 ServerFilterInstance& AddFilter(IServerFilter* filter)
481 { 495 {
482 if (submitted_) 496 if (submitted_)
483 { 497 {
484 throw OrthancException(ErrorCode_BadSequenceOfCalls); 498 throw OrthancException(ErrorCode_BadSequenceOfCalls);
485 } 499 }
486 500
487 filters_.push_back(new FilterWrapper(filter, jobId_)); 501 filters_.push_back(new ServerFilterInstance(filter, jobId_));
488 502
489 return *filters_.back(); 503 return *filters_.back();
490 } 504 }
491 }; 505 };
492 506
509 JobStatus_Running = 1, 523 JobStatus_Running = 1,
510 JobStatus_Success = 2, 524 JobStatus_Success = 2,
511 JobStatus_Failure = 3 525 JobStatus_Failure = 3
512 }; 526 };
513 527
528 typedef IServerFilter::ListOfStrings ListOfStrings;
514 typedef std::map<std::string, JobInfo> Jobs; 529 typedef std::map<std::string, JobInfo> Jobs;
515 530
516 boost::mutex mutex_; 531 boost::mutex mutex_;
517 boost::condition_variable jobFinished_; 532 boost::condition_variable jobFinished_;
518 Jobs jobs_; 533 Jobs jobs_;
582 while (!that->finish_) 597 while (!that->finish_)
583 { 598 {
584 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT)); 599 std::auto_ptr<IDynamicObject> object(that->queue_.Dequeue(TIMEOUT));
585 if (object.get() != NULL) 600 if (object.get() != NULL)
586 { 601 {
587 FilterWrapper& filter = dynamic_cast<FilterWrapper&>(*object); 602 ServerFilterInstance& filter = dynamic_cast<ServerFilterInstance&>(*object);
588 603
589 // Skip the execution of this filter if its parent job has 604 // Skip the execution of this filter if its parent job has
590 // previously failed. 605 // previously failed.
591 bool jobHasFailed; 606 bool jobHasFailed;
592 { 607 {
667 return true; 682 return true;
668 } 683 }
669 684
670 // Add a sink filter to collect all the results of the filters 685 // Add a sink filter to collect all the results of the filters
671 // that have no next filter. 686 // that have no next filter.
672 FilterWrapper& sink = job.AddFilter(new Sink(outputs)); 687 ServerFilterInstance& sink = job.AddFilter(new Sink(outputs));
673 688
674 for (std::list<FilterWrapper*>::iterator 689 for (std::list<ServerFilterInstance*>::iterator
675 it = job.filters_.begin(); it != job.filters_.end(); it++) 690 it = job.filters_.begin(); it != job.filters_.end(); it++)
676 { 691 {
677 if ((*it) != &sink && 692 if ((*it) != &sink &&
678 (*it)->GetNextFilters().size() == 0) 693 (*it)->GetNextFilters().size() == 0 &&
694 (*it)->GetFilter().SendOutputsToSink())
679 { 695 {
680 (*it)->ConnectNext(sink); 696 (*it)->ConnectNext(sink);
681 } 697 }
682 } 698 }
683 699
815 831
816 Toolbox::USleep(1000000); 832 Toolbox::USleep(1000000);
817 833
818 return true; 834 return true;
819 } 835 }
836
837 virtual bool SendOutputsToSink() const
838 {
839 return true;
840 }
820 }; 841 };
821 842
822 843
823 static void Tata(ServerScheduler* s, ServerJob* j, bool* done) 844 static void Tata(ServerScheduler* s, ServerJob* j, bool* done)
824 { 845 {
846 typedef IServerFilter::ListOfStrings ListOfStrings;
847
825 #if 1 848 #if 1
826 while (!(*done)) 849 while (!(*done))
827 { 850 {
828 ListOfStrings l; 851 ListOfStrings l;
829 s->GetListOfJobs(l); 852 s->GetListOfJobs(l);
849 TEST(Toto, Toto) 872 TEST(Toto, Toto)
850 { 873 {
851 ServerScheduler scheduler; 874 ServerScheduler scheduler;
852 875
853 ServerJob job; 876 ServerJob job;
854 FilterWrapper& f2 = job.AddFilter(new Tutu(2)); 877 ServerFilterInstance& f2 = job.AddFilter(new Tutu(2));
855 FilterWrapper& f3 = job.AddFilter(new Tutu(3)); 878 ServerFilterInstance& f3 = job.AddFilter(new Tutu(3));
856 FilterWrapper& f4 = job.AddFilter(new Tutu(4)); 879 ServerFilterInstance& f4 = job.AddFilter(new Tutu(4));
857 FilterWrapper& f5 = job.AddFilter(new Tutu(5)); 880 ServerFilterInstance& f5 = job.AddFilter(new Tutu(5));
858 f2.AddInput(boost::lexical_cast<std::string>(42)); 881 f2.AddInput(boost::lexical_cast<std::string>(42));
859 //f3.AddInput(boost::lexical_cast<std::string>(42)); 882 //f3.AddInput(boost::lexical_cast<std::string>(42));
860 //f4.AddInput(boost::lexical_cast<std::string>(42)); 883 //f4.AddInput(boost::lexical_cast<std::string>(42));
861 f2.ConnectNext(f3); 884 f2.ConnectNext(f3);
862 f3.ConnectNext(f4); 885 f3.ConnectNext(f4);
868 boost::thread t(Tata, &scheduler, &job, &done); 891 boost::thread t(Tata, &scheduler, &job, &done);
869 892
870 893
871 //scheduler.Submit(job); 894 //scheduler.Submit(job);
872 895
873 ListOfStrings l; 896 IServerFilter::ListOfStrings l;
874 scheduler.SubmitAndWait(l, job); 897 scheduler.SubmitAndWait(l, job);
875 898
876 for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) 899 for (IServerFilter::ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
877 { 900 {
878 printf("** %s\n", i->c_str()); 901 printf("** %s\n", i->c_str());
879 } 902 }
880 903
881 //Toolbox::ServerBarrier(); 904 //Toolbox::ServerBarrier();