Package flumotion :: Package component :: Package base :: Module watcher
[hide private]

Source Code for Module flumotion.component.base.watcher

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18   
 19  import os 
 20  import time 
 21   
 22  from twisted.internet import reactor 
 23   
 24  from flumotion.common import log 
 25   
 26  __version__ = "$Rev$" 
 27   
 28   
29 -class BaseWatcher(log.Loggable):
30 """I watch for file changes. 31 32 I am a base class for a file watcher. I can be specialized to watch 33 any set of files. 34 """ 35
36 - def __init__(self, timeout):
37 """Make a file watcher object. 38 39 @param timeout: timeout between checks, in seconds 40 @type timeout: int 41 """ 42 self.timeout = timeout 43 self._reset() 44 self._subscribeId = 0 45 self.subscribers = {}
46
47 - def _reset(self):
48 self._stableData = {} 49 self._changingData = {} 50 self._delayedCall = None
51
52 - def _subscribe(self, **events):
53 """Subscribe to events. 54 55 @param events: The events to subscribe to. Subclasses are 56 expected to formalize this dict, specifying which events they 57 support via declaring their kwargs explicitly. 58 59 @returns: A subscription ID that can later be passed to 60 unsubscribe(). 61 """ 62 sid = self._subscribeId 63 self._subscribeId += 1 64 self.subscribers[sid] = events 65 return sid
66
67 - def subscribe(self, fileChanged=None, fileDeleted=None):
68 """Subscribe to events. 69 70 @param fileChanged: A function to call when a file changes. This 71 function will only be called if the file's details (size, mtime) 72 do not change during the timeout period. 73 @type fileChanged: filename -> None 74 @param fileDeleted: A function to call when a file is deleted. 75 @type fileDeleted: filename -> None 76 77 @returns: A subscription ID that can later be passed to 78 unsubscribe(). 79 """ 80 return self._subscribe(fileChanged=fileChanged, 81 fileDeleted=fileDeleted)
82
83 - def unsubscribe(self, id):
84 """Unsubscribe from file change notifications. 85 86 @param id: Subscription ID received from subscribe() 87 """ 88 del self.subscribers[id]
89
90 - def event(self, event, *args, **kwargs):
91 """Fire an event. 92 93 This method is intended for use by object implementations. 94 """ 95 for s in self.subscribers.values(): 96 if s[event]: 97 # Exceptions raised by subscribers need to be catched to 98 # continue polling for changes 99 try: 100 s[event](*args, **kwargs) 101 except Exception, e: 102 self.warning("A callback for event %s raised an error: %s" 103 % (event, log.getExceptionMessage(e)))
104 105 # FIXME: this API has tripped up two people thus far, including its 106 # author. make subscribe() call start() if necessary? 107
108 - def start(self):
109 """Start checking for file changes. 110 111 Subscribers will be notified asynchronously of changes to the 112 watched files. 113 """ 114 115 def checkFiles(): 116 self.log("checking for file changes") 117 new = self.getFileData() 118 changing = self._changingData 119 stable = self._stableData 120 for f in new: 121 if f not in changing: 122 if not f in stable and self.isNewFileStable(f, new[f]): 123 self.debug('file %s stable when noted', f) 124 stable[f] = new[f] 125 self.event('fileChanged', f) 126 elif f in stable and new[f] == stable[f]: 127 # no change 128 pass 129 else: 130 self.debug('change start noted for %s', f) 131 changing[f] = new[f] 132 else: 133 if new[f] == changing[f]: 134 self.debug('change finished for %s', f) 135 del changing[f] 136 stable[f] = new[f] 137 self.event('fileChanged', f) 138 else: 139 self.log('change continues for %s', f) 140 changing[f] = new[f] 141 for f in stable.keys(): 142 if f not in new: 143 # deletion 144 del stable[f] 145 self.debug('file %s has been deleted', f) 146 self.event('fileDeleted', f) 147 for f in changing.keys(): 148 if f not in new: 149 self.debug('file %s has been deleted', f) 150 del changing[f] 151 self._delayedCall = reactor.callLater(self.timeout, 152 checkFiles)
153 154 assert self._delayedCall is None 155 checkFiles()
156
157 - def stop(self):
158 """Stop checking for file changes. 159 """ 160 self._delayedCall.cancel() 161 self._reset()
162
163 - def getFileData(self):
164 """ 165 @returns: a dict, {filename => DATA} 166 DATA can be anything. In the default implementation it is a pair 167 of (mtime, size). 168 """ 169 ret = {} 170 for f in self.getFilesToStat(): 171 try: 172 stat = os.stat(f) 173 ret[f] = (stat.st_mtime, stat.st_size) 174 except OSError, e: 175 self.debug('could not read file %s: %s', f, 176 log.getExceptionMessage(e)) 177 return ret
178
179 - def isNewFileStable(self, fName, fData):
180 """ 181 Check if the file is already stable when being added to the 182 set of watched files. 183 184 @param fName: filename 185 @type fName: str 186 @param fData: DATA, as returned by L{getFileData} method. In 187 the default implementation it is a pair of 188 (mtime, size). 189 190 @rtype: bool 191 """ 192 __pychecker__ = 'unusednames=fName' 193 194 ret = fData[0] + self.timeout < time.time() 195 return ret
196
197 - def getFilesToStat(self):
198 """ 199 @returns: sequence of filename 200 """ 201 raise NotImplementedError
202 203
204 -class DirectoryWatcher(BaseWatcher):
205 """ 206 Directory Watcher 207 Watches a directory for new files. 208 """ 209
210 - def __init__(self, path, ignorefiles=(), timeout=30):
211 BaseWatcher.__init__(self, timeout) 212 self.path = path 213 self._ignorefiles = ignorefiles
214
215 - def getFilesToStat(self):
216 return [os.path.join(self.path, f) 217 for f in os.listdir(self.path) 218 if f not in self._ignorefiles]
219 220
221 -class FilesWatcher(BaseWatcher):
222 """ 223 Watches a collection of files for modifications. 224 """ 225
226 - def __init__(self, files, timeout=30):
227 BaseWatcher.__init__(self, timeout) 228 self._files = files
229
230 - def getFilesToStat(self):
231 return self._files
232