comparison Framework/Plugins/StorageBackend.cpp @ 269:567761f0c1ea

fix issue #151: support of retries in the storage area plugins to deal with multiple writers
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 21 Apr 2021 17:54:31 +0200
parents cd73e34d5411
children 5931c2ff22ca
comparison
equal deleted inserted replaced
268:9b003f265a8f 269:567761f0c1ea
30 30
31 #include <Compatibility.h> // For std::unique_ptr<> 31 #include <Compatibility.h> // For std::unique_ptr<>
32 #include <Logging.h> 32 #include <Logging.h>
33 #include <OrthancException.h> 33 #include <OrthancException.h>
34 34
35 #include <boost/thread.hpp>
35 #include <cassert> 36 #include <cassert>
36 #include <limits> 37 #include <limits>
37 38
38 39
39 #define ORTHANC_PLUGINS_DATABASE_CATCH \ 40 #define ORTHANC_PLUGINS_DATABASE_CATCH \
54 } 55 }
55 56
56 57
57 namespace OrthancDatabases 58 namespace OrthancDatabases
58 { 59 {
59 StorageBackend::StorageBackend(IDatabaseFactory* factory) : 60 class StorageBackend::ReadWholeOperation : public StorageBackend::IDatabaseOperation
60 manager_(factory) 61 {
62 private:
63 IFileContentVisitor& visitor_;
64 const char* uuid_;
65 OrthancPluginContentType type_;
66
67 public:
68 ReadWholeOperation(IFileContentVisitor& visitor,
69 const char* uuid,
70 OrthancPluginContentType type) :
71 visitor_(visitor),
72 uuid_(uuid),
73 type_(type)
74 {
75 }
76
77 virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
78 {
79 accessor.ReadWhole(visitor_, uuid_, type_);
80 }
81 };
82
83
84 StorageBackend::StorageBackend(IDatabaseFactory* factory,
85 unsigned int maxRetries) :
86 manager_(factory),
87 maxRetries_(maxRetries)
61 { 88 {
62 } 89 }
63 90
64 void StorageBackend::AccessorBase::Create(const std::string& uuid, 91 void StorageBackend::AccessorBase::Create(const std::string& uuid,
65 const void* content, 92 const void* content,
231 statement.Execute(args); 258 statement.Execute(args);
232 } 259 }
233 260
234 transaction.Commit(); 261 transaction.Commit();
235 } 262 }
236 263
237
238 264
239 static OrthancPluginContext* context_ = NULL; 265 static OrthancPluginContext* context_ = NULL;
240 static std::unique_ptr<StorageBackend> backend_; 266 static std::unique_ptr<StorageBackend> backend_;
241 267
242 268
243 static OrthancPluginErrorCode StorageCreate(const char* uuid, 269 static OrthancPluginErrorCode StorageCreate(const char* uuid,
244 const void* content, 270 const void* content,
245 int64_t size, 271 int64_t size,
246 OrthancPluginContentType type) 272 OrthancPluginContentType type)
247 { 273 {
274 class Operation : public StorageBackend::IDatabaseOperation
275 {
276 private:
277 const char* uuid_;
278 const void* content_;
279 int64_t size_;
280 OrthancPluginContentType type_;
281
282 public:
283 Operation(const char* uuid,
284 const void* content,
285 int64_t size,
286 OrthancPluginContentType type) :
287 uuid_(uuid),
288 content_(content),
289 size_(size),
290 type_(type)
291 {
292 }
293
294 virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
295 {
296 accessor.Create(uuid_, content_, size_, type_);
297 }
298 };
299
300
248 try 301 try
249 { 302 {
250 if (backend_.get() == NULL) 303 if (backend_.get() == NULL)
251 { 304 {
252 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); 305 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
253 } 306 }
254 else 307 else
255 { 308 {
256 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); 309 Operation operation(uuid, content, size, type);
257 accessor->Create(uuid, content, static_cast<size_t>(size), type); 310 backend_->Execute(operation);
258 return OrthancPluginErrorCode_Success; 311 return OrthancPluginErrorCode_Success;
259 } 312 }
260 } 313 }
261 ORTHANC_PLUGINS_DATABASE_CATCH; 314 ORTHANC_PLUGINS_DATABASE_CATCH;
262 } 315 }
325 else 378 else
326 { 379 {
327 Visitor visitor(target); 380 Visitor visitor(target);
328 381
329 { 382 {
330 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); 383 StorageBackend::ReadWholeOperation operation(visitor, uuid, type);
331 accessor->ReadWhole(visitor, uuid, type); 384 backend_->Execute(operation);
332 } 385 }
333 386
334 return OrthancPluginErrorCode_Success; 387 return OrthancPluginErrorCode_Success;
335 } 388 }
336 } 389 }
387 } 440 }
388 } 441 }
389 }; 442 };
390 443
391 444
445 class Operation : public StorageBackend::IDatabaseOperation
446 {
447 private:
448 Visitor& visitor_;
449 const char* uuid_;
450 OrthancPluginContentType type_;
451 uint64_t start_;
452 size_t length_;
453
454 public:
455 Operation(Visitor& visitor,
456 const char* uuid,
457 OrthancPluginContentType type,
458 uint64_t start,
459 size_t length) :
460 visitor_(visitor),
461 uuid_(uuid),
462 type_(type),
463 start_(start),
464 length_(length)
465 {
466 }
467
468 virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
469 {
470 accessor.ReadRange(visitor_, uuid_, type_, start_, length_);
471 }
472 };
473
474
392 try 475 try
393 { 476 {
394 if (backend_.get() == NULL) 477 if (backend_.get() == NULL)
395 { 478 {
396 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); 479 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
398 else 481 else
399 { 482 {
400 Visitor visitor(target); 483 Visitor visitor(target);
401 484
402 { 485 {
403 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); 486 Operation operation(visitor, uuid, type, start, target->size);
404 accessor->ReadRange(visitor, uuid, type, start, target->size); 487 backend_->Execute(operation);
405 } 488 }
406 489
407 return OrthancPluginErrorCode_Success; 490 return OrthancPluginErrorCode_Success;
408 } 491 }
409 } 492 }
509 else 592 else
510 { 593 {
511 Visitor visitor(data, size); 594 Visitor visitor(data, size);
512 595
513 { 596 {
514 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); 597 StorageBackend::ReadWholeOperation operation(visitor, uuid, type);
515 accessor->ReadWhole(visitor, uuid, type); 598 backend_->Execute(operation);
516 } 599 }
517 600
518 visitor.Release(); 601 visitor.Release();
519 602
520 return OrthancPluginErrorCode_Success; 603 return OrthancPluginErrorCode_Success;
525 608
526 609
527 static OrthancPluginErrorCode StorageRemove(const char* uuid, 610 static OrthancPluginErrorCode StorageRemove(const char* uuid,
528 OrthancPluginContentType type) 611 OrthancPluginContentType type)
529 { 612 {
613 class Operation : public StorageBackend::IDatabaseOperation
614 {
615 private:
616 const char* uuid_;
617 OrthancPluginContentType type_;
618
619 public:
620 Operation(const char* uuid,
621 OrthancPluginContentType type) :
622 uuid_(uuid),
623 type_(type)
624 {
625 }
626
627 virtual void Execute(StorageBackend::IAccessor& accessor) ORTHANC_OVERRIDE
628 {
629 accessor.Remove(uuid_, type_);
630 }
631 };
632
633
530 try 634 try
531 { 635 {
532 if (backend_.get() == NULL) 636 if (backend_.get() == NULL)
533 { 637 {
534 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); 638 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
535 } 639 }
536 else 640 else
537 { 641 {
538 std::unique_ptr<StorageBackend::IAccessor> accessor(backend_->CreateAccessor()); 642 Operation operation(uuid, type);
539 accessor->Remove(uuid, type); 643 backend_->Execute(operation);
540 return OrthancPluginErrorCode_Success; 644 return OrthancPluginErrorCode_Success;
541 } 645 }
542 } 646 }
543 ORTHANC_PLUGINS_DATABASE_CATCH; 647 ORTHANC_PLUGINS_DATABASE_CATCH;
544 } 648 }
550 if (context == NULL || 654 if (context == NULL ||
551 backend == NULL) 655 backend == NULL)
552 { 656 {
553 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); 657 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
554 } 658 }
555 659 else if (context_ != NULL ||
556 if (context_ != NULL || 660 backend_.get() != NULL)
557 backend_.get() != NULL)
558 { 661 {
559 // This function can only be invoked once in the plugin 662 // This function can only be invoked once in the plugin
560 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); 663 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
561 } 664 }
562 else 665 else
568 671
569 #if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) // Macro introduced in Orthanc 1.3.1 672 #if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) // Macro introduced in Orthanc 1.3.1
570 # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 0) 673 # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 0)
571 if (OrthancPluginCheckVersionAdvanced(context, 1, 9, 0) == 1) 674 if (OrthancPluginCheckVersionAdvanced(context, 1, 9, 0) == 1)
572 { 675 {
676 LOG(WARNING) << "The storage area plugin will retry up to " << backend_->GetMaxRetries()
677 << " time(s) in the case of a collision";
678
573 OrthancPluginStorageReadRange readRange = NULL; 679 OrthancPluginStorageReadRange readRange = NULL;
574 if (backend_->HasReadRange()) 680 if (backend_->HasReadRange())
575 { 681 {
576 readRange = StorageReadRange; 682 readRange = StorageReadRange;
577 } 683 }
659 if (!visitor.IsSuccess()) 765 if (!visitor.IsSuccess())
660 { 766 {
661 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); 767 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
662 } 768 }
663 } 769 }
770
771
772 void StorageBackend::Execute(IDatabaseOperation& operation)
773 {
774 std::unique_ptr<IAccessor> accessor(CreateAccessor());
775 if (accessor.get() == NULL)
776 {
777 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
778 }
779
780 unsigned int attempt = 0;
781
782 for (;;)
783 {
784 try
785 {
786 operation.Execute(*accessor);
787 return; // Success
788 }
789 catch (Orthanc::OrthancException& e)
790 {
791 if (e.GetErrorCode() == Orthanc::ErrorCode_DatabaseCannotSerialize)
792 {
793 if (attempt >= maxRetries_)
794 {
795 throw;
796 }
797 else
798 {
799 attempt++;
800
801 // The "rand()" adds some jitter to de-synchronize writers
802 boost::this_thread::sleep(boost::posix_time::milliseconds(100 * attempt + 5 * (rand() % 10)));
803 }
804 }
805 else
806 {
807 throw;
808 }
809 }
810 }
811 }
664 } 812 }