view OrthancServer/Sources/ServerIndex.cpp @ 4589:bec74e29f86b db-changes

attaching the listener to transactions in IDatabaseWrapper
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 12 Mar 2021 15:33:47 +0100
parents 94147ce2f097
children 4a0bf1019335
line wrap: on
line source

/**
 * Orthanc - A Lightweight, RESTful DICOM Store
 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
 * Department, University Hospital of Liege, Belgium
 * Copyright (C) 2017-2021 Osimis S.A., Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * In addition, as a special exception, the copyright holders of this
 * program give permission to link the code of its release with the
 * OpenSSL project's "OpenSSL" library (or with modified versions of it
 * that use the same license as the "OpenSSL" library), and distribute
 * the linked executables. You must obey the GNU General Public License
 * in all respects for all of the code used other than "OpenSSL". If you
 * modify file(s) with this exception, you may extend this exception to
 * your version of the file(s), but you are not obligated to do so. If
 * you do not wish to do so, delete this exception statement from your
 * version. If you delete this exception statement from all source files
 * in the program, then also delete it here.
 * 
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#include "PrecompiledHeadersServer.h"
#include "ServerIndex.h"

#ifndef NOMINMAX
#define NOMINMAX
#endif

#include "../../OrthancFramework/Sources/Logging.h"
#include "../../OrthancFramework/Sources/Toolbox.h"

#include "OrthancConfiguration.h"
#include "ServerContext.h"
#include "ServerIndexChange.h"
#include "ServerToolbox.h"


static const uint64_t MEGA_BYTES = 1024 * 1024;

namespace Orthanc
{
  class ServerIndex::Listener : public StatelessDatabaseOperations::ITransactionContext
  {
  private:
    struct FileToRemove
    {
    private:
      std::string  uuid_;
      FileContentType  type_;

    public:
      explicit FileToRemove(const FileInfo& info) :
        uuid_(info.GetUuid()), 
        type_(info.GetContentType())
      {
      }

      const std::string& GetUuid() const
      {
        return uuid_;
      }

      FileContentType GetContentType() const 
      {
        return type_;
      }
    };

    ServerContext& context_;
    bool hasRemainingLevel_;
    ResourceType remainingType_;
    std::string remainingPublicId_;
    std::list<FileToRemove> pendingFilesToRemove_;
    std::list<ServerIndexChange> pendingChanges_;
    uint64_t sizeOfFilesToRemove_;
    bool insideTransaction_;
    uint64_t sizeOfAddedAttachments_;

    void Reset()
    {
      sizeOfFilesToRemove_ = 0;
      hasRemainingLevel_ = false;
      pendingFilesToRemove_.clear();
      pendingChanges_.clear();
      sizeOfAddedAttachments_ = 0;
    }

  public:
    explicit Listener(ServerContext& context) :
      context_(context),
      insideTransaction_(false)
    {
      Reset();
      assert(ResourceType_Patient < ResourceType_Study &&
             ResourceType_Study < ResourceType_Series &&
             ResourceType_Series < ResourceType_Instance);
    }

    void StartTransaction()
    {
      Reset();
      insideTransaction_ = true;
    }

    void EndTransaction()
    {
      insideTransaction_ = false;
    }

    uint64_t GetSizeOfFilesToRemove()
    {
      return sizeOfFilesToRemove_;
    }

    void CommitFilesToRemove()
    {
      for (std::list<FileToRemove>::const_iterator 
             it = pendingFilesToRemove_.begin();
           it != pendingFilesToRemove_.end(); ++it)
      {
        try
        {
          context_.RemoveFile(it->GetUuid(), it->GetContentType());
        }
        catch (OrthancException& e)
        {
          LOG(ERROR) << "Unable to remove an attachment from the storage area: "
                     << it->GetUuid() << " (type: " << EnumerationToString(it->GetContentType()) << ")";
        }
      }
    }

    void CommitChanges()
    {
      for (std::list<ServerIndexChange>::const_iterator 
             it = pendingChanges_.begin(); 
           it != pendingChanges_.end(); ++it)
      {
        context_.SignalChange(*it);
      }
    }

    virtual void SignalRemainingAncestor(ResourceType parentType,
                                         const std::string& publicId) ORTHANC_OVERRIDE
    {
      LOG(TRACE) << "Remaining ancestor \"" << publicId << "\" (" << parentType << ")";

      if (hasRemainingLevel_)
      {
        if (parentType < remainingType_)
        {
          remainingType_ = parentType;
          remainingPublicId_ = publicId;
        }
      }
      else
      {
        hasRemainingLevel_ = true;
        remainingType_ = parentType;
        remainingPublicId_ = publicId;
      }        
    }

    virtual void SignalAttachmentDeleted(const FileInfo& info) ORTHANC_OVERRIDE
    {
      assert(Toolbox::IsUuid(info.GetUuid()));
      pendingFilesToRemove_.push_back(FileToRemove(info));
      sizeOfFilesToRemove_ += info.GetCompressedSize();
    }

    virtual void SignalResourceDeleted(ResourceType type,
                                       const std::string& publicId) ORTHANC_OVERRIDE
    {
      SignalChange(ServerIndexChange(ChangeType_Deleted, type, publicId));
    }

    virtual void SignalChange(const ServerIndexChange& change) ORTHANC_OVERRIDE
    {
      LOG(TRACE) << "Change related to resource " << change.GetPublicId() << " of type " 
                 << EnumerationToString(change.GetResourceType()) << ": " 
                 << EnumerationToString(change.GetChangeType());

      if (insideTransaction_)
      {
        pendingChanges_.push_back(change);
      }
      else
      {
        context_.SignalChange(change);
      }
    }

    virtual void SignalAttachmentsAdded(uint64_t compressedSize) ORTHANC_OVERRIDE
    {
      sizeOfAddedAttachments_ += compressedSize;
    }

    bool HasRemainingLevel() const
    {
      return hasRemainingLevel_;
    }

    ResourceType GetRemainingType() const
    {
      assert(HasRemainingLevel());
      return remainingType_;
    }

    const std::string& GetRemainingPublicId() const
    {
      assert(HasRemainingLevel());
      return remainingPublicId_;
    }

    uint64_t GetSizeOfAddedAttachments() const
    {
      return sizeOfAddedAttachments_;
    }

    virtual bool LookupRemainingLevel(std::string& remainingPublicId /* out */,
                                      ResourceType& remainingLevel   /* out */) ORTHANC_OVERRIDE
    {
      if (HasRemainingLevel())
      {
        remainingPublicId = GetRemainingPublicId();
        remainingLevel = GetRemainingType();
        return true;
      }
      else
      {
        return false;
      }        
    };

    virtual void MarkAsUnstable(int64_t id,
                                Orthanc::ResourceType type,
                                const std::string& publicId) ORTHANC_OVERRIDE
    {
      context_.GetIndex().MarkAsUnstable(id, type, publicId);
    }

    virtual bool IsUnstableResource(int64_t id) ORTHANC_OVERRIDE
    {
      return context_.GetIndex().IsUnstableResource(id);
    }

    virtual void Commit() ORTHANC_OVERRIDE
    {
      // We can remove the files once the SQLite transaction has
      // been successfully committed. Some files might have to be
      // deleted because of recycling.
      CommitFilesToRemove();
      
      // Send all the pending changes to the Orthanc plugins
      CommitChanges();
    }

    virtual int64_t GetCompressedSizeDelta() ORTHANC_OVERRIDE
    {
      return (static_cast<int64_t>(GetSizeOfAddedAttachments()) -
              static_cast<int64_t>(GetSizeOfFilesToRemove()));
    }
  };


  class ServerIndex::TransactionContextFactory : public ITransactionContextFactory
  {
  private:
    class Context : public ITransactionContext
    {
    private:
      Listener&  listener_;

    public:
      Context(ServerIndex& index) :
        listener_(*index.listener_)
      {
        listener_.StartTransaction();
      }

      ~Context()
      {
        listener_.EndTransaction();
      }

      virtual bool IsUnstableResource(int64_t id) ORTHANC_OVERRIDE
      {
        return listener_.IsUnstableResource(id);
      }

      virtual bool LookupRemainingLevel(std::string& remainingPublicId /* out */,
                                        ResourceType& remainingLevel   /* out */) ORTHANC_OVERRIDE
      {
        return listener_.LookupRemainingLevel(remainingPublicId, remainingLevel);
      }

      virtual void MarkAsUnstable(int64_t id,
                                  Orthanc::ResourceType type,
                                  const std::string& publicId) ORTHANC_OVERRIDE
      {
        listener_.MarkAsUnstable(id, type, publicId);
      }

      virtual void SignalAttachmentsAdded(uint64_t compressedSize) ORTHANC_OVERRIDE
      {
        listener_.SignalAttachmentsAdded(compressedSize);
      }

      virtual void SignalChange(const ServerIndexChange& change) ORTHANC_OVERRIDE
      {
        listener_.SignalChange(change);
      }

      virtual void Commit() ORTHANC_OVERRIDE
      {
        listener_.Commit();
      }

      virtual int64_t GetCompressedSizeDelta() ORTHANC_OVERRIDE
      {
        return listener_.GetCompressedSizeDelta();
      }

      virtual void SignalRemainingAncestor(ResourceType parentType,
                                           const std::string& publicId) ORTHANC_OVERRIDE
      {
        listener_.SignalRemainingAncestor(parentType, publicId);
      }

      virtual void SignalAttachmentDeleted(const FileInfo& info) ORTHANC_OVERRIDE
      {
        listener_.SignalAttachmentDeleted(info);
      }

      virtual void SignalResourceDeleted(ResourceType type,
                                         const std::string& publicId) ORTHANC_OVERRIDE
      {
        listener_.SignalResourceDeleted(type, publicId);
      }
    };

    ServerIndex& index_;
      
  public:
    TransactionContextFactory(ServerIndex& index) :
      index_(index)
    {
    }

    virtual ITransactionContext* Create()
    {
      return new Context(index_);
    }
  };    
  
  
  class ServerIndex::UnstableResourcePayload
  {
  private:
    ResourceType type_;
    std::string publicId_;
    boost::posix_time::ptime time_;

  public:
    UnstableResourcePayload() : type_(ResourceType_Instance)
    {
    }

    UnstableResourcePayload(Orthanc::ResourceType type,
                            const std::string& publicId) : 
      type_(type),
      publicId_(publicId),
      time_(boost::posix_time::second_clock::local_time())
    {
    }

    unsigned int GetAge() const
    {
      return (boost::posix_time::second_clock::local_time() - time_).total_seconds();
    }

    ResourceType GetResourceType() const
    {
      return type_;
    }
    
    const std::string& GetPublicId() const
    {
      return publicId_;
    }
  };


  void ServerIndex::FlushThread(ServerIndex* that,
                                unsigned int threadSleepGranularityMilliseconds)
  {
    // By default, wait for 10 seconds before flushing
    static const unsigned int SLEEP_SECONDS = 10;

    if (threadSleepGranularityMilliseconds > 1000)
    {
      throw OrthancException(ErrorCode_ParameterOutOfRange);
    }

    LOG(INFO) << "Starting the database flushing thread (sleep = " << SLEEP_SECONDS << " seconds)";

    unsigned int count = 0;
    unsigned int countThreshold = (1000 * SLEEP_SECONDS) / threadSleepGranularityMilliseconds;

    while (!that->done_)
    {
      boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds));
      count++;
      
      if (count >= countThreshold)
      {
        Logging::Flush();
        that->FlushToDisk();
        
        count = 0;
      }
    }

    LOG(INFO) << "Stopping the database flushing thread";
  }


  bool ServerIndex::IsUnstableResource(int64_t id)
  {
    boost::mutex::scoped_lock lock(monitoringMutex_);
    return unstableResources_.Contains(id);
  }


  ServerIndex::ServerIndex(ServerContext& context,
                           IDatabaseWrapper& db,
                           unsigned int threadSleepGranularityMilliseconds) :
    StatelessDatabaseOperations(db),
    done_(false),
    maximumStorageSize_(0),
    maximumPatients_(0)
  {
    listener_.reset(new Listener(context));

    SetTransactionContextFactory(new TransactionContextFactory(*this));

    // Initial recycling if the parameters have changed since the last
    // execution of Orthanc
    StandaloneRecycling(maximumStorageSize_, maximumPatients_);

    if (HasFlushToDisk())
    {
      flushThread_ = boost::thread(FlushThread, this, threadSleepGranularityMilliseconds);
    }

    unstableResourcesMonitorThread_ = boost::thread
      (UnstableResourcesMonitorThread, this, threadSleepGranularityMilliseconds);
  }


  ServerIndex::~ServerIndex()
  {
    if (!done_)
    {
      LOG(ERROR) << "INTERNAL ERROR: ServerIndex::Stop() should be invoked manually to avoid mess in the destruction order!";
      Stop();
    }
  }


  void ServerIndex::Stop()
  {
    if (!done_)
    {
      done_ = true;

      if (flushThread_.joinable())
      {
        flushThread_.join();
      }

      if (unstableResourcesMonitorThread_.joinable())
      {
        unstableResourcesMonitorThread_.join();
      }
    }
  }


  void ServerIndex::SetMaximumPatientCount(unsigned int count) 
  {
    {
      boost::mutex::scoped_lock lock(monitoringMutex_);
      maximumPatients_ = count;
      
      if (count == 0)
      {
        LOG(WARNING) << "No limit on the number of stored patients";
      }
      else
      {
        LOG(WARNING) << "At most " << count << " patients will be stored";
      }
    }

    StandaloneRecycling(maximumStorageSize_, maximumPatients_);
  }

  
  void ServerIndex::SetMaximumStorageSize(uint64_t size) 
  {
    {
      boost::mutex::scoped_lock lock(monitoringMutex_);
      maximumStorageSize_ = size;
      
      if (size == 0)
      {
        LOG(WARNING) << "No limit on the size of the storage area";
      }
      else
      {
        LOG(WARNING) << "At most " << (size / MEGA_BYTES) << "MB will be used for the storage area";
      }
    }

    StandaloneRecycling(maximumStorageSize_, maximumPatients_);
  }


  void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that,
                                                   unsigned int threadSleepGranularityMilliseconds)
  {
    int stableAge;
    
    {
      OrthancConfiguration::ReaderLock lock;
      stableAge = lock.GetConfiguration().GetUnsignedIntegerParameter("StableAge", 60);
    }

    if (stableAge <= 0)
    {
      stableAge = 60;
    }

    LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")";

    while (!that->done_)
    {
      // Check for stable resources each few seconds
      boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds));

      for (;;)
      {
        UnstableResourcePayload stableResource;
        int64_t stableId;

        {      
          boost::mutex::scoped_lock lock(that->monitoringMutex_);

          if (!that->unstableResources_.IsEmpty() &&
              that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge))
          {
            // This DICOM resource has not received any new instance for
            // some time. It can be considered as stable.
            stableId = that->unstableResources_.RemoveOldest(stableResource);
            //LOG(TRACE) << "Stable resource: " << EnumerationToString(stableResource.GetResourceType()) << " " << stableId;
          }
          else
          {
            // No more stable DICOM resource, leave the internal loop
            break;
          }
        }

        /**
         * WARNING: Don't protect the calls to "LogChange()" using
         * "monitoringMutex_", as this could lead to deadlocks in
         * other threads (typically, if "Store()" is being running in
         * another thread, which leads to calls to "MarkAsUnstable()",
         * which leads to two lockings of "monitoringMutex_").
         **/
        switch (stableResource.GetResourceType())
        {
          case ResourceType_Patient:
            that->LogChange(stableId, ChangeType_StablePatient, stableResource.GetPublicId(), ResourceType_Patient);
            break;
            
          case ResourceType_Study:
            that->LogChange(stableId, ChangeType_StableStudy, stableResource.GetPublicId(), ResourceType_Study);
            break;
            
          case ResourceType_Series:
            that->LogChange(stableId, ChangeType_StableSeries, stableResource.GetPublicId(), ResourceType_Series);
            break;
            
          default:
            throw OrthancException(ErrorCode_InternalError);
        }
      }
    }

    LOG(INFO) << "Closing the monitor thread for stable resources";
  }
  

  void ServerIndex::MarkAsUnstable(int64_t id,
                                   Orthanc::ResourceType type,
                                   const std::string& publicId)
  {
    assert(type == Orthanc::ResourceType_Patient ||
           type == Orthanc::ResourceType_Study ||
           type == Orthanc::ResourceType_Series);

    {
      boost::mutex::scoped_lock lock(monitoringMutex_);
      UnstableResourcePayload payload(type, publicId);
      unstableResources_.AddOrMakeMostRecent(id, payload);
      //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id;
    }
  }


  StoreStatus ServerIndex::Store(std::map<MetadataType, std::string>& instanceMetadata,
                                 const DicomMap& dicomSummary,
                                 const ServerIndex::Attachments& attachments,
                                 const ServerIndex::MetadataMap& metadata,
                                 const DicomInstanceOrigin& origin,
                                 bool overwrite,
                                 bool hasTransferSyntax,
                                 DicomTransferSyntax transferSyntax,
                                 bool hasPixelDataOffset,
                                 uint64_t pixelDataOffset)
  {
    uint64_t maximumStorageSize;
    unsigned int maximumPatients;
    
    {
      boost::mutex::scoped_lock lock(monitoringMutex_);
      maximumStorageSize = maximumStorageSize_;
      maximumPatients = maximumPatients_;
    }

    return StatelessDatabaseOperations::Store(
      instanceMetadata, dicomSummary, attachments, metadata, origin, overwrite, hasTransferSyntax,
      transferSyntax, hasPixelDataOffset, pixelDataOffset, maximumStorageSize, maximumPatients);
  }

  
  StoreStatus ServerIndex::AddAttachment(const FileInfo& attachment,
                                         const std::string& publicId)
  {
    uint64_t maximumStorageSize;
    unsigned int maximumPatients;
    
    {
      boost::mutex::scoped_lock lock(monitoringMutex_);
      maximumStorageSize = maximumStorageSize_;
      maximumPatients = maximumPatients_;
    }

    return StatelessDatabaseOperations::AddAttachment(
      attachment, publicId, maximumStorageSize, maximumPatients);
  }
}