comparison Framework/Plugins/DatabaseBackendAdapterV3.cpp @ 234:d1b124d116c1

PostgreSQL index plugin handles retries for collisions between multiple writers
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 08 Apr 2021 10:50:01 +0200
parents 7d46c99523a2
children f2b32d31fc99
comparison
equal deleted inserted replaced
233:7d46c99523a2 234:d1b124d116c1
59 59
60 // TODO - TURN THIS INTO A CONNECTION POOL 60 // TODO - TURN THIS INTO A CONNECTION POOL
61 class DatabaseBackendAdapterV3::Adapter : public boost::noncopyable 61 class DatabaseBackendAdapterV3::Adapter : public boost::noncopyable
62 { 62 {
63 private: 63 private:
64 std::unique_ptr<IndexBackend> backend_; 64 class ManagerReference : public Orthanc::IDynamicObject
65 OrthancPluginContext* context_; 65 {
66 boost::mutex managerMutex_; 66 private:
67 std::unique_ptr<DatabaseManager> manager_; 67 DatabaseManager* manager_;
68 68
69 DatabaseManager& GetManager() const 69 public:
70 { 70 ManagerReference(DatabaseManager& manager) :
71 if (manager_.get() == NULL) 71 manager_(&manager)
72 {
73 }
74
75 DatabaseManager& GetManager()
76 {
77 assert(manager_ != NULL);
78 return *manager_;
79 }
80 };
81
82 std::unique_ptr<IndexBackend> backend_;
83 OrthancPluginContext* context_;
84 boost::shared_mutex connectionsMutex_;
85 size_t countConnections_;
86 std::list<DatabaseManager*> connections_;
87 Orthanc::SharedMessageQueue availableConnections_;
88
89 public:
90 Adapter(IndexBackend* backend,
91 size_t countConnections) :
92 backend_(backend),
93 countConnections_(countConnections)
94 {
95 if (countConnections == 0)
96 {
97 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
98 }
99 else if (backend == NULL)
100 {
101 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
102 }
103 else
104 {
105 context_ = backend_->GetContext();
106 }
107 }
108
109 ~Adapter()
110 {
111 for (std::list<DatabaseManager*>::iterator
112 it = connections_.begin(); it != connections_.end(); ++it)
113 {
114 assert(*it != NULL);
115 delete *it;
116 }
117 }
118
119 OrthancPluginContext* GetContext() const
120 {
121 return context_;
122 }
123
124 void OpenConnections()
125 {
126 boost::unique_lock<boost::shared_mutex> lock(connectionsMutex_);
127
128 if (connections_.size() == 0)
129 {
130 assert(backend_.get() != NULL);
131
132 std::unique_ptr<IDatabase> database(backend_->OpenDatabaseConnection());
133 backend_->ConfigureDatabase(*database);
134
135 connections_.push_back(new DatabaseManager(database.release()));
136
137 for (size_t i = 1; i < countConnections_; i++)
138 {
139 connections_.push_back(new DatabaseManager(backend_->OpenDatabaseConnection()));
140 }
141
142 for (std::list<DatabaseManager*>::iterator
143 it = connections_.begin(); it != connections_.end(); ++it)
144 {
145 assert(*it != NULL);
146 availableConnections_.Enqueue(new ManagerReference(**it));
147 }
148 }
149 else
72 { 150 {
73 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); 151 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
74 } 152 }
153 }
154
155 void CloseConnections()
156 {
157 boost::unique_lock<boost::shared_mutex> lock(connectionsMutex_);
158
159 if (connections_.size() != countConnections_)
160 {
161 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
162 }
163 else if (availableConnections_.GetSize() != countConnections_)
164 {
165 throw Orthanc::OrthancException(Orthanc::ErrorCode_Database, "Some connections are still in use, bug in the Orthanc core");
166 }
75 else 167 else
76 { 168 {
77 return *manager_; 169 for (std::list<DatabaseManager*>::iterator
78 } 170 it = connections_.begin(); it != connections_.end(); ++it)
79 } 171 {
80 172 assert(*it != NULL);
81 public: 173 (*it)->Close();
82 Adapter(IndexBackend* backend) : 174 }
83 backend_(backend)
84 {
85 if (backend == NULL)
86 {
87 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
88 }
89 else
90 {
91 context_ = backend_->GetContext();
92 }
93 }
94
95 OrthancPluginContext* GetContext() const
96 {
97 return context_;
98 }
99
100 void OpenConnections()
101 {
102 boost::mutex::scoped_lock lock(managerMutex_);
103
104 if (manager_.get() == NULL)
105 {
106 manager_.reset(IndexBackend::CreateSingleDatabaseManager(*backend_));
107 }
108 else
109 {
110 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
111 }
112 }
113
114 void CloseConnections()
115 {
116 boost::mutex::scoped_lock lock(managerMutex_);
117
118 if (manager_.get() == NULL)
119 {
120 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
121 }
122 else
123 {
124 manager_->Close();
125 manager_.reset(NULL);
126 } 175 }
127 } 176 }
128 177
129 class DatabaseAccessor : public boost::noncopyable 178 class DatabaseAccessor : public boost::noncopyable
130 { 179 {
131 private: 180 private:
132 boost::mutex::scoped_lock lock_; 181 boost::shared_lock<boost::shared_mutex> lock_;
133 IndexBackend& backend_; 182 Adapter& adapter_;
134 DatabaseManager& manager_; 183 DatabaseManager* manager_;
135 184
136 public: 185 public:
137 DatabaseAccessor(Adapter& adapter) : 186 DatabaseAccessor(Adapter& adapter) :
138 lock_(adapter.managerMutex_), 187 lock_(adapter.connectionsMutex_),
139 backend_(*adapter.backend_), 188 adapter_(adapter),
140 manager_(adapter.GetManager()) 189 manager_(NULL)
141 { 190 {
191 for (;;)
192 {
193 std::unique_ptr<Orthanc::IDynamicObject> manager(adapter.availableConnections_.Dequeue(100));
194 if (manager.get() != NULL)
195 {
196 manager_ = &dynamic_cast<ManagerReference&>(*manager).GetManager();
197 return;
198 }
199 }
200 }
201
202 ~DatabaseAccessor()
203 {
204 assert(manager_ != NULL);
205 adapter_.availableConnections_.Enqueue(new ManagerReference(*manager_));
142 } 206 }
143 207
144 IndexBackend& GetBackend() const 208 IndexBackend& GetBackend() const
145 { 209 {
146 return backend_; 210 return *adapter_.backend_;
147 } 211 }
148 212
149 DatabaseManager& GetManager() const 213 DatabaseManager& GetManager() const
150 { 214 {
151 return manager_; 215 assert(manager_ != NULL);
216 return *manager_;
152 } 217 }
153 }; 218 };
154 }; 219 };
155 220
156 221
1860 } 1925 }
1861 ORTHANC_PLUGINS_DATABASE_CATCH(t->GetBackend().GetContext()); 1926 ORTHANC_PLUGINS_DATABASE_CATCH(t->GetBackend().GetContext());
1862 } 1927 }
1863 1928
1864 1929
1865 void DatabaseBackendAdapterV3::Register(IndexBackend* backend) 1930 void DatabaseBackendAdapterV3::Register(IndexBackend* backend,
1931 size_t countConnections,
1932 unsigned int maxDatabaseRetries)
1866 { 1933 {
1867 if (isBackendInUse_) 1934 if (isBackendInUse_)
1868 { 1935 {
1869 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); 1936 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
1870 } 1937 }
1946 params.setProtectedPatient = SetProtectedPatient; 2013 params.setProtectedPatient = SetProtectedPatient;
1947 params.setResourcesContent = SetResourcesContent; 2014 params.setResourcesContent = SetResourcesContent;
1948 2015
1949 OrthancPluginContext* context = backend->GetContext(); 2016 OrthancPluginContext* context = backend->GetContext();
1950 2017
1951 if (OrthancPluginRegisterDatabaseBackendV3(context, &params, sizeof(params), new Adapter(backend)) != OrthancPluginErrorCode_Success) 2018 if (OrthancPluginRegisterDatabaseBackendV3(
2019 context, &params, sizeof(params), maxDatabaseRetries,
2020 new Adapter(backend, countConnections)) != OrthancPluginErrorCode_Success)
1952 { 2021 {
1953 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend"); 2022 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Unable to register the database backend");
1954 } 2023 }
1955 2024
1956 backend->SetOutputFactory(new Factory); 2025 backend->SetOutputFactory(new Factory);