# HG changeset patch # User Sebastien Jodogne # Date 1372923249 -7200 # Node ID 58b433bb9762ed66b5d74347f01253e065beea40 # Parent 694f06a84bf4138587c4bf1476ae49b1d6f0907e SharedMessageQueue diff -r 694f06a84bf4 -r 58b433bb9762 CMakeLists.txt --- a/CMakeLists.txt Tue Jun 25 16:30:21 2013 +0200 +++ b/CMakeLists.txt Thu Jul 04 09:34:09 2013 +0200 @@ -137,6 +137,7 @@ Core/RestApi/RestApiOutput.cpp Core/RestApi/RestApi.cpp Core/MultiThreading/BagOfRunnablesBySteps.cpp + Core/MultiThreading/SharedMessageQueue.cpp Core/PngWriter.cpp Core/SQLite/Connection.cpp Core/SQLite/FunctionContext.cpp diff -r 694f06a84bf4 -r 58b433bb9762 Core/MultiThreading/SharedMessageQueue.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/MultiThreading/SharedMessageQueue.cpp Thu Jul 04 09:34:09 2013 +0200 @@ -0,0 +1,95 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2013 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#include "SharedMessageQueue.h" + +namespace Orthanc +{ + SharedMessageQueue::SharedMessageQueue(unsigned int maxSize) + { + maxSize_ = maxSize; + } + + + SharedMessageQueue::~SharedMessageQueue() + { + for (Queue::iterator it = queue_.begin(); it != queue_.end(); it++) + { + delete *it; + } + } + + + void SharedMessageQueue::Enqueue(IDynamicObject* message) + { + boost::mutex::scoped_lock lock(mutex_); + + if (maxSize_ != 0 && queue_.size() > maxSize_) + { + // Too many elements in the queue: First remove the oldest + delete queue_.front(); + queue_.pop_front(); + } + + queue_.push_back(message); + elementAvailable_.notify_one(); + } + + + IDynamicObject* SharedMessageQueue::Dequeue(int32_t millisecondsTimeout) + { + boost::mutex::scoped_lock lock(mutex_); + + // Wait for a message to arrive in the FIFO queue + while (queue_.empty()) + { + if (millisecondsTimeout == 0) + { + elementAvailable_.wait(lock); + } + else + { + bool success = elementAvailable_.timed_wait + (lock, boost::posix_time::milliseconds(millisecondsTimeout)); + if (!success) + { + return NULL; + } + } + } + + std::auto_ptr message(queue_.front()); + queue_.pop_front(); + + return message.release(); + } +} diff -r 694f06a84bf4 -r 58b433bb9762 Core/MultiThreading/SharedMessageQueue.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/MultiThreading/SharedMessageQueue.h Thu Jul 04 09:34:09 2013 +0200 @@ -0,0 +1,63 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2013 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include "../IDynamicObject.h" + +#include +#include + +namespace Orthanc +{ + class SharedMessageQueue + { + private: + typedef std::list Queue; + + unsigned int maxSize_; + Queue queue_; + boost::mutex mutex_; + boost::condition_variable elementAvailable_; + + public: + SharedMessageQueue(unsigned int maxSize = 0); + + ~SharedMessageQueue(); + + // This transfers the ownership of the message + void Enqueue(IDynamicObject* message); + + // The caller is responsible to delete the dequeud message! + IDynamicObject* Dequeue(int32_t millisecondsTimeout); + }; +} diff -r 694f06a84bf4 -r 58b433bb9762 UnitTests/main.cpp --- a/UnitTests/main.cpp Tue Jun 25 16:30:21 2013 +0200 +++ b/UnitTests/main.cpp Thu Jul 04 09:34:09 2013 +0200 @@ -13,6 +13,7 @@ #include "../Core/Uuid.h" #include "../OrthancServer/FromDcmtkBridge.h" #include "../OrthancServer/OrthancInitialization.h" +#include "../Core/MultiThreading/SharedMessageQueue.h" using namespace Orthanc; @@ -397,6 +398,57 @@ } + +class DynamicInteger : public IDynamicObject +{ +private: + int value_; + +public: + DynamicInteger(int value) : value_(value) + { + } + + int GetValue() const + { + return value_; + } +}; + + +TEST(SharedMessageQueue, Basic) +{ + SharedMessageQueue q; + q.Enqueue(new DynamicInteger(10)); + q.Enqueue(new DynamicInteger(20)); + q.Enqueue(new DynamicInteger(30)); + q.Enqueue(new DynamicInteger(40)); + + std::auto_ptr i; + i.reset(dynamic_cast(q.Dequeue(10))); ASSERT_EQ(10, i->GetValue()); + i.reset(dynamic_cast(q.Dequeue(10))); ASSERT_EQ(20, i->GetValue()); + i.reset(dynamic_cast(q.Dequeue(10))); ASSERT_EQ(30, i->GetValue()); + i.reset(dynamic_cast(q.Dequeue(10))); ASSERT_EQ(40, i->GetValue()); + + ASSERT_EQ(NULL, q.Dequeue(10)); +} + + +TEST(SharedMessageQueue, Clean) +{ + try + { + SharedMessageQueue q; + q.Enqueue(new DynamicInteger(10)); + q.Enqueue(new DynamicInteger(20)); + throw OrthancException("Nope"); + } + catch (OrthancException) + { + } +} + + int main(int argc, char **argv) { // Initialize Google's logging library.