diff Framework/PushMode/ActivePushTransactions.cpp @ 0:95226b754d9e

initial release
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 17 Sep 2018 11:34:55 +0200
parents
children 4c3437217518
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Framework/PushMode/ActivePushTransactions.cpp	Mon Sep 17 11:34:55 2018 +0200
@@ -0,0 +1,186 @@
+/**
+ * Transfers accelerator plugin for Orthanc
+ * Copyright (C) 2018 Osimis, Belgium
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Affero General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+
+#include "ActivePushTransactions.h"
+
+#include "../DownloadArea.h"
+
+#include <Core/Logging.h>
+
+
+namespace OrthancPlugins
+{
+  class ActivePushTransactions::Transaction : public boost::noncopyable
+  {
+  private:
+    DownloadArea                 area_;
+    std::vector<TransferBucket>  buckets_;
+    BucketCompression            compression_;
+
+  public:
+    Transaction(const std::vector<DicomInstanceInfo>& instances,
+                const std::vector<TransferBucket>& buckets,
+                BucketCompression compression) :
+      area_(instances),
+      buckets_(buckets),
+      compression_(compression)
+    {
+    }
+
+    DownloadArea& GetDownloadArea()
+    {
+      return area_;
+    }
+
+    BucketCompression GetCompression() const
+    {
+      return compression_;
+    }
+
+    const TransferBucket& GetBucket(size_t index) const
+    {
+      if (index >= buckets_.size())
+      {
+        throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
+      }
+      else
+      {
+        return buckets_[index];
+      }
+    }
+
+    void Store(size_t bucketIndex,
+               const void* data,
+               size_t size)
+    {
+      area_.WriteBucket(GetBucket(bucketIndex), data, size, compression_);
+    }
+  };
+    
+
+  void ActivePushTransactions::FinalizeTransaction(OrthancPluginContext* context,
+                                                   const std::string& transactionUuid,
+                                                   bool commit)
+  {
+    boost::mutex::scoped_lock  lock(mutex_);
+
+    Content::iterator found = content_.find(transactionUuid);
+    if (found == content_.end())
+    {
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
+    }
+
+    assert(found->second != NULL);
+    if (commit)
+    {
+      found->second->GetDownloadArea().Commit(context);
+    }
+
+    delete found->second;
+    content_.erase(found);
+    index_.Invalidate(transactionUuid);
+  }
+
+
+  ActivePushTransactions::~ActivePushTransactions()
+  {
+    for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
+    {
+      LOG(WARNING) << "Discarding an uncommitted push transaction "
+                   << "in the transfers accelerator: " << it->first;
+        
+      assert(it->second != NULL);
+      delete it->second;
+    }
+  }
+    
+
+  void ActivePushTransactions::ListTransactions(std::vector<std::string>& target)
+  {
+    boost::mutex::scoped_lock  lock(mutex_);
+
+    target.clear();
+    target.reserve(content_.size());
+
+    for (Content::const_iterator it = content_.begin();
+         it != content_.end(); ++it)
+    {
+      target.push_back(it->first);
+    }
+  }
+
+  
+  std::string ActivePushTransactions::CreateTransaction(const std::vector<DicomInstanceInfo>& instances,
+                                                        const std::vector<TransferBucket>& buckets,
+                                                        BucketCompression compression)
+  {
+    std::string uuid = Orthanc::Toolbox::GenerateUuid();
+    std::auto_ptr<Transaction> tmp(new Transaction(instances, buckets, compression));
+
+    LOG(INFO) << "Creating transaction to receive " << instances.size()
+              << " instances (" << ConvertToMegabytes(tmp->GetDownloadArea().GetTotalSize())
+              << "MB) in push mode: " << uuid;
+      
+    {
+      boost::mutex::scoped_lock  lock(mutex_);
+
+      // Drop the oldest active transaction, if not enough place
+      if (content_.size() == maxSize_)
+      {
+        std::string oldest = index_.RemoveOldest();
+
+        Content::iterator transaction = content_.find(oldest);
+        assert(transaction != content_.end() &&
+               transaction->second != NULL);
+
+        delete transaction->second;
+        content_.erase(transaction);
+
+        LOG(WARNING) << "An inactive push transaction has been discarded: " << oldest;
+      }
+
+      index_.Add(uuid);
+      content_[uuid] = tmp.release();
+    }
+
+    return uuid;
+  }
+    
+
+  void ActivePushTransactions::Store(OrthancPluginContext* context,
+                                     const std::string& transactionUuid,
+                                     size_t bucketIndex,
+                                     const void* data,
+                                     size_t size)
+  {
+    boost::mutex::scoped_lock  lock(mutex_);
+
+    Content::iterator found = content_.find(transactionUuid);
+    if (found == content_.end())
+    {
+      throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
+    }
+      
+    assert(found->second != NULL);
+
+    index_.MakeMostRecent(transactionUuid);
+      
+    found->second->Store(bucketIndex, data, size);
+  }
+}