changeset 6511:ebf563bfe42d

new BlockingSharedMessageQueue + refactored ArchiveJob to use it
author Alain Mazy <am@orthanc.team>
date Tue, 02 Dec 2025 10:09:49 +0100
parents b3718e91d224
children 81f2cea4ab5f ef77a536195b
files OrthancFramework/Resources/CMake/OrthancFrameworkConfiguration.cmake OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.cpp OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.h OrthancFramework/UnitTestsSources/JobsTests.cpp OrthancServer/Sources/ServerJobs/ArchiveJob.cpp
diffstat 5 files changed, 373 insertions(+), 69 deletions(-) [+]
line wrap: on
line diff
--- a/OrthancFramework/Resources/CMake/OrthancFrameworkConfiguration.cmake	Mon Dec 01 08:35:27 2025 +0100
+++ b/OrthancFramework/Resources/CMake/OrthancFrameworkConfiguration.cmake	Tue Dec 02 10:09:49 2025 +0100
@@ -657,6 +657,7 @@
     ${CMAKE_CURRENT_LIST_DIR}/../../Sources/FileBuffer.cpp
     ${CMAKE_CURRENT_LIST_DIR}/../../Sources/FileStorage/FilesystemStorage.cpp
     ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MetricsRegistry.cpp
+    ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MultiThreading/BlockingSharedMessageQueue.cpp
     ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MultiThreading/RunnableWorkersPool.cpp
     ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MultiThreading/Semaphore.cpp
     ${CMAKE_CURRENT_LIST_DIR}/../../Sources/MultiThreading/SharedMessageQueue.cpp
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.cpp	Tue Dec 02 10:09:49 2025 +0100
@@ -0,0 +1,134 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
+ * Department, University Hospital of Liege, Belgium
+ * Copyright (C) 2017-2023 Osimis S.A., Belgium
+ * Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
+ * Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "../PrecompiledHeaders.h"
+#include "BlockingSharedMessageQueue.h"
+
+
+#include "../Compatibility.h"
+
+
+namespace Orthanc
+{
+  BlockingSharedMessageQueue::BlockingSharedMessageQueue(unsigned int maxSize) :
+    maxSize_(maxSize)
+  {
+  }
+
+
+  BlockingSharedMessageQueue::~BlockingSharedMessageQueue()
+  {
+    for (Queue::iterator it = queue_.begin(); it != queue_.end(); ++it)
+    {
+      delete *it;
+    }
+  }
+
+
+  bool BlockingSharedMessageQueue::Enqueue(std::unique_ptr<IDynamicObject>& message, int32_t millisecondsTimeout)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    if (maxSize_ != 0 && queue_.size() >= maxSize_)
+    {
+      if (!roomAvailable_.timed_wait(lock, boost::posix_time::milliseconds(millisecondsTimeout)))
+      {
+        return false;
+      }
+    }
+
+    queue_.push_back(message.release());  // take ownership only when pushed into the queue
+    elementAvailable_.notify_one();
+    
+    return true;
+  }
+
+  void BlockingSharedMessageQueue::Enqueue(IDynamicObject* message)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    if (maxSize_ != 0 && queue_.size() >= maxSize_)
+    {
+      roomAvailable_.wait(lock);
+    }
+
+    queue_.push_back(message);  // take ownership
+    elementAvailable_.notify_one();
+  }
+
+
+  IDynamicObject* BlockingSharedMessageQueue::Dequeue(int32_t millisecondsTimeout)
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    // If it is empty, wait for a message to arrive in the queue
+    while (queue_.empty())
+    {
+      if (millisecondsTimeout == 0)
+      {
+        elementAvailable_.wait(lock);
+      }
+      else
+      {
+        if (!elementAvailable_.timed_wait(lock, boost::posix_time::milliseconds(millisecondsTimeout)))
+        {
+          return NULL;
+        }
+      }
+    }
+
+    std::unique_ptr<IDynamicObject> message(queue_.front());
+    queue_.pop_front();
+
+    roomAvailable_.notify_one();
+
+    return message.release();
+  }
+
+  void BlockingSharedMessageQueue::Clear()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+
+    if (queue_.empty())
+    {
+      return;
+    }
+    else
+    {
+      while (!queue_.empty())
+      {
+        std::unique_ptr<IDynamicObject> message(queue_.front());
+        queue_.pop_front();
+        
+        roomAvailable_.notify_one();
+      }
+    }
+  }
+
+  size_t BlockingSharedMessageQueue::GetSize()
+  {
+    boost::mutex::scoped_lock lock(mutex_);
+    return queue_.size();
+  }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.h	Tue Dec 02 10:09:49 2025 +0100
@@ -0,0 +1,66 @@
+/**
+ * Orthanc - A Lightweight, RESTful DICOM Store
+ * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
+ * Department, University Hospital of Liege, Belgium
+ * Copyright (C) 2017-2023 Osimis S.A., Belgium
+ * Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
+ * Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ **/
+
+
+#pragma once
+
+#include "../IDynamicObject.h"
+
+#include <stdint.h>
+#include <list>
+#include <boost/thread.hpp>
+
+namespace Orthanc
+{
+  // Compared to SharedMessageQueue that is discarding old messages when it is full,
+  // this queue blocks the Enqueue method until there is room for a new message.
+  class ORTHANC_PUBLIC BlockingSharedMessageQueue : public boost::noncopyable
+  {
+  private:
+    typedef std::list<IDynamicObject*>  Queue;
+
+    unsigned int maxSize_;
+    Queue queue_;
+    boost::mutex mutex_;
+    boost::condition_variable elementAvailable_;
+    boost::condition_variable roomAvailable_;
+
+  public:
+    explicit BlockingSharedMessageQueue(unsigned int maxSize = 0);
+    
+    ~BlockingSharedMessageQueue();
+
+    // This transfers the ownership of the message only if it is actually pushed in the queue (hence the unique_ptr)
+    bool Enqueue(std::unique_ptr<IDynamicObject>& message, int32_t millisecondsTimeout);
+
+    // This transfers the ownership of the message
+    void Enqueue(IDynamicObject* message);
+
+    // The caller is responsible to delete the dequeued message!
+    IDynamicObject* Dequeue(int32_t millisecondsTimeout);
+
+    void Clear();
+
+    size_t GetSize();
+  };
+}
--- a/OrthancFramework/UnitTestsSources/JobsTests.cpp	Mon Dec 01 08:35:27 2025 +0100
+++ b/OrthancFramework/UnitTestsSources/JobsTests.cpp	Tue Dec 02 10:09:49 2025 +0100
@@ -43,6 +43,7 @@
 #include "../../OrthancFramework/Sources/JobsEngine/Operations/StringOperationValue.h"
 #include "../../OrthancFramework/Sources/JobsEngine/SetOfInstancesJob.h"
 #include "../../OrthancFramework/Sources/Logging.h"
