diff UnitTestsSources/MultiThreading.cpp @ 768:476febedc516 lua-scripting

improvements
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 23 Apr 2014 17:23:18 +0200
parents fc97f762834c
children a64ca424e0e2
line wrap: on
line diff
--- a/UnitTestsSources/MultiThreading.cpp	Wed Apr 23 16:53:59 2014 +0200
+++ b/UnitTestsSources/MultiThreading.cpp	Wed Apr 23 17:23:18 2014 +0200
@@ -281,17 +281,41 @@
 
   class FilterWrapper : public IDynamicObject
   {
+    friend class ServerScheduler;
+
   private:
-    IServerFilterListener *listener_;
     IServerFilter *filter_;
     std::string jobId_;
     ListOfStrings inputs_;
     std::list<FilterWrapper*> next_;
 
+    bool Execute(IServerFilterListener& listener)
+    {
+      ListOfStrings outputs;
+      if (!filter_->Apply(outputs, inputs_))
+      {
+        listener.SignalFailure(jobId_);
+        return true;
+      }
+
+      for (std::list<FilterWrapper*>::iterator
+             it = next_.begin(); it != next_.end(); it++)
+      {
+        for (ListOfStrings::const_iterator
+               output = outputs.begin(); output != outputs.end(); output++)
+        {
+          (*it)->AddInput(*output);
+        }
+      }
+
+      listener.SignalSuccess(jobId_);
+      return true;
+    }
+
+
   public:
     FilterWrapper(IServerFilter *filter,
                   const std::string& jobId) : 
-      listener_(NULL),
       filter_(filter), 
       jobId_(jobId)
     {
@@ -309,11 +333,6 @@
       }
     }
 
-    void SetListener(IServerFilterListener& listener)
-    {
-      listener_ = &listener;
-    }
-
     const std::string& GetJobId() const
     {
       return jobId_;
@@ -324,37 +343,6 @@
       inputs_.push_back(input);
     }
 
-    bool Execute()
-    {
-      ListOfStrings outputs;
-      if (!filter_->Apply(outputs, inputs_))
-      {
-        if (listener_)
-        {
-          listener_->SignalFailure(jobId_);
-        }
-
-        return true;
-      }
-
-      for (std::list<FilterWrapper*>::iterator
-             it = next_.begin(); it != next_.end(); it++)
-      {
-        for (ListOfStrings::const_iterator
-               output = outputs.begin(); output != outputs.end(); output++)
-        {
-          (*it)->AddInput(*output);
-        }
-      }
-
-      if (listener_)
-      {
-        listener_->SignalSuccess(jobId_);
-      }
-
-      return true;
-    }
-
     void ConnectNext(FilterWrapper& filter)
     {
       next_.push_back(&filter);
@@ -367,14 +355,6 @@
   };
 
 
-  enum ServerJobStatus
-  {
-    ServerJobStatus_Running = 1,
-    ServerJobStatus_Success = 2,
-    ServerJobStatus_Failure = 3
-  };
-
-
   class ServerJob
   {
     friend class ServerScheduler;
@@ -432,7 +412,6 @@
       for (std::list<FilterWrapper*>::iterator 
              it = filters_.begin(); it != filters_.end(); it++)
       {
-        (*it)->SetListener(listener);
         target.Enqueue(*it);
       }
 
@@ -501,6 +480,13 @@
       std::string description_;
     };
 
+    enum JobStatus
+    {
+      JobStatus_Running = 1,
+      JobStatus_Success = 2,
+      JobStatus_Failure = 3
+    };
+
     typedef std::map<std::string, JobInfo> Jobs;
 
     boost::mutex mutex_;
@@ -509,7 +495,7 @@
     SharedMessageQueue queue_;
     bool finish_;
     boost::thread worker_;
-    std::map<std::string, ServerJobStatus> watchedJobStatus_;
+    std::map<std::string, JobStatus> watchedJobStatus_;
 
     JobInfo& GetJobInfo(const std::string& jobId)
     {
@@ -536,7 +522,7 @@
       {
         if (info.watched_)
         {
-          watchedJobStatus_[jobId] = ServerJobStatus_Success;
+          watchedJobStatus_[jobId] = JobStatus_Success;
           jobFinished_.notify_all();
         }
 
@@ -556,7 +542,7 @@
       {
         if (info.watched_)
         {
-          watchedJobStatus_[jobId] = ServerJobStatus_Failure;
+          watchedJobStatus_[jobId] = JobStatus_Failure;
           jobFinished_.notify_all();
         }
 
@@ -591,7 +577,7 @@
           }
           else
           {
-            filter.Execute();
+            filter.Execute(*that);
           }
         }
       }
