comparison Plugin/Cache/CacheScheduler.cpp @ 0:02f7a0400a91

initial commit
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 25 Feb 2015 13:45:35 +0100
parents
children ecefd45026bf
comparison
equal deleted inserted replaced
-1:000000000000 0:02f7a0400a91
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2015 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 *
6 * This program is free software: you can redistribute it and/or
7 * modify it under the terms of the GNU Affero General Public License
8 * as published by the Free Software Foundation, either version 3 of
9 * the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Affero General Public License for more details.
15 *
16 * You should have received a copy of the GNU Affero General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 **/
19
20
21 #include "CacheScheduler.h"
22
23 #include "CacheIndex.h"
24
25 #include "../../Orthanc/OrthancException.h"
26 #include <stdio.h>
27
28 namespace OrthancPlugins
29 {
30 class DynamicString : public Orthanc::IDynamicObject
31 {
32 private:
33 std::string value_;
34
35 public:
36 DynamicString(const std::string& value) : value_(value)
37 {
38 }
39
40 const std::string& GetValue() const
41 {
42 return value_;
43 }
44 };
45
46
47 class CacheScheduler::PrefetchQueue : public boost::noncopyable
48 {
49 private:
50 boost::mutex mutex_;
51 Orthanc::SharedMessageQueue queue_;
52 std::set<std::string> content_;
53
54 public:
55 PrefetchQueue(size_t maxSize) : queue_(maxSize)
56 {
57 queue_.SetLifoPolicy();
58 }
59
60 void Enqueue(const std::string& item)
61 {
62 boost::mutex::scoped_lock lock(mutex_);
63
64 if (content_.find(item) != content_.end())
65 {
66 // This cache index is already pending in the queue
67 return;
68 }
69
70 content_.insert(item);
71 queue_.Enqueue(new DynamicString(item));
72 }
73
74 DynamicString* Dequeue(int32_t msTimeout)
75 {
76 std::auto_ptr<Orthanc::IDynamicObject> message(queue_.Dequeue(msTimeout));
77 if (message.get() == NULL)
78 {
79 return NULL;
80 }
81
82 const DynamicString& index = dynamic_cast<const DynamicString&>(*message);
83
84 {
85 boost::mutex::scoped_lock lock(mutex_);
86 content_.erase(index.GetValue());
87 }
88
89 return dynamic_cast<DynamicString*>(message.release());
90 }
91 };
92
93
94 class CacheScheduler::Prefetcher : public boost::noncopyable
95 {
96 private:
97 int bundleIndex_;
98 ICacheFactory& factory_;
99 CacheManager& cache_;
100 boost::mutex& cacheMutex_;
101 PrefetchQueue& queue_;
102
103 bool done_;
104 boost::thread thread_;
105 boost::mutex invalidatedMutex_;
106 bool invalidated_;
107 std::string prefetching_;
108
109 static void Worker(Prefetcher* that)
110 {
111 while (!(that->done_))
112 {
113 std::auto_ptr<DynamicString> prefetch(that->queue_.Dequeue(500));
114
115 if (prefetch.get() != NULL)
116 {
117 {
118 boost::mutex::scoped_lock lock(that->invalidatedMutex_);
119 that->invalidated_ = false;
120 that->prefetching_ = prefetch->GetValue();
121 }
122
123 {
124 boost::mutex::scoped_lock lock(that->cacheMutex_);
125 if (that->cache_.IsCached(that->bundleIndex_, prefetch->GetValue()))
126 {
127 // This item is already cached
128 continue;
129 }
130 }
131
132 std::string content;
133 if (!that->factory_.Create(content, prefetch->GetValue()))
134 {
135 // The factory cannot generate this item
136 continue;
137 }
138
139 {
140 boost::mutex::scoped_lock lock(that->invalidatedMutex_);
141 if (that->invalidated_)
142 {
143 // This item has been invalidated
144 continue;
145 }
146
147 {
148 boost::mutex::scoped_lock lock2(that->cacheMutex_);
149 that->cache_.Store(that->bundleIndex_, prefetch->GetValue(), content);
150 }
151 }
152 }
153 }
154 }
155
156
157 public:
158 Prefetcher(int bundleIndex,
159 ICacheFactory& factory,
160 CacheManager& cache,
161 boost::mutex& cacheMutex,
162 PrefetchQueue& queue) :
163 bundleIndex_(bundleIndex),
164 factory_(factory),
165 cache_(cache),
166 cacheMutex_(cacheMutex),
167 queue_(queue)
168 {
169 done_ = false;
170 thread_ = boost::thread(Worker, this);
171 }
172
173 ~Prefetcher()
174 {
175 done_ = true;
176 if (thread_.joinable())
177 {
178 thread_.join();
179 }
180 }
181
182 void SignalInvalidated(const std::string& item)
183 {
184 boost::mutex::scoped_lock lock(invalidatedMutex_);
185
186 if (prefetching_ == item)
187 {
188 invalidated_ = true;
189 }
190 }
191 };
192
193
194
195 class CacheScheduler::BundleScheduler
196 {
197 private:
198 std::auto_ptr<ICacheFactory> factory_;
199 PrefetchQueue queue_;
200 std::vector<Prefetcher*> prefetchers_;
201
202 public:
203 BundleScheduler(int bundleIndex,
204 ICacheFactory* factory,
205 CacheManager& cache,
206 boost::mutex& cacheMutex,
207 size_t numThreads,
208 size_t queueSize) :
209 factory_(factory),
210 queue_(queueSize)
211 {
212 prefetchers_.resize(numThreads, NULL);
213
214 for (size_t i = 0; i < numThreads; i++)
215 {
216 prefetchers_[i] = new Prefetcher(bundleIndex, *factory_, cache, cacheMutex, queue_);
217 }
218 }
219
220 ~BundleScheduler()
221 {
222 for (size_t i = 0; i < prefetchers_.size(); i++)
223 {
224 if (prefetchers_[i] != NULL)
225 delete prefetchers_[i];
226 }
227 }
228
229 void Invalidate(const std::string& item)
230 {
231 for (size_t i = 0; i < prefetchers_.size(); i++)
232 {
233 prefetchers_[i]->SignalInvalidated(item);
234 }
235 }
236
237 void Prefetch(const std::string& item)
238 {
239 queue_.Enqueue(item);
240 }
241
242 bool CallFactory(std::string& content,
243 const std::string& item)
244 {
245 content.clear();
246 return factory_->Create(content, item);
247 }
248 };
249
250
251
252 CacheScheduler::BundleScheduler& CacheScheduler::GetBundleScheduler(unsigned int bundleIndex)
253 {
254 boost::mutex::scoped_lock lock(factoryMutex_);
255
256 BundleSchedulers::iterator it = bundles_.find(bundleIndex);
257 if (it == bundles_.end())
258 {
259 throw Orthanc::OrthancException("No factory associated with this bundle");
260 }
261
262 return *(it->second);
263 }
264
265
266
267 CacheScheduler::CacheScheduler(CacheManager& cache,
268 unsigned int maxPrefetchSize) :
269 maxPrefetchSize_(maxPrefetchSize),
270 cache_(cache),
271 policy_(NULL)
272 {
273 }
274
275
276 CacheScheduler::~CacheScheduler()
277 {
278 for (BundleSchedulers::iterator it = bundles_.begin();
279 it != bundles_.end(); it++)
280 {
281 delete it->second;
282 }
283 }
284
285
286 void CacheScheduler::Register(int bundle,
287 ICacheFactory* factory /* takes ownership */,
288 size_t numThreads)
289 {
290 boost::mutex::scoped_lock lock(factoryMutex_);
291
292 BundleSchedulers::iterator it = bundles_.find(bundle);
293 if (it != bundles_.end())
294 {
295 // This bundle is already registered
296 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
297 }
298
299 bundles_[bundle] = new BundleScheduler(bundle, factory, cache_, cacheMutex_, numThreads, maxPrefetchSize_);
300 }
301
302
303 void CacheScheduler::Invalidate(int bundle,
304 const std::string& item)
305 {
306 {
307 boost::mutex::scoped_lock lock(cacheMutex_);
308 cache_.Invalidate(bundle, item);
309 }
310
311 GetBundleScheduler(bundle).Invalidate(item);
312 }
313
314
315 void CacheScheduler::ApplyPrefetchPolicy(int bundle,
316 const std::string& item,
317 const std::string& content)
318 {
319 boost::recursive_mutex::scoped_lock lock(policyMutex_);
320
321 if (policy_.get() != NULL)
322 {
323 std::list<CacheIndex> toPrefetch;
324
325 {
326 policy_->Apply(toPrefetch, *this, CacheIndex(bundle, item), content);
327 }
328
329 for (std::list<CacheIndex>::const_reverse_iterator
330 it = toPrefetch.rbegin(); it != toPrefetch.rend(); ++it)
331 {
332 Prefetch(it->GetBundle(), it->GetItem());
333 }
334 }
335 }
336
337
338 bool CacheScheduler::Access(std::string& content,
339 int bundle,
340 const std::string& item)
341 {
342 bool existing;
343
344 {
345 boost::mutex::scoped_lock lock(cacheMutex_);
346 existing = cache_.Access(content, bundle, item);
347 }
348
349 if (existing)
350 {
351 ApplyPrefetchPolicy(bundle, item, content);
352 return true;
353 }
354
355 if (!GetBundleScheduler(bundle).CallFactory(content, item))
356 {
357 // This item cannot be generated by the factory
358 return false;
359 }
360
361 {
362 boost::mutex::scoped_lock lock(cacheMutex_);
363 cache_.Store(bundle, item, content);
364 }
365
366 ApplyPrefetchPolicy(bundle, item, content);
367
368 return true;
369 }
370
371
372 void CacheScheduler::Prefetch(int bundle,
373 const std::string& item)
374 {
375 GetBundleScheduler(bundle).Prefetch(item);
376 }
377
378
379 void CacheScheduler::RegisterPolicy(IPrefetchPolicy* policy)
380 {
381 boost::recursive_mutex::scoped_lock lock(policyMutex_);
382 policy_.reset(policy);
383 }
384
385 }