1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import errno
19 import os
20 import tempfile
21 import time
22 import stat
23
24 from twisted.internet import defer, threads, reactor, utils
25
26 from flumotion.common import log, common, python, errors
27 from flumotion.common import format as formatting
28
29 LOG_CATEGORY = "cache-manager"
30
31 DEFAULT_CACHE_SIZE = 1000 * 1024 * 1024
32 DEFAULT_CACHE_DIR = "/tmp/httpserver"
33 DEFAULT_CLEANUP_ENABLED = True
34 DEFAULT_CLEANUP_HIGH_WATERMARK = 1.0
35 DEFAULT_CLEANUP_LOW_WATERMARK = 0.6
36 ID_CACHE_MAX_SIZE = 1024
37 TEMP_FILE_POSTFIX = ".tmp"
38
39
41
42 logCategory = LOG_CATEGORY
43
44 - def __init__(self, stats,
45 cacheDir = None,
46 cacheSize = None,
47 cleanupEnabled = None,
48 cleanupHighWatermark = None,
49 cleanupLowWatermark = None,
50 cacheRealm = None):
51
52 if cacheDir is None:
53 cacheDir = DEFAULT_CACHE_DIR
54 if cacheSize is None:
55 cacheSize = DEFAULT_CACHE_SIZE
56 if cleanupEnabled is None:
57 cleanupEnabled = DEFAULT_CLEANUP_ENABLED
58 if cleanupHighWatermark is None:
59 cleanupHighWatermark = DEFAULT_CLEANUP_HIGH_WATERMARK
60 if cleanupLowWatermark is None:
61 cleanupLowWatermark = DEFAULT_CLEANUP_LOW_WATERMARK
62
63 self.stats = stats
64 self._cacheDir = cacheDir
65 self._cacheSize = cacheSize
66 self._cleanupEnabled = cleanupEnabled
67 highWatermark = max(0.0, min(1.0, float(cleanupHighWatermark)))
68 lowWatermark = max(0.0, min(1.0, float(cleanupLowWatermark)))
69
70 self._cachePrefix = (cacheRealm and (cacheRealm + ":")) or ""
71
72 self._identifiers = {}
73
74 self.info("Cache Manager initialized")
75 self.debug("Cache directory: '%s'", self._cacheDir)
76 self.debug("Cache size: %d bytes", self._cacheSize)
77 self.debug("Cache cleanup enabled: %s", self._cleanupEnabled)
78
79 common.ensureDir(self._cacheDir, "cache")
80
81 self._cacheUsage = None
82 self._cacheUsageLastUpdate = None
83 self._lastCacheTime = None
84
85 self._cacheMaxUsage = self._cacheSize * highWatermark
86 self._cacheMinUsage = self._cacheSize * lowWatermark
87
89 """
90 Initialize the cache manager
91
92 @return a defer
93 @raise: OSError or FlumotionError
94 """
95
96 return self.updateCacheUsage()
97
99 """
100 The returned identifier is a digest of the path encoded in hex string.
101 The hash function used is SHA1.
102 It caches the identifiers in a dictionary indexed by path and with
103 a maximum number of entry specified by the constant ID_CACHE_MAX_SIZE.
104
105 @return: an identifier for path.
106 """
107 ident = self._identifiers.get(path, None)
108 if ident is None:
109 sha1Hash = python.sha1()
110 sha1Hash.update(self._cachePrefix + path)
111 ident = sha1Hash.digest().encode("hex").strip('\n')
112
113 if len(self._identifiers) >= ID_CACHE_MAX_SIZE:
114 self._identifiers.clear()
115 self._identifiers[path] = ident
116 return ident
117
119 """
120 @return: the cached file path for a path.
121 """
122 ident = self.getIdentifier(path)
123 return os.path.join(self._cacheDir, ident)
124
126 """
127 @return: a temporary file path for a path.
128
129 Don't use this function, it's provided for compatibility.
130 Use newTempFile() instead.
131 """
132 ident = self.getIdentifier(path)
133 return os.path.join(self._cacheDir, ident + TEMP_FILE_POSTFIX)
134
137
144
146 """
147 @return: a defered with the cache usage in bytes.
148 @raise: OSError or FlumotionError
149 """
150
151
152
153 try:
154 cacheTime = os.path.getmtime(self._cacheDir)
155 except OSError, e:
156 return defer.fail(e)
157
158 if ((self._cacheUsage is None) or (self._lastCacheTime < cacheTime)):
159 self._lastCacheTime = cacheTime
160 self.log('Getting disk usage for path %r', self._cacheDir)
161 d = utils.getProcessOutput('du', ['-bs', self._cacheDir])
162 d.addCallback(lambda o: int(o.split('\t', 1)[0]))
163 d.addCallback(self._updateCacheUsage)
164 return d
165 else:
166 return defer.succeed(self._cacheUsage)
167
169 try:
170 for path in files:
171 os.remove(path)
172 except OSError, e:
173 if e.errno != errno.ENOENT:
174
175 self.warning("Error cleaning cached file: %s", str(e))
176
178
179 self._cacheUsage = usage
180 self._cacheUsageLastUpdate = time.time()
181 return usage
182
184
185 self.stats.onCleanup()
186
187 try:
188 listdir = os.listdir(self._cacheDir)
189 except OSError, e:
190 return defer.fail(e)
191
192 files = []
193 for f in listdir:
194 f = os.path.join(self._cacheDir, f)
195
196 try:
197 files.append((f, os.stat(f)))
198 except OSError, e:
199 if e.errno == errno.ENOENT:
200 pass
201 else:
202 return defer.fail(e)
203
204
205 usage = sum([d[1].st_size for d in files])
206
207 files.sort(key=lambda d: d[1].st_atime)
208 rmlist = []
209 for path, info in files:
210 usage -= info.st_size
211 rmlist.append(path)
212 if usage <= self._cacheMinUsage:
213
214 self.debug('cleaned up, cache use is now %sbytes',
215 formatting.formatStorage(usage))
216 break
217 d = threads.deferToThread(self._rmfiles, rmlist)
218 d.addBoth(self._setCacheUsage, usage)
219 return d
220
234
253
255 """
256 Try to reserve cache space.
257
258 If there is not enough space and the cache cleanup is enabled,
259 it will delete files from the cache starting with the ones
260 with oldest access time until the cache usage drops below
261 the fraction specified by the property cleanup-low-threshold.
262
263 Returns a 'tag' that should be used to 'free' the cache space
264 using releaseCacheSpace.
265 This tag is needed to better estimate the cache usage,
266 if the cache usage has been updated since cache space
267 has been allocated, freeing up the space should not change
268 the cache usage estimation.
269
270 @param size: size to reserve, in bytes
271 @type size: int
272
273 @return: an allocation tag or None if the allocation failed.
274 @rtype: defer to tuple
275 """
276 d = self.updateCacheUsage()
277 d.addCallback(self._allocateCacheSpace, size)
278 return d
279
281 """
282 Low-level function to release reserved cache space.
283 """
284 lastUpdate, size = tag
285 if lastUpdate == self._cacheUsageLastUpdate:
286 self._cacheUsage -= size
287 self.updateCacheUsageStatistics()
288
297
307
315
316
318 """
319 Read only.
320
321 See cachedprovider.py
322 @raise: OSError
323 """
324
326 cachedPath = cachemgr.getCachePath(resPath)
327 handle = open(cachedPath, 'rb')
328 stat = os.fstat(handle.fileno())
329
330 cachemgr.log("Opened cached file %s [fd %d]",
331 cachedPath, handle.fileno())
332
333 self.name = cachedPath
334 self.file = handle
335 self.stat = stat
336
338 """
339 Delete the cached file from filesystem, unless the current
340 file is more recent. However, this is not done atomically...
341 """
342 try:
343 s = os.stat(self.name)
344 if (s[stat.ST_MTIME] > self.stat[stat.ST_MTIME]):
345 return
346 os.unlink(self.name)
347 except OSError:
348 pass
349
351 a = getattr(self.__dict__['file'], name)
352 if type(a) != type(0):
353 setattr(self, name, a)
354 return a
355
356
358 """
359 See cachedprovider.py
360 """
361
362 - def __init__(self, cachemgr, resPath, tag, size, mtime=None):
363 """
364 @raise: OSError
365 """
366 self.tag = tag
367 self.cachemgr = cachemgr
368 self._completed = False
369 self._finishPath = cachemgr.getCachePath(resPath)
370 self.mtime = mtime
371 self.file = None
372 self.size = size
373
374 fd, tempPath = tempfile.mkstemp(TEMP_FILE_POSTFIX,
375 LOG_CATEGORY, cachemgr._cacheDir)
376 cachemgr.log("Created temporary file '%s' [fd %d]",
377 tempPath, fd)
378 self.file = os.fdopen(fd, "w+b")
379 cachemgr.log("Truncating temporary file to size %d", size)
380 self.file.truncate(size)
381 self.stat = os.fstat(self.file.fileno())
382 self.name = tempPath
383
385 a = getattr(self.__dict__['file'], name)
386 if type(a) != type(0):
387 setattr(self, name, a)
388 return a
389
391 """
392 Set file modification time.
393 """
394 if (mtime):
395 self.mtime = mtime
396 try:
397 if self.mtime:
398 mtime = self.mtime
399 atime = int(time.time())
400 self.cachemgr.log("Setting cache file "
401 "modification time to %d", mtime)
402
403 os.utime(self.name, (atime, mtime))
404 except OSError, e:
405 if e.errno == errno.ENOENT:
406 self.cachemgr.releaseCacheSpace(self.tag)
407 else:
408 self.cachemgr.warning(
409 "Failed to update modification time of temporary "
410 "file: %s", log.getExceptionMessage(e))
411
413 """
414 @raise: OSError
415 """
416 if self.cachemgr is None:
417 return
418
419 try:
420 if not self._completed:
421 self.cachemgr.log("Temporary file canceled '%s' [fd %d]",
422 self.name, self.fileno())
423 self.cachemgr.releaseCacheSpace(self.tag)
424 os.unlink(self.name)
425 except OSError:
426 pass
427
428 self.file.close()
429 self.setModificationTime()
430 self.file = None
431 self.cachemgr = None
432
434 """
435 @raise: OSError
436 @raise: IOError
437 allocated size
438 """
439 if (self.file.tell() + len(str) > self.size):
440 raise IOError("Cache size overrun (%d > %d)" %
441 (self.file.tell() + len(str), self.size))
442 return self.file.write(str)
443
445 """
446 Make the temporary file available as a cached file.
447 Do NOT close the file, afterward the file can be used
448 as a normal CachedFile instance.
449 Do not raise exceptions on rename error.
450
451 @raise: IOError if checkSize and tell() != size
452 """
453 if self.cachemgr is None:
454 return
455 if self._completed:
456 return
457 self._completed = True
458
459 _, size = self.tag
460 if (self.tell() != size and checkSize):
461 raise IOError("Did not reach end of file")
462
463 self.cachemgr.log("Temporary file completed '%s' [fd %d]",
464 self.name, self.fileno())
465 try:
466 if self.mtime is not None:
467 mtime = os.path.getmtime(self._finishPath)
468 if mtime > self.mtime:
469 self.cachemgr.log("Did not complete(), "
470 "a more recent version exists already")
471 os.unlink(self.name)
472 self.name = self._finishPath
473 return
474 except OSError, e:
475 pass
476
477 try:
478 os.rename(self.name, self._finishPath)
479 except OSError, e:
480 if e.errno == errno.ENOENT:
481 self.cachemgr.releaseCacheSpace(self.tag)
482 self.cachemgr.warning(
483 "Failed to rename file '%s': %s" %
484 (self.name, str(e)))
485 return
486
487 self.setModificationTime()
488
489 self.name = self._finishPath
490 self.cachemgr.log("Temporary file renamed to '%s' [fd %d]",
491 self._finishPath, self.fileno())
492
493
494 -def main(argv=None):
495
496 import random
497
498 CACHE_SIZE = 1 * 1024 * 1024
499 MAX_CLEANUPS = 512
500
501 class DummyStats:
502
503 def __init__(self):
504 self.oncleanup = 0
505
506 def info():
507 pass
508
509 def onEstimateCacheUsage(self, usage, size):
510
511
512 pass
513
514 def onCleanup(self):
515 self.oncleanup += 1
516 print "OnCleanup"
517
518 def makeTemp(tag, size, m, name):
519 t = TempFile(m, name, tag, size)
520 return t
521
522 def completeAndClose(t):
523 try:
524 t.complete()
525 t.close()
526 except:
527 print "Got a complete exception"
528
529 def fillTestCache(manager):
530 i = 0
531 while (manager.stats.oncleanup < MAX_CLEANUPS):
532 i += 1
533 filesize = 4096 * random.randint(1, 30)
534 d = manager.newTempFile(str(i), filesize)
535 d.addCallback(completeAndClose)
536
537 def releaseCacheSpace(tag, m):
538 print "gotCacheSpace: ", tag
539 m.releaseCacheSpace(tag)
540
541 def checkUsage(usage, m, check):
542 if (not check(m._cacheUsage)):
543 print "Cache overrun!!! %d/%d" % (m._cacheUsage, m._cacheSize)
544
545 def openCacheAndClose(_, m, name):
546 d = m.openCacheFile(name)
547 d.addCallback(lambda f: f.close())
548 return d
549
550 def checkMiss(_):
551 if (_ == "cacheMiss"):
552 return
553 raise errors.FlumotionError("an error")
554
555 def runTests():
556
557 d = m.allocateCacheSpace(1024)
558 d.addCallback(releaseCacheSpace, m)
559 d.addCallback(checkUsage, m, lambda u: u == 0)
560
561 d = m.allocateCacheSpace(CACHE_SIZE / 2)
562 d.addCallback(makeTemp, CACHE_SIZE / 2, m, "test")
563 d.addCallback(lambda t: t.close())
564 d.addCallback(checkUsage, m, lambda u: u == 0)
565
566 d = m.allocateCacheSpace(CACHE_SIZE / 2)
567 d.addCallback(makeTemp, CACHE_SIZE / 2, m, "test2")
568 d.addCallback(completeAndClose)
569 d.addCallback(checkUsage, m, lambda u: u > 0)
570
571
572 m2 = CacheManager(DummyStats(), cachedir, CACHE_SIZE, True, 0.5, 0.3)
573 d = m2.newTempFile("test3", 12000)
574 d.addCallback(completeAndClose)
575 d.addCallback(openCacheAndClose, m, "test3")
576
577 d = openCacheAndClose(None, m, "test4_do_not_exists")
578 d.addErrback(lambda _: "cacheMiss")
579 d.addCallback(checkMiss)
580
581
582 threads.deferToThread(fillTestCache, m)
583 threads.deferToThread(fillTestCache, m)
584 threads.deferToThread(fillTestCache, m)
585
586
587 m.updateCacheUsage().addCallback(checkUsage, m,
588 lambda u: u < CACHE_SIZE * 1.10)
589
590
591 cachedir = os.environ['HOME'] + "/tmp/cache"
592 m = CacheManager(DummyStats(), cachedir, CACHE_SIZE, True, 0.0, 0.0)
593 d = m.setUp()
594
595 m.addCallback(lambda x: runTests())
596
597 reactor.callLater(3, reactor.stop)
598 reactor.run()
599 return 0
600
601 if __name__ == '__main__':
602 import sys
603 status = main()
604 sys.exit(status)
605