+#include "../../OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.h"
 #include "../../OrthancFramework/Sources/MultiThreading/SharedMessageQueue.h"
 #include "../../OrthancFramework/Sources/OrthancException.h"
 #include "../../OrthancFramework/Sources/SerializationToolbox.h"
@@ -251,6 +252,12 @@
     DynamicInteger(int value, std::set<int>& target) : 
       value_(value), target_(target)
     {
+      target_.insert(value);
+    }
+
+    virtual ~DynamicInteger()
+    {
+      target_.erase(value_);
     }
 
     int GetValue() const
@@ -263,40 +270,148 @@
 
 TEST(MultiThreading, SharedMessageQueueBasic)
 {
-  std::set<int> s;
+  std::set<int> s; // keeps a copy of all DynamicInteger objects
+
+  {
+    SharedMessageQueue q;
+    ASSERT_TRUE(q.WaitEmpty(0));
+    q.Enqueue(new DynamicInteger(10, s));
+    ASSERT_FALSE(q.WaitEmpty(1));
+    q.Enqueue(new DynamicInteger(20, s));
+    q.Enqueue(new DynamicInteger(30, s));
+    q.Enqueue(new DynamicInteger(40, s));
 
-  SharedMessageQueue q;
-  ASSERT_TRUE(q.WaitEmpty(0));
-  q.Enqueue(new DynamicInteger(10, s));
-  ASSERT_FALSE(q.WaitEmpty(1));
-  q.Enqueue(new DynamicInteger(20, s));
-  q.Enqueue(new DynamicInteger(30, s));
-  q.Enqueue(new DynamicInteger(40, s));
+    ASSERT_EQ(4, s.size());
 
-  std::unique_ptr<DynamicInteger> i;
-  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue());
-  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue());
-  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue());
-  ASSERT_FALSE(q.WaitEmpty(1));
-  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue());
-  ASSERT_TRUE(q.WaitEmpty(0));
-  ASSERT_EQ(NULL, q.Dequeue(1));
+    std::unique_ptr<DynamicInteger> i;
+    i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue());
+    i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(20, i->GetValue());
+    i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue());
+    ASSERT_FALSE(q.WaitEmpty(1));
+    i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(40, i->GetValue());
+    ASSERT_TRUE(q.WaitEmpty(0));
+    ASSERT_EQ(NULL, q.Dequeue(1));
+  }
+
+  ASSERT_EQ(0, s.size());
 }
 
 
 TEST(MultiThreading, SharedMessageQueueClean)
 {
+  std::set<int> s; // keeps a copy of all DynamicInteger objects
   try
   {
-    std::set<int> s;
-
     SharedMessageQueue q;
     q.Enqueue(new DynamicInteger(10, s));
     q.Enqueue(new DynamicInteger(20, s));  
+    ASSERT_EQ(2, s.size());
     throw OrthancException(ErrorCode_InternalError);
   }
   catch (OrthancException&)
   {
+    // the SharedMessageQueue is destroyed -> make sure the elements have been deleted
+    ASSERT_EQ(0, s.size());
+  }
+}
+
+
+TEST(MultiThreading, BlockingSharedMessageQueueBasicUnlimited)
+{
+  std::set<int> s; // keeps a copy of all DynamicInteger objects
+
+  std::unique_ptr<IDynamicObject> o10(new DynamicInteger(10, s));
+  std::unique_ptr<IDynamicObject> o20(new DynamicInteger(20, s));
+  std::unique_ptr<DynamicInteger> i;
+
+  BlockingSharedMessageQueue q;
+  
+  ASSERT_TRUE(q.Enqueue(o10, 0));
+  ASSERT_TRUE(q.Enqueue(o20, 1));
+  q.Enqueue(new DynamicInteger(30, s)); 
+
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(0))); ASSERT_EQ(20, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(0))); ASSERT_EQ(30, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(NULL, i.get());
+
+  ASSERT_EQ(0, s.size());
+}
+
+
+TEST(MultiThreading, BlockingSharedMessageQueueBasicLimitedSize)
+{
+  std::set<int> s; // keeps a copy of all DynamicInteger objects
+
+  std::unique_ptr<IDynamicObject> o10(new DynamicInteger(10, s));
+  std::unique_ptr<IDynamicObject> o20(new DynamicInteger(20, s));
+  std::unique_ptr<IDynamicObject> o30(new DynamicInteger(30, s));
+  ASSERT_EQ(3, s.size());
+
+  std::unique_ptr<DynamicInteger > i;
+
+  BlockingSharedMessageQueue q(2);
+  
+  q.Enqueue(o10.release());
+  ASSERT_TRUE(q.Enqueue(o20, 1));
+  ASSERT_FALSE(q.Enqueue(o30, 1));
+
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(10, i->GetValue());
+  ASSERT_TRUE(q.Enqueue(o30, 1));
+
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(0))); ASSERT_EQ(20, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(0))); ASSERT_EQ(30, i->GetValue());
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(NULL, i.get());
+
+  // ensures all objects have been destroyed
+  ASSERT_EQ(0, s.size());
+}
+
+TEST(MultiThreading, BlockingSharedMessageQueueBasicClear)
+{
+  std::set<int> s; // keeps a copy of all DynamicInteger objects
+  
+  std::unique_ptr<IDynamicObject> o10(new DynamicInteger(10, s));
+  std::unique_ptr<IDynamicObject> o20(new DynamicInteger(20, s));
+  std::unique_ptr<IDynamicObject> o30(new DynamicInteger(30, s));
+  std::unique_ptr<IDynamicObject> o40(new DynamicInteger(40, s));
+  std::unique_ptr<DynamicInteger > i;
+
+  BlockingSharedMessageQueue q(2);
+  
+  ASSERT_TRUE(q.Enqueue(o10, 0));
+  ASSERT_TRUE(q.Enqueue(o20, 1));
+  ASSERT_EQ(2, q.GetSize());
+  ASSERT_FALSE(q.Enqueue(o30, 1));
+  ASSERT_EQ(2, q.GetSize());
+
+  q.Clear();
+  ASSERT_EQ(0, q.GetSize());
+
+  ASSERT_TRUE(q.Enqueue(o30, 1));
+
+  i.reset(dynamic_cast<DynamicInteger*>(q.Dequeue(1))); ASSERT_EQ(30, i->GetValue());
+  ASSERT_TRUE(q.Enqueue(o40, 1));
+}
+
+TEST(MultiThreading, BlockingSharedMessageQueueClean)
+{
+  std::set<int> s; // keeps a copy of all DynamicInteger objects
+  try
+  {
+    BlockingSharedMessageQueue q;
+    std::unique_ptr<IDynamicObject> o10(new DynamicInteger(10, s));
+    std::unique_ptr<IDynamicObject> o20(new DynamicInteger(20, s));
+
+    q.Enqueue(o10, 0);
+    q.Enqueue(o20, 0);
+    ASSERT_EQ(2, s.size());
+    throw OrthancException(ErrorCode_InternalError);
+  }
+  catch (OrthancException&)
+  {
+    // the BlockingSharedMessageQueue is destroyed -> make sure the elements have been deleted
+    ASSERT_EQ(0, s.size());
   }
 }
 
