changeset 723:0da078f3affc

multithreading tests
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 18 Feb 2014 16:18:42 +0100
parents 3596177682a9
children 96a2d2da0fee
files CMakeLists.txt Core/MultiThreading/ArrayFilledByThreads.h Core/MultiThreading/SharedMessageQueue.cpp Core/MultiThreading/ThreadedCommandProcessor.cpp UnitTestsSources/MultiThreading.cpp UnitTestsSources/main.cpp
diffstat 6 files changed, 200 insertions(+), 57 deletions(-) [+]
line wrap: on
line diff
--- a/CMakeLists.txt	Fri Feb 14 16:12:44 2014 +0100
+++ b/CMakeLists.txt	Tue Feb 18 16:18:42 2014 +0100
@@ -263,6 +263,7 @@
   UnitTestsSources/Versions.cpp
   UnitTestsSources/Zip.cpp
   UnitTestsSources/Lua.cpp
+  UnitTestsSources/MultiThreading.cpp
   UnitTestsSources/main.cpp
   )
 target_link_libraries(UnitTests ServerLibrary CoreLibrary)
--- a/Core/MultiThreading/ArrayFilledByThreads.h	Fri Feb 14 16:12:44 2014 +0100
+++ b/Core/MultiThreading/ArrayFilledByThreads.h	Tue Feb 18 16:18:42 2014 +0100
@@ -2,7 +2,7 @@
 
 #include <boost/thread.hpp>
 
-#include "../ICommand.h"
+#include "../IDynamicObject.h"
 
 namespace Orthanc
 {
--- a/Core/MultiThreading/SharedMessageQueue.cpp	Fri Feb 14 16:12:44 2014 +0100
+++ b/Core/MultiThreading/SharedMessageQueue.cpp	Tue Feb 18 16:18:42 2014 +0100
@@ -89,7 +89,11 @@
 
     std::auto_ptr<IDynamicObject> message(queue_.front());
     queue_.pop_front();
-    emptied_.notify_all();
+
+    if (queue_.empty())
+    {
+      emptied_.notify_all();
+    }
 
     return message.release();
   }
@@ -101,7 +105,7 @@
     boost::mutex::scoped_lock lock(mutex_);
     
     // Wait for the queue to become empty
