changeset 1681:ee4367497d0d

got rid of buggy BagOfRunnablesBySteps
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 06 Oct 2015 14:02:39 +0200
parents 4113a9a668b1
children 6414043df7d8 7dae55228b16
files CMakeLists.txt Core/MultiThreading/BagOfRunnablesBySteps.cpp Core/MultiThreading/BagOfRunnablesBySteps.h Core/MultiThreading/RunnableWorkersPool.cpp Core/MultiThreading/RunnableWorkersPool.h OrthancServer/DicomProtocol/DicomServer.cpp OrthancServer/DicomProtocol/DicomServer.h UnitTestsSources/MultiThreadingTests.cpp
diffstat 8 files changed, 46 insertions(+), 368 deletions(-) [+]
line wrap: on
line diff
--- a/CMakeLists.txt	Tue Oct 06 13:36:09 2015 +0200
+++ b/CMakeLists.txt	Tue Oct 06 14:02:39 2015 +0200
@@ -116,12 +116,11 @@
   Core/RestApi/RestApiPath.cpp
   Core/RestApi/RestApiOutput.cpp
   Core/RestApi/RestApi.cpp
-  Core/MultiThreading/BagOfRunnablesBySteps.cpp
   Core/MultiThreading/Mutex.cpp
   Core/MultiThreading/ReaderWriterLock.cpp
+  Core/MultiThreading/RunnableWorkersPool.cpp
   Core/MultiThreading/Semaphore.cpp
   Core/MultiThreading/SharedMessageQueue.cpp
-  Core/MultiThreading/RunnableWorkersPool.cpp
   Core/Images/Font.cpp
   Core/Images/FontRegistry.cpp
   Core/Images/ImageAccessor.cpp
