changeset 1453:c0bdc47165ef

code to warn about possible threading problems
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 02 Jul 2015 12:26:44 +0200
parents b737acb13da5
children 9de4fa64e29c
files Core/MultiThreading/BagOfRunnablesBySteps.cpp Core/MultiThreading/BagOfRunnablesBySteps.h OrthancServer/DicomProtocol/DicomServer.cpp OrthancServer/DicomProtocol/ReusableDicomUserConnection.cpp OrthancServer/DicomProtocol/ReusableDicomUserConnection.h OrthancServer/Scheduler/ServerScheduler.cpp OrthancServer/Scheduler/ServerScheduler.h OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerIndex.cpp OrthancServer/ServerIndex.h UnitTestsSources/MultiThreadingTests.cpp UnitTestsSources/ServerIndexTests.cpp
diffstat 13 files changed, 154 insertions(+), 41 deletions(-) [+]
line wrap: on
line diff
--- a/Core/MultiThreading/BagOfRunnablesBySteps.cpp	Thu Jul 02 11:35:41 2015 +0200
+++ b/Core/MultiThreading/BagOfRunnablesBySteps.cpp	Thu Jul 02 12:26:44 2015 +0200
@@ -35,6 +35,7 @@
 
 #include <stack>
 #include <boost/thread.hpp>
+#include <glog/logging.h>
 
 namespace Orthanc
 {
@@ -128,15 +129,10 @@
 
   BagOfRunnablesBySteps::~BagOfRunnablesBySteps()
   {
-    StopAll();
-
-    // Stop the finish listener
-    pimpl_->stopFinishListener_ = true;
-    pimpl_->oneThreadIsStopped_.notify_one();  // Awakens the listener
-
-    if (pimpl_->finishListener_->joinable())
+    if (!pimpl_->stopFinishListener_)
     {
-      pimpl_->finishListener_->join();
+      LOG(ERROR) << "INTERNAL ERROR: BagOfRunnablesBySteps::Finalize() should be invoked manually to avoid mess in the destruction order!";
+      Finalize();
     }
   }
 
@@ -165,4 +161,24 @@
 
     pimpl_->continue_ = true;
   }
+
+
+
+  void BagOfRunnablesBySteps::Finalize()
+  {
+    if (!pimpl_->stopFinishListener_)
+    {
+      StopAll();
+
+      // Stop the finish listener
+      pimpl_->stopFinishListener_ = true;
+      pimpl_->oneThreadIsStopped_.notify_one();  // Awakens the listener
+
+      if (pimpl_->finishListener_->joinable())
+      {
+        pimpl_->finishListener_->join();
+      }
+    }
+  }
+
 }
--- a/Core/MultiThreading/BagOfRunnablesBySteps.h	Thu Jul 02 11:35:41 2015 +0200
+++ b/Core/MultiThreading/BagOfRunnablesBySteps.h	Thu Jul 02 12:26:44 2015 +0200
@@ -58,5 +58,7 @@
     void Add(IRunnableBySteps* runnable);
 
     void StopAll();
+
+    void Finalize();
   };
 }
--- a/OrthancServer/DicomProtocol/DicomServer.cpp	Thu Jul 02 11:35:41 2015 +0200
+++ b/OrthancServer/DicomProtocol/DicomServer.cpp	Thu Jul 02 12:26:44 2015 +0200
@@ -223,7 +223,11 @@
 
   DicomServer::~DicomServer()
   {
-    Stop();
+    if (continue_)
+    {
+      LOG(ERROR) << "INTERNAL ERROR: DicomServer::Stop() should be invoked manually to avoid mess in the destruction order!";
+      Stop();
+    }
   }
 
   void DicomServer::SetPortNumber(uint16_t port)
@@ -409,16 +413,21 @@
     }
   }
 
+
   void DicomServer::Stop()
   {
-    continue_ = false;
+    if (continue_)
+    {
+      continue_ = false;
 
-    if (pimpl_->thread_.joinable())
-    {
-      pimpl_->thread_.join();
+      if (pimpl_->thread_.joinable())
+      {
+        pimpl_->thread_.join();
+      }
     }
   }
 
+
   bool DicomServer::IsMyAETitle(const std::string& aet) const
   {
     if (!HasCalledApplicationEntityTitleCheck())
--- a/OrthancServer/DicomProtocol/ReusableDicomUserConnection.cpp	Thu Jul 02 11:35:41 2015 +0200
+++ b/OrthancServer/DicomProtocol/ReusableDicomUserConnection.cpp	Thu Jul 02 12:26:44 2015 +0200
@@ -131,9 +131,11 @@
 
   ReusableDicomUserConnection::~ReusableDicomUserConnection()
   {
-    continue_ = false;
-    closeThread_.join();
-    Close();
+    if (continue_)
+    {
+      LOG(ERROR) << "INTERNAL ERROR: ReusableDicomUserConnection::Finalize() should be invoked manually to avoid mess in the destruction order!";
+      Finalize();
+    }
   }
 
   void ReusableDicomUserConnection::SetMillisecondsBeforeClose(uint64_t ms)
@@ -166,5 +168,21 @@
     lastUse_ = Now();
     mutex_.unlock();
   }
+
+  
+  void ReusableDicomUserConnection::Finalize()
+  {
+    if (continue_)
+    {
+      continue_ = false;
+
+      if (closeThread_.joinable())
+      {
+        closeThread_.join();
+      }
+
+      Close();
+    }
+  }
 }
 
