changeset 206:4453a010d0db

flush to disk thread
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 28 Nov 2012 12:03:18 +0100
parents 6ab754744446
children 7f74209ea0f8
files Core/SQLite/Connection.cpp Core/SQLite/Connection.h NEWS OrthancServer/DatabaseWrapper.cpp OrthancServer/DatabaseWrapper.h OrthancServer/PrepareDatabase.sql OrthancServer/ServerEnumerations.h OrthancServer/ServerIndex.cpp OrthancServer/ServerIndex.h UnitTests/ServerIndex.cpp
diffstat 10 files changed, 100 insertions(+), 25 deletions(-) [+]
line wrap: on
line diff
--- a/Core/SQLite/Connection.cpp	Wed Nov 28 11:34:54 2012 +0100
+++ b/Core/SQLite/Connection.cpp	Wed Nov 28 12:03:18 2012 +0100
@@ -372,5 +372,16 @@
       return func;
     }
 
+
+    void Connection::FlushToDisk()
+    {
+      VLOG(1) << "SQLite::Connection::FlushToDisk";
+      int err = sqlite3_wal_checkpoint(db_, NULL);
+
+      if (err != SQLITE_OK)
+      {
+        throw OrthancException("SQLite: Unable to flush the database");
+      }
+    }
   }
 }
--- a/Core/SQLite/Connection.h	Wed Nov 28 11:34:54 2012 +0100
+++ b/Core/SQLite/Connection.h	Wed Nov 28 12:03:18 2012 +0100
@@ -111,6 +111,8 @@
         return Execute(sql.c_str());
       }
 
+      void FlushToDisk();
+
       IScalarFunction* Register(IScalarFunction* func);  // Takes the ownership of the function
 
       // Info querying -------------------------------------------------------------
@@ -169,7 +171,7 @@
 
       bool BeginTransaction();
       void RollbackTransaction();
-      bool CommitTransaction();
+      bool CommitTransaction();      
     };
   }
 }
--- a/NEWS	Wed Nov 28 11:34:54 2012 +0100
+++ b/NEWS	Wed Nov 28 12:03:18 2012 +0100
@@ -1,9 +1,19 @@
 Pending changes in the mainline
 ===============================
 
+Major changes
+-------------
+
 * Full refactoring of the DB schema
 * Generate a sample configuration file from command line
 
+Minor changes
+-------------
+
+* "CompletedSeries" event in the changes API
+* Flush to disk thread
+
+
 
 Version 0.2.3 (2012/10/26)
 ==========================
--- a/OrthancServer/DatabaseWrapper.cpp	Wed Nov 28 11:34:54 2012 +0100
+++ b/OrthancServer/DatabaseWrapper.cpp	Wed Nov 28 12:03:18 2012 +0100
@@ -130,21 +130,21 @@
 
 
   
-  void DatabaseWrapper::SetGlobalProperty(const std::string& name,
+  void DatabaseWrapper::SetGlobalProperty(GlobalProperty property,
                                           const std::string& value)
   {
     SQLite::Statement s(db_, SQLITE_FROM_HERE, "INSERT OR REPLACE INTO GlobalProperties VALUES(?, ?)");
-    s.BindString(0, name);
+    s.BindInt(0, property);
     s.BindString(1, value);
     s.Run();
   }
 
   bool DatabaseWrapper::LookupGlobalProperty(std::string& target,
-                                             const std::string& name)
+                                             GlobalProperty property)
   {
     SQLite::Statement s(db_, SQLITE_FROM_HERE, 
-                        "SELECT value FROM GlobalProperties WHERE name=?");
-    s.BindString(0, name);
+                        "SELECT value FROM GlobalProperties WHERE property=?");
+    s.BindInt(0, property);
 
     if (!s.Step())
     {
@@ -157,11 +157,11 @@
     }
   }
 