--- a/Core/MultiThreading/BagOfRunnablesBySteps.cpp	Tue Oct 06 13:36:09 2015 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,185 +0,0 @@
-/**
- * Orthanc - A Lightweight, RESTful DICOM Store
- * Copyright (C) 2012-2015 Sebastien Jodogne, Medical Physics
- * Department, University Hospital of Liege, Belgium
- *
- * This program is free software: you can redistribute it and/or
- * modify it under the terms of the GNU General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * In addition, as a special exception, the copyright holders of this
- * program give permission to link the code of its release with the
- * OpenSSL project's "OpenSSL" library (or with modified versions of it
- * that use the same license as the "OpenSSL" library), and distribute
- * the linked executables. You must obey the GNU General Public License
- * in all respects for all of the code used other than "OpenSSL". If you
- * modify file(s) with this exception, you may extend this exception to
- * your version of the file(s), but you are not obligated to do so. If
- * you do not wish to do so, delete this exception statement from your
- * version. If you delete this exception statement from all source files
- * in the program, then also delete it here.
- * 
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- **/
-
-
-#include "../PrecompiledHeaders.h"
-#include "BagOfRunnablesBySteps.h"
-
-#include "../Logging.h"
-
-#include <stack>
-#include <boost/thread.hpp>
-
-namespace Orthanc
-{
-  struct BagOfRunnablesBySteps::PImpl
-  {
-    bool continue_;
-    bool stopFinishListener_;
-
-    boost::mutex mutex_;
-    boost::condition_variable oneThreadIsStopped_;
-    boost::condition_variable oneThreadIsJoined_;
-
-    // The list of threads that are waiting to be joined.
-    typedef std::stack<IRunnableBySteps*>  StoppedThreads;
-    StoppedThreads  stoppedThreads_;
-
-    // The set of active runnables, i.e. the runnables that have not
-    // finished their job yet, plus the runnables that have not been
-    // joined yet.
-    typedef std::map<IRunnableBySteps*, boost::thread*>  ActiveThreads;
-    ActiveThreads  activeThreads_;
-
-    // The thread that joins the runnables after they stop
-    std::auto_ptr<boost::thread> finishListener_;
-  };
-
-
-
-  void BagOfRunnablesBySteps::RunnableThread(BagOfRunnablesBySteps* bag,
-                                             IRunnableBySteps* runnable)
-  {
-    while (bag->pimpl_->continue_)
-    {
-      if (!runnable->Step())
-      {
-        break;
-      }
-    }
-
-    {
-      // Register this runnable as having stopped
-      boost::mutex::scoped_lock lock(bag->pimpl_->mutex_);
-      bag->pimpl_->stoppedThreads_.push(runnable);
-      bag->pimpl_->oneThreadIsStopped_.notify_one();
-    }
-  }
-
-  
-  void BagOfRunnablesBySteps::FinishListener(BagOfRunnablesBySteps* bag)
-  {
-    boost::mutex::scoped_lock lock(bag->pimpl_->mutex_);
-
-    while (!bag->pimpl_->stopFinishListener_)
-    {
-      while (!bag->pimpl_->stoppedThreads_.empty())
-      {
-        std::auto_ptr<IRunnableBySteps> r(bag->pimpl_->stoppedThreads_.top());
-        bag->pimpl_->stoppedThreads_.pop();
-
-        assert(r.get() != NULL);
-        assert(bag->pimpl_->activeThreads_.find(r.get()) != bag->pimpl_->activeThreads_.end());
-
-        std::auto_ptr<boost::thread> t(bag->pimpl_->activeThreads_[r.get()]);
-        bag->pimpl_->activeThreads_.erase(r.get());
-
-        assert(t.get() != NULL);
-        assert(bag->pimpl_->activeThreads_.find(r.get()) == bag->pimpl_->activeThreads_.end());
-
-        if (t->joinable())
-        {
-          t->join();
-        }
-
-        bag->pimpl_->oneThreadIsJoined_.notify_one();
-      }
-
-      bag->pimpl_->oneThreadIsStopped_.wait(lock);
-    }
-  }
-
-
-  BagOfRunnablesBySteps::BagOfRunnablesBySteps() : pimpl_(new PImpl)
-  {
-    pimpl_->continue_ = true;
-    pimpl_->stopFinishListener_ = false;
-
-    // Everyting is set up, the finish listener can be started
-    pimpl_->finishListener_.reset(new boost::thread(FinishListener, this));
-  }
-
-
-  BagOfRunnablesBySteps::~BagOfRunnablesBySteps()
-  {
-    if (!pimpl_->stopFinishListener_)
-    {
-      LOG(ERROR) << "INTERNAL ERROR: BagOfRunnablesBySteps::Finalize() should be invoked manually to avoid mess in the destruction order!";
-      Finalize();
-    }
-  }
-
-
-  void BagOfRunnablesBySteps::Add(IRunnableBySteps* runnable)
-  {
-    // Make sure the runnable is deleted is something goes wrong
-    std::auto_ptr<IRunnableBySteps> runnableRabi(runnable);
-
-    boost::mutex::scoped_lock lock(pimpl_->mutex_);
-    boost::thread* t(new boost::thread(RunnableThread, this, runnable));
-
-    pimpl_->activeThreads_.insert(std::make_pair(runnableRabi.release(), t));
-  }
-
-
-  void BagOfRunnablesBySteps::StopAll()
-  {
-    boost::mutex::scoped_lock lock(pimpl_->mutex_);
-    pimpl_->continue_ = false;
-
-    while (pimpl_->activeThreads_.size() > 0)
-    {
-      pimpl_->oneThreadIsJoined_.wait(lock);
-    }
-
-    pimpl_->continue_ = true;
-  }
-
-
-
-  void BagOfRunnablesBySteps::Finalize()
-  {
-    if (!pimpl_->stopFinishListener_)
-    {
-      StopAll();
-
-      // Stop the finish listener
-      pimpl_->stopFinishListener_ = true;
-      pimpl_->oneThreadIsStopped_.notify_one();  // Awakens the listener
-
-      if (pimpl_->finishListener_->joinable())
-      {
-        pimpl_->finishListener_->join();
-      }
-    }
-  }
-
-}
--- a/Core/MultiThreading/BagOfRunnablesBySteps.h	Tue Oct 06 13:36:09 2015 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,64 +0,0 @@
-/**
- * Orthanc - A Lightweight, RESTful DICOM Store
- * Copyright (C) 2012-2015 Sebastien Jodogne, Medical Physics
- * Department, University Hospital of Liege, Belgium
- *
- * This program is free software: you can redistribute it and/or
- * modify it under the terms of the GNU General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * In addition, as a special exception, the copyright holders of this
- * program give permission to link the code of its release with the
- * OpenSSL project's "OpenSSL" library (or with modified versions of it
- * that use the same license as the "OpenSSL" library), and distribute
- * the linked executables. You must obey the GNU General Public License
- * in all respects for all of the code used other than "OpenSSL". If you
- * modify file(s) with this exception, you may extend this exception to
- * your version of the file(s), but you are not obligated to do so. If
- * you do not wish to do so, delete this exception statement from your
- * version. If you delete this exception statement from all source files
- * in the program, then also delete it here.
- * 
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- **/
-
-
-#pragma once
-
-#include "IRunnableBySteps.h"
-
-#include <boost/noncopyable.hpp>
-#include <boost/shared_ptr.hpp>
-
-namespace Orthanc
-{
-  class BagOfRunnablesBySteps : public boost::noncopyable
-  {
-  private:
-    struct PImpl;
-    boost::shared_ptr<PImpl> pimpl_;
-
-    static void RunnableThread(BagOfRunnablesBySteps* bag,
-                               IRunnableBySteps* runnable);
-
-    static void FinishListener(BagOfRunnablesBySteps* bag);
-
-  public:
-    BagOfRunnablesBySteps();
-
-    ~BagOfRunnablesBySteps();
-
-    void Add(IRunnableBySteps* runnable);
-
-    void StopAll();
-
-    void Finalize();
-  };
-}
--- a/Core/MultiThreading/RunnableWorkersPool.cpp	Tue Oct 06 13:36:09 2015 +0200
+++ b/Core/MultiThreading/RunnableWorkersPool.cpp	Tue Oct 06 14:02:39 2015 +0200
@@ -119,31 +119,39 @@
   }
 
 
