Package flumotion :: Package component :: Package misc :: Package httpserver :: Module cachedprovider
[hide private]

Source Code for Module flumotion.component.misc.httpserver.cachedprovider

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import errno 
 19  import os 
 20  import stat 
 21  import tempfile 
 22  import threading 
 23  import time 
 24   
 25  from twisted.internet import defer, reactor, threads, abstract 
 26   
 27  from flumotion.common import log, common 
 28  from flumotion.component.misc.httpserver import cachestats 
 29  from flumotion.component.misc.httpserver import cachemanager 
 30  from flumotion.component.misc.httpserver import fileprovider 
 31  from flumotion.component.misc.httpserver import localpath 
 32  from flumotion.component.misc.httpserver.fileprovider import FileClosedError 
 33  from flumotion.component.misc.httpserver.fileprovider import FileError 
 34  from flumotion.component.misc.httpserver.fileprovider import NotFoundError 
 35   
 36   
 37  SEEK_SET = 0 # os.SEEK_SET is not defined in python 2.4 
 38  FILE_COPY_BUFFER_SIZE = abstract.FileDescriptor.bufferSize 
 39  MAX_LOGNAME_SIZE = 30 # maximum number of characters to use for logging a path 
 40   
 41   
 42  LOG_CATEGORY = "fileprovider-localcached" 
 43   
 44   
 45  errnoLookup = {errno.ENOENT: fileprovider.NotFoundError, 
 46                 errno.EISDIR: fileprovider.CannotOpenError, 
 47                 errno.EACCES: fileprovider.AccessError} 
 48   
 49   
