Package flumotion :: Package component :: Module padmonitor
[hide private]

Source Code for Module flumotion.component.padmonitor

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import time 
 19   
 20  import gst 
 21  from twisted.internet import reactor, defer 
 22   
 23  from flumotion.common import log 
 24  from flumotion.common.poller import Poller 
 25   
 26  __version__ = "$Rev$" 
 27   
 28   
29 -class PadMonitor(log.Loggable):
30 """ 31 I monitor data flow on a GStreamer pad. 32 I regularly schedule a buffer probe call at PAD_MONITOR_PROBE_INTERVAL. 33 I regularly schedule a check call at PAD_MONITOR_CHECK_INTERVAL 34 that makes sure a buffer probe was triggered since the last check call. 35 """ 36 37 PAD_MONITOR_PROBE_INTERVAL = 5.0 38 PAD_MONITOR_CHECK_INTERVAL = PAD_MONITOR_PROBE_INTERVAL * 2.5 39
40 - def __init__(self, pad, name, setActive, setInactive):
41 """ 42 @type pad: L{gst.Pad} 43 @type name: str 44 @param setActive: a callable that will be called when the pad is 45 considered active, taking the name of the monitor. 46 @type setActive: callable 47 @param setInactive: a callable that will be called when the pad is 48 considered inactive, taking the name of the 49 monitor. 50 @type setInactive: callable 51 """ 52 self._last_data_time = -1 # system time in epoch secs of last reception 53 self._pad = pad 54 self.name = name 55 self._active = False 56 self._first = True 57 self._running = True 58 59 self._doSetActive = [] 60 self._doSetInactive = [] 61 self.addWatch(setActive, setInactive) 62 63 # This dict sillyness is because python's dict operations are atomic 64 # w.r.t. the GIL. 65 self._probe_id = {} 66 67 self.check_poller = Poller(self._probe_timeout, 68 self.PAD_MONITOR_PROBE_INTERVAL, 69 immediately=True) 70 71 self.watch_poller = Poller(self._check_timeout, 72 self.PAD_MONITOR_CHECK_INTERVAL)
73
74 - def logMessage(self, message, *args):
75 if self._first: 76 self.debug(message, *args) 77 else: 78 self.log(message, *args)
79
80 - def isActive(self):
81 return self._active
82
83 - def detach(self):
84 self.check_poller.stop() 85 self.watch_poller.stop() 86 self._running = False 87 88 # implementation closely tied to _probe_timeout wrt to GIL 89 # tricks, threadsafety, and getting the probe deferred to 90 # actually return 91 d, probe_id = self._probe_id.pop("id", (None, None)) 92 if probe_id: 93 self._pad.remove_buffer_probe(probe_id) 94 d.callback(None)
95
96 - def _probe_timeout(self):
97 # called every so often to install a probe callback 98 99 def probe_cb(pad, buffer): 100 """ 101 Periodically scheduled buffer probe, that ensures that we're 102 currently actually having dataflow through our eater 103 elements. 104 105 Called from GStreamer threads. 106 107 @param pad: The gst.Pad srcpad for one eater in this 108 component. 109 @param buffer: A gst.Buffer that has arrived on this pad 110 """ 111 self._last_data_time = time.time() 112 113 self.logMessage('buffer probe on %s has timestamp %s', self.name, 114 gst.TIME_ARGS(buffer.timestamp)) 115 116 deferred, probe_id = self._probe_id.pop("id", (None, None)) 117 if probe_id: 118 # This will be None only if detach() has been called. 119 self._pad.remove_buffer_probe(probe_id) 120 121 reactor.callFromThread(deferred.callback, None) 122 # Data received! Return to happy ASAP: 123 reactor.callFromThread(self.watch_poller.run) 124 125 self._first = False 126 127 # let the buffer through 128 return True
129 130 d = defer.Deferred() 131 # FIXME: this is racy: evaluate RHS, drop GIL, buffer probe 132 # fires before __setitem__ in LHS; need a mutex 133 self._probe_id['id'] = (d, self._pad.add_buffer_probe(probe_cb)) 134 return d
135
136 - def _check_timeout(self):
137 # called every so often to check that a probe callback was triggered 138 self.log('last buffer for %s at %r', self.name, self._last_data_time) 139 140 now = time.time() 141 142 if self._last_data_time < 0: 143 # We never received any data in the first timeout period... 144 self._last_data_time = 0 145 self.setInactive() 146 elif self._last_data_time == 0: 147 # still no data... 148 pass 149 else: 150 # We received data at some time in the past. 151 delta = now - self._last_data_time 152 153 if self._active and delta > self.PAD_MONITOR_CHECK_INTERVAL: 154 self.info("No data received on pad %s for > %r seconds, " 155 "setting to hungry", 156 self.name, self.PAD_MONITOR_CHECK_INTERVAL) 157 self.setInactive() 158 elif not self._active and delta < self.PAD_MONITOR_CHECK_INTERVAL: 159 self.info("Receiving data again on pad %s, flow active", 160 self.name) 161 self.setActive()
162
163 - def addWatch(self, setActive, setInactive):
164 """ 165 @param setActive: a callable that will be called when the pad is 166 considered active, taking the name of the monitor. 167 @type setActive: callable 168 @param setInactive: a callable that will be called when the pad is 169 considered inactive, taking the name of the 170 monitor. 171 @type setInactive: callable 172 """ 173 self._doSetActive.append(setActive) 174 self._doSetInactive.append(setInactive)
175
176 - def setInactive(self):
177 self._active = False 178 for setInactive in self._doSetInactive: 179 setInactive(self.name)
180
181 - def setActive(self):
182 self._active = True 183 for setActive in self._doSetActive: 184 setActive(self.name)
185 186
187 -class EaterPadMonitor(PadMonitor):
188
189 - def __init__(self, pad, name, setActive, setInactive, 190 reconnectEater, *args):
191 PadMonitor.__init__(self, pad, name, setActive, setInactive) 192 193 self._reconnectPoller = Poller(lambda: reconnectEater(*args), 194 self.PAD_MONITOR_CHECK_INTERVAL, 195 start=False)
196
197 - def setInactive(self):
198 PadMonitor.setInactive(self) 199 200 # It might be that we got detached while calling 201 # PadMonitor.setInactive() For example someone might have 202 # stopped the component as it went hungry, which would happen 203 # inside the PadMonitor.setInactive() call. The component 204 # would then detach us and the reconnect poller would get 205 # stopped. If that happened don't bother restarting it, as it 206 # will result in the reactor ending up in an unclean state. 207 # 208 # A prominent example of such situation is 209 # flumotion.test.test_component_disker, where the component 210 # gets stopped right after it goes hungry 211 if self._running: 212 # If an eater received a buffer before being marked as 213 # disconnected, and still within the buffer check 214 # interval, the next eaterCheck call could accidentally 215 # think the eater was reconnected properly. Setting this 216 # to 0 here avoids that happening in eaterCheck. 217 self._last_data_time = 0 218 219 self.debug('starting the reconnect poller') 220 self._reconnectPoller.start(immediately=True)
221
222 - def setActive(self):
223 PadMonitor.setActive(self) 224 self.debug('stopping the reconnect poller') 225 self._reconnectPoller.stop()
226
227 - def detach(self):
228 PadMonitor.detach(self) 229 self.debug('stopping the reconnect poller') 230 self._reconnectPoller.stop()
231 232
233 -class PadMonitorSet(dict, log.Loggable):
234 """ 235 I am a dict of monitor name -> monitor. 236 """ 237
238 - def __init__(self, setActive, setInactive):
239 # These callbacks will be called when the set as a whole is 240 # active or inactive. 241 self._doSetActive = setActive 242 self._doSetInactive = setInactive 243 self._wasActive = True
244
245 - def attach(self, pad, name, klass=PadMonitor, *args):
246 """ 247 Watch for data flow through this pad periodically. 248 If data flow ceases for too long, we turn hungry. If data flow resumes, 249 we return to happy. 250 """ 251 252 def monitorActive(name): 253 self.info('Pad data flow at %s is active', name) 254 if self.isActive() and not self._wasActive: 255 # The wasActive check is to prevent _doSetActive from being 256 # called happy initially because of this; only if we 257 # previously went inactive because of an inactive monitor. A 258 # curious interface. 259 self._wasActive = True 260 self._doSetActive()
261 262 def monitorInactive(name): 263 self.info('Pad data flow at %s is inactive', name) 264 if self._wasActive: 265 self._doSetInactive() 266 self._wasActive = False
267 268 assert name not in self 269 monitor = klass(pad, name, monitorActive, monitorInactive, *args) 270 self[monitor.name] = monitor 271 self.info("Added pad monitor %s", monitor.name) 272
273 - def remove(self, name):
274 if name not in self: 275 self.warning("No pad monitor with name %s", name) 276 return 277 278 monitor = self.pop(name) 279 monitor.detach()
280
281 - def isActive(self):
282 for monitor in self.values(): 283 if not monitor.isActive(): 284 return False 285 return True
286