-  RunnableWorkersPool::~RunnableWorkersPool()
+  void RunnableWorkersPool::Stop()
   {
-    pimpl_->continue_ = false;
-
-    for (size_t i = 0; i < pimpl_->workers_.size(); i++)
+    if (pimpl_->continue_)
     {
-      PImpl::Worker* worker = pimpl_->workers_[i];
+      pimpl_->continue_ = false;
+
+      for (size_t i = 0; i < pimpl_->workers_.size(); i++)
+      {
+        PImpl::Worker* worker = pimpl_->workers_[i];
 
-      if (worker != NULL)
-      {
-        worker->Join();
-        delete worker;
+        if (worker != NULL)
+        {
+          worker->Join();
+          delete worker;
+        }
       }
     }
   }
 
 
-  void RunnableWorkersPool::Add(IRunnableBySteps* runnable)
+  RunnableWorkersPool::~RunnableWorkersPool()
   {
-    pimpl_->queue_.Enqueue(runnable);
+    Stop();
   }
 
 
-  void RunnableWorkersPool::WaitDone()
+  void RunnableWorkersPool::Add(IRunnableBySteps* runnable)
   {
-    pimpl_->queue_.WaitEmpty(0);
+    if (!pimpl_->continue_)
+    {
+      throw OrthancException(ErrorCode_BadSequenceOfCalls);
+    }
+
+    pimpl_->queue_.Enqueue(runnable);
   }
 }
--- a/Core/MultiThreading/RunnableWorkersPool.h	Tue Oct 06 13:36:09 2015 +0200
+++ b/Core/MultiThreading/RunnableWorkersPool.h	Tue Oct 06 14:02:39 2015 +0200
@@ -44,13 +44,13 @@
     struct PImpl;
     boost::shared_ptr<PImpl> pimpl_;
 
+    void Stop();
+
   public:
     RunnableWorkersPool(size_t countWorkers);
 
     ~RunnableWorkersPool();
 
     void Add(IRunnableBySteps* runnable);  // Takes the ownership
