comparison Framework/PushMode/ActivePushTransactions.cpp @ 0:95226b754d9e

initial release
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 17 Sep 2018 11:34:55 +0200
parents
children 4c3437217518
comparison
equal deleted inserted replaced
-1:000000000000 0:95226b754d9e
1 /**
2 * Transfers accelerator plugin for Orthanc
3 * Copyright (C) 2018 Osimis, Belgium
4 *
5 * This program is free software: you can redistribute it and/or
6 * modify it under the terms of the GNU Affero General Public License
7 * as published by the Free Software Foundation, either version 3 of
8 * the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 **/
18
19
20 #include "ActivePushTransactions.h"
21
22 #include "../DownloadArea.h"
23
24 #include <Core/Logging.h>
25
26
27 namespace OrthancPlugins
28 {
29 class ActivePushTransactions::Transaction : public boost::noncopyable
30 {
31 private:
32 DownloadArea area_;
33 std::vector<TransferBucket> buckets_;
34 BucketCompression compression_;
35
36 public:
37 Transaction(const std::vector<DicomInstanceInfo>& instances,
38 const std::vector<TransferBucket>& buckets,
39 BucketCompression compression) :
40 area_(instances),
41 buckets_(buckets),
42 compression_(compression)
43 {
44 }
45
46 DownloadArea& GetDownloadArea()
47 {
48 return area_;
49 }
50
51 BucketCompression GetCompression() const
52 {
53 return compression_;
54 }
55
56 const TransferBucket& GetBucket(size_t index) const
57 {
58 if (index >= buckets_.size())
59 {
60 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
61 }
62 else
63 {
64 return buckets_[index];
65 }
66 }
67
68 void Store(size_t bucketIndex,
69 const void* data,
70 size_t size)
71 {
72 area_.WriteBucket(GetBucket(bucketIndex), data, size, compression_);
73 }
74 };
75
76
77 void ActivePushTransactions::FinalizeTransaction(OrthancPluginContext* context,
78 const std::string& transactionUuid,
79 bool commit)
80 {
81 boost::mutex::scoped_lock lock(mutex_);
82
83 Content::iterator found = content_.find(transactionUuid);
84 if (found == content_.end())
85 {
86 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
87 }
88
89 assert(found->second != NULL);
90 if (commit)
91 {
92 found->second->GetDownloadArea().Commit(context);
93 }
94
95 delete found->second;
96 content_.erase(found);
97 index_.Invalidate(transactionUuid);
98 }
99
100
101 ActivePushTransactions::~ActivePushTransactions()
102 {
103 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
104 {
105 LOG(WARNING) << "Discarding an uncommitted push transaction "
106 << "in the transfers accelerator: " << it->first;
107
108 assert(it->second != NULL);
109 delete it->second;
110 }
111 }
112
113
114 void ActivePushTransactions::ListTransactions(std::vector<std::string>& target)
115 {
116 boost::mutex::scoped_lock lock(mutex_);
117
118 target.clear();
119 target.reserve(content_.size());
120
121 for (Content::const_iterator it = content_.begin();
122 it != content_.end(); ++it)
123 {
124 target.push_back(it->first);
125 }
126 }
127
128
129 std::string ActivePushTransactions::CreateTransaction(const std::vector<DicomInstanceInfo>& instances,
130 const std::vector<TransferBucket>& buckets,
131 BucketCompression compression)
132 {
133 std::string uuid = Orthanc::Toolbox::GenerateUuid();
134 std::auto_ptr<Transaction> tmp(new Transaction(instances, buckets, compression));
135
136 LOG(INFO) << "Creating transaction to receive " << instances.size()
137 << " instances (" << ConvertToMegabytes(tmp->GetDownloadArea().GetTotalSize())
138 << "MB) in push mode: " << uuid;
139
140 {
141 boost::mutex::scoped_lock lock(mutex_);
142
143 // Drop the oldest active transaction, if not enough place
144 if (content_.size() == maxSize_)
145 {
146 std::string oldest = index_.RemoveOldest();
147
148 Content::iterator transaction = content_.find(oldest);
149 assert(transaction != content_.end() &&
150 transaction->second != NULL);
151
152 delete transaction->second;
153 content_.erase(transaction);
154
155 LOG(WARNING) << "An inactive push transaction has been discarded: " << oldest;
156 }
157
158 index_.Add(uuid);
159 content_[uuid] = tmp.release();
160 }
161
162 return uuid;
163 }
164
165
166 void ActivePushTransactions::Store(OrthancPluginContext* context,
167 const std::string& transactionUuid,
168 size_t bucketIndex,
169 const void* data,
170 size_t size)
171 {
172 boost::mutex::scoped_lock lock(mutex_);
173
174 Content::iterator found = content_.find(transactionUuid);
175 if (found == content_.end())
176 {
177 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
178 }
179
180 assert(found->second != NULL);
181
182 index_.MakeMostRecent(transactionUuid);
183
184 found->second->Store(bucketIndex, data, size);
185 }
186 }