1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import errno
19 import os
20 import stat
21 import tempfile
22 import threading
23 import time
24
25 from twisted.internet import defer, reactor, threads, abstract
26
27 from flumotion.common import log, common
28 from flumotion.component.misc.httpserver import cachestats
29 from flumotion.component.misc.httpserver import cachemanager
30 from flumotion.component.misc.httpserver import fileprovider
31 from flumotion.component.misc.httpserver import localpath
32 from flumotion.component.misc.httpserver.fileprovider import FileClosedError
33 from flumotion.component.misc.httpserver.fileprovider import FileError
34 from flumotion.component.misc.httpserver.fileprovider import NotFoundError
35
36
37 SEEK_SET = 0
38 FILE_COPY_BUFFER_SIZE = abstract.FileDescriptor.bufferSize
39 MAX_LOGNAME_SIZE = 30
40
41
42 LOG_CATEGORY = "fileprovider-localcached"
43
44
45 errnoLookup = {errno.ENOENT: fileprovider.NotFoundError,
46 errno.EISDIR: fileprovider.CannotOpenError,
47 errno.EACCES: fileprovider.AccessError}
48
49
67
68
71 """
72
73 WARNING: Currently does not work properly in combination with rate-control.
74
75 I'm caching the files taken from a mounted
76 network file system to a shared local directory.
77 Multiple instances can share the same cache directory,
78 but it's recommended to use slightly different values
79 for the property cleanup-high-watermark.
80 I'm using the directory access time to know when
81 the cache usage changed and keep an estimation
82 of the cache usage for statistics.
83
84 I'm creating a unique thread to do the file copying block by block,
85 for all files to be copied to the cache.
86 Using a thread instead of a reactor.callLater 'loop' allow for
87 higher copy throughput and do not slow down the mail loop when
88 lots of files are copied at the same time.
89 Simulations with real request logs show that using a thread
90 gives better results than the equivalent asynchronous implementation.
91 """
92
93 logCategory = LOG_CATEGORY
94
96 props = args['properties']
97 self._sourceDir = props.get('path')
98 cacheDir = props.get('cache-dir')
99 cacheSizeInMB = props.get('cache-size')
100 if cacheSizeInMB is not None:
101 cacheSize = cacheSizeInMB * 10 ** 6
102 else:
103 cacheSize = None
104 cleanupEnabled = props.get('cleanup-enabled')
105 cleanupHighWatermark = props.get('cleanup-high-watermark')
106 cleanupLowWatermark = props.get('cleanup-low-watermark')
107
108 self._sessions = {}
109 self._index = {}
110
111 self.stats = cachestats.CacheStatistics()
112
113 self.cache = cachemanager.CacheManager(self.stats,
114 cacheDir, cacheSize,
115 cleanupEnabled,
116 cleanupHighWatermark,
117 cleanupLowWatermark)
118
119 common.ensureDir(self._sourceDir, "source")
120
121
122 self._thread = CopyThread(self)
123
124 - def start(self, component):
125 self.debug('Starting cachedprovider plug for component %r', component)
126 d = self.cache.setUp()
127 d.addCallback(lambda x: self._thread.start())
128 return d
129
130 - def stop(self, component):
131 self.debug('Stopping cachedprovider plug for component %r', component)
132 self._thread.stop()
133 dl = []
134 for s in self._index.values():
135 d = s.close()
136 if d:
137 dl.append(d)
138 if len(dl) != 0:
139 return defer.DeferredList(dl)
140
146
149
151 if self._sourceDir is None:
152 return None
153 return LocalPath(self, self._sourceDir)
154
155
156
157
159 """
160 Returns a log name for a path, shortened to a maximum size
161 specified by the global variable MAX_LOGNAME_SIZE.
162 The log name will be the filename part of the path postfixed
163 by the id in brackets if id is not None.
164 """
165 filename = os.path.basename(path)
166 basename, postfix = os.path.splitext(filename)
167 if id is not None:
168 postfix += "[%s]" % id
169 prefixMaxLen = MAX_LOGNAME_SIZE - len(postfix)
170 if len(basename) > prefixMaxLen:
171 basename = basename[:prefixMaxLen-1] + "*"
172 return basename + postfix
173
176
184
189
195
203
211
214
217
218
219 -class LocalPath(localpath.LocalPath, log.Loggable):
247
248
250
251 logCategory = LOG_CATEGORY
252
254 threading.Thread.__init__(self)
255 self.plug = plug
256 self._running = True
257 self._event = threading.Event()
258
260 self._running = False
261 self._event.set()
262 self.join()
263
266
269
285
286
289
290
292 """
293 I'm serving a file at the same time I'm copying it
294 from the network file system to the cache.
295 If the client ask for data not yet copied, the source file
296 read operation is delegated the the copy thread as an asynchronous
297 operation because file seeking/reading is not thread safe.
298
299 The copy session have to open two times the temporary file,
300 one for read-only and one for write only,
301 because closing a read/write file change the modification time.
302 We want the modification time to be set to a known value
303 when the copy is finished even keeping read access to the file.
304
305 The session manage a reference counter to know how many TempFileDelegate
306 instances are using the session to delegate read operations.
307 This is done for two reasons:
308 - To avoid circular references by have the session manage
309 a list of delegate instances.
310 - If not cancelled, sessions should not be deleted
311 when no delegates reference them anymore. So weakref cannot be used.
312 """
313
314 logCategory = LOG_CATEGORY
315
316 - def __init__(self, plug, sourcePath, sourceFile, sourceInfo):
317 self.plug = plug
318 self.logName = plug.getLogName(sourcePath, sourceFile.fileno())
319 self.copying = None
320 self.sourcePath = sourcePath
321 self.tempPath = plug.cache.getTempPath(sourcePath)
322 self.cachePath = plug.cache.getCachePath(sourcePath)
323
324 self.mtime = sourceInfo[stat.ST_MTIME]
325 self.size = sourceInfo[stat.ST_SIZE]
326 self._sourceFile = sourceFile
327 self._cancelled = False
328 self._wTempFile = None
329 self._rTempFile = None
330 self._allocTag = None
331 self._waitCancel = None
332
333 self._pending = []
334 self._refCount = 0
335 self._copied = 0
336 self._correction = 0
337 self._startCopyingDefer = self._startCopying()
338
342
343 - def read(self, position, size, stats):
344
345 if self._rTempFile:
346
347
348 if (self._copied is None) or ((position + size) <= self._copied):
349 try:
350 self._rTempFile.seek(position)
351 data = self._rTempFile.read(size)
352
353 size = len(data)
354
355
356
357
358
359 diff = min(self._correction, size)
360 self._correction -= diff
361 stats.onBytesRead(0, size, diff)
362 return data
363 except Exception, e:
364 self.warning("Failed to read from temporary file: %s",
365 log.getExceptionMessage(e))
366 self._cancelSession()
367
368 if self._sourceFile is None:
369 raise FileError("File caching error, cannot proceed")
370
371 try:
372
373
374
375
376 if self.copying:
377
378
379
380 d = defer.Deferred()
381
382 def updateStats(data):
383 stats.onBytesRead(len(data), 0, 0)
384 return data
385
386 d.addCallback(updateStats)
387 self._pending.append((position, size, d))
388 return d
389
390 self._sourceFile.seek(position)
391 data = self._sourceFile.read(size)
392 stats.onBytesRead(len(data), 0, 0)
393 return data
394 except IOError, e:
395 cls = errnoLookup.get(e.errno, FileError)
396 raise cls("Failed to read source file: %s" % str(e))
397
400
402 self._refCount -= 1
403
404
405 if (self._refCount == 1) and self._cancelled:
406
407 self._cancelCopy(False, True)
408
409 if (self._refCount == 0) and (self._copied is None):
410 self.close()
411
419
421 if self._startCopyingDefer:
422 d = self._startCopyingDefer
423 self._startCopyingDefer = None
424 d.addCallback(lambda _: self._close())
425 return d
426
428 if not (self.copying and self._pending):
429
430 return False
431
432 position, size, d = self._pending.pop(0)
433 self._sourceFile.seek(position)
434 data = self._sourceFile.read(size)
435
436 reactor.callFromThread(d.callback, data)
437 return len(self._pending) > 0
438
481
482
483
484
488
490 if not (self._cancelled or self._allocTag is None):
491 self.plug.cache.releaseCacheSpace(self._allocTag)
492 self._allocTag = None
493
495 if not self._cancelled:
496 self.log("Canceling copy session")
497
498 self._cancelled = True
499
500
501 if self._refCount <= 1:
502
503 self._cancelCopy(False, True)
504
506 self._allocTag = tag
507
508 if not tag:
509
510 self._cancelSession()
511 return
512 self.plug.stats.onCopyStarted()
513
514 try:
515 fd, transientPath = tempfile.mkstemp(".tmp", LOG_CATEGORY)
516 self.log("Created transient file '%s'", transientPath)
517 self._wTempFile = os.fdopen(fd, "wb")
518 self.log("Opened temporary file for writing [fd %d]",
519 self._wTempFile.fileno())
520 self._rTempFile = file(transientPath, "rb")
521 self.log("Opened temporary file for reading [fd %d]",
522 self._rTempFile.fileno())
523 except IOError, e:
524 self.warning("Failed to open temporary file: %s",
525 log.getExceptionMessage(e))
526 self._cancelSession()
527 return
528
529 try:
530 self.log("Truncating temporary file to size %d", self.size)
531 self._wTempFile.truncate(self.size)
532 except IOError, e:
533 self.warning("Failed to truncate temporary file: %s",
534 log.getExceptionMessage(e))
535 self._cancelSession()
536 return
537
538 try:
539 self.log("Renaming transient file to '%s'", self.tempPath)
540 os.rename(transientPath, self.tempPath)
541 except IOError, e:
542 self.warning("Failed to rename transient temporary file: %s",
543 log.getExceptionMessage(e))
544
545 self.debug("Start caching '%s' [fd %d]",
546 self.sourcePath, self._sourceFile.fileno())
547
548 self.copying = True
549 self.plug.activateSession(self)
550
559
561 if self.copying:
562 self.log("Canceling file copy")
563 if self._waitCancel:
564
565 return
566 self.debug("Cancel caching '%s' [fd %d]",
567 self.sourcePath, self._sourceFile.fileno())
568
569
570
571 self._waitCancel = (closeSource, closeTempWrite)
572 return
573
574 if closeSource:
575 self._closeSourceFile()
576 if closeTempWrite:
577 self._closeWriteTempFile()
578
603
605 if self._sourceFile is None:
606 return
607
608 self.debug("Finished caching '%s' [fd %d]",
609 self.sourcePath, self._sourceFile.fileno())
610 self.plug.stats.onCopyFinished(self.size)
611
612
613 self._copied = None
614
615 self._closeSourceFile()
616 self._closeWriteTempFile()
617
618 try:
619 mtime = self.mtime
620 atime = int(time.time())
621 self.log("Setting temporary file modification time to %d", mtime)
622
623 os.utime(self.tempPath, (atime, mtime))
624 except OSError, e:
625 if e.errno == errno.ENOENT:
626
627 self._releaseCacheSpace()
628 else:
629 self.warning("Failed to update modification time of temporary "
630 "file: %s", log.getExceptionMessage(e))
631 self._cancelSession()
632 try:
633 self.log("Renaming temporary file to '%s'", self.cachePath)
634 os.rename(self.tempPath, self.cachePath)
635 except OSError, e:
636 if e.errno == errno.ENOENT:
637 self._releaseCacheSpace()
638 else:
639 self.warning("Failed to rename temporary file: %s",
640 log.getExceptionMessage(e))
641 self._cancelSession()
642
643 for position, size, d in self._pending:
644 try:
645 self._rTempFile.seek(position)
646 data = self._rTempFile.read(size)
647 d.callback(data)
648 except Exception, e:
649 self.warning("Failed to read from temporary file: %s",
650 log.getExceptionMessage(e))
651 d.errback(e)
652 self._pending = []
653 if self._refCount == 0:
654
655 self.close()
656
671
673 if self._sourceFile is not None:
674 self.log("Closing source file [fd %d]", self._sourceFile.fileno())
675 try:
676 try:
677 self._sourceFile.close()
678 finally:
679 self._sourceFile = None
680 except IOError, e:
681 self.warning("Failed to close source file: %s",
682 log.getExceptionMessage(e))
683
685 if self._rTempFile is not None:
686 self.log("Closing temporary file for reading [fd %d]",
687 self._rTempFile.fileno())
688 try:
689 try:
690 self._rTempFile.close()
691 finally:
692 self._rTempFile = None
693 except IOError, e:
694 self.warning("Failed to close temporary file for reading: %s",
695 log.getExceptionMessage(e))
696
698 if self._wTempFile is not None:
699
700 if not self._cancelled and self._copied is not None:
701 self._removeTempFile()
702 self.log("Closing temporary file for writing [fd %d]",
703 self._wTempFile.fileno())
704 try:
705 try:
706 self._wTempFile.close()
707 finally:
708 self._wTempFile = None
709 except Exception, e:
710 self.warning("Failed to close temporary file for writing: %s",
711 log.getExceptionMessage(e))
712
713
715
716 logCategory = LOG_CATEGORY
717
726
728 return self._position
729
730 - def seek(self, offset):
731 self._position = offset
732
733 - def read(self, size, stats):
734 assert not self._reading, "Simultaneous read not supported"
735 d = self._session.read(self._position, size, stats)
736 if isinstance(d, defer.Deferred):
737 self._reading = True
738 return d.addCallback(self._cbGotData)
739 self._position += len(d)
740 return d
741
743 if self._session is not None:
744 self._session.decRef()
745 self._session = None
746
747
748
749
751 self._reading = False
752 self._position += len(data)
753 return data
754
755
801
802
804
805 - def read(self, size, stats):
809
814
815
816 -class CachedFile(fileprovider.File, log.Loggable):
845
853
855 return "<CachedFile '%s'>" % self._path
856
861
866
871
872 - def seek(self, offset):
876
877 - def read(self, size):
890
896
899
902
903
904
905
913
915 sourcePath = self._path
916 self.log("Selecting delegate for source file %r [fd %d]",
917 sourcePath, sourceFile.fileno())
918
919 self.logName = self.plug.getLogName(self._path, sourceFile.fileno())
920
921 cachedPath = self.plug.cache.getCachePath(sourcePath)
922 try:
923 cachedFile, cachedInfo = open_stat(cachedPath)
924 self.log("Opened cached file [fd %d]", cachedFile.fileno())
925 except NotFoundError:
926 self.debug("Did not find cached file '%s'", cachedPath)
927 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
928 except FileError, e:
929 self.debug("Failed to open cached file: %s", str(e))
930 self._removeCachedFile(cachedPath)
931 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
932
933 self.debug("Found cached file '%s'", cachedPath)
934 sourceTime = sourceInfo[stat.ST_MTIME]
935 cacheTime = cachedInfo[stat.ST_MTIME]
936 if sourceTime != cacheTime:
937
938 self.debug("Cached file out-of-date (%d != %d)",
939 sourceTime, cacheTime)
940 self.stats.onCacheOutdated()
941 self.plug.outdateCopySession(sourcePath)
942 self._removeCachedFile(cachedPath)
943 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
944 self._closeSourceFile(sourceFile)
945
946 self.debug("Serving cached file '%s'", cachedPath)
947 delegate = CachedFileDelegate(self.plug, cachedPath,
948 cachedFile, cachedInfo)
949 self.stats.onStarted(delegate.size, cachestats.CACHE_HIT)
950 return delegate
951
953 try:
954 os.remove(cachePath)
955 self.debug("Deleted cached file '%s'", cachePath)
956 except OSError, e:
957 if e.errno != errno.ENOENT:
958 self.warning("Error deleting cached file: %s", str(e))
959
960 - def _tryTempFile(self, sourcePath, sourceFile, sourceInfo):
978
979 - def _cacheFile(self, sourcePath, sourceFile, sourceInfo):
986