--- a/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp	Mon Dec 01 08:35:27 2025 +0100
+++ b/OrthancServer/Sources/ServerJobs/ArchiveJob.cpp	Tue Dec 02 10:09:49 2025 +0100
@@ -31,6 +31,7 @@
 #include "../../../OrthancFramework/Sources/Logging.h"
 #include "../../../OrthancFramework/Sources/OrthancException.h"
 #include "../../../OrthancFramework/Sources/MultiThreading/Semaphore.h"
+#include "../../../OrthancFramework/Sources/MultiThreading/BlockingSharedMessageQueue.h"
 #include "../../../OrthancFramework/Sources/SerializationToolbox.h"
 #include "../OrthancConfiguration.h"
 #include "../ServerContext.h"
@@ -133,7 +134,7 @@
 
     virtual void GetDicom(std::string& dicom, const std::string& instanceId, const FileInfo& fileInfo) = 0;
 
-    virtual void Clear()
+    virtual void Clear(bool isAbort)
     {
     }
   };
@@ -185,11 +186,10 @@
 
   class ArchiveJob::ThreadedInstanceLoader : public ArchiveJob::InstanceLoader
   {
-    Semaphore                           availableInstancesSemaphore_;
-    Semaphore                           bufferedInstancesSemaphore_;
+    boost::condition_variable           condInstanceAvailable_;
     std::map<std::string, boost::shared_ptr<std::string> >  availableInstances_;
     boost::mutex                        availableInstancesMutex_;
-    SharedMessageQueue                  instancesToPreload_;
+    BlockingSharedMessageQueue          instancesToPreload_;
     std::vector<boost::thread*>         threads_;
     bool                                loadersShouldStop_;
 
@@ -197,8 +197,7 @@
   public:
     ThreadedInstanceLoader(ServerContext& context, size_t threadCount, bool transcode, DicomTransferSyntax transferSyntax, unsigned int lossyQuality)
     : InstanceLoader(context, transcode, transferSyntax, lossyQuality),
-      availableInstancesSemaphore_(0),
-      bufferedInstancesSemaphore_(3*threadCount),
+      instancesToPreload_ (3*threadCount), 
       loadersShouldStop_(false)
     {
       for (size_t i = 0; i < threadCount; i++)
@@ -209,29 +208,31 @@
 
     virtual ~ThreadedInstanceLoader() ORTHANC_OVERRIDE
     {
-      ThreadedInstanceLoader::Clear();
+      ThreadedInstanceLoader::Clear(false);
     }
 
-    virtual void Clear() ORTHANC_OVERRIDE
+    virtual void Clear(bool isAbort) ORTHANC_OVERRIDE
     {
       if (threads_.size() > 0)
       {
-        LOG(INFO) << "Waiting for loader threads to complete";
         loadersShouldStop_ = true; // not need to protect this by a mutex.  This is the only "writer" and all loaders are "readers"
 
+        if (isAbort)
+        {
+          LOG(INFO) << "Cancelling the loader threads";
+          instancesToPreload_.Clear();
+        }
+        else
+        {
+          LOG(INFO) << "Waiting for loader threads to complete";
+        }
+
         // unlock the loaders if they are waiting on this message queue (this happens when the job completes sucessfully)
         for (size_t i = 0; i < threads_.size(); i++)
         {
           instancesToPreload_.Enqueue(NULL);
         }
 
-        // If the consumer stops e.g. because the HttpClient disconnected, we must make sure the loader threads are not blocked waiting for room in the bufferedInstances.
-        // If the loader threads have completed their jobs, this is harmless to release the bufferedInstances since they won't be used anymore.
-        for (size_t i = 0; i < threads_.size(); i++)
-        {
-          bufferedInstancesSemaphore_.Release();
-        }
-
         for (size_t i = 0; i < threads_.size(); i++)
         {
           if (threads_[i]->joinable())
@@ -264,9 +265,6 @@
           return;
         }
         
-        // wait for the consumers (zip writer), no need to accumulate instances in memory if loaders are faster than writers
-        that->bufferedInstancesSemaphore_.Acquire();
-
         try
         {
           boost::shared_ptr<std::string> dicomContent(new std::string());
@@ -284,9 +282,8 @@
           {
             boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
             that->availableInstances_[instanceToPreload->GetId()] = dicomContent;
+            that->condInstanceAvailable_.notify_one();
           }
-
-          that->availableInstancesSemaphore_.Release();
         }
         catch (OrthancException& e)
         {
@@ -294,7 +291,7 @@
           boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
           // store a NULL result to notify that we could not read the instance
           that->availableInstances_[instanceToPreload->GetId()] = boost::shared_ptr<std::string>(); 
-          that->availableInstancesSemaphore_.Release();
+          that->condInstanceAvailable_.notify_one();
         }
         catch (...)
         {
@@ -302,7 +299,7 @@
           boost::mutex::scoped_lock lock(that->availableInstancesMutex_);
           // store a NULL result to notify that we could not read the instance
           that->availableInstances_[instanceToPreload->GetId()] = boost::shared_ptr<std::string>(); 
-          that->availableInstancesSemaphore_.Release();
+          that->condInstanceAvailable_.notify_one();
         }
       }
     }
@@ -314,39 +311,29 @@
 
     virtual void GetDicom(std::string& dicom, const std::string& instanceId, const FileInfo& fileInfo) ORTHANC_OVERRIDE
     {
+      boost::mutex::scoped_lock lock(availableInstancesMutex_);
+
       while (true)
       {
-        // wait for an instance to be available but this might not be the one we are waiting for !
-        availableInstancesSemaphore_.Acquire();
-        bufferedInstancesSemaphore_.Release(); // unlock the "flow" of loaders
+        // wait for this instance to be available but this might not be the one we are waiting for !
+        while (availableInstances_.find(instanceId) == availableInstances_.end())
+        {
+          condInstanceAvailable_.wait(lock);
+        }
 
         boost::shared_ptr<std::string> dicomContent;
-        {
-          boost::mutex::scoped_lock lock(availableInstancesMutex_);
 
-          if (availableInstances_.find(instanceId) != availableInstances_.end())
-          {
-            // this is the instance we were waiting for
-            dicomContent = availableInstances_[instanceId];
-            availableInstances_.erase(instanceId);
+        // this is the instance we were waiting for
+        dicomContent = availableInstances_[instanceId];
+        availableInstances_.erase(instanceId);
 
-            if (dicomContent.get() == NULL)  // there has been an error while reading the file
-            {
-              throw OrthancException(ErrorCode_InexistentItem);
-            }
-            dicom.swap(*dicomContent);
+        if (dicomContent.get() == NULL)  // there has been an error while reading the file
+        {
+          throw OrthancException(ErrorCode_InexistentItem);
+        }
+        dicom.swap(*dicomContent);
 
-            if (availableInstances_.size() > 0)
-            {
-              // we have just read the instance we were waiting for but there are still other instances available ->
-              // make sure the next GetDicom call does not wait !
-              availableInstancesSemaphore_.Release();
-            }
-            return;
-          }
-          // we have not found the expected instance, simply wait for the next loader thread to signal the semaphore when
-          // a new instance is available
-        }
+        return;
       }
     }
   };
@@ -803,6 +790,7 @@
 
             try
             {
+              LOG(INFO) << "Adding instance " << instanceId_ << " in zip";
               instanceLoader.GetDicom(content, instanceId_, fileInfo_);
             }
             catch (OrthancException& e)
@@ -1495,7 +1483,7 @@
 
     if (instanceLoader_.get() != NULL)
     {
-      instanceLoader_->Clear();
+      instanceLoader_->Clear(false);
     }
 
     if (asynchronousTarget_.get() != NULL)
@@ -1567,7 +1555,7 @@
       // clear the loader threads
       if (instanceLoader_.get() != NULL)
       {
-        instanceLoader_->Clear();
+        instanceLoader_->Clear(true);
       }
     }
   }