Mercurial > hg > orthanc-webviewer
diff Plugin/Cache/CacheScheduler.cpp @ 0:02f7a0400a91
initial commit
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 25 Feb 2015 13:45:35 +0100 |
parents | |
children | ecefd45026bf |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Plugin/Cache/CacheScheduler.cpp Wed Feb 25 13:45:35 2015 +0100 @@ -0,0 +1,385 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2015 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "CacheScheduler.h" + +#include "CacheIndex.h" + +#include "../../Orthanc/OrthancException.h" +#include <stdio.h> + +namespace OrthancPlugins +{ + class DynamicString : public Orthanc::IDynamicObject + { + private: + std::string value_; + + public: + DynamicString(const std::string& value) : value_(value) + { + } + + const std::string& GetValue() const + { + return value_; + } + }; + + + class CacheScheduler::PrefetchQueue : public boost::noncopyable + { + private: + boost::mutex mutex_; + Orthanc::SharedMessageQueue queue_; + std::set<std::string> content_; + + public: + PrefetchQueue(size_t maxSize) : queue_(maxSize) + { + queue_.SetLifoPolicy(); + } + + void Enqueue(const std::string& item) + { + boost::mutex::scoped_lock lock(mutex_); + + if (content_.find(item) != content_.end()) + { + // This cache index is already pending in the queue + return; + } + + content_.insert(item); + queue_.Enqueue(new DynamicString(item)); + } + + DynamicString* Dequeue(int32_t msTimeout) + { + std::auto_ptr<Orthanc::IDynamicObject> message(queue_.Dequeue(msTimeout)); + if (message.get() == NULL) + { + return NULL; + } + + const DynamicString& index = dynamic_cast<const DynamicString&>(*message); + + { + boost::mutex::scoped_lock lock(mutex_); + content_.erase(index.GetValue()); + } + + return dynamic_cast<DynamicString*>(message.release()); + } + }; + + + class CacheScheduler::Prefetcher : public boost::noncopyable + { + private: + int bundleIndex_; + ICacheFactory& factory_; + CacheManager& cache_; + boost::mutex& cacheMutex_; + PrefetchQueue& queue_; + + bool done_; + boost::thread thread_; + boost::mutex invalidatedMutex_; + bool invalidated_; + std::string prefetching_; + + static void Worker(Prefetcher* that) + { + while (!(that->done_)) + { + std::auto_ptr<DynamicString> prefetch(that->queue_.Dequeue(500)); + + if (prefetch.get() != NULL) + { + { + boost::mutex::scoped_lock lock(that->invalidatedMutex_); + that->invalidated_ = false; + that->prefetching_ = prefetch->GetValue(); + } + + { + boost::mutex::scoped_lock lock(that->cacheMutex_); + if (that->cache_.IsCached(that->bundleIndex_, prefetch->GetValue())) + { + // This item is already cached + continue; + } + } + + std::string content; + if (!that->factory_.Create(content, prefetch->GetValue())) + { + // The factory cannot generate this item + continue; + } + + { + boost::mutex::scoped_lock lock(that->invalidatedMutex_); + if (that->invalidated_) + { + // This item has been invalidated + continue; + } + + { + boost::mutex::scoped_lock lock2(that->cacheMutex_); + that->cache_.Store(that->bundleIndex_, prefetch->GetValue(), content); + } + } + } + } + } + + + public: + Prefetcher(int bundleIndex, + ICacheFactory& factory, + CacheManager& cache, + boost::mutex& cacheMutex, + PrefetchQueue& queue) : + bundleIndex_(bundleIndex), + factory_(factory), + cache_(cache), + cacheMutex_(cacheMutex), + queue_(queue) + { + done_ = false; + thread_ = boost::thread(Worker, this); + } + + ~Prefetcher() + { + done_ = true; + if (thread_.joinable()) + { + thread_.join(); + } + } + + void SignalInvalidated(const std::string& item) + { + boost::mutex::scoped_lock lock(invalidatedMutex_); + + if (prefetching_ == item) + { + invalidated_ = true; + } + } + }; + + + + class CacheScheduler::BundleScheduler + { + private: + std::auto_ptr<ICacheFactory> factory_; + PrefetchQueue queue_; + std::vector<Prefetcher*> prefetchers_; + + public: + BundleScheduler(int bundleIndex, + ICacheFactory* factory, + CacheManager& cache, + boost::mutex& cacheMutex, + size_t numThreads, + size_t queueSize) : + factory_(factory), + queue_(queueSize) + { + prefetchers_.resize(numThreads, NULL); + + for (size_t i = 0; i < numThreads; i++) + { + prefetchers_[i] = new Prefetcher(bundleIndex, *factory_, cache, cacheMutex, queue_); + } + } + + ~BundleScheduler() + { + for (size_t i = 0; i < prefetchers_.size(); i++) + { + if (prefetchers_[i] != NULL) + delete prefetchers_[i]; + } + } + + void Invalidate(const std::string& item) + { + for (size_t i = 0; i < prefetchers_.size(); i++) + { + prefetchers_[i]->SignalInvalidated(item); + } + } + + void Prefetch(const std::string& item) + { + queue_.Enqueue(item); + } + + bool CallFactory(std::string& content, + const std::string& item) + { + content.clear(); + return factory_->Create(content, item); + } + }; + + + + CacheScheduler::BundleScheduler& CacheScheduler::GetBundleScheduler(unsigned int bundleIndex) + { + boost::mutex::scoped_lock lock(factoryMutex_); + + BundleSchedulers::iterator it = bundles_.find(bundleIndex); + if (it == bundles_.end()) + { + throw Orthanc::OrthancException("No factory associated with this bundle"); + } + + return *(it->second); + } + + + + CacheScheduler::CacheScheduler(CacheManager& cache, + unsigned int maxPrefetchSize) : + maxPrefetchSize_(maxPrefetchSize), + cache_(cache), + policy_(NULL) + { + } + + + CacheScheduler::~CacheScheduler() + { + for (BundleSchedulers::iterator it = bundles_.begin(); + it != bundles_.end(); it++) + { + delete it->second; + } + } + + + void CacheScheduler::Register(int bundle, + ICacheFactory* factory /* takes ownership */, + size_t numThreads) + { + boost::mutex::scoped_lock lock(factoryMutex_); + + BundleSchedulers::iterator it = bundles_.find(bundle); + if (it != bundles_.end()) + { + // This bundle is already registered + throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); + } + + bundles_[bundle] = new BundleScheduler(bundle, factory, cache_, cacheMutex_, numThreads, maxPrefetchSize_); + } + + + void CacheScheduler::Invalidate(int bundle, + const std::string& item) + { + { + boost::mutex::scoped_lock lock(cacheMutex_); + cache_.Invalidate(bundle, item); + } + + GetBundleScheduler(bundle).Invalidate(item); + } + + + void CacheScheduler::ApplyPrefetchPolicy(int bundle, + const std::string& item, + const std::string& content) + { + boost::recursive_mutex::scoped_lock lock(policyMutex_); + + if (policy_.get() != NULL) + { + std::list<CacheIndex> toPrefetch; + + { + policy_->Apply(toPrefetch, *this, CacheIndex(bundle, item), content); + } + + for (std::list<CacheIndex>::const_reverse_iterator + it = toPrefetch.rbegin(); it != toPrefetch.rend(); ++it) + { + Prefetch(it->GetBundle(), it->GetItem()); + } + } + } + + + bool CacheScheduler::Access(std::string& content, + int bundle, + const std::string& item) + { + bool existing; + + { + boost::mutex::scoped_lock lock(cacheMutex_); + existing = cache_.Access(content, bundle, item); + } + + if (existing) + { + ApplyPrefetchPolicy(bundle, item, content); + return true; + } + + if (!GetBundleScheduler(bundle).CallFactory(content, item)) + { + // This item cannot be generated by the factory + return false; + } + + { + boost::mutex::scoped_lock lock(cacheMutex_); + cache_.Store(bundle, item, content); + } + + ApplyPrefetchPolicy(bundle, item, content); + + return true; + } + + + void CacheScheduler::Prefetch(int bundle, + const std::string& item) + { + GetBundleScheduler(bundle).Prefetch(item); + } + + + void CacheScheduler::RegisterPolicy(IPrefetchPolicy* policy) + { + boost::recursive_mutex::scoped_lock lock(policyMutex_); + policy_.reset(policy); + } + +}