-
-    void WaitDone();
   };
 }
--- a/OrthancServer/DicomProtocol/DicomServer.cpp	Tue Oct 06 13:36:09 2015 +0200
+++ b/OrthancServer/DicomProtocol/DicomServer.cpp	Tue Oct 06 14:02:39 2015 +0200
@@ -40,6 +40,7 @@
 #include "../Internals/CommandDispatcher.h"
 #include "../OrthancInitialization.h"
 #include "EmbeddedResources.h"
+#include "../../Core/MultiThreading/RunnableWorkersPool.h"
 
 #include <boost/thread.hpp>
 
@@ -52,14 +53,13 @@
 {
   struct DicomServer::PImpl
   {
-    boost::thread thread_;
-
-    //std::set<
+    boost::thread  thread_;
+    T_ASC_Network *network_;
+    std::auto_ptr<RunnableWorkersPool>  workers_;
   };
 
 
-  void DicomServer::ServerThread(DicomServer* server,
-                                 T_ASC_Network *network)
+  void DicomServer::ServerThread(DicomServer* server)
   {
     LOG(INFO) << "DICOM server started";
 
@@ -67,20 +67,13 @@
     {
       /* receive an association and acknowledge or reject it. If the association was */
       /* acknowledged, offer corresponding services and invoke one or more if required. */
-      std::auto_ptr<Internals::CommandDispatcher> dispatcher(Internals::AcceptAssociation(*server, network));
+      std::auto_ptr<Internals::CommandDispatcher> dispatcher(Internals::AcceptAssociation(*server, server->pimpl_->network_));
 
       try
       {
         if (dispatcher.get() != NULL)
         {
-          if (server->isThreaded_)
-          {
-            server->bagOfDispatchers_.Add(dispatcher.release());
-          }
-          else
-          {
-            IRunnableBySteps::RunUntilDone(*dispatcher);
-          }
+          server->pimpl_->workers_->Add(dispatcher.release());
         }
       }
       catch (OrthancException& e)
@@ -90,19 +83,6 @@
     }
 
     LOG(INFO) << "DICOM server stopping";
-
-    if (server->isThreaded_)
-    {
-      server->bagOfDispatchers_.StopAll();
-    }
-
-    /* drop the network, i.e. free memory of T_ASC_Network* structure. This call */
-    /* is the counterpart of ASC_initializeNetwork(...) which was called above. */
-    OFCondition cond = ASC_dropNetwork(&network);
-    if (cond.bad())
-    {
-      LOG(ERROR) << "Error while dropping the network: " << cond.text();
-    }
   }
 
 
@@ -117,8 +97,7 @@
     applicationEntityFilter_ = NULL;
     checkCalledAet_ = true;
     clientTimeout_ = 30;
-    isThreaded_ = true;
-    continue_ = true;
+    continue_ = false;
   }
 
   DicomServer::~DicomServer()
@@ -141,17 +120,6 @@
     return port_;
   }
 
-  void DicomServer::SetThreaded(bool isThreaded)
-  {
-    Stop();
-    isThreaded_ = isThreaded;
-  }
-
-  bool DicomServer::IsThreaded() const
-  {
-    return isThreaded_;
-  }
-
   void DicomServer::SetClientTimeout(uint32_t timeout)
   {
     Stop();
@@ -305,17 +273,18 @@
     Stop();
 
     /* initialize network, i.e. create an instance of T_ASC_Network*. */
-    T_ASC_Network *network;
     OFCondition cond = ASC_initializeNetwork
-      (NET_ACCEPTOR, OFstatic_cast(int, port_), /*opt_acse_timeout*/ 30, &network);
+      (NET_ACCEPTOR, OFstatic_cast(int, port_), /*opt_acse_timeout*/ 30, &pimpl_->network_);
     if (cond.bad())
     {
       LOG(ERROR) << "cannot create network: " << cond.text();
       throw OrthancException(ErrorCode_DicomPortInUse);
     }
 
+    pimpl_->workers_.reset(new RunnableWorkersPool(4));   // Use 4 workers - TODO as a parameter?
+
     continue_ = true;
-    pimpl_->thread_ = boost::thread(ServerThread, this, network);
+    pimpl_->thread_ = boost::thread(ServerThread, this);
   }
 
 
