Mercurial > hg > orthanc
changeset 140:4d863c7b2f44
message queues
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 10 Oct 2012 18:20:04 +0200 |
parents | 3ad78369fcc4 |
children | 24681d35bad9 |
files | Core/Enumerations.h Core/IDynamicObject.h Core/OrthancException.cpp UnitTests/MessageWithDestination.cpp |
diffstat | 4 files changed, 133 insertions(+), 7 deletions(-) [+] |
line wrap: on
line diff
--- a/Core/Enumerations.h Wed Oct 10 17:57:03 2012 +0200 +++ b/Core/Enumerations.h Wed Oct 10 18:20:04 2012 +0200 @@ -51,7 +51,8 @@ ErrorCode_UriSyntax, ErrorCode_InexistentFile, ErrorCode_CannotWriteFile, - ErrorCode_BadFileFormat + ErrorCode_BadFileFormat, + ErrorCode_Timeout }; enum PixelFormat
--- a/Core/IDynamicObject.h Wed Oct 10 17:57:03 2012 +0200 +++ b/Core/IDynamicObject.h Wed Oct 10 18:20:04 2012 +0200 @@ -39,6 +39,8 @@ /** * This class should be the ancestor to any class whose type is * determined at the runtime, and that can be dynamically allocated. + * Being a child of IDynamicObject only implies the existence of a + * virtual destructor. **/ class IDynamicObject : public boost::noncopyable {
--- a/Core/OrthancException.cpp Wed Oct 10 17:57:03 2012 +0200 +++ b/Core/OrthancException.cpp Wed Oct 10 18:20:04 2012 +0200 @@ -81,6 +81,9 @@ case ErrorCode_CannotWriteFile: return "Cannot write to file"; + case ErrorCode_Timeout: + return "Timeout"; + case ErrorCode_Custom: default: return "???";
--- a/UnitTests/MessageWithDestination.cpp Wed Oct 10 17:57:03 2012 +0200 +++ b/UnitTests/MessageWithDestination.cpp Wed Oct 10 18:20:04 2012 +0200 @@ -1,14 +1,94 @@ #include "../Core/IDynamicObject.h" +#include "../Core/OrthancException.h" + +#include <memory> +#include <map> #include <gtest/gtest.h> #include <string> +#include <boost/thread.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp> namespace Orthanc { + class SharedMessageQueue + { + private: + typedef std::list<IDynamicObject*> Queue; + + unsigned int maxSize_; + Queue queue_; + boost::mutex mutex_; + boost::condition_variable elementAvailable_; + + public: + SharedMessageQueue(unsigned int maxSize = 0) + { + maxSize_ = maxSize; + } + + ~SharedMessageQueue() + { + for (Queue::iterator it = queue_.begin(); it != queue_.end(); it++) + { + delete *it; + } + } + + void Enqueue(IDynamicObject* message) + { + boost::mutex::scoped_lock lock(mutex_); + + if (maxSize_ != 0 && queue_.size() > maxSize_) + { + // Too many elements in the queue: First remove the oldest + delete queue_.front(); + queue_.pop_front(); + } + + queue_.push_back(message); + elementAvailable_.notify_one(); + } + + IDynamicObject* Dequeue(int32_t timeout) + { + boost::mutex::scoped_lock lock(mutex_); + + // Wait for a message to arrive in the FIFO queue + while (queue_.empty()) + { + if (timeout == 0) + { + elementAvailable_.wait(lock); + } + else + { + bool success = elementAvailable_.timed_wait + (lock, boost::posix_time::milliseconds(timeout)); + if (!success) + { + throw OrthancException(ErrorCode_Timeout); + } + } + } + + std::auto_ptr<IDynamicObject> message(queue_.front()); + queue_.pop_front(); + + return message.release(); + } + + IDynamicObject* Dequeue() + { + return Dequeue(0); + } + }; + + /** * This class represents a message that is to be sent to some destination. **/ - class MessageWithDestination : public boost::noncopyable + class MessageToDispatch : public boost::noncopyable { private: IDynamicObject* message_; @@ -20,14 +100,14 @@ * \param message The content of the message (takes the ownership) * \param destination The destination of the message **/ - MessageWithDestination(IDynamicObject* message, - const char* destination) + MessageToDispatch(IDynamicObject* message, + const char* destination) { message_ = message; destination_ = destination; } - ~MessageWithDestination() + ~MessageToDispatch() { if (message_) { @@ -35,6 +115,46 @@ } } }; + + + class IDestinationContext : public IDynamicObject + { + public: + virtual void Handle(const IDynamicObject& message) = 0; + }; + + + class IDestinationContextFactory : public IDynamicObject + { + public: + virtual IDestinationContext* Construct(const char* destination) = 0; + }; + + + class MessageDispatcher + { + private: + typedef std::map<std::string, IDestinationContext*> ActiveContexts; + + std::auto_ptr<IDestinationContextFactory> factory_; + ActiveContexts activeContexts_; + SharedMessageQueue queue_; + + public: + MessageDispatcher(IDestinationContextFactory* factory) // takes the ownership + { + factory_.reset(factory); + } + + ~MessageDispatcher() + { + for (ActiveContexts::iterator it = activeContexts_.begin(); + it != activeContexts_.end(); it++) + { + delete it->second; + } + } + }; } @@ -43,8 +163,8 @@ using namespace Orthanc; -TEST(MessageWithDestination, A) +TEST(MessageToDispatch, A) { - MessageWithDestination a(new DicomString("coucou"), "pukkaj"); + MessageToDispatch a(new DicomString("coucou"), "pukkaj"); }