Mercurial > hg > orthanc-databases
comparison Framework/Plugins/StorageBackend.cpp @ 269:567761f0c1ea
fix issue #151: support of retries in the storage area plugins to deal with multiple writers
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 21 Apr 2021 17:54:31 +0200 |
parents | cd73e34d5411 |
children | 5931c2ff22ca |
comparison
equal
deleted
inserted
replaced
268:9b003f265a8f | 269:567761f0c1ea |
---|---|
30 | 30 |
31 #include <Compatibility.h> // For std::unique_ptr<> | 31 #include <Compatibility.h> // For std::unique_ptr<> |
32 #include <Logging.h> | 32 #include <Logging.h> |
33 #include <OrthancException.h> | 33 #include <OrthancException.h> |
34 | 34 |
35 #include <boost/thread.hpp> | |
35 #include <cassert> | 36 #include <cassert> |
36 #include <limits> | 37 #include <limits> |
37 | 38 |
38 | 39 |
39 #define ORTHANC_PLUGINS_DATABASE_CATCH \ | 40 #define ORTHANC_PLUGINS_DATABASE_CATCH \ |
54 } | 55 } |
55 | 56 |
56 | 57 |
57 namespace OrthancDatabases | 58 namespace OrthancDatabases |
58 { | 59 { |
59 StorageBackend::StorageBackend(IDatabaseFactory* factory) : | 60 class StorageBackend::ReadWholeOperation : public StorageBackend::IDatabaseOperation |
60 manager_(factory) | 61 { |
62 private: | |
63 IFileContentVisitor& visitor_; | |
64 const char* uuid_; | |
65 OrthancPluginContentType type_; | |
66 | |
67 public: | |
68 ReadWholeOperation(IFileContentVisitor& visitor, | |
69 const char* uuid, | |
70 OrthancPluginContentType type) : | |
71 visitor_(visitor), | |
72 uuid_(uuid), | |
73 type_(type) | |
74 { | |
75 } | |
76 | |
77 virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE | |
78 { | |
79 accessor.ReadWhole(visitor_, uuid_, type_); | |
80 } | |
81 }; | |
82 | |
83 | |
84 StorageBackend::StorageBackend(IDatabaseFactory* factory, | |
85 unsigned int maxRetries) : | |
86 manager_(factory), | |
87 maxRetries_(maxRetries) | |
61 { | 88 { |
62 } | 89 } |
63 | 90 |
64 void StorageBackend::AccessorBase::Create(const std::string& uuid, | 91 void StorageBackend::AccessorBase::Create(const std::string& uuid, |
65 const void* content, | 92 const void* content, |
231 statement.Execute(args); | 258 statement.Execute(args); |
232 } | 259 } |
233 | 260 |
234 transaction.Commit(); | 261 transaction.Commit(); |
235 } | 262 } |
236 | 263 |
237 | |
238 | 264 |
239 static OrthancPluginContext* context_ = NULL; | 265 static OrthancPluginContext* context_ = NULL; |
240 static std::unique_ptr<StorageBackend> backend_; | 266 static std::unique_ptr<StorageBackend> backend_; |
241 | 267 |
242 | 268 |
243 static OrthancPluginErrorCode StorageCreate(const char* uuid, | 269 static OrthancPluginErrorCode StorageCreate(const char* uuid, |
244 const void* content, | 270 const void* content, |
245 int64_t size, | 271 int64_t size, |
246 OrthancPluginContentType type) | 272 OrthancPluginContentType type) |
247 { | 273 { |
274 class Operation : public StorageBackend::IDatabaseOperation | |
275 { | |
276 private: | |
277 const char* uuid_; | |
278 const void* content_; | |
279 int64_t size_; | |
280 OrthancPluginContentType type_; | |
281 | |
282 public: | |
283 Operation(const char* uuid, | |
284 const void* content, | |
285 int64_t size, | |
286 OrthancPluginContentType type) : | |
287 uuid_(uuid), | |
288 content_(content), | |
289 size_(size), | |
290 type_(type) | |
291 { | |
292 } | |
293 | |
294 virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE | |
295 { | |
296 accessor.Create(uuid_, content_, size_, type_); | |
297 } | |
298 }; | |
299 | |
300 | |
248 try | 301 try |
249 { | 302 { |
250 if (backend_.get() == NULL) | 303 if (backend_.get() == NULL) |
251 { | 304 { |
252 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); | 305 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); |
253 } | 306 } |
254 else | 307 else |
255 { | 308 { |
256 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); | 309 Operation operation(uuid, content, size, type); |
257 accessor->Create(uuid, content, static_cast<size_t>(size), type); | 310 backend_->Execute(operation); |
258 return OrthancPluginErrorCode_Success; | 311 return OrthancPluginErrorCode_Success; |
259 } | 312 } |
260 } | 313 } |
261 ORTHANC_PLUGINS_DATABASE_CATCH; | 314 ORTHANC_PLUGINS_DATABASE_CATCH; |
262 } | 315 } |
325 else | 378 else |
326 { | 379 { |
327 Visitor visitor(target); | 380 Visitor visitor(target); |
328 | 381 |
329 { | 382 { |
330 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); | 383 StorageBackend::ReadWholeOperation operation(visitor, uuid, type); |
331 accessor->ReadWhole(visitor, uuid, type); | 384 backend_->Execute(operation); |
332 } | 385 } |
333 | 386 |
334 return OrthancPluginErrorCode_Success; | 387 return OrthancPluginErrorCode_Success; |
335 } | 388 } |
336 } | 389 } |
387 } | 440 } |
388 } | 441 } |
389 }; | 442 }; |
390 | 443 |
391 | 444 |
445 class Operation : public StorageBackend::IDatabaseOperation | |
446 { | |
447 private: | |
448 Visitor& visitor_; | |
449 const char* uuid_; | |
450 OrthancPluginContentType type_; | |
451 uint64_t start_; | |
452 size_t length_; | |
453 | |
454 public: | |
455 Operation(Visitor& visitor, | |
456 const char* uuid, | |
457 OrthancPluginContentType type, | |
458 uint64_t start, | |
459 size_t length) : | |
460 visitor_(visitor), | |
461 uuid_(uuid), | |
462 type_(type), | |
463 start_(start), | |
464 length_(length) | |
465 { | |
466 } | |
467 | |
468 virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE | |
469 { | |
470 accessor.ReadRange(visitor_, uuid_, type_, start_, length_); | |
471 } | |
472 }; | |
473 | |
474 | |
392 try | 475 try |
393 { | 476 { |
394 if (backend_.get() == NULL) | 477 if (backend_.get() == NULL) |
395 { | 478 { |
396 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); | 479 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); |
398 else | 481 else |
399 { | 482 { |
400 Visitor visitor(target); | 483 Visitor visitor(target); |
401 | 484 |
402 { | 485 { |
403 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); | 486 Operation operation(visitor, uuid, type, start, target->size); |
404 accessor->ReadRange(visitor, uuid, type, start, target->size); | 487 backend_->Execute(operation); |
405 } | 488 } |
406 | 489 |
407 return OrthancPluginErrorCode_Success; | 490 return OrthancPluginErrorCode_Success; |
408 } | 491 } |
409 } | 492 } |
509 else | 592 else |
510 { | 593 { |
511 Visitor visitor(data, size); | 594 Visitor visitor(data, size); |
512 | 595 |
513 { | 596 { |
514 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); | 597 StorageBackend::ReadWholeOperation operation(visitor, uuid, type); |
515 accessor->ReadWhole(visitor, uuid, type); | 598 backend_->Execute(operation); |
516 } | 599 } |
517 | 600 |
518 visitor.Release(); | 601 visitor.Release(); |
519 | 602 |
520 return OrthancPluginErrorCode_Success; | 603 return OrthancPluginErrorCode_Success; |
525 | 608 |
526 | 609 |
527 static OrthancPluginErrorCode StorageRemove(const char* uuid, | 610 static OrthancPluginErrorCode StorageRemove(const char* uuid, |
528 OrthancPluginContentType type) | 611 OrthancPluginContentType type) |
529 { | 612 { |
613 class Operation : public StorageBackend::IDatabaseOperation | |
614 { | |
615 private: | |
616 const char* uuid_; | |
617 OrthancPluginContentType type_; | |
618 | |
619 public: | |
620 Operation(const char* uuid, | |
621 OrthancPluginContentType type) : | |
622 uuid_(uuid), | |
623 type_(type) | |
624 { | |
625 } | |
626 | |
627 virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE | |
628 { | |
629 accessor.Remove(uuid_, type_); | |
630 } | |
631 }; | |
632 | |
633 | |
530 try | 634 try |
531 { | 635 { |
532 if (backend_.get() == NULL) | 636 if (backend_.get() == NULL) |
533 { | 637 { |
534 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); | 638 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); |
535 } | 639 } |
536 else | 640 else |
537 { | 641 { |
538 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); | 642 Operation operation(uuid, type); |
539 accessor->Remove(uuid, type); | 643 backend_->Execute(operation); |
540 return OrthancPluginErrorCode_Success; | 644 return OrthancPluginErrorCode_Success; |
541 } | 645 } |
542 } | 646 } |
543 ORTHANC_PLUGINS_DATABASE_CATCH; | 647 ORTHANC_PLUGINS_DATABASE_CATCH; |
544 } | 648 } |
550 if (context == NULL || | 654 if (context == NULL || |
551 backend == NULL) | 655 backend == NULL) |
552 { | 656 { |
553 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); | 657 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); |
554 } | 658 } |
555 | 659 else if (context_ != NULL || |
556 if (context_ != NULL || | 660 backend_.get() != NULL) |
557 backend_.get() != NULL) | |
558 { | 661 { |
559 // This function can only be invoked once in the plugin | 662 // This function can only be invoked once in the plugin |
560 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); | 663 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); |
561 } | 664 } |
562 else | 665 else |
568 | 671 |
569 #if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) // Macro introduced in Orthanc 1.3.1 | 672 #if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) // Macro introduced in Orthanc 1.3.1 |
570 # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 0) | 673 # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 0) |
571 if (OrthancPluginCheckVersionAdvanced(context, 1, 9, 0) == 1) | 674 if (OrthancPluginCheckVersionAdvanced(context, 1, 9, 0) == 1) |
572 { | 675 { |
676 LOG(WARNING) << "The storage area plugin will retry up to " << backend_->GetMaxRetries() | |
677 << " time(s) in the case of a collision"; | |
678 | |
573 OrthancPluginStorageReadRange readRange = NULL; | 679 OrthancPluginStorageReadRange readRange = NULL; |
574 if (backend_->HasReadRange()) | 680 if (backend_->HasReadRange()) |
575 { | 681 { |
576 readRange = StorageReadRange; | 682 readRange = StorageReadRange; |
577 } | 683 } |
659 if (!visitor.IsSuccess()) | 765 if (!visitor.IsSuccess()) |
660 { | 766 { |
661 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | 767 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); |
662 } | 768 } |
663 } | 769 } |
770 | |
771 | |
772 void StorageBackend::Execute(IDatabaseOperation& operation) | |
773 { | |
774 std::unique_ptr<IAccessor> accessor(CreateAccessor()); | |
775 if (accessor.get() == NULL) | |
776 { | |
777 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); | |
778 } | |
779 | |
780 unsigned int attempt = 0; | |
781 | |
782 for (;;) | |
783 { | |
784 try | |
785 { | |
786 operation.Execute(*accessor); | |
787 return; // Success | |
788 } | |
789 catch (Orthanc::OrthancException& e) | |
790 { | |
791 if (e.GetErrorCode() == Orthanc::ErrorCode_DatabaseCannotSerialize) | |
792 { | |
793 if (attempt >= maxRetries_) | |
794 { | |
795 throw; | |
796 } | |
797 else | |
798 { | |
799 attempt++; | |
800 | |
801 // The "rand()" adds some jitter to de-synchronize writers | |
802 boost::this_thread::sleep(boost::posix_time::milliseconds(100 * attempt + 5 * (rand() % 10))); | |
803 } | |
804 } | |
805 else | |
806 { | |
807 throw; | |
808 } | |
809 } | |
810 } | |
811 } | |
664 } | 812 } |