50 -def open_stat(path, mode='rb'):
51 """ 52 @rtype: (file, statinfo) 53 """ 54 try: 55 handle = open(path, mode) 56 fd = handle.fileno() 57 except IOError, e: 58 cls = errnoLookup.get(e.errno, fileprovider.FileError) 59 raise cls("Failed to open file '%s': %s" % (path, str(e))) 60 try: 61 info = os.fstat(fd) 62 except OSError, e: 63 handle.close() 64 cls = errnoLookup.get(e.errno, fileprovider.FileError) 65 raise cls("Failed to stat file '%s': %s" % (path, str(e))) 66 return handle, info
67 68
69 -class FileProviderLocalCachedPlug(fileprovider.FileProviderPlug, 70 log.Loggable):
71 """ 72 73 WARNING: Currently does not work properly in combination with rate-control. 74 75 I'm caching the files taken from a mounted 76 network file system to a shared local directory. 77 Multiple instances can share the same cache directory, 78 but it's recommended to use slightly different values 79 for the property cleanup-high-watermark. 80 I'm using the directory access time to know when 81 the cache usage changed and keep an estimation 82 of the cache usage for statistics. 83 84 I'm creating a unique thread to do the file copying block by block, 85 for all files to be copied to the cache. 86 Using a thread instead of a reactor.callLater 'loop' allow for 87 higher copy throughput and do not slow down the mail loop when 88 lots of files are copied at the same time. 89 Simulations with real request logs show that using a thread 90 gives better results than the equivalent asynchronous implementation. 91 """ 92 93 logCategory = LOG_CATEGORY 94
95 - def __init__(self, args):
96 props = args['properties'] 97 self._sourceDir = props.get('path') 98 cacheDir = props.get('cache-dir') 99 cacheSizeInMB = props.get('cache-size') 100 if cacheSizeInMB is not None: 101 cacheSize = cacheSizeInMB * 10 ** 6 # in bytes 102 else: 103 cacheSize = None 104 cleanupEnabled = props.get('cleanup-enabled') 105 cleanupHighWatermark = props.get('cleanup-high-watermark') 106 cleanupLowWatermark = props.get('cleanup-low-watermark') 107 108 self._sessions = {} # {CopySession: None} 109 self._index = {} # {path: CopySession} 110 111 self.stats = cachestats.CacheStatistics() 112 113 self.cache = cachemanager.CacheManager(self.stats, 114 cacheDir, cacheSize, 115 cleanupEnabled, 116 cleanupHighWatermark, 117 cleanupLowWatermark) 118 119 common.ensureDir(self._sourceDir, "source") 120 121 # Startup copy thread 122 self._thread = CopyThread(self)
123
124 - def start(self, component):
125 self.debug('Starting cachedprovider plug for component %r', component) 126 d = self.cache.setUp() 127 d.addCallback(lambda x: self._thread.start()) 128 return d
129
130 - def stop(self, component):
131 self.debug('Stopping cachedprovider plug for component %r', component) 132 self._thread.stop() 133 dl = [] 134 for s in self._index.values(): 135 d = s.close() 136 if d: 137 dl.append(d) 138 if len(dl) != 0: 139 return defer.DeferredList(dl)
140
141 - def startStatsUpdates(self, updater):
142 #FIXME: This is temporary. Should be done with plug UI. 143 # Used for the UI to know which plug is used 144 updater.update("provider-name", "fileprovider-localcached") 145 self.stats.startUpdates(updater)
146
147 - def stopStatsUpdates(self):
148 self.stats.stopUpdates()
149
150 - def getRootPath(self):
151 if self._sourceDir is None: 152 return None 153 return LocalPath(self, self._sourceDir)
154 155 156 ## Protected Methods ## 157
158 - def getLogName(self, path, id=None):
159 """ 160 Returns a log name for a path, shortened to a maximum size 161 specified by the global variable MAX_LOGNAME_SIZE. 162 The log name will be the filename part of the path postfixed 163 by the id in brackets if id is not None. 164 """ 165 filename = os.path.basename(path) 166 basename, postfix = os.path.splitext(filename) 167 if id is not None: 168 postfix += "[%s]" % id 169 prefixMaxLen = MAX_LOGNAME_SIZE - len(postfix) 170 if len(basename) > prefixMaxLen: 171 basename = basename[:prefixMaxLen-1] + "*" 172 return basename + postfix
173
174 - def getCopySession(self, path):
175 return self._index.get(path, None)
176
177 - def createCopySession(self, path, file, info):
178 # First outdate existing session for the path 179 self.outdateCopySession(path) 180 # Then create a new one 181 session = CopySession(self, path, file, info) 182 self._index[path] = session 183 return session
184
185 - def outdateCopySession(self, path):
186 session = self._index.get(path, None) 187 if session is not None: 188 session.outdate()
189
190 - def removeCopySession(self, session):
191 path = session.sourcePath 192 if path in self._index: 193 del self._index[path] 194 self.disableSession(session)
195
196 - def activateSession(self, session):
197 self.debug("Starting Copy Session '%s' (%d)", 198 session.logName, len(self._sessions)) 199 if session in self._sessions: 200 return 201 self._sessions[session] = None 202 self._activateCopyLoop()
203
204 - def disableSession(self, session):
205 self.debug("Stopping Copy Session '%s' (%d)", 206 session.logName, len(self._sessions)) 207 if session in self._sessions: 208 del self._sessions[session] 209 if not self._sessions: 210 self._disableCopyLoop()
211
212 - def _activateCopyLoop(self):
213 self._thread.wakeup()
214
215 - def _disableCopyLoop(self):
216 self._thread.sleep()
217 218
219 -class LocalPath(localpath.LocalPath, log.Loggable):
220 221 logCategory = LOG_CATEGORY 222
223 - def __init__(self, plug, path):
224 localpath.LocalPath.__init__(self, path) 225 self.logName = plug.getLogName(path) 226 self.plug = plug
227
228 - def child(self, name):
229 childpath = self._getChildPath(name) 230 return LocalPath(self.plug, childpath)
231
232 - def open(self):
233 f = CachedFile(self.plug, self._path, self.mimeType) 234 return f.open()
235 236 237 ## Private Methods ## 238
239 - def _removeCachedFile(self, sourcePath):
240 cachePath = self.plug.cache.getCachePath(sourcePath) 241 try: 242 os.remove(cachePath) 243 self.debug("Deleted cached file '%s'", cachePath) 244 except OSError, e: 245 if e.errno != errno.ENOENT: 246 self.warning("Error deleting file: %s", str(e))
247 248
249 -class CopyThread(threading.Thread, log.Loggable):
250 251 logCategory = LOG_CATEGORY 252
253 - def __init__(self, plug):
254 threading.Thread.__init__(self) 255 self.plug = plug 256 self._running = True 257 self._event = threading.Event()
258
259 - def stop(self):
260 self._running = False 261 self._event.set() 262 self.join()
263
264 - def wakeup(self):
265 self._event.set()
266
267 - def sleep(self):
268 self._event.clear()
269
270 - def run(self):
271 while self._running: 272 sessions = self.plug._sessions.keys() 273 for session in sessions: 274 try: 275 session.doServe() 276 except Exception, e: 277 log.warning("Error during async file serving: %s", 278 log.getExceptionMessage(e)) 279 try: 280 session.doCopy() 281 except Exception, e: 282 log.warning("Error during file copy: %s", 283 log.getExceptionMessage(e)) 284 self._event.wait()
285 286
287 -class CopySessionCancelled(Exception):
288 pass
289 290
291 -class CopySession(log.Loggable):
292 """ 293 I'm serving a file at the same time I'm copying it 294 from the network file system to the cache. 295 If the client ask for data not yet copied, the source file 296 read operation is delegated the the copy thread as an asynchronous 297 operation because file seeking/reading is not thread safe. 298 299 The copy session have to open two times the temporary file, 300 one for read-only and one for write only, 301 because closing a read/write file change the modification time. 302 We want the modification time to be set to a known value 303 when the copy is finished even keeping read access to the file. 304 305 The session manage a reference counter to know how many TempFileDelegate 306 instances are using the session to delegate read operations. 307 This is done for two reasons: 308 - To avoid circular references by have the session manage 309 a list of delegate instances. 310 - If not cancelled, sessions should not be deleted 311 when no delegates reference them anymore. So weakref cannot be used. 312 """ 313 314 logCategory = LOG_CATEGORY 315
316 - def __init__(self, plug, sourcePath, sourceFile, sourceInfo):
317 self.plug = plug 318 self.logName = plug.getLogName(sourcePath, sourceFile.fileno()) 319 self.copying = None # Not yet started 320 self.sourcePath = sourcePath 321 self.tempPath = plug.cache.getTempPath(sourcePath) 322 self.cachePath = plug.cache.getCachePath(sourcePath) 323 # The size and modification time is not supposed to change over time 324 self.mtime = sourceInfo[stat.ST_MTIME] 325 self.size = sourceInfo[stat.ST_SIZE] 326 self._sourceFile = sourceFile 327 self._cancelled = False # True when a session has been outdated 328 self._wTempFile = None 329 self._rTempFile = None 330 self._allocTag = None # Tag used to identify cache allocations 331 self._waitCancel = None 332 # List of the pending read from source file 333 self._pending = [] # [(position, size, defer),] 334 self._refCount = 0 335 self._copied = 0 # None when the file is fully copied 336 self._correction = 0 # Used to take into account copies data for stats 337 self._startCopyingDefer = self._startCopying()
338
339 - def outdate(self):
340 self.log("Copy session outdated") 341 self._cancelSession()
342
343 - def read(self, position, size, stats):
344 # If the temporary file is open for reading 345 if self._rTempFile: 346 # And the needed data is already downloaded 347 # Safe to read because it's not used by the copy thread 348 if (self._copied is None) or ((position + size) <= self._copied): 349 try: 350 self._rTempFile.seek(position) 351 data = self._rTempFile.read(size) 352 # Adjust the cache/source values to take copy into account 353 size = len(data) 354 # It's safe to use and modify self._correction even if 355 # it's used by the copy thread because the copy thread 356 # only add and the main thread only subtract. 357 # The only thing that could append it's a less accurate 358 # correction... 359 diff = min(self._correction, size) 360 self._correction -= diff 361 stats.onBytesRead(0, size, diff) 362 return data 363 except Exception, e: 364 self.warning("Failed to read from temporary file: %s", 365 log.getExceptionMessage(e)) 366 self._cancelSession() 367 # If the source file is not open anymore, we can't continue 368 if self._sourceFile is None: 369 raise FileError("File caching error, cannot proceed") 370 # Otherwise read the data directly from the source 371 try: 372 # It's safe to not use Lock, because simple type operations 373 # are thread safe, and even if the copying state change 374 # from True to False, _onCopyFinished will be called 375 # later in the same thread and will process pending reads. 376 if self.copying: 377 # If we are currently copying the source file, 378 # we defer the file read to the copying thread 379 # because we can't read a file from two threads. 380 d = defer.Deferred() 381 382 def updateStats(data): 383 stats.onBytesRead(len(data), 0, 0) 384 return data
385 386 d.addCallback(updateStats) 387 self._pending.append((position, size, d)) 388 return d 389 # Not copying, it's safe to read directly 390 self._sourceFile.seek(position) 391 data = self._sourceFile.read(size) 392 stats.onBytesRead(len(data), 0, 0) 393 return data 394 except IOError, e: 395 cls = errnoLookup.get(e.errno, FileError) 396 raise cls("Failed to read source file: %s" % str(e))
397
398 - def incRef(self):
399 self._refCount += 1
400
401 - def decRef(self):
402 self._refCount -= 1 403 # If there is only one client and the session has been cancelled, 404 # stop copying and and serve the source file directly 405 if (self._refCount == 1) and self._cancelled: 406 # Cancel the copy and close the writing temporary file. 407 self._cancelCopy(False, True) 408 # We close if the copy is finished (if _copied is None) 409 if (self._refCount == 0) and (self._copied is None): 410 self.close()
411
412 - def _close(self):
413 self.log("Closing copy session") 414 # Cancel the copy, close the source file and the writing temp file. 415 self._cancelCopy(True, True) 416 self._closeReadTempFile() 417 self.plug.removeCopySession(self) 418 self.plug = None
419
420 - def close(self):
421 if self._startCopyingDefer: 422 d = self._startCopyingDefer 423 self._startCopyingDefer = None 424 d.addCallback(lambda _: self._close()) 425 return d
426
427 - def doServe(self):
428 if not (self.copying and self._pending): 429 # Nothing to do anymore. 430 return False 431 # We have pending source file read operations 432 position, size, d = self._pending.pop(0) 433 self._sourceFile.seek(position) 434 data = self._sourceFile.read(size) 435 # Call the deferred in the main thread 436 reactor.callFromThread(d.callback, data) 437 return len(self._pending) > 0
438
439 - def doCopy(self):
440 # Called in the copy thread context. 441 if not self.copying: 442 # Nothing to do anymore. 443 return False 444 # Copy a buffer from the source file to the temporary writing file 445 cont = True 446 try: 447 # It's safe to use self._copied, because it's only set 448 # by the copy thread during copy. 449 self._sourceFile.seek(self._copied) 450 self._wTempFile.seek(self._copied) 451 data = self._sourceFile.read(FILE_COPY_BUFFER_SIZE) 452 self._wTempFile.write(data) 453 self._wTempFile.flush() 454 except IOError, e: 455 self.warning("Failed to copy source file: %s", 456 log.getExceptionMessage(e)) 457 # Abort copy and cancel the session 458 self.copying = False 459 reactor.callFromThread(self.plug.disableSession, self) 460 reactor.callFromThread(self._cancelSession) 461 # Do not continue 462 cont = False 463 else: 464 size = len(data) 465 self._copied += size 466 self._correction += size 467 if size < FILE_COPY_BUFFER_SIZE: 468 # Stop copying 469 self.copying = False 470 reactor.callFromThread(self.plug.disableSession, self) 471 reactor.callFromThread(self._onCopyFinished) 472 cont = False 473 # Check for cancellation 474 if self._waitCancel and self.copying: 475 # Copy has been cancelled 476 self.copying = False 477 reactor.callFromThread(self.plug.disableSession, self) 478 reactor.callFromThread(self._onCopyCancelled, *self._waitCancel) 479 return False 480 return cont
481 482 483 ## Private Methods ## 484
485 - def _allocCacheSpace(self):
486 # Retrieve a cache allocation tag, used to track the cache free space 487 return self.plug.cache.allocateCacheSpace(self.size)
488
489 - def _releaseCacheSpace(self):
490 if not (self._cancelled or self._allocTag is None): 491 self.plug.cache.releaseCacheSpace(self._allocTag) 492 self._allocTag = None
493
494 - def _cancelSession(self):#
495 if not self._cancelled: 496 self.log("Canceling copy session") 497 # Not a valid copy session anymore 498 self._cancelled = True 499 # If there is no more than 1 client using the session, 500 # stop copying and and serve the source file directly 501 if self._refCount <= 1: 502 # Cancel and close the temp write file. 503 self._cancelCopy(False, True) 504
505 - def _gotCacheSpace(self, tag):
506 self._allocTag = tag 507 508 if not tag: 509 # No free space, proxying source file directly 510 self._cancelSession() 511 return 512 self.plug.stats.onCopyStarted() 513 # Then open a transient temporary files 514 try: 515 fd, transientPath = tempfile.mkstemp(".tmp", LOG_CATEGORY) 516 self.log("Created transient file '%s'", transientPath) 517 self._wTempFile = os.fdopen(fd, "wb") 518 self.log("Opened temporary file for writing [fd %d]", 519 self._wTempFile.fileno()) 520 self._rTempFile = file(transientPath, "rb") 521 self.log("Opened temporary file for reading [fd %d]", 522 self._rTempFile.fileno()) 523 except IOError, e: 524 self.warning("Failed to open temporary file: %s", 525 log.getExceptionMessage(e)) 526 self._cancelSession() 527 return 528 # Truncate it to the source size 529 try: 530 self.log("Truncating temporary file to size %d", self.size) 531 self._wTempFile.truncate(self.size) 532 except IOError, e: 533 self.warning("Failed to truncate temporary file: %s", 534 log.getExceptionMessage(e)) 535 self._cancelSession() 536 return 537 # And move it to the real temporary file path 538 try: 539 self.log("Renaming transient file to '%s'", self.tempPath) 540 os.rename(transientPath, self.tempPath) 541 except IOError, e: 542 self.warning("Failed to rename transient temporary file: %s", 543 log.getExceptionMessage(e)) 544 # And start copying 545 self.debug("Start caching '%s' [fd %d]", 546 self.sourcePath, self._sourceFile.fileno()) 547 # Activate the copy 548 self.copying = True 549 self.plug.activateSession(self)
550
551 - def _startCopying(self):
552 self.log("Start copy session") 553 # First ensure there is not already a temporary file 554 self._removeTempFile() 555 # Reserve cache space, may trigger a cache cleanup 556 d = self._allocCacheSpace() 557 d.addCallback(self._gotCacheSpace) 558 return d
559
560 - def _cancelCopy(self, closeSource, closeTempWrite):
561 if self.copying: 562 self.log("Canceling file copy") 563 if self._waitCancel: 564 # Already waiting for cancellation. 565 return 566 self.debug("Cancel caching '%s' [fd %d]", 567 self.sourcePath, self._sourceFile.fileno()) 568 # Disable the copy, we do not modify copying directly 569 # to let the copying thread terminate current operations. 570 # The file close operation are deferred. 571 self._waitCancel = (closeSource, closeTempWrite) 572 return 573 # No pending copy, we can close the files 574 if closeSource: 575 self._closeSourceFile() 576 if closeTempWrite: 577 self._closeWriteTempFile()
578
579 - def _onCopyCancelled(self, closeSource, closeTempWrite):
580 self.log("Copy session cancelled") 581 # Called when the copy thread really stopped to read/write 582 self._waitCancel = None 583 self.plug.stats.onCopyCancelled(self.size, self._copied) 584 # Resolve all pending source read operations 585 for position, size, d in self._pending: 586 if self._sourceFile is None: 587 d.errback(CopySessionCancelled()) 588 else: 589 try: 590 self._sourceFile.seek(position) 591 data = self._sourceFile.read(size) 592 d.callback(data) 593 except Exception, e: 594 self.warning("Failed to read from source file: %s", 595 log.getExceptionMessage(e)) 596 d.errback(e) 597 self._pending = [] 598 # then we can safely close files 599 if closeSource: 600 self._closeSourceFile() 601 if closeTempWrite: 602 self._closeWriteTempFile()
603
604 - def _onCopyFinished(self):
605 if self._sourceFile is None: 606 return 607 # Called when the copy thread really stopped to read/write 608 self.debug("Finished caching '%s' [fd %d]", 609 self.sourcePath, self._sourceFile.fileno()) 610 self.plug.stats.onCopyFinished(self.size) 611 # Set the copy as finished to prevent the temporary file 612 # to be deleted when closed 613 self._copied = None 614 # Closing source and write files 615 self._closeSourceFile() 616 self._closeWriteTempFile() 617 # Setting the modification time on the temporary file 618 try: 619 mtime = self.mtime 620 atime = int(time.time()) 621 self.log("Setting temporary file modification time to %d", mtime) 622 # FIXME: Should use futimes, but it's not wrapped by python 623 os.utime(self.tempPath, (atime, mtime)) 624 except OSError, e: 625 if e.errno == errno.ENOENT: 626 # The file may have been deleted by another process 627 self._releaseCacheSpace() 628 else: 629 self.warning("Failed to update modification time of temporary " 630 "file: %s", log.getExceptionMessage(e)) 631 self._cancelSession() 632 try: 633 self.log("Renaming temporary file to '%s'", self.cachePath) 634 os.rename(self.tempPath, self.cachePath) 635 except OSError, e: 636 if e.errno == errno.ENOENT: 637 self._releaseCacheSpace() 638 else: 639 self.warning("Failed to rename temporary file: %s", 640 log.getExceptionMessage(e)) 641 self._cancelSession() 642 # Complete all pending source read operations with the temporary file. 643 for position, size, d in self._pending: 644 try: 645 self._rTempFile.seek(position) 646 data = self._rTempFile.read(size) 647 d.callback(data) 648 except Exception, e: 649 self.warning("Failed to read from temporary file: %s", 650 log.getExceptionMessage(e)) 651 d.errback(e) 652 self._pending = [] 653 if self._refCount == 0: 654 # We were waiting for the file to be copied to close it. 655 self.close()
656
657 - def _removeTempFile(self):
658 try: 659 os.remove(self.tempPath) 660 self.log("Deleted temporary file '%s'", self.tempPath) 661 # Inform the plug that cache space has been released 662 self._releaseCacheSpace() 663 except OSError, e: 664 if e.errno == errno.ENOENT: 665 if self._wTempFile is not None: 666 # Already deleted but inform the plug anyway 667 self._releaseCacheSpace() 668 else: 669 self.warning("Error deleting temporary file: %s", 670 log.getExceptionMessage(e))
671
672 - def _closeSourceFile(self):
673 if self._sourceFile is not None: 674 self.log("Closing source file [fd %d]", self._sourceFile.fileno()) 675 try: 676 try: 677 self._sourceFile.close() 678 finally: 679 self._sourceFile = None 680 except IOError, e: 681 self.warning("Failed to close source file: %s", 682 log.getExceptionMessage(e))
683
684 - def _closeReadTempFile(self):
685 if self._rTempFile is not None: 686 self.log("Closing temporary file for reading [fd %d]", 687 self._rTempFile.fileno()) 688 try: 689 try: 690 self._rTempFile.close() 691 finally: 692 self._rTempFile = None 693 except IOError, e: 694 self.warning("Failed to close temporary file for reading: %s", 695 log.getExceptionMessage(e))
696
697 - def _closeWriteTempFile(self):
698 if self._wTempFile is not None: 699 # If the copy is not finished, remove the temporary file 700 if not self._cancelled and self._copied is not None: 701 self._removeTempFile() 702 self.log("Closing temporary file for writing [fd %d]", 703 self._wTempFile.fileno()) 704 try: 705 try: 706 self._wTempFile.close() 707 finally: 708 self._wTempFile = None 709 except Exception, e: 710 self.warning("Failed to close temporary file for writing: %s", 711 log.getExceptionMessage(e))
712 713
714 -class TempFileDelegate(log.Loggable):
715 716 logCategory = LOG_CATEGORY 717
718 - def __init__(self, plug, session):
719 self.logName = plug.getLogName(session.sourcePath) 720 self.mtime = session.mtime 721 self.size = session.size 722 self._session = session 723 self._reading = False 724 self._position = 0 725 session.incRef()
726
727 - def tell(self):
728 return self._position
729
730 - def seek(self, offset):
731 self._position = offset
732
733 - def read(self, size, stats):
734 assert not self._reading, "Simultaneous read not supported" 735 d = self._session.read(self._position, size, stats) 736 if isinstance(d, defer.Deferred): 737 self._reading = True 738 return d.addCallback(self._cbGotData) 739 self._position += len(d) 740 return d
741
742 - def close(self):
743 if self._session is not None: 744 self._session.decRef() 745 self._session = None
746 747 748 ## Private Methods ## 749
750 - def _cbGotData(self, data):
751 self._reading = False 752 self._position += len(data) 753 return data
754 755
756 -class DirectFileDelegate(log.Loggable):
757 758 logCategory = LOG_CATEGORY 759 760 # Default values 761 _file = None 762
763 - def __init__(self, plug, path, file, info):
764 self.logName = plug.getLogName(path, file.fileno()) 765 self._file = file 766 # The size and modification time is not supposed to change over time 767 self.mtime = info[stat.ST_MTIME] 768 self.size = info[stat.ST_SIZE]
769
770 - def tell(self):
771 try: 772 return self._file.tell() 773 except IOError, e: 774 cls = errnoLookup.get(e.errno, FileError) 775 raise cls("Failed to tell position in file: %s" % str(e))
776
777 - def seek(self, offset):
778 try: 779 self._file.seek(offset, SEEK_SET) 780 except IOError, e: 781 cls = errnoLookup.get(e.errno, FileError) 782 raise cls("Failed to seek in cached file: %s" % str(e))
783
784 - def read(self, size):
785 try: 786 return self._file.read(size) 787 except IOError, e: 788 cls = errnoLookup.get(e.errno, FileError) 789 raise cls("Failed to read data from file: %s" % str(e))
790
791 - def close(self):
792 if self._file is not None: 793 try: 794 try: 795 self._file.close() 796 finally: 797 self._file = None 798 except IOError, e: 799 cls = errnoLookup.get(e.errno, FileError) 800 raise cls("Failed to close file: %s" % str(e))
801 802
803 -class CachedFileDelegate(DirectFileDelegate):
804
805 - def read(self, size, stats):
806 data = DirectFileDelegate.read(self, size) 807 stats.onBytesRead(0, len(data), 0) 808 return data
809
810 - def close(self):
811 if self._file is not None: 812 self.log("Closing cached file [fd %d]", self._file.fileno()) 813 DirectFileDelegate.close(self)
814 815
816 -class CachedFile(fileprovider.File, log.Loggable):
817 818 logCategory = LOG_CATEGORY 819 820 # Overriding parent class properties to become attribute 821 mimeType = None 822 823 # Default values 824 _delegate = None 825
826 - def __init__(self, plug, path, mimeType):
827 self.logName = plug.getLogName(path) 828 self.plug = plug 829 self._path = path 830 self.mimeType = mimeType 831 self.stats = cachestats.RequestStatistics(plug.stats) 832 self._delegate = None
833
834 - def open(self):
835 # Opening source file in a separate thread, as it usually involves 836 # accessing a network filesystem (which would block the reactor) 837 d = threads.deferToThread(open_stat, self._path) 838 d.addCallbacks(self._selectDelegate, self._sourceOpenFailed) 839 840 def _setDelegate(delegate): 841 self._delegate = delegate
842 d.addCallback(_setDelegate) 843 d.addCallback(lambda _: self) 844 return d
845
846 - def _sourceOpenFailed(self, failure):
847 failure.trap(NotFoundError) 848 self.debug("Source file %r not found", self._path) 849 self.plug.outdateCopySession(self._path) 850 cachedPath = self.plug.cache.getCachePath(self._path) 851 self._removeCachedFile(cachedPath) 852 raise failure
853
854 - def __str__(self):
855 return "<CachedFile '%s'>" % self._path
856
857 - def getmtime(self):
858 if self._delegate is None: 859 raise FileClosedError("File closed") 860 return self._delegate.mtime
861
862 - def getsize(self):
863 if self._delegate is None: 864 raise FileClosedError("File closed") 865 return self._delegate.size
866
867 - def tell(self):
868 if self._delegate is None: 869 raise FileClosedError("File closed") 870 return self._delegate.tell()
871
872 - def seek(self, offset):
873 if self._delegate is None: 874 raise FileClosedError("File closed") 875 return self._delegate.seek(offset)
876
877 - def read(self, size):
878 if self._delegate is None: 879 raise FileClosedError("File closed") 880 try: 881 d = self._delegate.read(size, self.stats) 882 if isinstance(d, defer.Deferred): 883 return d 884 return defer.succeed(d) 885 except IOError, e: 886 cls = errnoLookup.get(e.errno, FileError) 887 return defer.fail(cls("Failed to read cached data: %s", str(e))) 888 except: 889 return defer.fail()
890
891 - def close(self):
892 if self._delegate: 893 self.stats.onClosed() 894 self._delegate.close() 895 self._delegate = None
896
897 - def __del__(self):
898 self.close()
899
900 - def getLogFields(self):
901 return self.stats.getLogFields()
902 903 904 ## Private Methods ## 905
906 - def _closeSourceFile(self, sourceFile):
907 self.log("Closing source file [fd %d]", sourceFile.fileno()) 908 try: 909 sourceFile.close() 910 except Exception, e: 911 self.warning("Failed to close source file: %s", 912 log.getExceptionMessage(e))
913
914 - def _selectDelegate(self, (sourceFile, sourceInfo)):
915 sourcePath = self._path 916 self.log("Selecting delegate for source file %r [fd %d]", 917 sourcePath, sourceFile.fileno()) 918 # Update the log name 919 self.logName = self.plug.getLogName(self._path, sourceFile.fileno()) 920 # Opening cached file 921 cachedPath = self.plug.cache.getCachePath(sourcePath) 922 try: 923 cachedFile, cachedInfo = open_stat(cachedPath) 924 self.log("Opened cached file [fd %d]", cachedFile.fileno()) 925 except NotFoundError: 926 self.debug("Did not find cached file '%s'", cachedPath) 927 return self._tryTempFile(sourcePath, sourceFile, sourceInfo) 928 except FileError, e: 929 self.debug("Failed to open cached file: %s", str(e)) 930 self._removeCachedFile(cachedPath) 931 return self._tryTempFile(sourcePath, sourceFile, sourceInfo) 932 # Found a cached file, now check the modification time 933 self.debug("Found cached file '%s'", cachedPath) 934 sourceTime = sourceInfo[stat.ST_MTIME] 935 cacheTime = cachedInfo[stat.ST_MTIME] 936 if sourceTime != cacheTime: 937 # Source file changed, remove file and start caching again 938 self.debug("Cached file out-of-date (%d != %d)", 939 sourceTime, cacheTime) 940 self.stats.onCacheOutdated() 941 self.plug.outdateCopySession(sourcePath) 942 self._removeCachedFile(cachedPath) 943 return self._cacheFile(sourcePath, sourceFile, sourceInfo) 944 self._closeSourceFile(sourceFile) 945 # We have a valid cached file, just delegate to it. 946 self.debug("Serving cached file '%s'", cachedPath) 947 delegate = CachedFileDelegate(self.plug, cachedPath, 948 cachedFile, cachedInfo) 949 self.stats.onStarted(delegate.size, cachestats.CACHE_HIT) 950 return delegate
951
952 - def _removeCachedFile(self, cachePath):
953 try: 954 os.remove(cachePath) 955 self.debug("Deleted cached file '%s'", cachePath) 956 except OSError, e: 957 if e.errno != errno.ENOENT: 958 self.warning("Error deleting cached file: %s", str(e))
959
960 - def _tryTempFile(self, sourcePath, sourceFile, sourceInfo):
961 session = self.plug.getCopySession(sourcePath) 962 if session is None: 963 self.debug("No copy sessions found") 964 return self._cacheFile(sourcePath, sourceFile, sourceInfo) 965 self.debug("Copy session found") 966 if sourceInfo[stat.ST_MTIME] != session.mtime: 967 self.debug("Copy session out-of-date (%d != %d)", 968 sourceInfo[stat.ST_MTIME], session.mtime) 969 self.stats.onCacheOutdated() 970 session.outdate() 971 return self._cacheFile(sourcePath, sourceFile, sourceInfo) 972 self._closeSourceFile(sourceFile) 973 # We have a valid session, just delegate to it. 974 self.debug("Serving temporary file '%s'", session.tempPath) 975 delegate = TempFileDelegate(self.plug, session) 976 self.stats.onStarted(delegate.size, cachestats.TEMP_HIT) 977 return delegate
978
979 - def _cacheFile(self, sourcePath, sourceFile, sourceInfo):
980 session = self.plug.createCopySession(sourcePath, sourceFile, 981 sourceInfo) 982 self.debug("Serving temporary file '%s'", session.tempPath) 983 delegate = TempFileDelegate(self.plug, session) 984 self.stats.onStarted(delegate.size, cachestats.CACHE_MISS) 985 return delegate
986