Mercurial > hg > orthanc
changeset 6511:ebf563bfe42d
new BlockingSharedMessageQueue + refactored ArchiveJob to use it
| author | Alain Mazy <am@orthanc.team> |
|---|---|
| date | Tue, 02 Dec 2025 10:09:49 +0100 |
| parents | b3718e91d224 |
| children | 81f2cea4ab5f ef77a536195b |
| files | OrthancFramework/Resources/CMake/OrthancFrameworkConfiguration.cmake OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.cpp OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.h OrthancFramework/UnitTestsSources/JobsTests.cpp OrthancServer/Sources/ServerJobs/ArchiveJob.cpp |
| diffstat | 5 files changed, 373 insertions(+), 69 deletions(-) [+] |
line wrap: on
line diff
--- a/OrthancFramework/Resources/CMake/OrthancFrameworkConfiguration.cmake Mon Dec 01 08:35:27 2025 +0100 +++ b/OrthancFramework/Resources/CMake/OrthancFrameworkConfiguration.cmake Tue Dec 02 10:09:49 2025 +0100 @@ -657,6 +657,7 @@ ${CMAKE_CURRENT_LIST_DIR}/../../Sources/FileBuffer.cpp ${CMAKE_CURRENT_LIST_DIR}/../../Sources/FileStorage/FilesystemStorage.cpp ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MetricsRegistry.cpp + ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MultiThreading/BlockingSharedMessageQueue.cpp ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MultiThreading/RunnableWorkersPool.cpp ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MultiThreading/Semaphore.cpp ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MultiThreading/SharedMessageQueue.cpp
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.cpp Tue Dec 02 10:09:49 2025 +0100 @@ -0,0 +1,134 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2023 Osimis S.A., Belgium + * Copyright (C) 2024-2025 Orthanc Team SRL, Belgium + * Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeaders.h" +#include "BlockingSharedMessageQueue.h" + + +#include "../Compatibility.h" + + +namespace Orthanc +{ + BlockingSharedMessageQueue::BlockingSharedMessageQueue(unsigned int maxSize) : + maxSize_(maxSize) + { + } + + + BlockingSharedMessageQueue::~BlockingSharedMessageQueue() + { + for (Queue::iterator it = queue_.begin(); it != queue_.end(); ++it) + { + delete *it; + } + } + + + bool BlockingSharedMessageQueue::Enqueue(std::unique_ptr<IDynamicObject>& message, int32_t millisecondsTimeout) + { + boost::mutex::scoped_lock lock(mutex_); + + if (maxSize_ != 0 && queue_.size() >= maxSize_) + { + if (!roomAvailable_.timed_wait(lock, boost::posix_time::milliseconds(millisecondsTimeout))) + { + return false; + } + } + + queue_.push_back(message.release()); // take ownership only when pushed into the queue + elementAvailable_.notify_one(); + + return true; + } + + void BlockingSharedMessageQueue::Enqueue(IDynamicObject* message) + { + boost::mutex::scoped_lock lock(mutex_); + + if (maxSize_ != 0 && queue_.size() >= maxSize_) + { + roomAvailable_.wait(lock); + } + + queue_.push_back(message); // take ownership + elementAvailable_.notify_one(); + } + + + IDynamicObject* BlockingSharedMessageQueue::Dequeue(int32_t millisecondsTimeout) + { + boost::mutex::scoped_lock lock(mutex_); + + // If it is empty, wait for a message to arrive in the queue + while (queue_.empty()) + { + if (millisecondsTimeout == 0) + { + elementAvailable_.wait(lock); + } + else + { + if (!elementAvailable_.timed_wait(lock, boost::posix_time::milliseconds(millisecondsTimeout))) + { + return NULL; + } + } + } + + std::unique_ptr<IDynamicObject> message(queue_.front()); + queue_.pop_front(); + + roomAvailable_.notify_one(); + + return message.release(); + } + + void BlockingSharedMessageQueue::Clear() + { + boost::mutex::scoped_lock lock(mutex_); + + if (queue_.empty()) + { + return; + } + else + { + while (!queue_.empty()) + { + std::unique_ptr<IDynamicObject> message(queue_.front()); + queue_.pop_front(); + + roomAvailable_.notify_one(); + } + } + } + + size_t BlockingSharedMessageQueue::GetSize() + { + boost::mutex::scoped_lock lock(mutex_); + return queue_.size(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.h Tue Dec 02 10:09:49 2025 +0100 @@ -0,0 +1,66 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2023 Osimis S.A., Belgium + * Copyright (C) 2024-2025 Orthanc Team SRL, Belgium + * Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "../IDynamicObject.h" + +#include <stdint.h> +#include <list> +#include <boost/thread.hpp> + +namespace Orthanc +{ + // Compared to SharedMessageQueue that is discarding old messages when it is full, + // this queue blocks the Enqueue method until there is room for a new message. + class ORTHANC_PUBLIC BlockingSharedMessageQueue : public boost::noncopyable + { + private: + typedef std::list<IDynamicObject*> Queue; + + unsigned int maxSize_; + Queue queue_; + boost::mutex mutex_; + boost::condition_variable elementAvailable_; + boost::condition_variable roomAvailable_; + + public: + explicit BlockingSharedMessageQueue(unsigned int maxSize = 0); + + ~BlockingSharedMessageQueue(); + + // This transfers the ownership of the message only if it is actually pushed in the queue (hence the unique_ptr) + bool Enqueue(std::unique_ptr<IDynamicObject>& message, int32_t millisecondsTimeout); + + // This transfers the ownership of the message + void Enqueue(IDynamicObject* message); + + // The caller is responsible to delete the dequeued message! + IDynamicObject* Dequeue(int32_t millisecondsTimeout); + + void Clear(); + + size_t GetSize(); + }; +}
--- a/OrthancFramework/UnitTestsSources/JobsTests.cpp Mon Dec 01 08:35:27 2025 +0100 +++ b/OrthancFramework/UnitTestsSources/JobsTests.cpp Tue Dec 02 10:09:49 2025 +0100 @@ -43,6 +43,7 @@ #include "../../OrthancFramework/Sources/JobsEngine/Operations/StringOperationValue.h" #include "../../OrthancFramework/Sources/JobsEngine/SetOfInstancesJob.h" #include "../../OrthancFramework/Sources/Logging.h" +#include "../../OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.h" #include "../../OrthancFramework/Sources/MultiThreading/SharedMessageQueue.h" #include "../../OrthancFramework/Sources/OrthancException.h" #include "../../OrthancFramework/Sources/SerializationToolbox.h" @@ -251,6 +252,12 @@ DynamicInteger(int value, std::set<int>& target) : value_(value), target_(target) { + target_.insert(value); + } + + virtual ~DynamicInteger() + { + target_.erase(value_); } int GetValue() const @@ -263,40 +270,148 @@ TEST(MultiThreading, SharedMessageQueueBasic) { - std::set<int> s; + std::set<int> s; // keeps a copy of all DynamicInteger objects + + { + SharedMessageQueue q; + ASSERT_TRUE(q.WaitEmpty(0)); + q.Enqueue(new DynamicInteger(10, s)); + ASSERT_FALSE(q.WaitEmpty(1)); + q.Enqueue(new DynamicInteger(20, s)); + q.Enqueue(new DynamicInteger(30, s)); + q.Enqueue(new DynamicInteger(40, s)); - SharedMessageQueue q; - ASSERT_TRUE(q.WaitEmpty(0)); - q.Enqueue(new DynamicInteger(10, s)); - ASSERT_FALSE(q.WaitEmpty(1)); - q.Enqueue(new DynamicInteger(20, s)); - q.Enqueue(new DynamicInteger(30, s)); - q.Enqueue(new DynamicInteger(40, s)); + ASSERT_EQ(4, s.size()); - std::unique_ptr<DynamicInteger> i; - i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue()); - i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue()); - i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue()); - ASSERT_FALSE(q.WaitEmpty(1)); - i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue()); - ASSERT_TRUE(q.WaitEmpty(0)); - ASSERT_EQ(NULL, q.Dequeue(1)); + std::unique_ptr<DynamicInteger> i; + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue()); + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue()); + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue()); + ASSERT_FALSE(q.WaitEmpty(1)); + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue()); + ASSERT_TRUE(q.WaitEmpty(0)); + ASSERT_EQ(NULL, q.Dequeue(1)); + } + + ASSERT_EQ(0, s.size()); } TEST(MultiThreading, SharedMessageQueueClean) { + std::set<int> s; // keeps a copy of all DynamicInteger objects try { - std::set<int> s; - SharedMessageQueue q; q.Enqueue(new DynamicInteger(10, s)); q.Enqueue(new DynamicInteger(20, s)); + ASSERT_EQ(2, s.size()); throw OrthancException(ErrorCode_InternalError); } catch (OrthancException&) { + // the SharedMessageQueue is destroyed -> make sure the elements have been deleted + ASSERT_EQ(0, s.size()); + } +} + + +TEST(MultiThreading, BlockingSharedMessageQueueBasicUnlimited) +{ + std::set<int> s; // keeps a copy of all DynamicInteger objects + + std::unique_ptr<IDynamicObject> o10(new DynamicInteger(10, s)); + std::unique_ptr<IDynamicObject> o20(new DynamicInteger(20, s)); + std::unique_ptr<DynamicInteger> i; + + BlockingSharedMessageQueue q; + + ASSERT_TRUE(q.Enqueue(o10, 0)); + ASSERT_TRUE(q.Enqueue(o20, 1)); + q.Enqueue(new DynamicInteger(30, s)); + + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue()); + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(0))); ASSERT_EQ(20, i->GetValue()); + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(0))); ASSERT_EQ(30, i->GetValue()); + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(NULL, i.get()); + + ASSERT_EQ(0, s.size()); +} + + +TEST(MultiThreading, BlockingSharedMessageQueueBasicLimitedSize) +{ + std::set<int> s; // keeps a copy of all DynamicInteger objects + + std::unique_ptr<IDynamicObject> o10(new DynamicInteger(10, s)); + std::unique_ptr<IDynamicObject> o20(new DynamicInteger(20, s)); + std::unique_ptr<IDynamicObject> o30(new DynamicInteger(30, s)); + ASSERT_EQ(3, s.size()); + + std::unique_ptr<DynamicInteger > i; + + BlockingSharedMessageQueue q(2); + + q.Enqueue(o10.release()); + ASSERT_TRUE(q.Enqueue(o20, 1)); + ASSERT_FALSE(q.Enqueue(o30, 1)); + + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue()); + ASSERT_TRUE(q.Enqueue(o30, 1)); + + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(0))); ASSERT_EQ(20, i->GetValue()); + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(0))); ASSERT_EQ(30, i->GetValue()); + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(NULL, i.get()); + + // ensures all objects have been destroyed + ASSERT_EQ(0, s.size()); +} + +TEST(MultiThreading, BlockingSharedMessageQueueBasicClear) +{ + std::set<int> s; // keeps a copy of all DynamicInteger objects + + std::unique_ptr<IDynamicObject> o10(new DynamicInteger(10, s)); + std::unique_ptr<IDynamicObject> o20(new DynamicInteger(20, s)); + std::unique_ptr<IDynamicObject> o30(new DynamicInteger(30, s)); + std::unique_ptr<IDynamicObject> o40(new DynamicInteger(40, s)); + std::unique_ptr<DynamicInteger > i; + + BlockingSharedMessageQueue q(2); + + ASSERT_TRUE(q.Enqueue(o10, 0)); + ASSERT_TRUE(q.Enqueue(o20, 1)); + ASSERT_EQ(2, q.GetSize()); + ASSERT_FALSE(q.Enqueue(o30, 1)); + ASSERT_EQ(2, q.GetSize()); + + q.Clear(); + ASSERT_EQ(0, q.GetSize()); + + ASSERT_TRUE(q.Enqueue(o30, 1)); + + i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue()); + ASSERT_TRUE(q.Enqueue(o40, 1)); +} + +TEST(MultiThreading, BlockingSharedMessageQueueClean) +{ + std::set<int> s; // keeps a copy of all DynamicInteger objects + try + { + BlockingSharedMessageQueue q; + std::unique_ptr<IDynamicObject> o10(new DynamicInteger(10, s)); + std::unique_ptr<IDynamicObject> o20(new DynamicInteger(20, s)); + + q.Enqueue(o10, 0); + q.Enqueue(o20, 0); + ASSERT_EQ(2, s.size()); + throw OrthancException(ErrorCode_InternalError); + } + catch (OrthancException&) + { + // the BlockingSharedMessageQueue is destroyed -> make sure the elements have been deleted + ASSERT_EQ(0, s.size()); } }
--- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Mon Dec 01 08:35:27 2025 +0100 +++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp Tue Dec 02 10:09:49 2025 +0100 @@ -31,6 +31,7 @@ #include "../../../OrthancFramework/Sources/Logging.h" #include "../../../OrthancFramework/Sources/OrthancException.h" #include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h" +#include "../../../OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.h" #include "../../../OrthancFramework/Sources/SerializationToolbox.h" #include "../OrthancConfiguration.h" #include "../ServerContext.h" @@ -133,7 +134,7 @@ virtual void GetDicom(std::string& dicom, const std::string& instanceId, const FileInfo& fileInfo) = 0; - virtual void Clear() + virtual void Clear(bool isAbort) { } }; @@ -185,11 +186,10 @@ class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader { - Semaphore availableInstancesSemaphore_; - Semaphore bufferedInstancesSemaphore_; + boost::condition_variable condInstanceAvailable_; std::map<std::string, boost::shared_ptr<std::string> > availableInstances_; boost::mutex availableInstancesMutex_; - SharedMessageQueue instancesToPreload_; + BlockingSharedMessageQueue instancesToPreload_; std::vector<boost::thread*> threads_; bool loadersShouldStop_; @@ -197,8 +197,7 @@ public: ThreadedInstanceLoader(ServerContext& context, size_t threadCount, bool transcode, DicomTransferSyntax transferSyntax, unsigned int lossyQuality) : InstanceLoader(context, transcode, transferSyntax, lossyQuality), - availableInstancesSemaphore_(0), - bufferedInstancesSemaphore_(3*threadCount), + instancesToPreload_ (3*threadCount), loadersShouldStop_(false) { for (size_t i = 0; i < threadCount; i++) @@ -209,29 +208,31 @@ virtual ~ThreadedInstanceLoader() ORTHANC_OVERRIDE { - ThreadedInstanceLoader::Clear(); + ThreadedInstanceLoader::Clear(false); } - virtual void Clear() ORTHANC_OVERRIDE + virtual void Clear(bool isAbort) ORTHANC_OVERRIDE { if (threads_.size() > 0) { - LOG(INFO) << "Waiting for loader threads to complete"; loadersShouldStop_ = true; // not need to protect this by a mutex. This is the only "writer" and all loaders are "readers" + if (isAbort) + { + LOG(INFO) << "Cancelling the loader threads"; + instancesToPreload_.Clear(); + } + else + { + LOG(INFO) << "Waiting for loader threads to complete"; + } + // unlock the loaders if they are waiting on this message queue (this happens when the job completes sucessfully) for (size_t i = 0; i < threads_.size(); i++) { instancesToPreload_.Enqueue(NULL); } - // If the consumer stops e.g. because the HttpClient disconnected, we must make sure the loader threads are not blocked waiting for room in the bufferedInstances. - // If the loader threads have completed their jobs, this is harmless to release the bufferedInstances since they won't be used anymore. - for (size_t i = 0; i < threads_.size(); i++) - { - bufferedInstancesSemaphore_.Release(); - } - for (size_t i = 0; i < threads_.size(); i++) { if (threads_[i]->joinable()) @@ -264,9 +265,6 @@ return; } - // wait for the consumers (zip writer), no need to accumulate instances in memory if loaders are faster than writers - that->bufferedInstancesSemaphore_.Acquire(); - try { boost::shared_ptr<std::string> dicomContent(new std::string()); @@ -284,9 +282,8 @@ { boost::mutex::scoped_lock lock(that->availableInstancesMutex_); that->availableInstances_[instanceToPreload->GetId()] = dicomContent; + that->condInstanceAvailable_.notify_one(); } - - that->availableInstancesSemaphore_.Release(); } catch (OrthancException& e) { @@ -294,7 +291,7 @@ boost::mutex::scoped_lock lock(that->availableInstancesMutex_); // store a NULL result to notify that we could not read the instance that->availableInstances_[instanceToPreload->GetId()] = boost::shared_ptr<std::string>(); - that->availableInstancesSemaphore_.Release(); + that->condInstanceAvailable_.notify_one(); } catch (...) { @@ -302,7 +299,7 @@ boost::mutex::scoped_lock lock(that->availableInstancesMutex_); // store a NULL result to notify that we could not read the instance that->availableInstances_[instanceToPreload->GetId()] = boost::shared_ptr<std::string>(); - that->availableInstancesSemaphore_.Release(); + that->condInstanceAvailable_.notify_one(); } } } @@ -314,39 +311,29 @@ virtual void GetDicom(std::string& dicom, const std::string& instanceId, const FileInfo& fileInfo) ORTHANC_OVERRIDE { + boost::mutex::scoped_lock lock(availableInstancesMutex_); + while (true) { - // wait for an instance to be available but this might not be the one we are waiting for ! - availableInstancesSemaphore_.Acquire(); - bufferedInstancesSemaphore_.Release(); // unlock the "flow" of loaders + // wait for this instance to be available but this might not be the one we are waiting for ! + while (availableInstances_.find(instanceId) == availableInstances_.end()) + { + condInstanceAvailable_.wait(lock); + } boost::shared_ptr<std::string> dicomContent; - { - boost::mutex::scoped_lock lock(availableInstancesMutex_); - if (availableInstances_.find(instanceId) != availableInstances_.end()) - { - // this is the instance we were waiting for - dicomContent = availableInstances_[instanceId]; - availableInstances_.erase(instanceId); + // this is the instance we were waiting for + dicomContent = availableInstances_[instanceId]; + availableInstances_.erase(instanceId); - if (dicomContent.get() == NULL) // there has been an error while reading the file - { - throw OrthancException(ErrorCode_InexistentItem); - } - dicom.swap(*dicomContent); + if (dicomContent.get() == NULL) // there has been an error while reading the file + { + throw OrthancException(ErrorCode_InexistentItem); + } + dicom.swap(*dicomContent); - if (availableInstances_.size() > 0) - { - // we have just read the instance we were waiting for but there are still other instances available -> - // make sure the next GetDicom call does not wait ! - availableInstancesSemaphore_.Release(); - } - return; - } - // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when - // a new instance is available - } + return; } } }; @@ -803,6 +790,7 @@ try { + LOG(INFO) << "Adding instance " << instanceId_ << " in zip"; instanceLoader.GetDicom(content, instanceId_, fileInfo_); } catch (OrthancException& e) @@ -1495,7 +1483,7 @@ if (instanceLoader_.get() != NULL) { - instanceLoader_->Clear(); + instanceLoader_->Clear(false); } if (asynchronousTarget_.get() != NULL) @@ -1567,7 +1555,7 @@ // clear the loader threads if (instanceLoader_.get() != NULL) { - instanceLoader_->Clear(); + instanceLoader_->Clear(true); } } }