@@ -614,7 +600,7 @@
 
       if (watched)
       {
-        watchedJobStatus_[job.GetId()] = ServerJobStatus_Running;
+        watchedJobStatus_[job.GetId()] = JobStatus_Running;
       }
 
       jobs_[job.GetId()] = info;
@@ -675,12 +661,14 @@
       SubmitInternal(job, true);
 
       // Wait for the job to complete (either success or failure)
-      ServerJobStatus status;
+      JobStatus status;
 
       {
         boost::mutex::scoped_lock lock(mutex_);
+
+        assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end());
         
-        while (watchedJobStatus_[jobId] == ServerJobStatus_Running)
+        while (watchedJobStatus_[jobId] == JobStatus_Running)
         {
           jobFinished_.wait(lock);
         }
@@ -689,7 +677,7 @@
         watchedJobStatus_.erase(jobId);
       }
 
-      return (status == ServerJobStatus_Success);
+      return (status == JobStatus_Success);
     }
 
 
@@ -728,9 +716,18 @@
         return 1;
       }
 
-      float n = static_cast<float>(job->second.failures_ + job->second.success_);
-      float d = static_cast<float>(job->second.size_);
-      return n / d;
+      if (job->second.failures_ != 0)
+      {
+        return 1;
+      }
+
+      if (job->second.size_ == 1)
+      {
+        return job->second.success_;
+      }
+
+      return (static_cast<float>(job->second.success_) / 
+              static_cast<float>(job->second.size_ - 1));
     }
 
     bool IsRunning(const ServerJob& job)
@@ -747,6 +744,19 @@
     {
       return GetProgress(job);
     }
+
+    void GetListOfJobs(ListOfStrings& jobs)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      jobs.clear();
+
+      for (Jobs::const_iterator 
+             it = jobs_.begin(); it != jobs_.end(); it++)
+      {
+        jobs.push_back(it->first);
+      }
+    }
   };
 
 }
@@ -785,6 +795,33 @@
   }
 };
 
+
+static void Tata(ServerScheduler* s, ServerJob* j)
+{
+#if 1
+  for (;;)
+  {
+    ListOfStrings l;
+    s->GetListOfJobs(l);
+    for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
+      printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i));
+    Toolbox::USleep(100000);
+  }
+#else
+  ListOfStrings l;
+  s->GetListOfJobs(l);
+  for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
+    printf(">> %s\n", i->c_str());
+  Toolbox::USleep(1500000);
+  s->Cancel(*j);
+  Toolbox::USleep(1000000);
+  s->GetListOfJobs(l);
+  for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++)
+    printf(">> %s\n", i->c_str());
+#endif
+}
+
+
 TEST(Toto, Toto)
 {
   ServerScheduler scheduler;
@@ -793,14 +830,19 @@
   FilterWrapper& f2 = job.AddFilter(new Tutu(2));
   FilterWrapper& f3 = job.AddFilter(new Tutu(3));
   FilterWrapper& f4 = job.AddFilter(new Tutu(4));
+  FilterWrapper& f5 = job.AddFilter(new Tutu(5));
   f2.AddInput(boost::lexical_cast<std::string>(42));
   //f3.AddInput(boost::lexical_cast<std::string>(42));
   //f4.AddInput(boost::lexical_cast<std::string>(42));
   f2.ConnectNext(f3);
   f3.ConnectNext(f4);
+  f4.ConnectNext(f5);
 
   job.SetDescription("tutu");
 
+  boost::thread t(Tata, &scheduler, &job);
+
+
   //scheduler.Submit(job);
 
   ListOfStrings l;
@@ -812,5 +854,7 @@
   }
 
   //Toolbox::ServerBarrier();
-  Toolbox::USleep(30000);
+  //Toolbox::USleep(3000000);
+
+  //t.join();
 }