Mercurial > hg > orthanc
view OrthancServer/Sources/ServerIndex.cpp @ 5905:dad78aa9141e get-scu
move/get jobs: progress
author | Alain Mazy <am@orthanc.team> |
---|---|
date | Fri, 06 Dec 2024 12:06:05 +0100 |
parents | 71c7d260510d |
children | 56352ae88120 8279eaab0d1d |
line wrap: on
line source
/** * Orthanc - A Lightweight, RESTful DICOM Store * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics * Department, University Hospital of Liege, Belgium * Copyright (C) 2017-2023 Osimis S.A., Belgium * Copyright (C) 2024-2024 Orthanc Team SRL, Belgium * Copyright (C) 2021-2024 Sebastien Jodogne, ICTEAM UCLouvain, Belgium * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. **/ #include "PrecompiledHeadersServer.h" #include "ServerIndex.h" #ifndef NOMINMAX #define NOMINMAX #endif #include "../../OrthancFramework/Sources/Logging.h" #include "../../OrthancFramework/Sources/Toolbox.h" #include "OrthancConfiguration.h" #include "ServerContext.h" #include "ServerIndexChange.h" #include "ServerToolbox.h" namespace Orthanc { class ServerIndex::TransactionContext : public StatelessDatabaseOperations::ITransactionContext { private: struct FileToRemove { private: std::string uuid_; FileContentType type_; public: explicit FileToRemove(const FileInfo& info) : uuid_(info.GetUuid()), type_(info.GetContentType()) { } const std::string& GetUuid() const { return uuid_; } FileContentType GetContentType() const { return type_; } }; ServerContext& context_; bool hasRemainingLevel_; ResourceType remainingType_; std::string remainingPublicId_; std::list<FileToRemove> pendingFilesToRemove_; std::list<ServerIndexChange> pendingChanges_; uint64_t sizeOfFilesToRemove_; uint64_t sizeOfAddedAttachments_; void Reset() { sizeOfFilesToRemove_ = 0; hasRemainingLevel_ = false; remainingType_ = ResourceType_Instance; // dummy initialization pendingFilesToRemove_.clear(); pendingChanges_.clear(); sizeOfAddedAttachments_ = 0; } void CommitFilesToRemove() { for (std::list<FileToRemove>::const_iterator it = pendingFilesToRemove_.begin(); it != pendingFilesToRemove_.end(); ++it) { try { context_.RemoveFile(it->GetUuid(), it->GetContentType()); } catch (OrthancException& e) { LOG(ERROR) << "Unable to remove an attachment from the storage area: " << it->GetUuid() << " (type: " << EnumerationToString(it->GetContentType()) << ")"; } } } void CommitChanges() { for (std::list<ServerIndexChange>::const_iterator it = pendingChanges_.begin(); it != pendingChanges_.end(); ++it) { context_.SignalChange(*it); } } public: explicit TransactionContext(ServerContext& context) : context_(context) { Reset(); assert(ResourceType_Patient < ResourceType_Study && ResourceType_Study < ResourceType_Series && ResourceType_Series < ResourceType_Instance); } virtual void SignalRemainingAncestor(ResourceType parentType, const std::string& publicId) ORTHANC_OVERRIDE { LOG(TRACE) << "Remaining ancestor \"" << publicId << "\" (" << parentType << ")"; if (hasRemainingLevel_) { if (parentType < remainingType_) { remainingType_ = parentType; remainingPublicId_ = publicId; } } else { hasRemainingLevel_ = true; remainingType_ = parentType; remainingPublicId_ = publicId; } } virtual void SignalAttachmentDeleted(const FileInfo& info) ORTHANC_OVERRIDE { assert(Toolbox::IsUuid(info.GetUuid())); pendingFilesToRemove_.push_back(FileToRemove(info)); sizeOfFilesToRemove_ += info.GetCompressedSize(); } virtual void SignalResourceDeleted(ResourceType type, const std::string& publicId) ORTHANC_OVERRIDE { SignalChange(ServerIndexChange(ChangeType_Deleted, type, publicId)); } virtual void SignalChange(const ServerIndexChange& change) ORTHANC_OVERRIDE { LOG(TRACE) << "Change related to resource " << change.GetPublicId() << " of type " << EnumerationToString(change.GetResourceType()) << ": " << EnumerationToString(change.GetChangeType()); pendingChanges_.push_back(change); } virtual void SignalAttachmentsAdded(uint64_t compressedSize) ORTHANC_OVERRIDE { sizeOfAddedAttachments_ += compressedSize; } virtual bool LookupRemainingLevel(std::string& remainingPublicId /* out */, ResourceType& remainingLevel /* out */) ORTHANC_OVERRIDE { if (hasRemainingLevel_) { remainingPublicId = remainingPublicId_; remainingLevel = remainingType_; return true; } else { return false; } }; virtual void MarkAsUnstable(ResourceType type, int64_t id, const std::string& publicId) ORTHANC_OVERRIDE { context_.GetIndex().MarkAsUnstable(type, id, publicId); } virtual bool IsUnstableResource(ResourceType type, int64_t id) ORTHANC_OVERRIDE { return context_.GetIndex().IsUnstableResource(type, id); } virtual void Commit() ORTHANC_OVERRIDE { // We can remove the files once the SQLite transaction has // been successfully committed. Some files might have to be // deleted because of recycling. CommitFilesToRemove(); // Send all the pending changes to the Orthanc plugins CommitChanges(); } virtual int64_t GetCompressedSizeDelta() ORTHANC_OVERRIDE { return (static_cast<int64_t>(sizeOfAddedAttachments_) - static_cast<int64_t>(sizeOfFilesToRemove_)); } }; class ServerIndex::TransactionContextFactory : public ITransactionContextFactory { private: ServerContext& context_; public: explicit TransactionContextFactory(ServerContext& context) : context_(context) { } virtual ITransactionContext* Create() { // There can be concurrent calls to this method, which is not an // issue because we simply create an object return new TransactionContext(context_); } }; class ServerIndex::UnstableResourcePayload { private: std::string publicId_; boost::posix_time::ptime time_; public: UnstableResourcePayload() { } explicit UnstableResourcePayload(const std::string& publicId) : publicId_(publicId), time_(boost::posix_time::second_clock::local_time()) { } unsigned int GetAge() const { return (boost::posix_time::second_clock::local_time() - time_).total_seconds(); } const std::string& GetPublicId() const { return publicId_; } }; void ServerIndex::UpdateStatisticsThread(ServerIndex* that, unsigned int threadSleepGranularityMilliseconds) { Logging::SetCurrentThreadName("DB-STATS"); static const unsigned int SLEEP_SECONDS = 60; if (threadSleepGranularityMilliseconds > 1000) { throw OrthancException(ErrorCode_ParameterOutOfRange); } LOG(INFO) << "Starting the update statistics thread (sleep = " << SLEEP_SECONDS << " seconds)"; unsigned int count = 0; unsigned int countThreshold = (1000 * SLEEP_SECONDS) / threadSleepGranularityMilliseconds; while (!that->done_) { boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds)); count++; if (count >= countThreshold) { uint64_t diskSize, uncompressedSize, countPatients, countStudies, countSeries, countInstances; that->GetGlobalStatistics(diskSize, uncompressedSize, countPatients, countStudies, countSeries, countInstances); count = 0; } } LOG(INFO) << "Stopping the update statistics thread"; } void ServerIndex::FlushThread(ServerIndex* that, unsigned int threadSleepGranularityMilliseconds) { Logging::SetCurrentThreadName("DB-FLUSH"); // By default, wait for 10 seconds before flushing static const unsigned int SLEEP_SECONDS = 10; if (threadSleepGranularityMilliseconds > 1000) { throw OrthancException(ErrorCode_ParameterOutOfRange); } LOG(INFO) << "Starting the database flushing thread (sleep = " << SLEEP_SECONDS << " seconds)"; unsigned int count = 0; unsigned int countThreshold = (1000 * SLEEP_SECONDS) / threadSleepGranularityMilliseconds; while (!that->done_) { boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds)); count++; if (count >= countThreshold) { Logging::Flush(); that->FlushToDisk(); count = 0; } } LOG(INFO) << "Stopping the database flushing thread"; } bool ServerIndex::IsUnstableResource(ResourceType type, int64_t id) { boost::mutex::scoped_lock lock(monitoringMutex_); return unstableResources_.Contains(std::make_pair(type, id)); } ServerIndex::ServerIndex(ServerContext& context, IDatabaseWrapper& db, unsigned int threadSleepGranularityMilliseconds) : StatelessDatabaseOperations(db), done_(false), maximumStorageMode_(MaxStorageMode_Recycle), maximumStorageSize_(0), maximumPatients_(0) { SetTransactionContextFactory(new TransactionContextFactory(context)); // Initial recycling if the parameters have changed since the last // execution of Orthanc StandaloneRecycling(maximumStorageMode_, maximumStorageSize_, maximumPatients_); // For some DB engines (like SQLite), make sure we flush the DB to disk at regular interval if (GetDatabaseCapabilities().HasFlushToDisk()) { flushThread_ = boost::thread(FlushThread, this, threadSleepGranularityMilliseconds); } // For some DB plugins that implements the UpdateAndGetStatistics function, updating // the statistics can take quite some time if you have not done it for a long time // -> make sure they are updated at regular interval if (GetDatabaseCapabilities().HasUpdateAndGetStatistics()) { updateStatisticsThread_ = boost::thread(UpdateStatisticsThread, this, threadSleepGranularityMilliseconds); } unstableResourcesMonitorThread_ = boost::thread (UnstableResourcesMonitorThread, this, threadSleepGranularityMilliseconds); } ServerIndex::~ServerIndex() { if (!done_) { LOG(ERROR) << "INTERNAL ERROR: ServerIndex::Stop() should be invoked manually to avoid mess in the destruction order!"; Stop(); } } void ServerIndex::Stop() { if (!done_) { done_ = true; if (flushThread_.joinable()) { flushThread_.join(); } if (updateStatisticsThread_.joinable()) { updateStatisticsThread_.join(); } if (unstableResourcesMonitorThread_.joinable()) { unstableResourcesMonitorThread_.join(); } } } void ServerIndex::SetMaximumPatientCount(unsigned int count) { { boost::mutex::scoped_lock lock(monitoringMutex_); maximumPatients_ = count; if (count == 0) { LOG(WARNING) << "No limit on the number of stored patients"; } else { LOG(WARNING) << "At most " << count << " patients will be stored"; } } StandaloneRecycling(maximumStorageMode_, maximumStorageSize_, maximumPatients_); } void ServerIndex::SetMaximumStorageSize(uint64_t size) { { boost::mutex::scoped_lock lock(monitoringMutex_); maximumStorageSize_ = size; if (size == 0) { LOG(WARNING) << "No limit on the size of the storage area"; } else { LOG(WARNING) << "At most " << Toolbox::GetHumanFileSize(size) << " will be used for the storage area"; } } StandaloneRecycling(maximumStorageMode_, maximumStorageSize_, maximumPatients_); } void ServerIndex::SetMaximumStorageMode(MaxStorageMode mode) { { boost::mutex::scoped_lock lock(monitoringMutex_); maximumStorageMode_ = mode; if (mode == MaxStorageMode_Recycle) { if (maximumStorageSize_ > 0 || maximumPatients_ > 0) { LOG(WARNING) << "Maximum Storage mode: Recycle"; } } else { if (maximumStorageSize_ > 0 || maximumPatients_ > 0) { LOG(WARNING) << "Maximum Storage mode: Reject"; } } } StandaloneRecycling(maximumStorageMode_, maximumStorageSize_, maximumPatients_); } void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that, unsigned int threadSleepGranularityMilliseconds) { Logging::SetCurrentThreadName("UNSTABLE-MON"); int stableAge; { OrthancConfiguration::ReaderLock lock; stableAge = lock.GetConfiguration().GetUnsignedIntegerParameter("StableAge", 60); } if (stableAge <= 0) { stableAge = 60; } LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")"; while (!that->done_) { // Check for stable resources each few seconds boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds)); for (;;) { UnstableResourcePayload stablePayload; ResourceType stableLevel; int64_t stableId; { boost::mutex::scoped_lock lock(that->monitoringMutex_); if (!that->unstableResources_.IsEmpty() && that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge)) { // This DICOM resource has not received any new instance for // some time. It can be considered as stable. std::pair<ResourceType, int64_t> stableResource = that->unstableResources_.RemoveOldest(stablePayload); stableLevel = stableResource.first; stableId = stableResource.second; //LOG(TRACE) << "Stable resource: " << EnumerationToString(stablePayload.GetResourceType()) << " " << stableId; } else { // No more stable DICOM resource, leave the internal loop break; } } try { /** * WARNING: Don't protect the calls to "LogChange()" using * "monitoringMutex_", as this could lead to deadlocks in * other threads (typically, if "Store()" is being running in * another thread, which leads to calls to "MarkAsUnstable()", * which leads to two lockings of "monitoringMutex_"). **/ switch (stableLevel) { case ResourceType_Patient: that->LogChange(stableId, ChangeType_StablePatient, stablePayload.GetPublicId(), ResourceType_Patient); break; case ResourceType_Study: that->LogChange(stableId, ChangeType_StableStudy, stablePayload.GetPublicId(), ResourceType_Study); break; case ResourceType_Series: that->LogChange(stableId, ChangeType_StableSeries, stablePayload.GetPublicId(), ResourceType_Series); break; default: throw OrthancException(ErrorCode_InternalError); } } catch (OrthancException& e) { LOG(ERROR) << "Cannot log a change about a stable resource into the database"; } } } LOG(INFO) << "Closing the monitor thread for stable resources"; } void ServerIndex::MarkAsUnstable(ResourceType type, int64_t id, const std::string& publicId) { assert(type == ResourceType_Patient || type == ResourceType_Study || type == ResourceType_Series); { boost::mutex::scoped_lock lock(monitoringMutex_); UnstableResourcePayload payload(publicId); unstableResources_.AddOrMakeMostRecent(std::make_pair(type, id), payload); //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id; } } StoreStatus ServerIndex::Store(std::map<MetadataType, std::string>& instanceMetadata, const DicomMap& dicomSummary, const ServerIndex::Attachments& attachments, const ServerIndex::MetadataMap& metadata, const DicomInstanceOrigin& origin, bool overwrite, bool hasTransferSyntax, DicomTransferSyntax transferSyntax, bool hasPixelDataOffset, uint64_t pixelDataOffset, ValueRepresentation pixelDataVR, bool isReconstruct) { uint64_t maximumStorageSize; unsigned int maximumPatients; MaxStorageMode maximumStorageMode; { boost::mutex::scoped_lock lock(monitoringMutex_); maximumStorageSize = maximumStorageSize_; maximumPatients = maximumPatients_; maximumStorageMode = maximumStorageMode_; } return StatelessDatabaseOperations::Store( instanceMetadata, dicomSummary, attachments, metadata, origin, overwrite, hasTransferSyntax, transferSyntax, hasPixelDataOffset, pixelDataOffset, pixelDataVR, maximumStorageMode, maximumStorageSize, maximumPatients, isReconstruct); } StoreStatus ServerIndex::AddAttachment(int64_t& newRevision, const FileInfo& attachment, const std::string& publicId, bool hasOldRevision, int64_t oldRevision, const std::string& oldMD5) { uint64_t maximumStorageSize; unsigned int maximumPatients; { boost::mutex::scoped_lock lock(monitoringMutex_); maximumStorageSize = maximumStorageSize_; maximumPatients = maximumPatients_; } return StatelessDatabaseOperations::AddAttachment( newRevision, attachment, publicId, maximumStorageSize, maximumPatients, hasOldRevision, oldRevision, oldMD5); } }