0
|
1 /**
|
|
2 * Transfers accelerator plugin for Orthanc
|
|
3 * Copyright (C) 2018 Osimis, Belgium
|
|
4 *
|
|
5 * This program is free software: you can redistribute it and/or
|
|
6 * modify it under the terms of the GNU Affero General Public License
|
|
7 * as published by the Free Software Foundation, either version 3 of
|
|
8 * the License, or (at your option) any later version.
|
|
9 *
|
|
10 * This program is distributed in the hope that it will be useful, but
|
|
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
13 * Affero General Public License for more details.
|
|
14 *
|
|
15 * You should have received a copy of the GNU Affero General Public License
|
|
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
17 **/
|
|
18
|
|
19
|
|
20 #include "ActivePushTransactions.h"
|
|
21
|
|
22 #include "../DownloadArea.h"
|
|
23
|
|
24 #include <Core/Logging.h>
|
|
25
|
|
26
|
|
27 namespace OrthancPlugins
|
|
28 {
|
|
29 class ActivePushTransactions::Transaction : public boost::noncopyable
|
|
30 {
|
|
31 private:
|
|
32 DownloadArea area_;
|
|
33 std::vector<TransferBucket> buckets_;
|
|
34 BucketCompression compression_;
|
|
35
|
|
36 public:
|
|
37 Transaction(const std::vector<DicomInstanceInfo>& instances,
|
|
38 const std::vector<TransferBucket>& buckets,
|
|
39 BucketCompression compression) :
|
|
40 area_(instances),
|
|
41 buckets_(buckets),
|
|
42 compression_(compression)
|
|
43 {
|
|
44 }
|
|
45
|
|
46 DownloadArea& GetDownloadArea()
|
|
47 {
|
|
48 return area_;
|
|
49 }
|
|
50
|
|
51 BucketCompression GetCompression() const
|
|
52 {
|
|
53 return compression_;
|
|
54 }
|
|
55
|
|
56 const TransferBucket& GetBucket(size_t index) const
|
|
57 {
|
|
58 if (index >= buckets_.size())
|
|
59 {
|
|
60 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
|
|
61 }
|
|
62 else
|
|
63 {
|
|
64 return buckets_[index];
|
|
65 }
|
|
66 }
|
|
67
|
|
68 void Store(size_t bucketIndex,
|
|
69 const void* data,
|
|
70 size_t size)
|
|
71 {
|
|
72 area_.WriteBucket(GetBucket(bucketIndex), data, size, compression_);
|
|
73 }
|
|
74 };
|
|
75
|
|
76
|
|
77 void ActivePushTransactions::FinalizeTransaction(OrthancPluginContext* context,
|
|
78 const std::string& transactionUuid,
|
|
79 bool commit)
|
|
80 {
|
|
81 boost::mutex::scoped_lock lock(mutex_);
|
|
82
|
|
83 Content::iterator found = content_.find(transactionUuid);
|
|
84 if (found == content_.end())
|
|
85 {
|
|
86 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
|
|
87 }
|
|
88
|
|
89 assert(found->second != NULL);
|
|
90 if (commit)
|
|
91 {
|
|
92 found->second->GetDownloadArea().Commit(context);
|
|
93 }
|
|
94
|
|
95 delete found->second;
|
|
96 content_.erase(found);
|
|
97 index_.Invalidate(transactionUuid);
|
|
98 }
|
|
99
|
|
100
|
|
101 ActivePushTransactions::~ActivePushTransactions()
|
|
102 {
|
|
103 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
|
|
104 {
|
|
105 LOG(WARNING) << "Discarding an uncommitted push transaction "
|
|
106 << "in the transfers accelerator: " << it->first;
|
|
107
|
|
108 assert(it->second != NULL);
|
|
109 delete it->second;
|
|
110 }
|
|
111 }
|
|
112
|
|
113
|
|
114 void ActivePushTransactions::ListTransactions(std::vector<std::string>& target)
|
|
115 {
|
|
116 boost::mutex::scoped_lock lock(mutex_);
|
|
117
|
|
118 target.clear();
|
|
119 target.reserve(content_.size());
|
|
120
|
|
121 for (Content::const_iterator it = content_.begin();
|
|
122 it != content_.end(); ++it)
|
|
123 {
|
|
124 target.push_back(it->first);
|
|
125 }
|
|
126 }
|
|
127
|
|
128
|
|
129 std::string ActivePushTransactions::CreateTransaction(const std::vector<DicomInstanceInfo>& instances,
|
|
130 const std::vector<TransferBucket>& buckets,
|
|
131 BucketCompression compression)
|
|
132 {
|
|
133 std::string uuid = Orthanc::Toolbox::GenerateUuid();
|
|
134 std::auto_ptr<Transaction> tmp(new Transaction(instances, buckets, compression));
|
|
135
|
|
136 LOG(INFO) << "Creating transaction to receive " << instances.size()
|
|
137 << " instances (" << ConvertToMegabytes(tmp->GetDownloadArea().GetTotalSize())
|
|
138 << "MB) in push mode: " << uuid;
|
|
139
|
|
140 {
|
|
141 boost::mutex::scoped_lock lock(mutex_);
|
|
142
|
|
143 // Drop the oldest active transaction, if not enough place
|
|
144 if (content_.size() == maxSize_)
|
|
145 {
|
|
146 std::string oldest = index_.RemoveOldest();
|
|
147
|
|
148 Content::iterator transaction = content_.find(oldest);
|
|
149 assert(transaction != content_.end() &&
|
|
150 transaction->second != NULL);
|
|
151
|
|
152 delete transaction->second;
|
|
153 content_.erase(transaction);
|
|
154
|
|
155 LOG(WARNING) << "An inactive push transaction has been discarded: " << oldest;
|
|
156 }
|
|
157
|
|
158 index_.Add(uuid);
|
|
159 content_[uuid] = tmp.release();
|
|
160 }
|
|
161
|
|
162 return uuid;
|
|
163 }
|
|
164
|
|
165
|
|
166 void ActivePushTransactions::Store(OrthancPluginContext* context,
|
|
167 const std::string& transactionUuid,
|
|
168 size_t bucketIndex,
|
|
169 const void* data,
|
|
170 size_t size)
|
|
171 {
|
|
172 boost::mutex::scoped_lock lock(mutex_);
|
|
173
|
|
174 Content::iterator found = content_.find(transactionUuid);
|
|
175 if (found == content_.end())
|
|
176 {
|
|
177 throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
|
|
178 }
|
|
179
|
|
180 assert(found->second != NULL);
|
|
181
|
|
182 index_.MakeMostRecent(transactionUuid);
|
|
183
|
|
184 found->second->Store(bucketIndex, data, size);
|
|
185 }
|
|
186 }
|