# HG changeset patch # User Sebastien Jodogne # Date 1349886004 -7200 # Node ID 4d863c7b2f44e89dff48cdbc891680d6f0cb6277 # Parent 3ad78369fcc40e68f8efa792f61393cac8df1fe9 message queues diff -r 3ad78369fcc4 -r 4d863c7b2f44 Core/Enumerations.h --- a/Core/Enumerations.h Wed Oct 10 17:57:03 2012 +0200 +++ b/Core/Enumerations.h Wed Oct 10 18:20:04 2012 +0200 @@ -51,7 +51,8 @@ ErrorCode_UriSyntax, ErrorCode_InexistentFile, ErrorCode_CannotWriteFile, - ErrorCode_BadFileFormat + ErrorCode_BadFileFormat, + ErrorCode_Timeout }; enum PixelFormat diff -r 3ad78369fcc4 -r 4d863c7b2f44 Core/IDynamicObject.h --- a/Core/IDynamicObject.h Wed Oct 10 17:57:03 2012 +0200 +++ b/Core/IDynamicObject.h Wed Oct 10 18:20:04 2012 +0200 @@ -39,6 +39,8 @@ /** * This class should be the ancestor to any class whose type is * determined at the runtime, and that can be dynamically allocated. + * Being a child of IDynamicObject only implies the existence of a + * virtual destructor. **/ class IDynamicObject : public boost::noncopyable { diff -r 3ad78369fcc4 -r 4d863c7b2f44 Core/OrthancException.cpp --- a/Core/OrthancException.cpp Wed Oct 10 17:57:03 2012 +0200 +++ b/Core/OrthancException.cpp Wed Oct 10 18:20:04 2012 +0200 @@ -81,6 +81,9 @@ case ErrorCode_CannotWriteFile: return "Cannot write to file"; + case ErrorCode_Timeout: + return "Timeout"; + case ErrorCode_Custom: default: return "???"; diff -r 3ad78369fcc4 -r 4d863c7b2f44 UnitTests/MessageWithDestination.cpp --- a/UnitTests/MessageWithDestination.cpp Wed Oct 10 17:57:03 2012 +0200 +++ b/UnitTests/MessageWithDestination.cpp Wed Oct 10 18:20:04 2012 +0200 @@ -1,14 +1,94 @@ #include "../Core/IDynamicObject.h" +#include "../Core/OrthancException.h" + +#include +#include #include #include +#include +#include namespace Orthanc { + class SharedMessageQueue + { + private: + typedef std::list Queue; + + unsigned int maxSize_; + Queue queue_; + boost::mutex mutex_; + boost::condition_variable elementAvailable_; + + public: + SharedMessageQueue(unsigned int maxSize = 0) + { + maxSize_ = maxSize; + } + + ~SharedMessageQueue() + { + for (Queue::iterator it = queue_.begin(); it != queue_.end(); it++) + { + delete *it; + } + } + + void Enqueue(IDynamicObject* message) + { + boost::mutex::scoped_lock lock(mutex_); + + if (maxSize_ != 0 && queue_.size() > maxSize_) + { + // Too many elements in the queue: First remove the oldest + delete queue_.front(); + queue_.pop_front(); + } + + queue_.push_back(message); + elementAvailable_.notify_one(); + } + + IDynamicObject* Dequeue(int32_t timeout) + { + boost::mutex::scoped_lock lock(mutex_); + + // Wait for a message to arrive in the FIFO queue + while (queue_.empty()) + { + if (timeout == 0) + { + elementAvailable_.wait(lock); + } + else + { + bool success = elementAvailable_.timed_wait + (lock, boost::posix_time::milliseconds(timeout)); + if (!success) + { + throw OrthancException(ErrorCode_Timeout); + } + } + } + + std::auto_ptr message(queue_.front()); + queue_.pop_front(); + + return message.release(); + } + + IDynamicObject* Dequeue() + { + return Dequeue(0); + } + }; + + /** * This class represents a message that is to be sent to some destination. **/ - class MessageWithDestination : public boost::noncopyable + class MessageToDispatch : public boost::noncopyable { private: IDynamicObject* message_; @@ -20,14 +100,14 @@ * \param message The content of the message (takes the ownership) * \param destination The destination of the message **/ - MessageWithDestination(IDynamicObject* message, - const char* destination) + MessageToDispatch(IDynamicObject* message, + const char* destination) { message_ = message; destination_ = destination; } - ~MessageWithDestination() + ~MessageToDispatch() { if (message_) { @@ -35,6 +115,46 @@ } } }; + + + class IDestinationContext : public IDynamicObject + { + public: + virtual void Handle(const IDynamicObject& message) = 0; + }; + + + class IDestinationContextFactory : public IDynamicObject + { + public: + virtual IDestinationContext* Construct(const char* destination) = 0; + }; + + + class MessageDispatcher + { + private: + typedef std::map ActiveContexts; + + std::auto_ptr factory_; + ActiveContexts activeContexts_; + SharedMessageQueue queue_; + + public: + MessageDispatcher(IDestinationContextFactory* factory) // takes the ownership + { + factory_.reset(factory); + } + + ~MessageDispatcher() + { + for (ActiveContexts::iterator it = activeContexts_.begin(); + it != activeContexts_.end(); it++) + { + delete it->second; + } + } + }; } @@ -43,8 +163,8 @@ using namespace Orthanc; -TEST(MessageWithDestination, A) +TEST(MessageToDispatch, A) { - MessageWithDestination a(new DicomString("coucou"), "pukkaj"); + MessageToDispatch a(new DicomString("coucou"), "pukkaj"); }