1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import time
19
20 import gst
21 from twisted.internet import reactor, defer
22
23 from flumotion.common import log
24 from flumotion.common.poller import Poller
25
26 __version__ = "$Rev$"
27
28
30 """
31 I monitor data flow on a GStreamer pad.
32 I regularly schedule a buffer probe call at PAD_MONITOR_PROBE_INTERVAL.
33 I regularly schedule a check call at PAD_MONITOR_CHECK_INTERVAL
34 that makes sure a buffer probe was triggered since the last check call.
35 """
36
37 PAD_MONITOR_PROBE_INTERVAL = 5.0
38 PAD_MONITOR_CHECK_INTERVAL = PAD_MONITOR_PROBE_INTERVAL * 2.5
39
40 - def __init__(self, pad, name, setActive, setInactive):
41 """
42 @type pad: L{gst.Pad}
43 @type name: str
44 @param setActive: a callable that will be called when the pad is
45 considered active, taking the name of the monitor.
46 @type setActive: callable
47 @param setInactive: a callable that will be called when the pad is
48 considered inactive, taking the name of the
49 monitor.
50 @type setInactive: callable
51 """
52 self._last_data_time = -1
53 self._pad = pad
54 self.name = name
55 self._active = False
56 self._first = True
57 self._running = True
58
59 self._doSetActive = []
60 self._doSetInactive = []
61 self.addWatch(setActive, setInactive)
62
63
64
65 self._probe_id = {}
66
67 self.check_poller = Poller(self._probe_timeout,
68 self.PAD_MONITOR_PROBE_INTERVAL,
69 immediately=True)
70
71 self.watch_poller = Poller(self._check_timeout,
72 self.PAD_MONITOR_CHECK_INTERVAL)
73
79
82
84 self.check_poller.stop()
85 self.watch_poller.stop()
86 self._running = False
87
88
89
90
91 d, probe_id = self._probe_id.pop("id", (None, None))
92 if probe_id:
93 self._pad.remove_buffer_probe(probe_id)
94 d.callback(None)
95
97
98
99 def probe_cb(pad, buffer):
100 """
101 Periodically scheduled buffer probe, that ensures that we're
102 currently actually having dataflow through our eater
103 elements.
104
105 Called from GStreamer threads.
106
107 @param pad: The gst.Pad srcpad for one eater in this
108 component.
109 @param buffer: A gst.Buffer that has arrived on this pad
110 """
111 self._last_data_time = time.time()
112
113 self.logMessage('buffer probe on %s has timestamp %s', self.name,
114 gst.TIME_ARGS(buffer.timestamp))
115
116 deferred, probe_id = self._probe_id.pop("id", (None, None))
117 if probe_id:
118
119 self._pad.remove_buffer_probe(probe_id)
120
121 reactor.callFromThread(deferred.callback, None)
122
123 reactor.callFromThread(self.watch_poller.run)
124
125 self._first = False
126
127
128 return True
129
130 d = defer.Deferred()
131
132
133 self._probe_id['id'] = (d, self._pad.add_buffer_probe(probe_cb))
134 return d
135
137
138 self.log('last buffer for %s at %r', self.name, self._last_data_time)
139
140 now = time.time()
141
142 if self._last_data_time < 0:
143
144 self._last_data_time = 0
145 self.setInactive()
146 elif self._last_data_time == 0:
147
148 pass
149 else:
150
151 delta = now - self._last_data_time
152
153 if self._active and delta > self.PAD_MONITOR_CHECK_INTERVAL:
154 self.info("No data received on pad %s for > %r seconds, "
155 "setting to hungry",
156 self.name, self.PAD_MONITOR_CHECK_INTERVAL)
157 self.setInactive()
158 elif not self._active and delta < self.PAD_MONITOR_CHECK_INTERVAL:
159 self.info("Receiving data again on pad %s, flow active",
160 self.name)
161 self.setActive()
162
163 - def addWatch(self, setActive, setInactive):
164 """
165 @param setActive: a callable that will be called when the pad is
166 considered active, taking the name of the monitor.
167 @type setActive: callable
168 @param setInactive: a callable that will be called when the pad is
169 considered inactive, taking the name of the
170 monitor.
171 @type setInactive: callable
172 """
173 self._doSetActive.append(setActive)
174 self._doSetInactive.append(setInactive)
175
180
185
186
188
189 - def __init__(self, pad, name, setActive, setInactive,
190 reconnectEater, *args):
196
198 PadMonitor.setInactive(self)
199
200
201
202
203
204
205
206
207
208
209
210
211 if self._running:
212
213
214
215
216
217 self._last_data_time = 0
218
219 self.debug('starting the reconnect poller')
220 self._reconnectPoller.start(immediately=True)
221
226
231
232
234 """
235 I am a dict of monitor name -> monitor.
236 """
237
238 - def __init__(self, setActive, setInactive):
239
240
241 self._doSetActive = setActive
242 self._doSetInactive = setInactive
243 self._wasActive = True
244
246 """
247 Watch for data flow through this pad periodically.
248 If data flow ceases for too long, we turn hungry. If data flow resumes,
249 we return to happy.
250 """
251
252 def monitorActive(name):
253 self.info('Pad data flow at %s is active', name)
254 if self.isActive() and not self._wasActive:
255
256
257
258
259 self._wasActive = True
260 self._doSetActive()
261
262 def monitorInactive(name):
263 self.info('Pad data flow at %s is inactive', name)
264 if self._wasActive:
265 self._doSetInactive()
266 self._wasActive = False
267
268 assert name not in self
269 monitor = klass(pad, name, monitorActive, monitorInactive, *args)
270 self[monitor.name] = monitor
271 self.info("Added pad monitor %s", monitor.name)
272
274 if name not in self:
275 self.warning("No pad monitor with name %s", name)
276 return
277
278 monitor = self.pop(name)
279 monitor.detach()
280
282 for monitor in self.values():
283 if not monitor.isActive():
284 return False
285 return True
286