comparison Framework/Plugins/DatabaseBackendAdapterV3.cpp @ 417:15bfd9a76f8d pg-transactions

merge
author Alain Mazy <am@osimis.io>
date Fri, 23 Jun 2023 14:26:58 +0200
parents 91124cc8a8c7
children ecd0b719cff5
comparison
equal deleted inserted replaced
370:d2b5d9c92214 417:15bfd9a76f8d
1 /** 1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store 2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics 3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium 4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2022 Osimis S.A., Belgium 5 * Copyright (C) 2017-2023 Osimis S.A., Belgium
6 * Copyright (C) 2021-2022 Sebastien Jodogne, ICTEAM UCLouvain, Belgium 6 * Copyright (C) 2021-2023 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
7 * 7 *
8 * This program is free software: you can redistribute it and/or 8 * This program is free software: you can redistribute it and/or
9 * modify it under the terms of the GNU Affero General Public License 9 * modify it under the terms of the GNU Affero General Public License
10 * as published by the Free Software Foundation, either version 3 of 10 * as published by the Free Software Foundation, either version 3 of
11 * the License, or (at your option) any later version. 11 * the License, or (at your option) any later version.
23 #include "DatabaseBackendAdapterV3.h" 23 #include "DatabaseBackendAdapterV3.h"
24 24
25 #if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) // Macro introduced in Orthanc 1.3.1 25 #if defined(ORTHANC_PLUGINS_VERSION_IS_ABOVE) // Macro introduced in Orthanc 1.3.1
26 # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 2) 26 # if ORTHANC_PLUGINS_VERSION_IS_ABOVE(1, 9, 2)
27 27
28 #include "IndexConnectionsPool.h"
29
28 #include <Logging.h> 30 #include <Logging.h>
29 #include <MultiThreading/SharedMessageQueue.h>
30 #include <OrthancException.h> 31 #include <OrthancException.h>
31 32
32 #include <stdexcept> 33 #include <stdexcept>
33 #include <list> 34 #include <list>
34 #include <string> 35 #include <string>
80 target.push_back(*it); 81 target.push_back(*it);
81 } 82 }
82 } 83 }
83 84
84 85
85 class DatabaseBackendAdapterV3::Adapter : public boost::noncopyable
86 {
87 private:
88 class ManagerReference : public Orthanc::IDynamicObject
89 {
90 private:
91 DatabaseManager* manager_;
92
93 public:
94 ManagerReference(DatabaseManager& manager) :
95 manager_(&manager)
96 {
97 }
98
99 DatabaseManager& GetManager()
100 {
101 assert(manager_ != NULL);
102 return *manager_;
103 }
104 };
105
106 std::unique_ptr<IndexBackend> backend_;
107 OrthancPluginContext* context_;
108 boost::shared_mutex connectionsMutex_;
109 size_t countConnections_;
110 std::list<DatabaseManager*> connections_;
111 Orthanc::SharedMessageQueue availableConnections_;
112
113 public:
114 Adapter(IndexBackend* backend,
115 size_t countConnections) :
116 backend_(backend),
117 countConnections_(countConnections)
118 {
119 if (countConnections == 0)
120 {
121 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange,
122 "There must be a non-zero number of connections to the database");
123 }
124 else if (backend == NULL)
125 {
126 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
127 }
128 else
129 {
130 context_ = backend_->GetContext();
131 }
132 }
133
134 ~Adapter()
135 {
136 for (std::list<DatabaseManager*>::iterator
137 it = connections_.begin(); it != connections_.end(); ++it)
138 {
139 assert(*it != NULL);
140 delete *it;
141 }
142 }
143
144 OrthancPluginContext* GetContext() const
145 {
146 return context_;
147 }
148
149 void OpenConnections()
150 {
151 boost::unique_lock<boost::shared_mutex> lock(connectionsMutex_);
152
153 if (connections_.size() == 0)
154 {
155 assert(backend_.get() != NULL);
156
157 {
158 std::unique_ptr<DatabaseManager> manager(new DatabaseManager(backend_->CreateDatabaseFactory()));
159 manager->GetDatabase(); // Make sure to open the database connection
160
161 backend_->ConfigureDatabase(*manager);
162 connections_.push_back(manager.release());
163 }
164
165 for (size_t i = 1; i < countConnections_; i++)
166 {
167 connections_.push_back(new DatabaseManager(backend_->CreateDatabaseFactory()));
168 connections_.back()->GetDatabase(); // Make sure to open the database connection
169 }
170
171 for (std::list<DatabaseManager*>::iterator
172 it = connections_.begin(); it != connections_.end(); ++it)
173 {
174 assert(*it != NULL);
175 availableConnections_.Enqueue(new ManagerReference(**it));
176 }
177 }
178 else
179 {
180 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
181 }
182 }
183
184 void CloseConnections()
185 {
186 boost::unique_lock<boost::shared_mutex> lock(connectionsMutex_);
187
188 if (connections_.size() != countConnections_)
189 {
190 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
191 }
192 else if (availableConnections_.GetSize() != countConnections_)
193 {
194 throw Orthanc::OrthancException(Orthanc::ErrorCode_Database, "Some connections are still in use, bug in the Orthanc core");
195 }
196 else
197 {
198 for (std::list<DatabaseManager*>::iterator
199 it = connections_.begin(); it != connections_.end(); ++it)
200 {
201 assert(*it != NULL);
202 (*it)->Close();
203 }
204 }
205 }
206
207 class DatabaseAccessor : public boost::noncopyable
208 {
209 private:
210 boost::shared_lock<boost::shared_mutex> lock_;
211 Adapter& adapter_;
212 DatabaseManager* manager_;
213
214 public:
215 DatabaseAccessor(Adapter& adapter) :
216 lock_(adapter.connectionsMutex_),
217 adapter_(adapter),
218 manager_(NULL)
219 {
220 for (;;)
221 {
222 std::unique_ptr<Orthanc::IDynamicObject> manager(adapter.availableConnections_.Dequeue(100));
223 if (manager.get() != NULL)
224 {
225 manager_ = &dynamic_cast<ManagerReference&>(*manager).GetManager();
226 return;
227 }
228 }
229 }
230
231 ~DatabaseAccessor()
232 {
233 assert(manager_ != NULL);
234 adapter_.availableConnections_.Enqueue(new ManagerReference(*manager_));
235 }
236
237 IndexBackend& GetBackend() const
238 {
239 return *adapter_.backend_;
240 }
241
242 DatabaseManager& GetManager() const
243 {
244 assert(manager_ != NULL);
245 return *manager_;
246 }
247 };
248 };
249
250
251 class DatabaseBackendAdapterV3::Output : public IDatabaseBackendOutput 86 class DatabaseBackendAdapterV3::Output : public IDatabaseBackendOutput
252 { 87 {
253 private: 88 private:
254 struct Metadata 89 struct Metadata
255 { 90 {
800 635
801 636
802 class DatabaseBackendAdapterV3::Transaction : public boost::noncopyable 637 class DatabaseBackendAdapterV3::Transaction : public boost::noncopyable
803 { 638 {
804 private: 639 private:
805 Adapter& adapter_; 640 IndexConnectionsPool& pool_;
806 std::unique_ptr<Adapter::DatabaseAccessor> accessor_; 641 std::unique_ptr<IndexConnectionsPool::Accessor> accessor_;
807 std::unique_ptr<Output> output_; 642 std::unique_ptr<Output> output_;
808 643
809 public: 644 public:
810 Transaction(Adapter& adapter) : 645 Transaction(IndexConnectionsPool& pool) :
811 adapter_(adapter), 646 pool_(pool),
812 accessor_(new Adapter::DatabaseAccessor(adapter)), 647 accessor_(new IndexConnectionsPool::Accessor(pool)),
813 output_(new Output) 648 output_(new Output)
814 { 649 {
815 } 650 }
816 651
817 ~Transaction() 652 ~Transaction()
959 } 794 }
960 795
961 796
962 static OrthancPluginErrorCode Open(void* database) 797 static OrthancPluginErrorCode Open(void* database)
963 { 798 {
964 DatabaseBackendAdapterV3::Adapter* adapter = reinterpret_cast<DatabaseBackendAdapterV3::Adapter*>(database); 799 IndexConnectionsPool* pool = reinterpret_cast<IndexConnectionsPool*>(database);
965 800
966 try 801 try
967 { 802 {
968 adapter->OpenConnections(); 803 std::list<IdentifierTag> identifierTags;
969 return OrthancPluginErrorCode_Success; 804 pool->OpenConnections(false, identifierTags);
970 } 805 return OrthancPluginErrorCode_Success;
971 ORTHANC_PLUGINS_DATABASE_CATCH(adapter->GetContext()); 806 }
807 ORTHANC_PLUGINS_DATABASE_CATCH(pool->GetContext());
972 } 808 }
973 809
974 810
975 static OrthancPluginErrorCode Close(void* database) 811 static OrthancPluginErrorCode Close(void* database)
976 { 812 {
977 DatabaseBackendAdapterV3::Adapter* adapter = reinterpret_cast<DatabaseBackendAdapterV3::Adapter*>(database); 813 IndexConnectionsPool* pool = reinterpret_cast<IndexConnectionsPool*>(database);
978 814
979 try 815 try
980 { 816 {
981 adapter->CloseConnections(); 817 pool->CloseConnections();
982 return OrthancPluginErrorCode_Success; 818 return OrthancPluginErrorCode_Success;
983 } 819 }
984 ORTHANC_PLUGINS_DATABASE_CATCH(adapter->GetContext()); 820 ORTHANC_PLUGINS_DATABASE_CATCH(pool->GetContext());
985 } 821 }
986 822
987 823
988 static OrthancPluginErrorCode DestructDatabase(void* database) 824 static OrthancPluginErrorCode DestructDatabase(void* database)
989 { 825 {
990 DatabaseBackendAdapterV3::Adapter* adapter = reinterpret_cast<DatabaseBackendAdapterV3::Adapter*>(database); 826 IndexConnectionsPool* pool = reinterpret_cast<IndexConnectionsPool*>(database);
991 827
992 if (adapter == NULL) 828 if (pool == NULL)
993 { 829 {
994 return OrthancPluginErrorCode_InternalError; 830 return OrthancPluginErrorCode_InternalError;
995 } 831 }
996 else 832 else
997 { 833 {
999 { 835 {
1000 isBackendInUse_ = false; 836 isBackendInUse_ = false;
1001 } 837 }
1002 else 838 else
1003 { 839 {
1004 OrthancPluginLogError(adapter->GetContext(), "More than one index backend was registered, internal error"); 840 OrthancPluginLogError(pool->GetContext(), "More than one index backend was registered, internal error");
1005 } 841 }
1006 842
1007 delete adapter; 843 delete pool;
1008 844
1009 return OrthancPluginErrorCode_Success; 845 return OrthancPluginErrorCode_Success;
1010 } 846 }
1011 } 847 }
1012 848
1013 849
1014 static OrthancPluginErrorCode GetDatabaseVersion(void* database, 850 static OrthancPluginErrorCode GetDatabaseVersion(void* database,
1015 uint32_t* version) 851 uint32_t* version)
1016 { 852 {
1017 DatabaseBackendAdapterV3::Adapter* adapter = reinterpret_cast<DatabaseBackendAdapterV3::Adapter*>(database); 853 IndexConnectionsPool* pool = reinterpret_cast<IndexConnectionsPool*>(database);
1018 854
1019 try 855 try
1020 { 856 {
1021 DatabaseBackendAdapterV3::Adapter::DatabaseAccessor accessor(*adapter); 857 IndexConnectionsPool::Accessor accessor(*pool);
1022 *version = accessor.GetBackend().GetDatabaseVersion(accessor.GetManager()); 858 *version = accessor.GetBackend().GetDatabaseVersion(accessor.GetManager());
1023 return OrthancPluginErrorCode_Success; 859 return OrthancPluginErrorCode_Success;
1024 } 860 }
1025 ORTHANC_PLUGINS_DATABASE_CATCH(adapter->GetContext()); 861 ORTHANC_PLUGINS_DATABASE_CATCH(pool->GetContext());
1026 } 862 }
1027 863
1028 864
1029 static OrthancPluginErrorCode UpgradeDatabase(void* database, 865 static OrthancPluginErrorCode UpgradeDatabase(void* database,
1030 OrthancPluginStorageArea* storageArea, 866 OrthancPluginStorageArea* storageArea,
1031 uint32_t targetVersion) 867 uint32_t targetVersion)
1032 { 868 {
1033 DatabaseBackendAdapterV3::Adapter* adapter = reinterpret_cast<DatabaseBackendAdapterV3::Adapter*>(database); 869 IndexConnectionsPool* pool = reinterpret_cast<IndexConnectionsPool*>(database);
1034 870
1035 try 871 try
1036 { 872 {
1037 DatabaseBackendAdapterV3::Adapter::DatabaseAccessor accessor(*adapter); 873 IndexConnectionsPool::Accessor accessor(*pool);
1038 accessor.GetBackend().UpgradeDatabase(accessor.GetManager(), targetVersion, storageArea); 874 accessor.GetBackend().UpgradeDatabase(accessor.GetManager(), targetVersion, storageArea);
1039 return OrthancPluginErrorCode_Success; 875 return OrthancPluginErrorCode_Success;
1040 } 876 }
1041 ORTHANC_PLUGINS_DATABASE_CATCH(adapter->GetContext()); 877 ORTHANC_PLUGINS_DATABASE_CATCH(pool->GetContext());
1042 } 878 }
1043 879
1044 880
1045 static OrthancPluginErrorCode HasRevisionsSupport(void* database, 881 static OrthancPluginErrorCode HasRevisionsSupport(void* database,
1046 uint8_t* target) 882 uint8_t* target)
1047 { 883 {
1048 DatabaseBackendAdapterV3::Adapter* adapter = reinterpret_cast<DatabaseBackendAdapterV3::Adapter*>(database); 884 IndexConnectionsPool* pool = reinterpret_cast<IndexConnectionsPool*>(database);
1049 885
1050 try 886 try
1051 { 887 {
1052 DatabaseBackendAdapterV3::Adapter::DatabaseAccessor accessor(*adapter); 888 IndexConnectionsPool::Accessor accessor(*pool);
1053 *target = (accessor.GetBackend().HasRevisionsSupport() ? 1 : 0); 889 *target = (accessor.GetBackend().HasRevisionsSupport() ? 1 : 0);
1054 return OrthancPluginErrorCode_Success; 890 return OrthancPluginErrorCode_Success;
1055 } 891 }
1056 ORTHANC_PLUGINS_DATABASE_CATCH(adapter->GetContext()); 892 ORTHANC_PLUGINS_DATABASE_CATCH(pool->GetContext());
1057 } 893 }
1058 894
1059 895
1060 static OrthancPluginErrorCode StartTransaction(void* database, 896 static OrthancPluginErrorCode StartTransaction(void* database,
1061 OrthancPluginDatabaseTransaction** target /* out */, 897 OrthancPluginDatabaseTransaction** target /* out */,
1062 OrthancPluginDatabaseTransactionType type) 898 OrthancPluginDatabaseTransactionType type)
1063 { 899 {
1064 DatabaseBackendAdapterV3::Adapter* adapter = reinterpret_cast<DatabaseBackendAdapterV3::Adapter*>(database); 900 IndexConnectionsPool* pool = reinterpret_cast<IndexConnectionsPool*>(database);
1065 901
1066 try 902 try
1067 { 903 {
1068 std::unique_ptr<DatabaseBackendAdapterV3::Transaction> transaction(new DatabaseBackendAdapterV3::Transaction(*adapter)); 904 std::unique_ptr<DatabaseBackendAdapterV3::Transaction> transaction(new DatabaseBackendAdapterV3::Transaction(*pool));
1069 905
1070 switch (type) 906 switch (type)
1071 { 907 {
1072 case OrthancPluginDatabaseTransactionType_ReadOnly: 908 case OrthancPluginDatabaseTransactionType_ReadOnly:
1073 transaction->GetManager().StartTransaction(TransactionType_ReadOnly); 909 transaction->GetManager().StartTransaction(TransactionType_ReadOnly);
1083 919
1084 *target = reinterpret_cast<OrthancPluginDatabaseTransaction*>(transaction.release()); 920 *target = reinterpret_cast<OrthancPluginDatabaseTransaction*>(transaction.release());
1085 921
1086 return OrthancPluginErrorCode_Success; 922 return OrthancPluginErrorCode_Success;
1087 } 923 }
1088 ORTHANC_PLUGINS_DATABASE_CATCH(adapter->GetContext()); 924 ORTHANC_PLUGINS_DATABASE_CATCH(pool->GetContext());
1089 } 925 }
1090 926
1091 927
1092 static OrthancPluginErrorCode DestructTransaction(OrthancPluginDatabaseTransaction* transaction) 928 static OrthancPluginErrorCode DestructTransaction(OrthancPluginDatabaseTransaction* transaction)
1093 { 929 {
1664 { 1500 {
1665 DatabaseBackendAdapterV3::Transaction* t = reinterpret_cast<DatabaseBackendAdapterV3::Transaction*>(transaction); 1501 DatabaseBackendAdapterV3::Transaction* t = reinterpret_cast<DatabaseBackendAdapterV3::Transaction*>(transaction);
1666 1502
1667 try 1503 try
1668 { 1504 {
1669 OrthancPluginExportedResource exported; 1505 t->GetOutput().Clear();
1670 exported.seq = 0; 1506 t->GetBackend().LogExportedResource(t->GetManager(), resourceType, publicId, modality, date,
1671 exported.resourceType = resourceType; 1507 patientId, studyInstanceUid, seriesInstanceUid, sopInstanceUid);
1672 exported.publicId = publicId;
1673 exported.modality = modality;
1674 exported.date = date;
1675 exported.patientId = patientId;
1676 exported.studyInstanceUid = studyInstanceUid;
1677 exported.seriesInstanceUid = seriesInstanceUid;
1678 exported.sopInstanceUid = sopInstanceUid;
1679
1680 t->GetOutput().Clear();
1681 t->GetBackend().LogExportedResource(t->GetManager(), exported);
1682 return OrthancPluginErrorCode_Success; 1508 return OrthancPluginErrorCode_Success;
1683 } 1509 }
1684 ORTHANC_PLUGINS_DATABASE_CATCH(t->GetBackend().GetContext()); 1510 ORTHANC_PLUGINS_DATABASE_CATCH(t->GetBackend().GetContext());
1685 } 1511 }
1686 1512
1819 for (uint32_t i = 0; i < constraintsCount; i++) 1645 for (uint32_t i = 0; i < constraintsCount; i++)
1820 { 1646 {
1821 lookup.push_back(Orthanc::DatabaseConstraint(constraints[i])); 1647 lookup.push_back(Orthanc::DatabaseConstraint(constraints[i]));
1822 } 1648 }
1823 1649
1824 t->GetBackend().LookupResources(t->GetOutput(), t->GetManager(), lookup, queryLevel, limit, (requestSomeInstanceId != 0)); 1650 std::set<std::string> noLabel;
1651 t->GetBackend().LookupResources(t->GetOutput(), t->GetManager(), lookup, queryLevel, noLabel,
1652 Orthanc::LabelsConstraint_All, limit, (requestSomeInstanceId != 0));
1825 return OrthancPluginErrorCode_Success; 1653 return OrthancPluginErrorCode_Success;
1826 } 1654 }
1827 ORTHANC_PLUGINS_DATABASE_CATCH(t->GetBackend().GetContext()); 1655 ORTHANC_PLUGINS_DATABASE_CATCH(t->GetBackend().GetContext());
1828 } 1656 }
1829 1657
1986 1814
1987 void DatabaseBackendAdapterV3::Register(IndexBackend* backend, 1815 void DatabaseBackendAdapterV3::Register(IndexBackend* backend,
1988 size_t countConnections, 1816 size_t countConnections,
1989 unsigned int maxDatabaseRetries) 1817 unsigned int maxDatabaseRetries)
1990 { 1818 {
1819 std::unique_ptr<IndexBackend> protection(backend);
1820
1991 if (isBackendInUse_) 1821 if (isBackendInUse_)
1992 { 1822 {
1993 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); 1823 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
1994 } 1824 }
1995 1825
2069 params.setGlobalProperty = SetGlobalProperty; 1899 params.setGlobalProperty = SetGlobalProperty;
2070 params.setMetadata = SetMetadata; 1900 params.setMetadata = SetMetadata;
2071 params.setProtectedPatient = SetProtectedPatient; 1901 params.setProtectedPatient = SetProtectedPatient;
2072 params.setResourcesContent = SetResourcesContent; 1902 params.setResourcesContent = SetResourcesContent;
2073 1903
2074 OrthancPluginContext* context = backend->GetContext(); 1904 OrthancPluginContext* context = protection->GetContext();
2075 1905
2076 if (OrthancPluginRegisterDatabaseBackendV3( 1906 if (OrthancPluginRegisterDatabaseBackendV3(
2077 context, &params, sizeof(params), maxDatabaseRetries, 1907 context, &params, sizeof(params), maxDatabaseRetries,
2078 new Adapter(backend, countConnections)) != OrthancPluginErrorCode_Success) 1908 new IndexConnectionsPool(protection.release(), countConnections)) != OrthancPluginErrorCode_Success)
2079 { 1909 {
1910 delete backend;
2080 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend"); 1911 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend");
2081 } 1912 }
2082 1913
2083 backend->SetOutputFactory(new Factory); 1914 backend->SetOutputFactory(new Factory);
2084 1915