-  std::string DatabaseWrapper::GetGlobalProperty(const std::string& name,
+  std::string DatabaseWrapper::GetGlobalProperty(GlobalProperty property,
                                                  const std::string& defaultValue)
   {
     std::string s;
-    if (LookupGlobalProperty(s, name))
+    if (LookupGlobalProperty(s, property))
     {
       return s;
     }
--- a/OrthancServer/DatabaseWrapper.h	Wed Nov 28 11:34:54 2012 +0100
+++ b/OrthancServer/DatabaseWrapper.h	Wed Nov 28 12:03:18 2012 +0100
@@ -62,13 +62,13 @@
     void Open();
 
   public:
-    void SetGlobalProperty(const std::string& name,
+    void SetGlobalProperty(GlobalProperty property,
                            const std::string& value);
 
     bool LookupGlobalProperty(std::string& target,
-                              const std::string& name);
+                              GlobalProperty property);
 
-    std::string GetGlobalProperty(const std::string& name,
+    std::string GetGlobalProperty(GlobalProperty property,
                                   const std::string& defaultValue = "");
 
     int64_t CreateResource(const std::string& publicId,
@@ -190,5 +190,10 @@
     {
       return db_.GetErrorMessage();
     }
+
+    void FlushToDisk()
+    {
+      db_.FlushToDisk();
+    }
   };
 }
--- a/OrthancServer/PrepareDatabase.sql	Wed Nov 28 11:34:54 2012 +0100
+++ b/OrthancServer/PrepareDatabase.sql	Wed Nov 28 12:03:18 2012 +0100
@@ -1,5 +1,5 @@
 CREATE TABLE GlobalProperties(
-       name TEXT PRIMARY KEY,
+       property INTEGER PRIMARY KEY,
        value TEXT
        );
 
--- a/OrthancServer/ServerEnumerations.h	Wed Nov 28 11:34:54 2012 +0100
+++ b/OrthancServer/ServerEnumerations.h	Wed Nov 28 12:03:18 2012 +0100
@@ -35,6 +35,11 @@
 
 namespace Orthanc
 {
+  enum GlobalProperty
+  {
+    GlobalProperty_FlushSleep = 1
+  };
+
   enum SeriesStatus
   {
     SeriesStatus_Complete,
--- a/OrthancServer/ServerIndex.cpp	Wed Nov 28 11:34:54 2012 +0100
+++ b/OrthancServer/ServerIndex.cpp	Wed Nov 28 12:03:18 2012 +0100
@@ -127,7 +127,7 @@
                                    const std::string& uuid,
                                    ResourceType expectedType)
   {
-    boost::mutex::scoped_lock scoped_lock(mutex_);
+    boost::mutex::scoped_lock lock(mutex_);
 
     listener_->Reset();
 
@@ -165,6 +165,35 @@
   }
 
 
+  static void FlushThread(DatabaseWrapper* db,
+                          boost::mutex* mutex)
+  {
+    // By default, wait for 10 seconds before flushing
+    unsigned int sleep = 10;
+
+    {
+      boost::mutex::scoped_lock lock(*mutex);
+      std::string s = db->GetGlobalProperty(GlobalProperty_FlushSleep);
+      try
+      {
+        sleep = boost::lexical_cast<unsigned int>(s);
+      }
+      catch (boost::bad_lexical_cast&)
+      {
+      }
+    }
+
+    LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")";
+
+    while (1)
+    {
+      boost::this_thread::sleep(boost::posix_time::seconds(sleep));
+      boost::mutex::scoped_lock lock(*mutex);
+      db->FlushToDisk();
+    }
+  }
+
+
   ServerIndex::ServerIndex(FileStorage& fileStorage,
                            const std::string& dbPath)
   {
@@ -188,6 +217,16 @@
 
       db_.reset(new DatabaseWrapper(p.string() + "/index", *listener_));
     }
+
+    flushThread_ = boost::thread(FlushThread, db_.get(), &mutex_);
+  }
+
+
+  ServerIndex::~ServerIndex()
+  {
+    LOG(INFO) << "Stopping the database flushing thread";
+    flushThread_.interrupt();
+    flushThread_.join();
   }
 
 
@@ -197,7 +236,7 @@
                                  const std::string& jsonUuid,
                                  const std::string& remoteAet)
   {
-    boost::mutex::scoped_lock scoped_lock(mutex_);
+    boost::mutex::scoped_lock lock(mutex_);
 
     DicomInstanceHasher hasher(dicomSummary);
 
@@ -350,13 +389,13 @@
 
   uint64_t ServerIndex::GetTotalCompressedSize()
   {
-    boost::mutex::scoped_lock scoped_lock(mutex_);
+    boost::mutex::scoped_lock lock(mutex_);
     return db_->GetTotalCompressedSize();
   }
 
   uint64_t ServerIndex::GetTotalUncompressedSize()
   {
-    boost::mutex::scoped_lock scoped_lock(mutex_);
+    boost::mutex::scoped_lock lock(mutex_);
     return db_->GetTotalUncompressedSize();
   }
 
@@ -442,7 +481,7 @@
   {
     result = Json::objectValue;
 
-    boost::mutex::scoped_lock scoped_lock(mutex_);
+    boost::mutex::scoped_lock lock(mutex_);
 
     // Lookup for the requested resource
     int64_t id;
@@ -581,7 +620,7 @@
                             const std::string& instanceUuid,
                             AttachedFileType contentType)
   {
-    boost::mutex::scoped_lock scoped_lock(mutex_);
+    boost::mutex::scoped_lock lock(mutex_);
 
     int64_t id;
     ResourceType type;
@@ -601,7 +640,7 @@
   void ServerIndex::GetAllUuids(Json::Value& target,
                                 ResourceType resourceType)
   {
-    boost::mutex::scoped_lock scoped_lock(mutex_);
+    boost::mutex::scoped_lock lock(mutex_);
     db_->GetAllPublicIds(target, resourceType);
   }
 
@@ -610,7 +649,7 @@
                                int64_t since,                               
                                unsigned int maxResults)
   {
-    boost::mutex::scoped_lock scoped_lock(mutex_);
+    boost::mutex::scoped_lock lock(mutex_);
 
     db_->GetChanges(target, since, maxResults);
 
--- a/OrthancServer/ServerIndex.h	Wed Nov 28 11:34:54 2012 +0100
+++ b/OrthancServer/ServerIndex.h	Wed Nov 28 12:03:18 2012 +0100
@@ -54,6 +54,7 @@
   {
   private:
     boost::mutex mutex_;
+    boost::thread flushThread_;
 
     std::auto_ptr<Internals::ServerIndexListener> listener_;
     std::auto_ptr<DatabaseWrapper> db_;
@@ -75,6 +76,8 @@
     ServerIndex(FileStorage& fileStorage,
                 const std::string& dbPath);
 
+    ~ServerIndex();
+
     StoreStatus Store(const DicomMap& dicomSummary,
                       const std::string& fileUuid,
                       uint64_t uncompressedFileSize,
--- a/UnitTests/ServerIndex.cpp	Wed Nov 28 11:34:54 2012 +0100
+++ b/UnitTests/ServerIndex.cpp	Wed Nov 28 12:03:18 2012 +0100
@@ -80,7 +80,7 @@
     ASSERT_EQ(3, t.size());
   }
 
-  index.SetGlobalProperty("Hello", "World");
+  index.SetGlobalProperty(GlobalProperty_FlushSleep, "World");
 
   index.AttachChild(a[0], a[1]);
   index.AttachChild(a[1], a[2]);
@@ -150,11 +150,11 @@
   ASSERT_EQ("PINNACLE", index.GetMetadata(a[4], MetadataType_Instance_RemoteAet));
   ASSERT_EQ("None", index.GetMetadata(a[4], MetadataType_Instance_IndexInSeries, "None"));
 
-  ASSERT_TRUE(index.LookupGlobalProperty(s, "Hello"));
-  ASSERT_FALSE(index.LookupGlobalProperty(s, "Hello2"));
+  ASSERT_TRUE(index.LookupGlobalProperty(s, GlobalProperty_FlushSleep));
+  ASSERT_FALSE(index.LookupGlobalProperty(s, static_cast<GlobalProperty>(42)));
   ASSERT_EQ("World", s);
-  ASSERT_EQ("World", index.GetGlobalProperty("Hello"));
-  ASSERT_EQ("None", index.GetGlobalProperty("Hello2", "None"));
+  ASSERT_EQ("World", index.GetGlobalProperty(GlobalProperty_FlushSleep));
+  ASSERT_EQ("None", index.GetGlobalProperty(static_cast<GlobalProperty>(42), "None"));
 
   uint64_t us, cs;
   CompressionType ct;