changeset 450:58b433bb9762

SharedMessageQueue
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 04 Jul 2013 09:34:09 +0200
parents 694f06a84bf4
children 0e8bd937a0f3
files CMakeLists.txt Core/MultiThreading/SharedMessageQueue.cpp Core/MultiThreading/SharedMessageQueue.h UnitTests/main.cpp
diffstat 4 files changed, 211 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/CMakeLists.txt	Tue Jun 25 16:30:21 2013 +0200
+++ b/CMakeLists.txt	Thu Jul 04 09:34:09 2013 +0200
@@ -137,6 +137,7 @@
   Core/RestApi/RestApiOutput.cpp
   Core/RestApi/RestApi.cpp
   Core/MultiThreading/BagOfRunnablesBySteps.cpp
+  Core/MultiThreading/SharedMessageQueue.cpp
   Core/PngWriter.cpp
   Core/SQLite/Connection.cpp
   Core/SQLite/FunctionContext.cpp
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Core/MultiThreading/SharedMessageQueue.cpp	Thu Jul 04 09:34:09 2013 +0200
@@ -0,0 +1,95 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2013 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "SharedMessageQueue.h"
+
+namespace Orthanc
+{
+  SharedMessageQueue::SharedMessageQueue(unsigned int maxSize)
+  {
+    maxSize_ = maxSize;
+  }
+
+
+  SharedMessageQueue::~SharedMessageQueue()
+  {
+    for (Queue::iterator it = queue_.begin(); it != queue_.end(); it++)
+    {
+      delete *it;
+    }
+  }
+
+
+  void SharedMessageQueue::Enqueue(IDynamicObject* message)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    if (maxSize_ != 0 && queue_.size() > maxSize_)
+    {
+      // Too many elements in the queue: First remove the oldest
+      delete queue_.front();
+      queue_.pop_front();
+    }
+
+    queue_.push_back(message);
+    elementAvailable_.notify_one();
+  }
+
+
+  IDynamicObject* SharedMessageQueue::Dequeue(int32_t millisecondsTimeout)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    // Wait for a message to arrive in the FIFO queue
+    while (queue_.empty())
+    {
+      if (millisecondsTimeout == 0)
+      {
+        elementAvailable_.wait(lock);
+      }
+      else
+      {
+        bool success = elementAvailable_.timed_wait
+          (lock, boost::posix_time::milliseconds(millisecondsTimeout));
+        if (!success)
+        {
+          return NULL;
+        }
+      }
+    }
+
+    std::auto_ptr<IDynamicObject> message(queue_.front());
+    queue_.pop_front();
+
+    return message.release();
+  }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Core/MultiThreading/SharedMessageQueue.h	Thu Jul 04 09:34:09 2013 +0200
@@ -0,0 +1,63 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2013 Medical Physics Department, CHU of Liege,
+ * Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders of this
+ * program give permission to link the code of its release with the
+ * OpenSSL project's "OpenSSL" library (or with modified versions of it
+ * that use the same license as the "OpenSSL" library), and distribute
+ * the linked executables. You must obey the GNU General Public License
+ * in all respects for all of the code used other than "OpenSSL". If you
+ * modify file(s) with this exception, you may extend this exception to
+ * your version of the file(s), but you are not obligated to do so. If
+ * you do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source files
+ * in the program, then also delete it here.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "../IDynamicObject.h"
+
+#include <list>
+#include <boost/thread.hpp>
+
+namespace Orthanc
+{
+  class SharedMessageQueue
+  {
+  private:
+    typedef std::list<IDynamicObject*>  Queue;
+
+    unsigned int maxSize_;
+    Queue queue_;
+    boost::mutex mutex_;
+    boost::condition_variable elementAvailable_;
+
+  public:
+    SharedMessageQueue(unsigned int maxSize = 0);
+
+    ~SharedMessageQueue();
+
+    // This transfers the ownership of the message
+    void Enqueue(IDynamicObject* message);
+
+    // The caller is responsible to delete the dequeud message!
+    IDynamicObject* Dequeue(int32_t millisecondsTimeout);
+  };
+}
--- a/UnitTests/main.cpp	Tue Jun 25 16:30:21 2013 +0200
+++ b/UnitTests/main.cpp	Thu Jul 04 09:34:09 2013 +0200
@@ -13,6 +13,7 @@
 #include "../Core/Uuid.h"
 #include "../OrthancServer/FromDcmtkBridge.h"
 #include "../OrthancServer/OrthancInitialization.h"
+#include "../Core/MultiThreading/SharedMessageQueue.h"
 
 using namespace Orthanc;
 
@@ -397,6 +398,57 @@
 }
 
 
+
+class DynamicInteger : public IDynamicObject
+{
+private:
+  int value_;
+
+public:
+  DynamicInteger(int value) : value_(value)
+  {
+  }
+
+  int GetValue() const
+  {
+    return value_;
+  }
+};
+
+
+TEST(SharedMessageQueue, Basic)
+{
+  SharedMessageQueue q;
+  q.Enqueue(new DynamicInteger(10));
+  q.Enqueue(new DynamicInteger(20));
+  q.Enqueue(new DynamicInteger(30));
+  q.Enqueue(new DynamicInteger(40));
+
+  std::auto_ptr<DynamicInteger> i;
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(10))); ASSERT_EQ(10, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(10))); ASSERT_EQ(20, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(10))); ASSERT_EQ(30, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(10))); ASSERT_EQ(40, i->GetValue());
+  
+  ASSERT_EQ(NULL, q.Dequeue(10));
+}
+
+
+TEST(SharedMessageQueue, Clean)
+{
+  try
+  {
+    SharedMessageQueue q;
+    q.Enqueue(new DynamicInteger(10));
+    q.Enqueue(new DynamicInteger(20));  
+    throw OrthancException("Nope");
+  }
+  catch (OrthancException)
+  {
+  }
+}
+
+
 int main(int argc, char **argv)
 {
   // Initialize Google's logging library.