--- a/OrthancServer/DicomProtocol/ReusableDicomUserConnection.h	Thu Jul 02 11:35:41 2015 +0200
+++ b/OrthancServer/DicomProtocol/ReusableDicomUserConnection.h	Thu Jul 02 12:26:44 2015 +0200
@@ -81,6 +81,8 @@
     virtual ~ReusableDicomUserConnection();
 
     void SetMillisecondsBeforeClose(uint64_t ms);
+
+    void Finalize();
   };
 }
 
--- a/OrthancServer/Scheduler/ServerScheduler.cpp	Thu Jul 02 11:35:41 2015 +0200
+++ b/OrthancServer/Scheduler/ServerScheduler.cpp	Thu Jul 02 12:26:44 2015 +0200
@@ -199,8 +199,25 @@
 
   ServerScheduler::~ServerScheduler()
   {
-    finish_ = true;
-    worker_.join();
+    if (!finish_)
+    {
+      LOG(ERROR) << "INTERNAL ERROR: ServerScheduler::Finalize() should be invoked manually to avoid mess in the destruction order!";
+      Stop();
+    }
+  }
+
+
+  void ServerScheduler::Stop()
+  {
+    if (!finish_)
+    {
+      finish_ = true;
+
+      if (worker_.joinable())
+      {
+        worker_.join();
+      }
+    }
   }
 
 
--- a/OrthancServer/Scheduler/ServerScheduler.h	Thu Jul 02 11:35:41 2015 +0200
+++ b/OrthancServer/Scheduler/ServerScheduler.h	Thu Jul 02 12:26:44 2015 +0200
@@ -86,6 +86,8 @@
 
     ~ServerScheduler();
 