-    if (!queue_.empty())
+    while (!queue_.empty())
     {
       if (millisecondsTimeout == 0)
       {
--- a/Core/MultiThreading/ThreadedCommandProcessor.cpp	Fri Feb 14 16:12:44 2014 +0100
+++ b/Core/MultiThreading/ThreadedCommandProcessor.cpp	Tue Feb 18 16:18:42 2014 +0100
@@ -158,6 +158,11 @@
 
   void ThreadedCommandProcessor::Post(ICommand* command)
   {
+    if (command == NULL)
+    {
+      throw OrthancException(ErrorCode_ParameterOutOfRange);
+    }
+
     boost::mutex::scoped_lock lock(mutex_);
     queue_.Enqueue(command);
     remainingCommands_++;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/UnitTestsSources/MultiThreading.cpp	Tue Feb 18 16:18:42 2014 +0100
@@ -0,0 +1,187 @@
+#include "gtest/gtest.h"
+
+#include "../Core/OrthancException.h"
+#include "../Core/Toolbox.h"
+#include "../Core/MultiThreading/ArrayFilledByThreads.h"
+#include "../Core/MultiThreading/ThreadedCommandProcessor.h"
+
+using namespace Orthanc;
+
+namespace
+{
+  class DynamicInteger : public ICommand
+  {
+  private:
+    int value_;
+    std::set<int>& target_;
+
+  public:
+    DynamicInteger(int value, std::set<int>& target) : 
+      value_(value), target_(target)
+    {
+    }
+
+    int GetValue() const
+    {
+      return value_;
+    }
+
+    virtual bool Execute()
+    {
+      static boost::mutex mutex;
+      boost::mutex::scoped_lock lock(mutex);
+      target_.insert(value_);
+      return true;
+    }
+  };
+
+  class MyFiller : public ArrayFilledByThreads::IFiller
+  {
+  private:
+    int size_;
+    unsigned int created_;
+    std::set<int> set_;
+
+  public:
+    MyFiller(int size) : size_(size), created_(0)
+    {
+    }
+
+    virtual size_t GetFillerSize()
+    {
+      return size_;
+    }
+
+    virtual IDynamicObject* GetFillerItem(size_t index)
+    {
+      static boost::mutex mutex;
+      boost::mutex::scoped_lock lock(mutex);
+      created_++;
+      return new DynamicInteger(index * 2, set_);
+    }
+
+    unsigned int GetCreatedCount() const
+    {
+      return created_;
+    }
+
+    std::set<int> GetSet()
+    {
+      return set_;
+    }    
+  };
+}
+
+
+
+
+TEST(MultiThreading, SharedMessageQueueBasic)
+{
+  std::set<int> s;
+
+  SharedMessageQueue q;
+  ASSERT_TRUE(q.WaitEmpty(0));
+  q.Enqueue(new DynamicInteger(10, s));
+  ASSERT_FALSE(q.WaitEmpty(1));
+  q.Enqueue(new DynamicInteger(20, s));
+  q.Enqueue(new DynamicInteger(30, s));
+  q.Enqueue(new DynamicInteger(40, s));
+
+  std::auto_ptr<DynamicInteger> i;
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue());
+  ASSERT_FALSE(q.WaitEmpty(1));
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue());
+  ASSERT_TRUE(q.WaitEmpty(0));
+  ASSERT_EQ(NULL, q.Dequeue(1));
+}
+
+
+TEST(MultiThreading, SharedMessageQueueClean)
+{
+  std::set<int> s;
+
+  try
+  {
+    SharedMessageQueue q;
+    q.Enqueue(new DynamicInteger(10, s));
+    q.Enqueue(new DynamicInteger(20, s));  
+    throw OrthancException("Nope");
+  }
+  catch (OrthancException&)
+  {
+  }
+}
+
+
+TEST(MultiThreading, ArrayFilledByThreadEmpty)
+{
+  MyFiller f(0);
+  ArrayFilledByThreads a(f);
+  a.SetThreadCount(1);
+  ASSERT_EQ(0, a.GetSize());
+}
+
+
+TEST(MultiThreading, ArrayFilledByThread1)
+{
+  MyFiller f(100);
+  ArrayFilledByThreads a(f);
+  a.SetThreadCount(1);
+  ASSERT_EQ(100, a.GetSize());
+  for (size_t i = 0; i < a.GetSize(); i++)
+  {
+    ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue());
+  }
+}
+
+
+TEST(MultiThreading, ArrayFilledByThread4)
+{
+  MyFiller f(100);
+  ArrayFilledByThreads a(f);
+  a.SetThreadCount(4);
+  ASSERT_EQ(100, a.GetSize());
+  for (size_t i = 0; i < a.GetSize(); i++)
+  {
+    ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue());
+  }
+
+  ASSERT_EQ(100u, f.GetCreatedCount());
+
+  a.Invalidate();
+
+  ASSERT_EQ(100, a.GetSize());
+  ASSERT_EQ(200u, f.GetCreatedCount());
+  ASSERT_EQ(4u, a.GetThreadCount());
+  ASSERT_TRUE(f.GetSet().empty());
+
+  for (size_t i = 0; i < a.GetSize(); i++)
+  {
+    ASSERT_EQ(2 * i, dynamic_cast<DynamicInteger&>(a.GetItem(i)).GetValue());
+  }
+}
+
+
+TEST(MultiThreading, CommandProcessor)
+{
+  ThreadedCommandProcessor p(4);
+
+  std::set<int> s;
+
+  for (size_t i = 0; i < 100; i++)
+  {
+    p.Post(new DynamicInteger(i * 2, s));
+  }
+
+  p.Join();
+
+  for (size_t i = 0; i < 200; i++)
+  {
+    if (i % 2)
+      ASSERT_TRUE(s.find(i) == s.end());
+    else
+      ASSERT_TRUE(s.find(i) != s.end());
+  }
+}
--- a/UnitTestsSources/main.cpp	Fri Feb 14 16:12:44 2014 +0100
+++ b/UnitTestsSources/main.cpp	Tue Feb 18 16:18:42 2014 +0100
@@ -12,7 +12,6 @@
 #include "../Core/Uuid.h"
 #include "../OrthancServer/FromDcmtkBridge.h"
 #include "../OrthancServer/OrthancInitialization.h"
-#include "../Core/MultiThreading/SharedMessageQueue.h"
 
 using namespace Orthanc;
 
@@ -498,59 +497,6 @@
 
 
 
-class DynamicInteger : public IDynamicObject
-{
-private:
-  int value_;
-
-public:
-  DynamicInteger(int value) : value_(value)
-  {
-  }
-
-  int GetValue() const
-  {
-    return value_;
-  }
-};
-
-
-TEST(SharedMessageQueue, Basic)
-{
-  SharedMessageQueue q;
-  ASSERT_TRUE(q.WaitEmpty(0));
-  q.Enqueue(new DynamicInteger(10));
-  ASSERT_FALSE(q.WaitEmpty(1));
-  q.Enqueue(new DynamicInteger(20));
-  q.Enqueue(new DynamicInteger(30));
-  q.Enqueue(new DynamicInteger(40));
-
-  std::auto_ptr<DynamicInteger> i;
-  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue());
-  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue());
-  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue());
-  ASSERT_FALSE(q.WaitEmpty(1));
-  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue());
-  ASSERT_TRUE(q.WaitEmpty(0));
-  ASSERT_EQ(NULL, q.Dequeue(1));
-}
-
-
-TEST(SharedMessageQueue, Clean)
-{
-  try
-  {
-    SharedMessageQueue q;
-    q.Enqueue(new DynamicInteger(10));
-    q.Enqueue(new DynamicInteger(20));  
-    throw OrthancException("Nope");
-  }
-  catch (OrthancException&)
-  {
-  }
-}
-
-
 TEST(Toolbox, WriteFile)
 {
   std::string path;