Mercurial > hg > orthanc
view OrthancFramework/Sources/DataSource/DataSourceReader.cpp @ 6939:ae404101b2ae streaming tip
renamed DicomDataSource threads to differentiate them from the current DICOM network threads
| author | Alain Mazy <am@orthanc.team> |
|---|---|
| date | Tue, 09 Jun 2026 09:53:51 +0200 |
| parents | d602f35554c5 |
| children |
line wrap: on
line source
/** * Orthanc - A Lightweight, RESTful DICOM Store * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics * Department, University Hospital of Liege, Belgium * Copyright (C) 2017-2023 Osimis S.A., Belgium * Copyright (C) 2024-2026 Orthanc Team SRL, Belgium * Copyright (C) 2021-2026 Sebastien Jodogne, ICTEAM UCLouvain, Belgium * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this program. If not, see * <http://www.gnu.org/licenses/>. **/ #include "../PrecompiledHeaders.h" #include "DataSourceReader.h" #include "../Cache/SharedObjectCache.h" #include "../Constants.h" #include "../MetricsRegistry.h" #include "../OrthancException.h" #include "DataSourceMemoryBudget.h" #include <boost/make_shared.hpp> #include <boost/weak_ptr.hpp> #include <cassert> namespace Orthanc { DataSourceReader::MetricsConfiguration::MetricsConfiguration(const boost::shared_ptr<MetricsRegistry>& metrics, const std::string& cacheSizeMegabytesName, const std::string& cacheCountName, const std::string& cacheHitCountName, const std::string& cacheMissCountName) { if (metrics.get() == NULL) { throw OrthancException(ErrorCode_NullPointer); } else if (cacheSizeMegabytesName.empty() || cacheCountName.empty() || cacheHitCountName.empty() || cacheMissCountName.empty()) { throw OrthancException(ErrorCode_ParameterOutOfRange); } else { metrics_ = metrics; cacheSizeMegabytesName_ = cacheSizeMegabytesName; cacheCountName_ = cacheCountName; cacheHitCountName_ = cacheHitCountName; cacheMissCountName_ = cacheMissCountName; metrics_->SetFloatValue(cacheSizeMegabytesName_, 0); metrics_->SetIntegerValue(cacheCountName_, 0); metrics_->SetIntegerValue(cacheHitCountName_, 0); metrics_->SetIntegerValue(cacheMissCountName_, 0); } } void DataSourceReader::MetricsConfiguration::SetCacheStatistics(SharedObjectCache& cache) { if (metrics_) { size_t count, size; cache.GetStatistics(count, size); metrics_->SetFloatValue(cacheSizeMegabytesName_, static_cast<float>(size) / static_cast<float>(MEGABYTE)); metrics_->SetIntegerValue(cacheCountName_, count); } } void DataSourceReader::MetricsConfiguration::IncrementCacheHitCount() { if (metrics_) { metrics_->IncrementIntegerValue(cacheHitCountName_, 1); } } void DataSourceReader::MetricsConfiguration::IncrementCacheMissCount() { if (metrics_) { metrics_->IncrementIntegerValue(cacheMissCountName_, 1); } } class DataSourceReader::DataSourceRunnable : public IRunnable { private: boost::weak_ptr<DataSourceAnswer> answer_; IDataSource& source_; std::unique_ptr<IDataIdentifier> id_; boost::shared_ptr<SharedObjectCache> cache_; boost::shared_ptr<Internals::DataSourceMemoryBudget> budget_; MetricsConfiguration metricsConfiguration_; public: DataSourceRunnable(const boost::shared_ptr<DataSourceAnswer>& answer, IDataSource& source, IDataIdentifier* id, boost::shared_ptr<SharedObjectCache>& cache, boost::shared_ptr<Internals::DataSourceMemoryBudget>& budget, const MetricsConfiguration& metrics) : answer_(answer), source_(source), id_(id), cache_(cache), budget_(budget), metricsConfiguration_(metrics) { if (id == NULL || answer.get() == NULL) { throw OrthancException(ErrorCode_NullPointer); } } virtual void Run() ORTHANC_OVERRIDE { // Phase 1: Make sure the target answer is still alive before doing unnecessary work. { boost::shared_ptr<DataSourceAnswer> lock = answer_.lock(); if (!lock) { // The answer was abandonned, give up the request return; } } // Phase 2: Do all the work WITHOUT holding a strong reference to "DataSourceAnswer". boost::shared_ptr<IDynamicObject> value; std::unique_ptr<OrthancException> error; size_t size = 0; try { std::string cacheKey; bool hasCacheKey = id_->GetCacheKey(cacheKey); if (cache_ && hasCacheKey) { value = cache_->GetCachedValue(cacheKey); if (value) { metricsConfiguration_.IncrementCacheHitCount(); } else { metricsConfiguration_.IncrementCacheMissCount(); } } if (!value) { std::unique_ptr<Internals::DataSourceMemoryBudget::Lock> preReservation; size_t estimatedSize = 0; if (id_->EstimateValueSize(estimatedSize)) { preReservation.reset(new Internals::DataSourceMemoryBudget::Lock(*budget_, estimatedSize)); } value.reset(source_.Load(*id_, cache_)); if (!value) { error.reset(new OrthancException(ErrorCode_NullPointer)); } if (!error && cache_ && hasCacheKey) { cache_->Store(cacheKey, value, source_.GetValueSize(*value)); metricsConfiguration_.SetCacheStatistics(*cache_); } } if (!error) { size = source_.GetValueSize(*value); } } catch (OrthancException& e) { error.reset(new OrthancException(e)); } catch (...) { error.reset(new OrthancException(ErrorCode_InternalError, "Unknown exception in Datasource::Run")); } // Phase 3: Acquire budget WITHOUT holding a strong reference to "DataSourceAnswer". // If the "DataSourceAnswer" has been dropped, "answer_.lock()" below will return NULL // and we will immediately "Release()" to revert this. if (!error) { budget_->Acquire(size); // may block; "DataSourceAnswer" CAN be destroyed here } // Phase 4: Only now take a strong reference to "DataSourceAnswer". { boost::shared_ptr<DataSourceAnswer> lock = answer_.lock(); if (!lock) { // Answer was abandoned while we were blocked in Acquire(). // Undo the charge and exit cleanly. if (!error) { budget_->Release(size); } } else { if (error) { lock->EnqueueError(id_.release(), *error); } else { lock->EnqueueValue(id_.release(), value, budget_, size); } } } } }; DataSourceReader::DataSourceReader(const boost::shared_ptr<IExecutorService>& executor /* takes ownership */, IDataSource* source /* takes ownership */) : executor_(executor), source_(source), budget_(new Internals::DataSourceMemoryBudget(0)) { if (executor.get() == NULL || source == NULL) { throw OrthancException(ErrorCode_NullPointer); } } DataSourceReader::~DataSourceReader() { executor_.reset(); source_.reset(); assert(budget_->GetCurrentMemory() == 0); } void DataSourceReader::CreateCache(size_t capacity) { cache_ = boost::make_shared<SharedObjectCache>(capacity); } void DataSourceReader::SetCapacity(uint64_t maximumMemory) { budget_ = boost::make_shared<Internals::DataSourceMemoryBudget>(maximumMemory); } boost::shared_ptr<DataSourceAnswer> DataSourceReader::Submit(DataSourceRequest* request /* takes ownership */) { std::unique_ptr<DataSourceRequest> protection(request); boost::shared_ptr<DataSourceAnswer> answer(new DataSourceAnswer(protection->GetSize())); while (!protection->IsEmpty()) { std::unique_ptr<IDataIdentifier> identifier(protection->Dequeue()); executor_->Submit(new DataSourceRunnable(answer, *source_, identifier.release(), cache_, budget_, metricsConfiguration_)); } return answer; } DataSourceAnswer::Item* DataSourceReader::ReadSingle(IDataIdentifier* id /* takes ownership */) { std::unique_ptr<IDataIdentifier> protection(id); std::unique_ptr<DataSourceRequest> request(new DataSourceRequest); request->Enqueue(protection.release()); boost::shared_ptr<DataSourceAnswer> answer(Submit(request.release())); std::unique_ptr<DataSourceAnswer::Item> item(answer->Dequeue()); assert(item != NULL); assert(answer->Dequeue() == NULL); return item.release(); } void DataSourceReader::GetStatistics(uint64_t& tasksMaximumMemory, uint64_t& tasksCurrentMemory, unsigned int& tasksReservations) { { boost::shared_ptr<Internals::DataSourceMemoryBudget> budgetCopy(budget_); budgetCopy->GetStatistics(tasksMaximumMemory, tasksCurrentMemory, tasksReservations); } } size_t DataSourceReader::GetCacheCapacity() const { boost::shared_ptr<SharedObjectCache> lock(cache_); if (lock) { return lock->GetCapacity(); } else { return 0; } } void DataSourceReader::StoreIntoCache(const std::string& key, IDynamicObject* value /* takes ownership */) { boost::shared_ptr<IDynamicObject> protection(value); if (value == NULL) { throw OrthancException(ErrorCode_NullPointer); } if (cache_) { size_t size = source_->GetValueSize(*value); cache_->Store(key, protection, size); metricsConfiguration_.SetCacheStatistics(*cache_); } } }
