1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import gst
19 import gobject
20
21 from twisted.internet import defer, reactor
22
23 from flumotion.common import errors, messages, log, python
24 from flumotion.common.i18n import N_, gettexter
25 from flumotion.common.planet import moods
26 from flumotion.component import feedcomponent
27 from flumotion.component.base import scheduler
28 from flumotion.component.padmonitor import PadMonitor
29 from flumotion.component.plugs import base
30 from flumotion.worker.checks import check
31
32 __version__ = "$Rev$"
33 T_ = gettexter()
34
35
46
47
49 logCategory = "ical-switch"
50
51 - def start(self, component):
52 self._sid = None
53 self.sched = None
54 try:
55
56 def eventStarted(eventInstance):
57 self.debug("event started %r", eventInstance.event.uid)
58 component.switch_to("backup")
59
60 def eventEnded(eventInstance):
61 self.debug("event ended %r", eventInstance.event.uid)
62 component.switch_to("master")
63
64
65
66 filename = self.args['properties']['ical-schedule']
67 self.sched = scheduler.ICalScheduler(open(filename, 'r'))
68 self._sid = self.sched.subscribe(eventStarted, eventEnded)
69 if self.sched.getCalendar().getActiveEventInstances():
70 component.idealFeed = "backup"
71 except ValueError:
72 fmt = N_("Error parsing ical file %s, so not scheduling "
73 "any events.")
74 component.addWarning("error-parsing-ical", fmt, filename)
75 except ImportError, e:
76 fmt = N_("An ical file has been specified for scheduling, "
77 "but the necessary modules are not installed.")
78 component.addWarning("error-parsing-ical", fmt, debug=e.message)
79
80 - def stop(self, component):
83
84
85 -class Switch(feedcomponent.MultiInputParseLaunchComponent):
86 logCategory = 'switch'
87 componentMediumClass = SwitchMedium
88
90 self.uiState.addKey("active-eater")
91 self.icalScheduler = None
92
93
94
95
96
97
98
99
100
101
102
103 self.logicalFeeds = {}
104
105 self.feedsByPriority = []
106
107
108 self.switchPads = {}
109
110
111
112
113
114
115
116
117 self.idealFeed = None
118 self.activeFeed = None
119
120
121
122
123 self.newSegmentEvents = {}
124
125
126
127 self.eventProbeIds = {}
128 self.bufferProbeIds = {}
129
130
131 self._padMonitors = {}
132
133 - def addWarning(self, id, format, *args, **kwargs):
138
140 for m in self.state.get('messages')[:]:
141 if m.id == id:
142 self.state.remove('messages', m)
143
145
146 def checkSignal(fact):
147 fact = fact.load()
148 signals = gobject.signal_list_names(fact.get_element_type())
149 return 'block' in signals
150
151 def cb(result):
152 for m in result.messages:
153 self.addMessage(m)
154 return result.value
155
156 self.debug("checking for input-selector element")
157 if gst.version() >= (0, 10, 32, 0):
158
159 d = check.checkPlugin('coreelements', 'gst-plugins',
160 (0, 10, 5, 2), 'input-selector', checkSignal)
161 else:
162 d = check.checkPlugin('selector', 'gst-plugins-bad',
163 (0, 10, 5, 2), 'input-selector', checkSignal)
164 d.addCallback(cb)
165 return d
166
168 ical = self.config['properties'].get('ical-schedule', None)
169 if ical:
170 args = {'properties': {'ical-schedule': ical}}
171 self.icalScheduler = ICalSwitchPlug(args)
172 self.icalScheduler.start(self)
173
187
189 raise errors.NotImplementedError('subclasses should implement '
190 'get_logical_feeds')
191
200
201 switchElements = self.get_switch_elements(pipeline)
202 for alias in self.eaters:
203 e = pipeline.get_by_name(self.eaters[alias].elementName)
204 pad = None
205 while e not in switchElements:
206 self.log("Element: %s", e.get_name())
207 pad, e = getDownstreamElement(e)
208 self.debug('eater %s maps to pad %s', alias, pad)
209 self.switchPads[alias] = pad, e
210
211
212
213 pairs = [self.switchPads[alias]
214 for alias in self.logicalFeeds[self.idealFeed]]
215
216 for p, s in pairs:
217 s.set_property('active-pad', p)
218 self.activeFeed = self.idealFeed
219 self.uiState.set("active-eater", self.idealFeed)
220
221 self.install_logical_feed_watches()
222
223 self.do_switch()
224
225
226
236
237 def eaterSetInactive(eaterAlias):
238 for feed, aliases in self.logicalFeeds.items():
239 if eaterAlias in aliases and feed in activeFeeds:
240 activeFeeds.remove(feed)
241 self.feedSetInactive(feed)
242
243
244
245
246
247
248 pad = self.switchPads[eaterAlias][0]
249 self.eventProbeIds[pad] = \
250 pad.add_event_probe(self._eventProbe)
251 self.bufferProbeIds[pad] = \
252 pad.add_buffer_probe(self._bufferProbe)
253 return
254
255 activeFeeds = []
256 for alias in self.eaters:
257 self._padMonitors[alias] = PadMonitor(self.switchPads[alias][0],
258 alias, eaterSetActive, eaterSetInactive)
259
261
262 ret = True
263 if event.type == gst.EVENT_NEWSEGMENT:
264 ret = False
265 self.newSegmentEvents[pad] = event
266 if self.eventProbeIds[pad]:
267 pad.remove_event_probe(self.eventProbeIds[pad])
268 del self.eventProbeIds[pad]
269 return ret
270
272
273 ts = buffer.timestamp
274 if pad in self.newSegmentEvents:
275 parsed = self.newSegmentEvents[pad].parse_new_segment()
276 newEvent = gst.event_new_new_segment(parsed[0], parsed[1],
277 parsed[2], ts, parsed[4], parsed[5])
278 pad.push_event(newEvent)
279 del self.newSegmentEvents[pad]
280 if pad in self.bufferProbeIds:
281 pad.remove_buffer_probe(self.bufferProbeIds[pad])
282 del self.bufferProbeIds[pad]
283 return True
284
286 raise errors.NotImplementedError('subclasses should implement '
287 'get_switch_elements')
288
290 return python.all([self.eaters[alias].isActive()
291 for alias in self.logicalFeeds[feed]])
292
297
299 self.debug('feed %r is now inactive', feed)
300
301
302
304 allFeeds = self.feedsByPriority[:]
305 feed = None
306 while allFeeds:
307 feed = allFeeds.pop(0)
308 if self.is_active(feed):
309 self.debug('autoswitch selects feed %r', feed)
310 self.do_switch(feed)
311 break
312 else:
313 self.debug("could not select feed %r because not active", feed)
314 if feed is None:
315 feed = self.feedsByPriority.get(0, None)
316 self.debug('no feeds active during autoswitch, choosing %r',
317 feed)
318 self.do_switch(feed)
319
320
321
322
324 """
325 @param feed: a logical feed
326 """
327 if feed not in self.logicalFeeds:
328 self.warning("unknown logical feed: %s", feed)
329 return None
330
331 self.debug('scheduling switch to feed %s', feed)
332 self.idealFeed = feed
333
334 self.feedsByPriority = [feed]
335 for name, aliases in self.get_logical_feeds():
336 if name != feed:
337 self.feedsByPriority.append(name)
338
339 if not self.pipeline:
340 return
341
342 if self.is_active(feed):
343 self.do_switch()
344 else:
345 fmt = N_("Tried to switch to %s, but feed is unavailable. "
346 "Will retry when the feed is back.")
347 self.addWarning("temporary-switch-problem", fmt, feed)
348
349
350
351
352
353
354
355
357 if feed == None:
358 feed = self.idealFeed
359
360 self.clearWarning('temporary-switch-problem')
361 if feed == self.activeFeed:
362 self.debug("already streaming from feed %r", feed)
363 return
364 if feed not in self.logicalFeeds:
365 self.warning("unknown logical feed: %s", feed)
366 return
367
368
369 pairs = [self.switchPads[alias]
370 for alias in self.logicalFeeds[feed]]
371
372 stop_times = [e.emit('block') for p, e in pairs]
373 start_times = [p.get_property('running-time') for p, e in pairs]
374
375 stop_time = max(stop_times)
376 self.debug('stop time = %d', stop_time)
377 self.debug('stop time = %s', gst.TIME_ARGS(stop_time))
378
379 if stop_time != gst.CLOCK_TIME_NONE:
380 diff = float(max(stop_times) - min(stop_times))
381 if diff > gst.SECOND * 10:
382 fmt = N_("When switching to %s, feed timestamps out"
383 " of sync by %us")
384 self.addWarning('large-timestamp-difference', fmt,
385 feed, diff / gst.SECOND, priority=40)
386
387 start_time = min(start_times)
388 self.debug('start time = %s', gst.TIME_ARGS(start_time))
389
390 self.debug('switching from %r to %r', self.activeFeed, feed)
391 for p, e in pairs:
392 self.debug("switching to pad %r", p)
393 e.emit('switch', p, stop_time, start_time)
394
395 self.activeFeed = feed
396 self.uiState.set("active-eater", feed)
397
398
400 logCategory = "single-switch"
401
403 return [('master', ['master']),
404 ('backup', ['backup'])]
405
407 return ("input-selector name=muxer ! "
408 "identity silent=true single-segment=true name=iden ")
409
411 return [pipeline.get_by_name('muxer')]
412
413
415 logCategory = "av-switch"
416
418
419 self.vparms = {'video-width': 'width', 'video-height': 'height',
420 'video-framerate': 'framerate',
421 'video-pixel-aspect-ratio': 'par'}
422 self.aparms = {'audio-channels': 'channels',
423 'audio-samplerate': 'samplerate'}
424
426 return [('master', ['video-master', 'audio-master']),
427 ('backup', ['video-backup', 'audio-backup'])]
428
430
431
432 return [pipeline.get_by_name('vswitch'),
433 pipeline.get_by_name('aswitch')]
434
435 - def addError(self, id, format, *args, **kwargs):
441
443 propkeys = python.set(self.config['properties'].keys())
444 vparms = python.set(self.vparms.keys())
445 aparms = python.set(self.aparms.keys())
446
447 for kind, parms in ('Video', vparms), ('Audio', aparms):
448 missing = parms - (propkeys & parms)
449 if missing and missing != parms:
450 fmt = N_("%s parameter(s) were specified but not all. "
451 "Missing parameters are: %r")
452 self.addError("video-params-not-specified", fmt, kind,
453 list(missing))
454
456
457 def i420caps(framerate, par, width, height):
458 return ("video/x-raw-yuv,width=%d,height=%d,framerate=%d/%d,"
459 "pixel-aspect-ratio=%d/%d,format=(fourcc)I420"
460 % (width, height, framerate[0], framerate[1],
461 par[0], par[1]))
462
463 def audiocaps(channels, samplerate):
464 return ("audio/x-raw-int,channels=%d,samplerate=%d,width=16,"
465 "depth=16,signed=true" % (channels, samplerate))
466
467 def props2caps(proc, parms, prefix, suffix=' ! '):
468 kw = dict([(parms[prop], properties[prop])
469 for prop in properties if prop in parms])
470 if kw:
471 return prefix + proc(**kw) + suffix
472 else:
473 return ''
474
475 vforce = props2caps(i420caps, self.vparms,
476 "ffmpegcolorspace ! videorate ! videoscale "
477 "! capsfilter caps=")
478 aforce = props2caps(audiocaps, self.aparms,
479 "audioconvert ! audioconvert ! capsfilter caps=")
480
481 pipeline = ("input-selector name=vswitch"
482 " ! identity silent=true single-segment=true"
483 " ! @feeder:video@ "
484 "input-selector name=aswitch"
485 " ! identity silent=true single-segment=true"
486 " ! @feeder:audio@ ")
487 for alias in self.eaters:
488 if "video" in alias:
489 pipeline += '@eater:%s@ ! %s vswitch. ' % (alias, vforce)
490 elif "audio" in alias:
491 pipeline += '@eater:%s@ ! %s aswitch. ' % (alias, aforce)
492 else:
493 raise AssertionError()
494
495 return pipeline
496