+    void Stop();
+
     void Submit(ServerJob& job);
 
     bool SubmitAndWait(ListOfStrings& outputs,
--- a/OrthancServer/ServerContext.cpp	Thu Jul 02 11:35:41 2015 +0200
+++ b/OrthancServer/ServerContext.cpp	Thu Jul 02 12:26:44 2015 +0200
@@ -77,6 +77,7 @@
       {
         const ServerIndexChange& change = dynamic_cast<const ServerIndexChange&>(*obj.get());
 
+        boost::mutex::scoped_lock lock(that->listenersMutex_);
         for (ServerListeners::iterator it = that->listeners_.begin(); 
              it != that->listeners_.end(); ++it)
         {
@@ -119,7 +120,11 @@
   
   ServerContext::~ServerContext()
   {
-    Stop();
+    if (!done_)
+    {
+      LOG(ERROR) << "INTERNAL ERROR: ServerContext::Stop() should be invoked manually to avoid mess in the destruction order!";
+      Stop();
+    }
   }
 
 
@@ -133,6 +138,12 @@
       {
         changeThread_.join();
       }
+
+      scu_.Finalize();
+
+      // Do not change the order below!
+      scheduler_.Stop();
+      index_.Stop();
     }
   }
 
@@ -168,21 +179,25 @@
       // Test if the instance must be filtered out
       bool accepted = true;
 
-      for (ServerListeners::iterator it = listeners_.begin(); it != listeners_.end(); ++it)
       {
-        try
+        boost::mutex::scoped_lock lock(listenersMutex_);
+
+        for (ServerListeners::iterator it = listeners_.begin(); it != listeners_.end(); ++it)
         {
-          if (!it->GetListener().FilterIncomingInstance(simplified, dicom.GetRemoteAet()))
+          try
           {
-            accepted = false;
-            break;
+            if (!it->GetListener().FilterIncomingInstance(simplified, dicom.GetRemoteAet()))
+            {
+              accepted = false;
+              break;
+            }
           }
-        }
-        catch (OrthancException& e)
-        {
-          LOG(ERROR) << "Error in the " << it->GetDescription() 
-                     << " callback while receiving an instance: " << e.What();
-          throw;
+          catch (OrthancException& e)
+          {
+            LOG(ERROR) << "Error in the " << it->GetDescription() 
+                       << " callback while receiving an instance: " << e.What();
+            throw;
+          }
         }
       }
 
@@ -251,6 +266,8 @@
       if (status == StoreStatus_Success ||
           status == StoreStatus_AlreadyStored)
       {
+        boost::mutex::scoped_lock lock(listenersMutex_);
+
         for (ServerListeners::iterator it = listeners_.begin(); it != listeners_.end(); ++it)
         {
           try
@@ -422,6 +439,7 @@
     plugins_ = &plugins;
 
     // TODO REFACTOR THIS
+    boost::mutex::scoped_lock lock(listenersMutex_);
     listeners_.clear();
     listeners_.push_back(ServerListener(lua_, "Lua"));
     listeners_.push_back(ServerListener(plugins, "plugin"));
@@ -433,6 +451,7 @@
     plugins_ = NULL;
 
     // TODO REFACTOR THIS
+    boost::mutex::scoped_lock lock(listenersMutex_);
     listeners_.clear();
     listeners_.push_back(ServerListener(lua_, "Lua"));
   }
--- a/OrthancServer/ServerContext.h	Thu Jul 02 11:35:41 2015 +0200
+++ b/OrthancServer/ServerContext.h	Thu Jul 02 12:26:44 2015 +0200
@@ -120,6 +120,7 @@
     LuaScripting lua_;
     OrthancPlugins* plugins_;
     ServerListeners listeners_;
+    boost::mutex listenersMutex_;
 
     bool done_;
     SharedMessageQueue  pendingChanges_;
--- a/OrthancServer/ServerIndex.cpp	Thu Jul 02 11:35:41 2015 +0200
+++ b/OrthancServer/ServerIndex.cpp	Thu Jul 02 12:26:44 2015 +0200
@@ -564,23 +564,39 @@
   }
 
 
+
   ServerIndex::~ServerIndex()
   {
-    done_ = true;
-
-    if (db_.HasFlushToDisk() &&
-        flushThread_.joinable())
+    if (!done_)
     {
-      flushThread_.join();
-    }
-
-    if (unstableResourcesMonitorThread_.joinable())
-    {
-      unstableResourcesMonitorThread_.join();
+      LOG(ERROR) << "INTERNAL ERROR: ServerIndex::Stop() should be invoked manually to avoid mess in the destruction order!";
+      Stop();
     }
   }
 
 
+
+  void ServerIndex::Stop()
+  {
+    if (!done_)
+    {
+      done_ = true;
+
+      if (db_.HasFlushToDisk() &&
+          flushThread_.joinable())
+      {
+        flushThread_.join();
+      }
+
+      if (unstableResourcesMonitorThread_.joinable())
+      {
+        unstableResourcesMonitorThread_.join();
+      }
+    }
+  }
+
+
+
   StoreStatus ServerIndex::Store(std::map<MetadataType, std::string>& instanceMetadata,
                                  const DicomMap& dicomSummary,
                                  const Attachments& attachments,
--- a/OrthancServer/ServerIndex.h	Thu Jul 02 11:35:41 2015 +0200
+++ b/OrthancServer/ServerIndex.h	Thu Jul 02 12:26:44 2015 +0200
@@ -122,6 +122,8 @@
 
     ~ServerIndex();
 
+    void Stop();
+
     uint64_t GetMaximumStorageSize() const
     {
       return maximumStorageSize_;
--- a/UnitTestsSources/MultiThreadingTests.cpp	Thu Jul 02 11:35:41 2015 +0200
+++ b/UnitTestsSources/MultiThreadingTests.cpp	Thu Jul 02 12:26:44 2015 +0200
@@ -251,6 +251,11 @@
   //Toolbox::ServerBarrier();
   //Toolbox::USleep(3000000);
 
+  scheduler.Stop();
+
   done = true;
-  t.join();
+  if (t.joinable())
+  {
+    t.join();
+  }
 }
--- a/UnitTestsSources/ServerIndexTests.cpp	Thu Jul 02 11:35:41 2015 +0200
+++ b/UnitTestsSources/ServerIndexTests.cpp	Thu Jul 02 12:26:44 2015 +0200
@@ -668,6 +668,8 @@
   ASSERT_EQ(2u, index.IncrementGlobalSequence(GlobalProperty_AnonymizationSequence));
   ASSERT_EQ(3u, index.IncrementGlobalSequence(GlobalProperty_AnonymizationSequence));
   ASSERT_EQ(4u, index.IncrementGlobalSequence(GlobalProperty_AnonymizationSequence));
+
+  context.Stop();
 }
 
 
@@ -779,4 +781,6 @@
 
   // Because the DB is in memory, the SQLite index must not have been created
   ASSERT_THROW(Toolbox::GetFileSize(path + "/index"), OrthancException);  
+
+  context.Stop();
 }