Package flumotion :: Package component :: Package misc :: Package httpserver :: Module cachemanager
[hide private]

Source Code for Module flumotion.component.misc.httpserver.cachemanager

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import errno 
 19  import os 
 20  import tempfile 
 21  import time 
 22  import stat 
 23   
 24  from twisted.internet import defer, threads, reactor, utils 
 25   
 26  from flumotion.common import log, common, python, errors 
 27  from flumotion.common import format as formatting 
 28   
 29  LOG_CATEGORY = "cache-manager" 
 30   
 31  DEFAULT_CACHE_SIZE = 1000 * 1024 * 1024 
 32  DEFAULT_CACHE_DIR = "/tmp/httpserver" 
 33  DEFAULT_CLEANUP_ENABLED = True 
 34  DEFAULT_CLEANUP_HIGH_WATERMARK = 1.0 
 35  DEFAULT_CLEANUP_LOW_WATERMARK = 0.6 
 36  ID_CACHE_MAX_SIZE = 1024 
 37  TEMP_FILE_POSTFIX = ".tmp" 
 38   
 39   
40 -class CacheManager(object, log.Loggable):
41 42 logCategory = LOG_CATEGORY 43
44 - def __init__(self, stats, 45 cacheDir = None, 46 cacheSize = None, 47 cleanupEnabled = None, 48 cleanupHighWatermark = None, 49 cleanupLowWatermark = None, 50 cacheRealm = None):
51 52 if cacheDir is None: 53 cacheDir = DEFAULT_CACHE_DIR 54 if cacheSize is None: 55 cacheSize = DEFAULT_CACHE_SIZE 56 if cleanupEnabled is None: 57 cleanupEnabled = DEFAULT_CLEANUP_ENABLED 58 if cleanupHighWatermark is None: 59 cleanupHighWatermark = DEFAULT_CLEANUP_HIGH_WATERMARK 60 if cleanupLowWatermark is None: 61 cleanupLowWatermark = DEFAULT_CLEANUP_LOW_WATERMARK 62 63 self.stats = stats 64 self._cacheDir = cacheDir 65 self._cacheSize = cacheSize # in bytes 66 self._cleanupEnabled = cleanupEnabled 67 highWatermark = max(0.0, min(1.0, float(cleanupHighWatermark))) 68 lowWatermark = max(0.0, min(1.0, float(cleanupLowWatermark))) 69 70 self._cachePrefix = (cacheRealm and (cacheRealm + ":")) or "" 71 72 self._identifiers = {} # {path: identifier} 73 74 self.info("Cache Manager initialized") 75 self.debug("Cache directory: '%s'", self._cacheDir) 76 self.debug("Cache size: %d bytes", self._cacheSize) 77 self.debug("Cache cleanup enabled: %s", self._cleanupEnabled) 78 79 common.ensureDir(self._cacheDir, "cache") 80 81 self._cacheUsage = None 82 self._cacheUsageLastUpdate = None 83 self._lastCacheTime = None 84 85 self._cacheMaxUsage = self._cacheSize * highWatermark # in bytes 86 self._cacheMinUsage = self._cacheSize * lowWatermark # in bytes
87
88 - def setUp(self):
89 """ 90 Initialize the cache manager 91 92 @return a defer 93 @raise: OSError or FlumotionError 94 """ 95 # Initialize cache usage 96 return self.updateCacheUsage()
97
98 - def getIdentifier(self, path):
99 """ 100 The returned identifier is a digest of the path encoded in hex string. 101 The hash function used is SHA1. 102 It caches the identifiers in a dictionary indexed by path and with 103 a maximum number of entry specified by the constant ID_CACHE_MAX_SIZE. 104 105 @return: an identifier for path. 106 """ 107 ident = self._identifiers.get(path, None) 108 if ident is None: 109 sha1Hash = python.sha1() 110 sha1Hash.update(self._cachePrefix + path) 111 ident = sha1Hash.digest().encode("hex").strip('\n') 112 # Prevent the cache from growing endlessly 113 if len(self._identifiers) >= ID_CACHE_MAX_SIZE: 114 self._identifiers.clear() 115 self._identifiers[path] = ident 116 return ident
117
118 - def getCachePath(self, path):
119 """ 120 @return: the cached file path for a path. 121 """ 122 ident = self.getIdentifier(path) 123 return os.path.join(self._cacheDir, ident)
124
125 - def getTempPath(self, path):
126 """ 127 @return: a temporary file path for a path. 128 129 Don't use this function, it's provided for compatibility. 130 Use newTempFile() instead. 131 """ 132 ident = self.getIdentifier(path) 133 return os.path.join(self._cacheDir, ident + TEMP_FILE_POSTFIX)
134
136 self.stats.onEstimateCacheUsage(self._cacheUsage, self._cacheSize)
137
138 - def _updateCacheUsage(self, usage):
139 self.log('Disk usage for path %r is %d bytes', self._cacheDir, usage) 140 self._cacheUsageLastUpdate = time.time() 141 self._cacheUsage = usage 142 self.updateCacheUsageStatistics() 143 return usage
144
145 - def updateCacheUsage(self):
146 """ 147 @return: a defered with the cache usage in bytes. 148 @raise: OSError or FlumotionError 149 """ 150 151 # Only calculate cache usage if the cache directory 152 # modification time changed since the last time we looked at it. 153 try: 154 cacheTime = os.path.getmtime(self._cacheDir) 155 except OSError, e: 156 return defer.fail(e) 157 158 if ((self._cacheUsage is None) or (self._lastCacheTime < cacheTime)): 159 self._lastCacheTime = cacheTime 160 self.log('Getting disk usage for path %r', self._cacheDir) 161 d = utils.getProcessOutput('du', ['-bs', self._cacheDir]) 162 d.addCallback(lambda o: int(o.split('\t', 1)[0])) 163 d.addCallback(self._updateCacheUsage) 164 return d 165 else: 166 return defer.succeed(self._cacheUsage)
167
168 - def _rmfiles(self, files):
169 try: 170 for path in files: 171 os.remove(path) 172 except OSError, e: 173 if e.errno != errno.ENOENT: 174 # TODO: is warning() thread safe? 175 self.warning("Error cleaning cached file: %s", str(e))
176
177 - def _setCacheUsage(self, _, usage):
178 # Update the cache usage 179 self._cacheUsage = usage 180 self._cacheUsageLastUpdate = time.time() 181 return usage
182
183 - def _cleanUp(self):
184 # Update cleanup statistics 185 self.stats.onCleanup() 186 # List the cached files with file state 187 try: 188 listdir = os.listdir(self._cacheDir) 189 except OSError, e: 190 return defer.fail(e) 191 192 files = [] 193 for f in listdir: 194 f = os.path.join(self._cacheDir, f) 195 # There's a possibility of getting an error on os.stat here. 196 try: 197 files.append((f, os.stat(f))) 198 except OSError, e: 199 if e.errno == errno.ENOENT: 200 pass 201 else: 202 return defer.fail(e) 203 204 # Calculate the cached file total size 205 usage = sum([d[1].st_size for d in files]) 206 # Delete the cached file starting by the oldest accessed ones 207 files.sort(key=lambda d: d[1].st_atime) 208 rmlist = [] 209 for path, info in files: 210 usage -= info.st_size 211 rmlist.append(path) 212 if usage <= self._cacheMinUsage: 213 # We reach the cleanup limit 214 self.debug('cleaned up, cache use is now %sbytes', 215 formatting.formatStorage(usage)) 216 break 217 d = threads.deferToThread(self._rmfiles, rmlist) 218 d.addBoth(self._setCacheUsage, usage) 219 return d
220
221 - def _allocateCacheSpaceAfterCleanUp(self, usage, size):
222 if (self._cacheUsage + size) >= self._cacheSize: 223 # There is not enough space, allocation failed 224 self.updateCacheUsageStatistics() 225 self.debug('not enough space in cache, ' 226 'cannot cache %d > %d' % 227 (self._cacheUsage + size, self._cacheSize)) 228 return None 229 230 # There is enough space to allocate, allocation succeed 231 self._cacheUsage += size 232 self.updateCacheUsageStatistics() 233 return (self._cacheUsageLastUpdate, size)
234
235 - def _allocateCacheSpace(self, usage, size):
236 if usage + size < self._cacheMaxUsage: 237 self._cacheUsage += size 238 self.updateCacheUsageStatistics() 239 return defer.succeed((self._cacheUsageLastUpdate, size)) 240 241 self.debug('cache usage will be %sbytes, need more cache', 242 formatting.formatStorage(usage + size)) 243 244 if not self._cleanupEnabled: 245 # No space available and cleanup disabled: allocation failed. 246 self.debug('not allowed to clean up cache, ' 247 'so cannot cache %d' % size) 248 return defer.succeed(None) 249 250 d = self._cleanUp() 251 d.addCallback(self._allocateCacheSpaceAfterCleanUp, size) 252 return d
253
254 - def allocateCacheSpace(self, size):
255 """ 256 Try to reserve cache space. 257 258 If there is not enough space and the cache cleanup is enabled, 259 it will delete files from the cache starting with the ones 260 with oldest access time until the cache usage drops below 261 the fraction specified by the property cleanup-low-threshold. 262 263 Returns a 'tag' that should be used to 'free' the cache space 264 using releaseCacheSpace. 265 This tag is needed to better estimate the cache usage, 266 if the cache usage has been updated since cache space 267 has been allocated, freeing up the space should not change 268 the cache usage estimation. 269 270 @param size: size to reserve, in bytes 271 @type size: int 272 273 @return: an allocation tag or None if the allocation failed. 274 @rtype: defer to tuple 275 """ 276 d = self.updateCacheUsage() 277 d.addCallback(self._allocateCacheSpace, size) 278 return d
279
280 - def releaseCacheSpace(self, tag):
281 """ 282 Low-level function to release reserved cache space. 283 """ 284 lastUpdate, size = tag 285 if lastUpdate == self._cacheUsageLastUpdate: 286 self._cacheUsage -= size 287 self.updateCacheUsageStatistics()
288
289 - def openCacheFile(self, path):
290 """ 291 @return: a defer to a CacheFile instance or None 292 """ 293 try: 294 return defer.succeed(CachedFile(self, path)) 295 except: 296 return defer.succeed(None)
297
298 - def _newTempFile(self, tag, path, size, mtime=None):
299 # if allocation fails 300 if tag is None: 301 return None 302 303 try: 304 return TempFile(self, path, tag, size, mtime) 305 except OSError: 306 return None
307
308 - def newTempFile(self, path, size, mtime=None):
309 """ 310 @return: a defer to a TempFile instance or None 311 """ 312 d = self.allocateCacheSpace(size) 313 d.addCallback(self._newTempFile, path, size, mtime) 314 return d
315 316
317 -class CachedFile:
318 """ 319 Read only. 320 321 See cachedprovider.py 322 @raise: OSError 323 """ 324
325 - def __init__(self, cachemgr, resPath):
326 cachedPath = cachemgr.getCachePath(resPath) 327 handle = open(cachedPath, 'rb') 328 stat = os.fstat(handle.fileno()) 329 330 cachemgr.log("Opened cached file %s [fd %d]", 331 cachedPath, handle.fileno()) 332 333 self.name = cachedPath 334 self.file = handle 335 self.stat = stat
336 349
350 - def __getattr__(self, name):
351 a = getattr(self.__dict__['file'], name) 352 if type(a) != type(0): 353 setattr(self, name, a) 354 return a
355 356
357 -class TempFile:
358 """ 359 See cachedprovider.py 360 """ 361
362 - def __init__(self, cachemgr, resPath, tag, size, mtime=None):
363 """ 364 @raise: OSError 365 """ 366 self.tag = tag 367 self.cachemgr = cachemgr 368 self._completed = False 369 self._finishPath = cachemgr.getCachePath(resPath) 370 self.mtime = mtime 371 self.file = None 372 self.size = size 373 374 fd, tempPath = tempfile.mkstemp(TEMP_FILE_POSTFIX, 375 LOG_CATEGORY, cachemgr._cacheDir) 376 cachemgr.log("Created temporary file '%s' [fd %d]", 377 tempPath, fd) 378 self.file = os.fdopen(fd, "w+b") 379 cachemgr.log("Truncating temporary file to size %d", size) 380 self.file.truncate(size) 381 self.stat = os.fstat(self.file.fileno()) 382 self.name = tempPath
383
384 - def __getattr__(self, name):
385 a = getattr(self.__dict__['file'], name) 386 if type(a) != type(0): 387 setattr(self, name, a) 388 return a
389
390 - def setModificationTime(self, mtime=None):
391 """ 392 Set file modification time. 393 """ 394 if (mtime): 395 self.mtime = mtime 396 try: 397 if self.mtime: 398 mtime = self.mtime 399 atime = int(time.time()) 400 self.cachemgr.log("Setting cache file " 401 "modification time to %d", mtime) 402 # FIXME: Should use futimes, but it's not wrapped by python 403 os.utime(self.name, (atime, mtime)) 404 except OSError, e: 405 if e.errno == errno.ENOENT: 406 self.cachemgr.releaseCacheSpace(self.tag) 407 else: 408 self.cachemgr.warning( 409 "Failed to update modification time of temporary " 410 "file: %s", log.getExceptionMessage(e))
411
412 - def close(self):
413 """ 414 @raise: OSError 415 """ 416 if self.cachemgr is None: 417 return 418 419 try: 420 if not self._completed: 421 self.cachemgr.log("Temporary file canceled '%s' [fd %d]", 422 self.name, self.fileno()) 423 self.cachemgr.releaseCacheSpace(self.tag) 424 os.unlink(self.name) 425 except OSError: 426 pass 427 428 self.file.close() 429 self.setModificationTime() 430 self.file = None 431 self.cachemgr = None
432
433 - def write(self, str):
434 """ 435 @raise: OSError 436 @raise: IOError 437 allocated size 438 """ 439 if (self.file.tell() + len(str) > self.size): 440 raise IOError("Cache size overrun (%d > %d)" % 441 (self.file.tell() + len(str), self.size)) 442 return self.file.write(str)
443
444 - def complete(self, checkSize=False):
445 """ 446 Make the temporary file available as a cached file. 447 Do NOT close the file, afterward the file can be used 448 as a normal CachedFile instance. 449 Do not raise exceptions on rename error. 450 451 @raise: IOError if checkSize and tell() != size 452 """ 453 if self.cachemgr is None: 454 return 455 if self._completed: 456 return 457 self._completed = True 458 459 _, size = self.tag 460 if (self.tell() != size and checkSize): 461 raise IOError("Did not reach end of file") 462 463 self.cachemgr.log("Temporary file completed '%s' [fd %d]", 464 self.name, self.fileno()) 465 try: 466 if self.mtime is not None: 467 mtime = os.path.getmtime(self._finishPath) 468 if mtime > self.mtime: 469 self.cachemgr.log("Did not complete(), " 470 "a more recent version exists already") 471 os.unlink(self.name) 472 self.name = self._finishPath 473 return 474 except OSError, e: 475 pass 476 477 try: 478 os.rename(self.name, self._finishPath) 479 except OSError, e: 480 if e.errno == errno.ENOENT: 481 self.cachemgr.releaseCacheSpace(self.tag) 482 self.cachemgr.warning( 483 "Failed to rename file '%s': %s" % 484 (self.name, str(e))) 485 return 486 487 self.setModificationTime() 488 489 self.name = self._finishPath 490 self.cachemgr.log("Temporary file renamed to '%s' [fd %d]", 491 self._finishPath, self.fileno())
492 493
494 -def main(argv=None):
495 # Functional tests 496 import random 497 498 CACHE_SIZE = 1 * 1024 * 1024 499 MAX_CLEANUPS = 512 500 501 class DummyStats: 502 503 def __init__(self): 504 self.oncleanup = 0
505 506 def info(): 507 pass 508 509 def onEstimateCacheUsage(self, usage, size): 510 #print "Stat: " + str(usage / (1024))\ 511 # + "k / " + str(size / (1024)) + "k" 512 pass 513 514 def onCleanup(self): 515 self.oncleanup += 1 516 print "OnCleanup" 517 518 def makeTemp(tag, size, m, name): 519 t = TempFile(m, name, tag, size) 520 return t 521 522 def completeAndClose(t): 523 try: 524 t.complete() 525 t.close() 526 except: 527 print "Got a complete exception" 528 529 def fillTestCache(manager): 530 i = 0 531 while (manager.stats.oncleanup < MAX_CLEANUPS): 532 i += 1 533 filesize = 4096 * random.randint(1, 30) 534 d = manager.newTempFile(str(i), filesize) 535 d.addCallback(completeAndClose) 536 537 def releaseCacheSpace(tag, m): 538 print "gotCacheSpace: ", tag 539 m.releaseCacheSpace(tag) 540 541 def checkUsage(usage, m, check): 542 if (not check(m._cacheUsage)): 543 print "Cache overrun!!! %d/%d" % (m._cacheUsage, m._cacheSize) 544 545 def openCacheAndClose(_, m, name): 546 d = m.openCacheFile(name) 547 d.addCallback(lambda f: f.close()) 548 return d 549 550 def checkMiss(_): 551 if (_ == "cacheMiss"): 552 return 553 raise errors.FlumotionError("an error") 554 555 def runTests(): 556 # low-level cache requests 557 d = m.allocateCacheSpace(1024) 558 d.addCallback(releaseCacheSpace, m) 559 d.addCallback(checkUsage, m, lambda u: u == 0) 560 561 d = m.allocateCacheSpace(CACHE_SIZE / 2) 562 d.addCallback(makeTemp, CACHE_SIZE / 2, m, "test") 563 d.addCallback(lambda t: t.close()) 564 d.addCallback(checkUsage, m, lambda u: u == 0) 565 566 d = m.allocateCacheSpace(CACHE_SIZE / 2) 567 d.addCallback(makeTemp, CACHE_SIZE / 2, m, "test2") 568 d.addCallback(completeAndClose) 569 d.addCallback(checkUsage, m, lambda u: u > 0) 570 571 # check hit and miss 572 m2 = CacheManager(DummyStats(), cachedir, CACHE_SIZE, True, 0.5, 0.3) 573 d = m2.newTempFile("test3", 12000) 574 d.addCallback(completeAndClose) 575 d.addCallback(openCacheAndClose, m, "test3") 576 577 d = openCacheAndClose(None, m, "test4_do_not_exists") 578 d.addErrback(lambda _: "cacheMiss") 579 d.addCallback(checkMiss) 580 581 # multi-thread test, full of races :) 582 threads.deferToThread(fillTestCache, m) 583 threads.deferToThread(fillTestCache, m) 584 threads.deferToThread(fillTestCache, m) 585 586 # check usage 587 m.updateCacheUsage().addCallback(checkUsage, m, 588 lambda u: u < CACHE_SIZE * 1.10) 589 590 591 cachedir = os.environ['HOME'] + "/tmp/cache" 592 m = CacheManager(DummyStats(), cachedir, CACHE_SIZE, True, 0.0, 0.0) 593 d = m.setUp() 594 595 m.addCallback(lambda x: runTests()) 596 597 reactor.callLater(3, reactor.stop) 598 reactor.run() 599 return 0 600 601 if __name__ == '__main__': 602 import sys 603 status = main() 604 sys.exit(status) 605