comparison Framework/Plugins/DatabaseBackendAdapterV4.cpp @ 375:824d70ce85ff db-protobuf

implemented database operations
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 28 Mar 2023 18:11:27 +0200
parents be7de633695c
children 59bba5fbb425
comparison
equal deleted inserted replaced
374:4a3985088723 375:824d70ce85ff
23 #include "DatabaseBackendAdapterV4.h" 23 #include "DatabaseBackendAdapterV4.h"
24 24
25 #if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) // Macro introduced in Orthanc 1.3.1 25 #if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) // Macro introduced in Orthanc 1.3.1
26 # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 12, 0) 26 # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 12, 0)
27 27
28 #include "IndexConnectionsPool.h"
29
28 #include <OrthancDatabasePlugin.pb.h> // Include protobuf messages 30 #include <OrthancDatabasePlugin.pb.h> // Include protobuf messages
29 31
30 #include <Logging.h> 32 #include <Logging.h>
31 #include <MultiThreading/SharedMessageQueue.h>
32 #include <OrthancException.h> 33 #include <OrthancException.h>
33 34
34 #include <stdexcept> 35 #include <stdexcept>
35 #include <list> 36 #include <list>
36 #include <string> 37 #include <string>
37 #include <cassert> 38 #include <cassert>
38 39
39 40
40 #define ORTHANC_PLUGINS_DATABASE_CATCH(context) \ 41 #define ORTHANC_PLUGINS_DATABASE_CATCH(context) \
41 42
42 43
43 namespace OrthancDatabases 44 namespace OrthancDatabases
44 { 45 {
45 static bool isBackendInUse_ = false; // Only for sanity checks 46 static bool isBackendInUse_ = false; // Only for sanity checks
46 47
48
49 class Output : public IDatabaseBackendOutput
50 {
51 private:
52 Orthanc::DatabasePluginMessages::DeleteAttachment::Response* deleteAttachment_;
53
54 void Clear()
55 {
56 deleteAttachment_ = NULL;
57 }
58
59 public:
60 Output(Orthanc::DatabasePluginMessages::DeleteAttachment::Response& deleteAttachment)
61 {
62 Clear();
63 deleteAttachment_ = &deleteAttachment;
64 }
65
66 virtual void SignalDeletedAttachment(const std::string& uuid,
67 int32_t contentType,
68 uint64_t uncompressedSize,
69 const std::string& uncompressedHash,
70 int32_t compressionType,
71 uint64_t compressedSize,
72 const std::string& compressedHash) ORTHANC_OVERRIDE
73 {
74 Orthanc::DatabasePluginMessages::FileInfo* attachment;
75 if (deleteAttachment_ != NULL)
76 {
77 if (deleteAttachment_->has_deleted_attachment())
78 {
79 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
80 }
81
82 attachment = deleteAttachment_->mutable_deleted_attachment();
83 }
84 else
85 {
86 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
87 }
88
89 attachment->set_uuid(uuid);
90 attachment->set_content_type(contentType);
91 attachment->set_uncompressed_size(uncompressedSize);
92 attachment->set_uncompressed_hash(uncompressedHash);
93 attachment->set_compression_type(compressionType);
94 attachment->set_compressed_size(compressedSize);
95 attachment->set_compressed_hash(compressedHash);
96 }
97
98 virtual void SignalDeletedResource(const std::string& publicId,
99 OrthancPluginResourceType resourceType) ORTHANC_OVERRIDE
100 {
101 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
102 }
103
104 virtual void SignalRemainingAncestor(const std::string& ancestorId,
105 OrthancPluginResourceType ancestorType) ORTHANC_OVERRIDE
106 {
107 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
108 }
109
110 virtual void AnswerAttachment(const std::string& uuid,
111 int32_t contentType,
112 uint64_t uncompressedSize,
113 const std::string& uncompressedHash,
114 int32_t compressionType,
115 uint64_t compressedSize,
116 const std::string& compressedHash) ORTHANC_OVERRIDE
117 {
118 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
119 }
120
121 virtual void AnswerChange(int64_t seq,
122 int32_t changeType,
123 OrthancPluginResourceType resourceType,
124 const std::string& publicId,
125 const std::string& date) ORTHANC_OVERRIDE
126 {
127 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
128 }
129
130 virtual void AnswerDicomTag(uint16_t group,
131 uint16_t element,
132 const std::string& value) ORTHANC_OVERRIDE
133 {
134 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
135 }
136
137 virtual void AnswerExportedResource(int64_t seq,
138 OrthancPluginResourceType resourceType,
139 const std::string& publicId,
140 const std::string& modality,
141 const std::string& date,
142 const std::string& patientId,
143 const std::string& studyInstanceUid,
144 const std::string& seriesInstanceUid,
145 const std::string& sopInstanceUid) ORTHANC_OVERRIDE
146 {
147 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
148 }
149
150 virtual void AnswerMatchingResource(const std::string& resourceId) ORTHANC_OVERRIDE
151 {
152 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
153 }
154
155 virtual void AnswerMatchingResource(const std::string& resourceId,
156 const std::string& someInstanceId) ORTHANC_OVERRIDE
157 {
158 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
159 }
160 };
47 161
162
48 static void ProcessDatabaseOperation(Orthanc::DatabasePluginMessages::DatabaseResponse& response, 163 static void ProcessDatabaseOperation(Orthanc::DatabasePluginMessages::DatabaseResponse& response,
49 const Orthanc::DatabasePluginMessages::DatabaseRequest& request, 164 const Orthanc::DatabasePluginMessages::DatabaseRequest& request,
50 IndexBackend& backend) 165 IndexConnectionsPool& pool)
51 { 166 {
52 switch (request.operation()) 167 switch (request.operation())
53 { 168 {
54 case Orthanc::DatabasePluginMessages::OPERATION_GET_SYSTEM_INFORMATION: 169 case Orthanc::DatabasePluginMessages::OPERATION_GET_SYSTEM_INFORMATION:
55 response.mutable_get_system_information()->set_supports_revisions(backend.HasRevisionsSupport()); 170 {
56 break; 171 IndexConnectionsPool::Accessor accessor(pool);
172 response.mutable_get_system_information()->set_database_version(accessor.GetBackend().GetDatabaseVersion(accessor.GetManager()));
173 response.mutable_get_system_information()->set_supports_flush_to_disk(false);
174 response.mutable_get_system_information()->set_supports_revisions(accessor.GetBackend().HasRevisionsSupport());
175 break;
176 }
177
178 case Orthanc::DatabasePluginMessages::OPERATION_OPEN:
179 {
180 pool.OpenConnections();
181 break;
182 }
183
184 case Orthanc::DatabasePluginMessages::OPERATION_CLOSE:
185 {
186 pool.CloseConnections();
187 break;
188 }
189
190 case Orthanc::DatabasePluginMessages::OPERATION_FLUSH_TO_DISK:
191 {
192 // Raise an exception since "set_supports_flush_to_disk(false)"
193 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
194 }
195
196 case Orthanc::DatabasePluginMessages::OPERATION_START_TRANSACTION:
197 {
198 std::unique_ptr<IndexConnectionsPool::Accessor> transaction(new IndexConnectionsPool::Accessor(pool));
199
200 switch (request.start_transaction().type())
201 {
202 case Orthanc::DatabasePluginMessages::TRANSACTION_READ_ONLY:
203 transaction->GetManager().StartTransaction(TransactionType_ReadOnly);
204 break;
205
206 case Orthanc::DatabasePluginMessages::TRANSACTION_READ_WRITE:
207 transaction->GetManager().StartTransaction(TransactionType_ReadWrite);
208 break;
209
210 default:
211 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
212 }
213
214 response.mutable_start_transaction()->set_transaction(reinterpret_cast<intptr_t>(transaction.release()));
215 break;
216 }
217
218 case Orthanc::DatabasePluginMessages::OPERATION_UPGRADE:
219 {
220 IndexConnectionsPool::Accessor accessor(pool);
221 OrthancPluginStorageArea* storageArea = reinterpret_cast<OrthancPluginStorageArea*>(request.upgrade().storage_area());
222 accessor.GetBackend().UpgradeDatabase(accessor.GetManager(), request.upgrade().target_version(), storageArea);
223 break;
224 }
57 225
58 default: 226 default:
59 LOG(ERROR) << "Not implemented database operation from protobuf: " << request.operation(); 227 LOG(ERROR) << "Not implemented database operation from protobuf: " << request.operation();
60 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); 228 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
61 } 229 }
62 } 230 }
63 231
64 232
65 static void ProcessTransactionOperation(Orthanc::DatabasePluginMessages::TransactionResponse& response, 233 static void ProcessTransactionOperation(Orthanc::DatabasePluginMessages::TransactionResponse& response,
66 const Orthanc::DatabasePluginMessages::TransactionRequest& request, 234 const Orthanc::DatabasePluginMessages::TransactionRequest& request,
67 IndexBackend& backend) 235 IndexConnectionsPool::Accessor& transaction)
68 { 236 {
69 switch (request.operation()) 237 switch (request.operation())
70 { 238 {
239 case Orthanc::DatabasePluginMessages::OPERATION_ROLLBACK:
240 {
241 transaction.GetManager().RollbackTransaction();
242 break;
243 }
244
245 case Orthanc::DatabasePluginMessages::OPERATION_COMMIT:
246 {
247 transaction.GetManager().CommitTransaction();
248 break;
249 }
250
251 case Orthanc::DatabasePluginMessages::OPERATION_ADD_ATTACHMENT:
252 {
253 OrthancPluginAttachment attachment;
254 attachment.uuid = request.add_attachment().attachment().uuid().c_str();
255 attachment.contentType = request.add_attachment().attachment().content_type();
256 attachment.uncompressedSize = request.add_attachment().attachment().uncompressed_size();
257 attachment.uncompressedHash = request.add_attachment().attachment().uncompressed_hash().c_str();
258 attachment.compressionType = request.add_attachment().attachment().compression_type();
259 attachment.compressedSize = request.add_attachment().attachment().compressed_size();
260 attachment.compressedHash = request.add_attachment().attachment().compressed_hash().c_str();
261
262 transaction.GetBackend().AddAttachment(transaction.GetManager(), request.add_attachment().id(), attachment,
263 request.add_attachment().revision());
264 break;
265 }
266
267 case Orthanc::DatabasePluginMessages::OPERATION_CLEAR_CHANGES:
268 {
269 transaction.GetBackend().ClearChanges(transaction.GetManager());
270 break;
271 }
272
273 case Orthanc::DatabasePluginMessages::OPERATION_CLEAR_EXPORTED_RESOURCES:
274 {
275 transaction.GetBackend().ClearExportedResources(transaction.GetManager());
276 break;
277 }
278
279 case Orthanc::DatabasePluginMessages::OPERATION_DELETE_ATTACHMENT:
280 {
281 Output output(*response.mutable_delete_attachment());
282 transaction.GetBackend().DeleteAttachment(
283 output, transaction.GetManager(), request.delete_attachment().id(), request.delete_attachment().type());
284 break;
285 }
286
287 case Orthanc::DatabasePluginMessages::OPERATION_DELETE_METADATA:
288 {
289 transaction.GetBackend().DeleteMetadata(
290 transaction.GetManager(), request.delete_metadata().id(), request.delete_metadata().type());
291 break;
292 }
293
71 default: 294 default:
72 LOG(ERROR) << "Not implemented transaction operation from protobuf: " << request.operation(); 295 LOG(ERROR) << "Not implemented transaction operation from protobuf: " << request.operation();
73 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); 296 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
74 } 297 }
75 } 298 }
76 299
77 300
78 static OrthancPluginErrorCode CallBackend(OrthancPluginMemoryBuffer64* serializedResponse, 301 static OrthancPluginErrorCode CallBackend(OrthancPluginMemoryBuffer64* serializedResponse,
79 void* rawBackend, 302 void* rawPool,
80 const void* requestData, 303 const void* requestData,
81 uint64_t requestSize) 304 uint64_t requestSize)
82 { 305 {
83 Orthanc::DatabasePluginMessages::Request request; 306 Orthanc::DatabasePluginMessages::Request request;
84 if (!request.ParseFromArray(requestData, requestSize)) 307 if (!request.ParseFromArray(requestData, requestSize))
85 { 308 {
86 LOG(ERROR) << "Cannot parse message from the Orthanc core using protobuf"; 309 LOG(ERROR) << "Cannot parse message from the Orthanc core using protobuf";
87 return OrthancPluginErrorCode_InternalError; 310 return OrthancPluginErrorCode_InternalError;
88 } 311 }
89 312
90 if (rawBackend == NULL) 313 if (rawPool == NULL)
91 { 314 {
92 LOG(ERROR) << "Received a NULL pointer from the database"; 315 LOG(ERROR) << "Received a NULL pointer from the database";
93 return OrthancPluginErrorCode_InternalError; 316 return OrthancPluginErrorCode_InternalError;
94 } 317 }
95 318
96 IndexBackend& backend = *reinterpret_cast<IndexBackend*>(rawBackend); 319 IndexConnectionsPool& pool = *reinterpret_cast<IndexConnectionsPool*>(rawPool);
97 320
98 try 321 try
99 { 322 {
100 Orthanc::DatabasePluginMessages::Response response; 323 Orthanc::DatabasePluginMessages::Response response;
101 324
102 switch (request.type()) 325 switch (request.type())
103 { 326 {
104 case Orthanc::DatabasePluginMessages::REQUEST_DATABASE: 327 case Orthanc::DatabasePluginMessages::REQUEST_DATABASE:
105 ProcessDatabaseOperation(*response.mutable_database_response(), request.database_request(), backend); 328 ProcessDatabaseOperation(*response.mutable_database_response(), request.database_request(), pool);
106 break; 329 break;
107 330
108 case Orthanc::DatabasePluginMessages::REQUEST_TRANSACTION: 331 case Orthanc::DatabasePluginMessages::REQUEST_TRANSACTION:
109 ProcessTransactionOperation(*response.mutable_transaction_response(), request.transaction_request(), backend); 332 {
333 IndexConnectionsPool::Accessor& transaction = *reinterpret_cast<IndexConnectionsPool::Accessor*>(request.transaction_request().transaction());
334 ProcessTransactionOperation(*response.mutable_transaction_response(), request.transaction_request(), transaction);
110 break; 335 break;
336 }
111 337
112 default: 338 default:
113 LOG(ERROR) << "Not implemented request type from protobuf: " << request.type(); 339 LOG(ERROR) << "Not implemented request type from protobuf: " << request.type();
114 break; 340 break;
115 } 341 }
118 if (!response.SerializeToString(&s)) 344 if (!response.SerializeToString(&s))
119 { 345 {
120 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Cannot serialize to protobuf"); 346 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Cannot serialize to protobuf");
121 } 347 }
122 348
123 if (OrthancPluginCreateMemoryBuffer64(backend.GetContext(), serializedResponse, s.size()) != OrthancPluginErrorCode_Success) 349 if (OrthancPluginCreateMemoryBuffer64(pool.GetContext(), serializedResponse, s.size()) != OrthancPluginErrorCode_Success)
124 { 350 {
125 throw Orthanc::OrthancException(Orthanc::ErrorCode_NotEnoughMemory, "Cannot allocate a memory buffer"); 351 throw Orthanc::OrthancException(Orthanc::ErrorCode_NotEnoughMemory, "Cannot allocate a memory buffer");
126 } 352 }
127 353
128 if (!s.empty()) 354 if (!s.empty())
148 LOG(ERROR) << "Native exception"; 374 LOG(ERROR) << "Native exception";
149 return OrthancPluginErrorCode_DatabasePlugin; 375 return OrthancPluginErrorCode_DatabasePlugin;
150 } 376 }
151 } 377 }
152 378
153 static void FinalizeBackend(void* rawBackend) 379 static void FinalizeBackend(void* rawPool)
154 { 380 {
155 if (rawBackend != NULL) 381 if (rawPool != NULL)
156 { 382 {
157 IndexBackend* backend = reinterpret_cast<IndexBackend*>(rawBackend); 383 IndexConnectionsPool* pool = reinterpret_cast<IndexConnectionsPool*>(rawPool);
158 384
159 if (isBackendInUse_) 385 if (isBackendInUse_)
160 { 386 {
161 isBackendInUse_ = false; 387 isBackendInUse_ = false;
162 } 388 }
163 else 389 else
164 { 390 {
165 LOG(ERROR) << "More than one index backend was registered, internal error"; 391 LOG(ERROR) << "More than one index backend was registered, internal error";
166 } 392 }
167 393
168 delete backend; 394 delete pool;
169 } 395 }
170 else 396 else
171 { 397 {
172 LOG(ERROR) << "Received a null pointer from the Orthanc core, internal error"; 398 LOG(ERROR) << "Received a null pointer from the Orthanc core, internal error";
173 } 399 }
176 402
177 void DatabaseBackendAdapterV4::Register(IndexBackend* backend, 403 void DatabaseBackendAdapterV4::Register(IndexBackend* backend,
178 size_t countConnections, 404 size_t countConnections,
179 unsigned int maxDatabaseRetries) 405 unsigned int maxDatabaseRetries)
180 { 406 {
181 std::unique_ptr<IndexBackend> protection(backend); 407 std::unique_ptr<IndexConnectionsPool> pool(new IndexConnectionsPool(backend, countConnections));
182 408
183 if (isBackendInUse_) 409 if (isBackendInUse_)
184 { 410 {
185 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); 411 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
186 }
187
188 if (backend == NULL)
189 {
190 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
191 } 412 }
192 413
193 OrthancPluginContext* context = backend->GetContext(); 414 OrthancPluginContext* context = backend->GetContext();
194 415
195 if (OrthancPluginRegisterDatabaseBackendV4(context, protection.release(), maxDatabaseRetries, 416 if (OrthancPluginRegisterDatabaseBackendV4(context, pool.release(), maxDatabaseRetries,
196 CallBackend, FinalizeBackend) != OrthancPluginErrorCode_Success) 417 CallBackend, FinalizeBackend) != OrthancPluginErrorCode_Success)
197 { 418 {
198 delete backend; 419 delete backend;
199 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend"); 420 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend");
200 } 421 }