@@ -330,7 +299,15 @@
         pimpl_->thread_.join();
       }
 
-      bagOfDispatchers_.Finalize();
+      pimpl_->workers_.reset(NULL);
+
+      /* drop the network, i.e. free memory of T_ASC_Network* structure. This call */
+      /* is the counterpart of ASC_initializeNetwork(...) which was called above. */
+      OFCondition cond = ASC_dropNetwork(&pimpl_->network_);
+      if (cond.bad())
+      {
+        LOG(ERROR) << "Error while dropping the network: " << cond.text();
+      }
     }
   }
 
--- a/OrthancServer/DicomProtocol/DicomServer.h	Tue Oct 06 13:36:09 2015 +0200
+++ b/OrthancServer/DicomProtocol/DicomServer.h	Tue Oct 06 14:02:39 2015 +0200
@@ -36,13 +36,10 @@
 #include "IMoveRequestHandlerFactory.h"
 #include "IStoreRequestHandlerFactory.h"
 #include "IApplicationEntityFilter.h"
-#include "../../Core/MultiThreading/BagOfRunnablesBySteps.h"
 
 #include <boost/shared_ptr.hpp>
 #include <boost/noncopyable.hpp>
 
-struct T_ASC_Network;
-
 namespace Orthanc
 {
   class DicomServer : public boost::noncopyable
@@ -57,16 +54,12 @@
     bool continue_;
     bool started_;
     uint32_t clientTimeout_;
-    bool isThreaded_;
     IFindRequestHandlerFactory* findRequestHandlerFactory_;
     IMoveRequestHandlerFactory* moveRequestHandlerFactory_;
     IStoreRequestHandlerFactory* storeRequestHandlerFactory_;
     IApplicationEntityFilter* applicationEntityFilter_;
 
-    BagOfRunnablesBySteps bagOfDispatchers_;  // This is used iff the server is threaded
-
-    static void ServerThread(DicomServer* server,
-                             T_ASC_Network *net);
+    static void ServerThread(DicomServer* server);
 
   public:
     DicomServer();
@@ -76,9 +69,6 @@
     void SetPortNumber(uint16_t port);
     uint16_t GetPortNumber() const;
 
-    void SetThreaded(bool isThreaded);
-    bool IsThreaded() const;
-
     void SetClientTimeout(uint32_t timeout);
     uint32_t GetClientTimeout() const;
 
--- a/UnitTestsSources/MultiThreadingTests.cpp	Tue Oct 06 13:36:09 2015 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Tue Oct 06 14:02:39 2015 +0200
@@ -39,7 +39,6 @@
 #include "../Core/MultiThreading/Locker.h"
 #include "../Core/MultiThreading/Mutex.h"
 #include "../Core/MultiThreading/ReaderWriterLock.h"
-#include "../Core/MultiThreading/RunnableWorkersPool.h"
 
 using namespace Orthanc;
 
@@ -258,49 +257,3 @@
     t.join();
   }
 }
-
-
-namespace
-{
-  class MyRunnable : public IRunnableBySteps
-  {
-  private:
-    unsigned int& output_;
-    unsigned int count_;
-
-  public:
-    MyRunnable(unsigned int& output) : output_(output), count_(0)
-    {
-    }
-
-    virtual bool Step()
-    {
-      count_ ++;
-      output_ ++;
-
-      boost::this_thread::sleep(boost::posix_time::milliseconds(3));
-
-      return (count_ < 7);
-    }
-  };
-}
-
-
-TEST(MultiThreading, RunnableWorkersPool)
-{
-  unsigned int output = 0;
-
-  {
-    RunnableWorkersPool pool(3);
-  
-    for (size_t i = 0; i < 11; i++)
-    {
-      pool.Add(new MyRunnable(output));
-    }
-
-    pool.WaitDone();
-  }
-
-
-  ASSERT_EQ(11 * 7, output);
-}