changeset 140:4d863c7b2f44

message queues
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 10 Oct 2012 18:20:04 +0200
parents 3ad78369fcc4
children 24681d35bad9
files Core/Enumerations.h Core/IDynamicObject.h Core/OrthancException.cpp UnitTests/MessageWithDestination.cpp
diffstat 4 files changed, 133 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/Core/Enumerations.h	Wed Oct 10 17:57:03 2012 +0200
+++ b/Core/Enumerations.h	Wed Oct 10 18:20:04 2012 +0200
@@ -51,7 +51,8 @@
     ErrorCode_UriSyntax,
     ErrorCode_InexistentFile,
     ErrorCode_CannotWriteFile,
-    ErrorCode_BadFileFormat
+    ErrorCode_BadFileFormat,
+    ErrorCode_Timeout
   };
 
   enum PixelFormat
--- a/Core/IDynamicObject.h	Wed Oct 10 17:57:03 2012 +0200
+++ b/Core/IDynamicObject.h	Wed Oct 10 18:20:04 2012 +0200
@@ -39,6 +39,8 @@
   /**
    * This class should be the ancestor to any class whose type is
    * determined at the runtime, and that can be dynamically allocated.
+   * Being a child of IDynamicObject only implies the existence of a
+   * virtual destructor.
    **/
   class IDynamicObject : public boost::noncopyable
   {
--- a/Core/OrthancException.cpp	Wed Oct 10 17:57:03 2012 +0200
+++ b/Core/OrthancException.cpp	Wed Oct 10 18:20:04 2012 +0200
@@ -81,6 +81,9 @@
     case ErrorCode_CannotWriteFile:
       return "Cannot write to file";
 
+    case ErrorCode_Timeout:
+      return "Timeout";
+
     case ErrorCode_Custom:
     default:
       return "???";
--- a/UnitTests/MessageWithDestination.cpp	Wed Oct 10 17:57:03 2012 +0200
+++ b/UnitTests/MessageWithDestination.cpp	Wed Oct 10 18:20:04 2012 +0200
@@ -1,14 +1,94 @@
 #include "../Core/IDynamicObject.h"
 
+#include "../Core/OrthancException.h"
+
+#include <memory>
+#include <map>
 #include <gtest/gtest.h>
 #include <string>
+#include <boost/thread.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
 
 namespace Orthanc
 {
+  class SharedMessageQueue
+  {
+  private:
+    typedef std::list<IDynamicObject*>  Queue;
+
+    unsigned int maxSize_;
+    Queue queue_;
+    boost::mutex mutex_;
+    boost::condition_variable elementAvailable_;
+
+  public:
+    SharedMessageQueue(unsigned int maxSize = 0)
+    {
+      maxSize_ = maxSize;
+    }
+
+    ~SharedMessageQueue()
+    {
+      for (Queue::iterator it = queue_.begin(); it != queue_.end(); it++)
+      {
+        delete *it;
+      }
+    }
+
+    void Enqueue(IDynamicObject* message)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      if (maxSize_ != 0 && queue_.size() > maxSize_)
+      {
+        // Too many elements in the queue: First remove the oldest
+        delete queue_.front();
+        queue_.pop_front();
+      }
+
+      queue_.push_back(message);
+      elementAvailable_.notify_one();
+    }
+
+    IDynamicObject* Dequeue(int32_t timeout)
+    {
+      boost::mutex::scoped_lock lock(mutex_);
+
+      // Wait for a message to arrive in the FIFO queue
+      while (queue_.empty())
+      {
+        if (timeout == 0)
+        {
+          elementAvailable_.wait(lock);
+        }
+        else
+        {
+          bool success = elementAvailable_.timed_wait
+            (lock, boost::posix_time::milliseconds(timeout));
+          if (!success)
+          {
+            throw OrthancException(ErrorCode_Timeout);
+          }
+        }
+      }
+
+      std::auto_ptr<IDynamicObject> message(queue_.front());
+      queue_.pop_front();
+
+      return message.release();
+    }
+
+    IDynamicObject* Dequeue()
+    {
+      return Dequeue(0);
+    }
+  };
+
+
   /**
    * This class represents a message that is to be sent to some destination.
    **/
-  class MessageWithDestination : public boost::noncopyable
+  class MessageToDispatch : public boost::noncopyable
   {
   private:
     IDynamicObject* message_;
@@ -20,14 +100,14 @@
      * \param message The content of the message (takes the ownership)
      * \param destination The destination of the message
      **/
-    MessageWithDestination(IDynamicObject* message,
-                           const char* destination)
+    MessageToDispatch(IDynamicObject* message,
+                      const char* destination)
     {
       message_ = message;
       destination_ = destination;
     }
 
-    ~MessageWithDestination()
+    ~MessageToDispatch()
     {
       if (message_)
       {
@@ -35,6 +115,46 @@
       }
     }
   };
+
+
+  class IDestinationContext : public IDynamicObject
+  {
+  public:
+    virtual void Handle(const IDynamicObject& message) = 0;
+  };
+
+
+  class IDestinationContextFactory : public IDynamicObject
+  {
+  public:
+    virtual IDestinationContext* Construct(const char* destination) = 0;
+  };
+
+
+  class MessageDispatcher
+  {
+  private:
+    typedef std::map<std::string, IDestinationContext*>  ActiveContexts;
+
+    std::auto_ptr<IDestinationContextFactory> factory_;
+    ActiveContexts activeContexts_;
+    SharedMessageQueue queue_;
+
+  public:
+    MessageDispatcher(IDestinationContextFactory* factory)  // takes the ownership
+    {
+      factory_.reset(factory);
+    }
+
+    ~MessageDispatcher()
+    {
+      for (ActiveContexts::iterator it = activeContexts_.begin(); 
+           it != activeContexts_.end(); it++)
+      {
+        delete it->second;
+      }
+    }
+  };
 }
 
 
@@ -43,8 +163,8 @@
 
 using namespace Orthanc;
 
-TEST(MessageWithDestination, A)
+TEST(MessageToDispatch, A)
 {
-  MessageWithDestination a(new DicomString("coucou"), "pukkaj");
+  MessageToDispatch a(new DicomString("coucou"), "pukkaj");
 }