Package flumotion :: Package component :: Package combiners :: Package switch :: Module switch
[hide private]

Source Code for Module flumotion.component.combiners.switch.switch

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gst 
 19  import gobject 
 20   
 21  from twisted.internet import defer, reactor 
 22   
 23  from flumotion.common import errors, messages, log, python 
 24  from flumotion.common.i18n import N_, gettexter 
 25  from flumotion.common.planet import moods 
 26  from flumotion.component import feedcomponent 
 27  from flumotion.component.base import scheduler 
 28  from flumotion.component.padmonitor import PadMonitor 
 29  from flumotion.component.plugs import base 
 30  from flumotion.worker.checks import check 
 31   
 32  __version__ = "$Rev$" 
 33  T_ = gettexter() 
 34   
 35   
36 -class SwitchMedium(feedcomponent.FeedComponentMedium):
37
38 - def remote_switchToMaster(self):
39 return self.comp.switch_to("master")
40
41 - def remote_switchToBackup(self):
42 return self.comp.switch_to("backup")
43
44 - def remote_switchTo(self, logicalFeed):
45 return self.comp.switch_to(logicalFeed)
46 47
48 -class ICalSwitchPlug(base.ComponentPlug):
49 logCategory = "ical-switch" 50
51 - def start(self, component):
52 self._sid = None 53 self.sched = None 54 try: 55 56 def eventStarted(eventInstance): 57 self.debug("event started %r", eventInstance.event.uid) 58 component.switch_to("backup")
59 60 def eventEnded(eventInstance): 61 self.debug("event ended %r", eventInstance.event.uid) 62 component.switch_to("master")
63 64 # if an event starts, semantics are to switch to backup 65 # if an event ends, semantics are to switch to master 66 filename = self.args['properties']['ical-schedule'] 67 self.sched = scheduler.ICalScheduler(open(filename, 'r')) 68 self._sid = self.sched.subscribe(eventStarted, eventEnded) 69 if self.sched.getCalendar().getActiveEventInstances(): 70 component.idealFeed = "backup" 71 except ValueError: 72 fmt = N_("Error parsing ical file %s, so not scheduling " 73 "any events.") 74 component.addWarning("error-parsing-ical", fmt, filename) 75 except ImportError, e: 76 fmt = N_("An ical file has been specified for scheduling, " 77 "but the necessary modules are not installed.") 78 component.addWarning("error-parsing-ical", fmt, debug=e.message) 79
80 - def stop(self, component):
81 if self.sched: 82 self.sched.unsubscribe(self._sid)
83 84
85 -class Switch(feedcomponent.MultiInputParseLaunchComponent):
86 logCategory = 'switch' 87 componentMediumClass = SwitchMedium 88
89 - def init(self):
90 self.uiState.addKey("active-eater") 91 self.icalScheduler = None 92 93 # This structure maps logical feeds to sets of eaters. For 94 # example, "master" and "backup" could be logical feeds, and 95 # would be the keys in this dict, mapping to lists of eater 96 # aliases corresponding to those feeds. The lengths of those 97 # lists is equal to the number of feeders that the element has, 98 # which is the number of individual streams in a logical feed. 99 # 100 # For example, {"master": ["audio-master", "video-master"], 101 # "backup": ["audio-backup", "video-backup"]} 102 # logical feed name -> [eater alias] 103 self.logicalFeeds = {} 104 # logical feed names in order of preference 105 self.feedsByPriority = [] 106 107 # eater alias -> (sink pad, switch element) 108 self.switchPads = {} 109 110 # Two variables form the state of the switch component. 111 # idealFeed 112 # The feed that we would like to provide, as chosen by 113 # the user, either by the UI, an ical file, a pattern 114 # detection, etc. 115 # activeFeed 116 # The feed currently being provided 117 self.idealFeed = None 118 self.activeFeed = None 119 120 # store of new segment events consumed on switch pads 121 # due to them having gone inactive 122 # eater alias -> event 123 self.newSegmentEvents = {} 124 125 # probe ids 126 # pad -> probe handler id 127 self.eventProbeIds = {} 128 self.bufferProbeIds = {} 129 130 # pad monitors for switch sink pads 131 self._padMonitors = {}
132
133 - def addWarning(self, id, format, *args, **kwargs):
134 self.warning(format, *args) 135 m = messages.Message(messages.WARNING, T_(format, *args), 136 mid=id, **kwargs) 137 self.addMessage(m)
138
139 - def clearWarning(self, id):
140 for m in self.state.get('messages')[:]: 141 if m.id == id: 142 self.state.remove('messages', m)
143
144 - def do_check(self):
145 146 def checkSignal(fact): 147 fact = fact.load() 148 signals = gobject.signal_list_names(fact.get_element_type()) 149 return 'block' in signals
150 151 def cb(result): 152 for m in result.messages: 153 self.addMessage(m) 154 return result.value
155 156 self.debug("checking for input-selector element") 157 if gst.version() >= (0, 10, 32, 0): 158 # In release 0.10.32 input-selector was moved to coreelements. 159 d = check.checkPlugin('coreelements', 'gst-plugins', 160 (0, 10, 5, 2), 'input-selector', checkSignal) 161 else: 162 d = check.checkPlugin('selector', 'gst-plugins-bad', 163 (0, 10, 5, 2), 'input-selector', checkSignal) 164 d.addCallback(cb) 165 return d 166
167 - def do_setup(self):
168 ical = self.config['properties'].get('ical-schedule', None) 169 if ical: 170 args = {'properties': {'ical-schedule': ical}} 171 self.icalScheduler = ICalSwitchPlug(args) 172 self.icalScheduler.start(self)
173
174 - def create_pipeline(self):
175 for name, aliases in self.get_logical_feeds(): 176 assert name not in self.logicalFeeds 177 for alias in aliases: 178 assert alias in self.eaters 179 self.logicalFeeds[name] = aliases 180 if self.idealFeed is None: 181 self.debug("idealFeed being set to %s", name) 182 self.idealFeed = name 183 self.feedsByPriority.append(name) 184 185 return feedcomponent.MultiInputParseLaunchComponent.create_pipeline( 186 self)
187
188 - def get_logical_feeds(self):
189 raise errors.NotImplementedError('subclasses should implement ' 190 'get_logical_feeds')
191
192 - def configure_pipeline(self, pipeline, properties):
193 194 def getDownstreamElement(e): 195 for pad in e.pads(): 196 if pad.get_direction() is gst.PAD_SRC: 197 peer = pad.get_peer() 198 return peer, peer.get_parent() 199 raise AssertionError('failed to find the switch')
200 201 switchElements = self.get_switch_elements(pipeline) 202 for alias in self.eaters: 203 e = pipeline.get_by_name(self.eaters[alias].elementName) 204 pad = None 205 while e not in switchElements: 206 self.log("Element: %s", e.get_name()) 207 pad, e = getDownstreamElement(e) 208 self.debug('eater %s maps to pad %s', alias, pad) 209 self.switchPads[alias] = pad, e 210 211 # set active pad correctly on each of the switch elements 212 # (pad, switch) 213 pairs = [self.switchPads[alias] 214 for alias in self.logicalFeeds[self.idealFeed]] 215 216 for p, s in pairs: 217 s.set_property('active-pad', p) 218 self.activeFeed = self.idealFeed 219 self.uiState.set("active-eater", self.idealFeed) 220 221 self.install_logical_feed_watches() 222 223 self.do_switch() 224 225 # add pad monitors on switch sink pads before we set them eaters active 226
227 - def install_logical_feed_watches(self):
228 229 def eaterSetActive(eaterAlias): 230 for feed, aliases in self.logicalFeeds.items(): 231 if eaterAlias in aliases: 232 if feed not in activeFeeds: 233 activeFeeds.append(feed) 234 self.feedSetActive(feed) 235 return
236 237 def eaterSetInactive(eaterAlias): 238 for feed, aliases in self.logicalFeeds.items(): 239 if eaterAlias in aliases and feed in activeFeeds: 240 activeFeeds.remove(feed) 241 self.feedSetInactive(feed) 242 # add an event and buffer probe to the switch pad 243 # so we can rewrite the newsegment that comes 244 # when eater is active again 245 # Not rewriting it causes the pad running time 246 # to be wrong due to the new segment having a start 247 # time being much lower than any subsequent buffers. 248 pad = self.switchPads[eaterAlias][0] 249 self.eventProbeIds[pad] = \ 250 pad.add_event_probe(self._eventProbe) 251 self.bufferProbeIds[pad] = \ 252 pad.add_buffer_probe(self._bufferProbe) 253 return 254 255 activeFeeds = [] 256 for alias in self.eaters: 257 self._padMonitors[alias] = PadMonitor(self.switchPads[alias][0], 258 alias, eaterSetActive, eaterSetInactive) 259
260 - def _eventProbe(self, pad, event):
261 # called from GStreamer threads 262 ret = True 263 if event.type == gst.EVENT_NEWSEGMENT: 264 ret = False 265 self.newSegmentEvents[pad] = event 266 if self.eventProbeIds[pad]: 267 pad.remove_event_probe(self.eventProbeIds[pad]) 268 del self.eventProbeIds[pad] 269 return ret
270
271 - def _bufferProbe(self, pad, buffer):
272 # called from GStreamer threads 273 ts = buffer.timestamp 274 if pad in self.newSegmentEvents: 275 parsed = self.newSegmentEvents[pad].parse_new_segment() 276 newEvent = gst.event_new_new_segment(parsed[0], parsed[1], 277 parsed[2], ts, parsed[4], parsed[5]) 278 pad.push_event(newEvent) 279 del self.newSegmentEvents[pad] 280 if pad in self.bufferProbeIds: 281 pad.remove_buffer_probe(self.bufferProbeIds[pad]) 282 del self.bufferProbeIds[pad] 283 return True
284
285 - def get_switch_elements(self, pipeline):
286 raise errors.NotImplementedError('subclasses should implement ' 287 'get_switch_elements')
288
289 - def is_active(self, feed):
290 return python.all([self.eaters[alias].isActive() 291 for alias in self.logicalFeeds[feed]])
292
293 - def feedSetActive(self, feed):
294 self.debug('feed %r is now active', feed) 295 if feed == self.idealFeed: 296 self.do_switch()
297
298 - def feedSetInactive(self, feed):
299 self.debug('feed %r is now inactive', feed)
300 301 # this function is used by the watchdogs 302
303 - def auto_switch(self):
304 allFeeds = self.feedsByPriority[:] 305 feed = None 306 while allFeeds: 307 feed = allFeeds.pop(0) 308 if self.is_active(feed): 309 self.debug('autoswitch selects feed %r', feed) 310 self.do_switch(feed) 311 break 312 else: 313 self.debug("could not select feed %r because not active", feed) 314 if feed is None: 315 feed = self.feedsByPriority.get(0, None) 316 self.debug('no feeds active during autoswitch, choosing %r', 317 feed) 318 self.do_switch(feed)
319 320 # switch_to should only be called when the ideal feed is requested to be 321 # changed, so not by watchdog reasons. 322
323 - def switch_to(self, feed):
324 """ 325 @param feed: a logical feed 326 """ 327 if feed not in self.logicalFeeds: 328 self.warning("unknown logical feed: %s", feed) 329 return None 330 331 self.debug('scheduling switch to feed %s', feed) 332 self.idealFeed = feed 333 # here we should bump this feed above others in feedsByPriority 334 self.feedsByPriority = [feed] 335 for name, aliases in self.get_logical_feeds(): 336 if name != feed: 337 self.feedsByPriority.append(name) 338 339 if not self.pipeline: 340 return 341 342 if self.is_active(feed): 343 self.do_switch() 344 else: 345 fmt = N_("Tried to switch to %s, but feed is unavailable. " 346 "Will retry when the feed is back.") 347 self.addWarning("temporary-switch-problem", fmt, feed)
348 349 # Switching multiple eaters is easy. The only trick is that we have 350 # to close the previous segment at the same running time, on both 351 # switch elements, and open the new segment at the same running 352 # time. The block()/switch() signal API on switch elements lets us 353 # do this. See the docs for switch's `block' and `switch' signals 354 # for more information. 355
356 - def do_switch(self, feed=None):
357 if feed == None: 358 feed = self.idealFeed 359 360 self.clearWarning('temporary-switch-problem') 361 if feed == self.activeFeed: 362 self.debug("already streaming from feed %r", feed) 363 return 364 if feed not in self.logicalFeeds: 365 self.warning("unknown logical feed: %s", feed) 366 return 367 368 # (pad, switch) 369 pairs = [self.switchPads[alias] 370 for alias in self.logicalFeeds[feed]] 371 372 stop_times = [e.emit('block') for p, e in pairs] 373 start_times = [p.get_property('running-time') for p, e in pairs] 374 375 stop_time = max(stop_times) 376 self.debug('stop time = %d', stop_time) 377 self.debug('stop time = %s', gst.TIME_ARGS(stop_time)) 378 379 if stop_time != gst.CLOCK_TIME_NONE: 380 diff = float(max(stop_times) - min(stop_times)) 381 if diff > gst.SECOND * 10: 382 fmt = N_("When switching to %s, feed timestamps out" 383 " of sync by %us") 384 self.addWarning('large-timestamp-difference', fmt, 385 feed, diff / gst.SECOND, priority=40) 386 387 start_time = min(start_times) 388 self.debug('start time = %s', gst.TIME_ARGS(start_time)) 389 390 self.debug('switching from %r to %r', self.activeFeed, feed) 391 for p, e in pairs: 392 self.debug("switching to pad %r", p) 393 e.emit('switch', p, stop_time, start_time) 394 395 self.activeFeed = feed 396 self.uiState.set("active-eater", feed)
397 398
399 -class SingleSwitch(Switch):
400 logCategory = "single-switch" 401
402 - def get_logical_feeds(self):
403 return [('master', ['master']), 404 ('backup', ['backup'])]
405
406 - def get_muxer_string(self, properties):
407 return ("input-selector name=muxer ! " 408 "identity silent=true single-segment=true name=iden ")
409
410 - def get_switch_elements(self, pipeline):
411 return [pipeline.get_by_name('muxer')]
412 413
414 -class AVSwitch(Switch):
415 logCategory = "av-switch" 416
417 - def init(self):
418 # property name -> caps property name 419 self.vparms = {'video-width': 'width', 'video-height': 'height', 420 'video-framerate': 'framerate', 421 'video-pixel-aspect-ratio': 'par'} 422 self.aparms = {'audio-channels': 'channels', 423 'audio-samplerate': 'samplerate'}
424
425 - def get_logical_feeds(self):
426 return [('master', ['video-master', 'audio-master']), 427 ('backup', ['video-backup', 'audio-backup'])]
428
429 - def get_switch_elements(self, pipeline):
430 # these have to be in the same order as the lists in 431 # get_logical_feeds 432 return [pipeline.get_by_name('vswitch'), 433 pipeline.get_by_name('aswitch')]
434
435 - def addError(self, id, format, *args, **kwargs):
436 self.warning(format, *args) 437 m = messages.Message(messages.ERROR, T_(format, *args), 438 id=id, **kwargs) 439 self.addMessage(m) 440 raise errors.ComponentSetupHandledError()
441
442 - def do_check(self):
443 propkeys = python.set(self.config['properties'].keys()) 444 vparms = python.set(self.vparms.keys()) 445 aparms = python.set(self.aparms.keys()) 446 447 for kind, parms in ('Video', vparms), ('Audio', aparms): 448 missing = parms - (propkeys & parms) 449 if missing and missing != parms: 450 fmt = N_("%s parameter(s) were specified but not all. " 451 "Missing parameters are: %r") 452 self.addError("video-params-not-specified", fmt, kind, 453 list(missing))
454
455 - def get_pipeline_string(self, properties):
456 457 def i420caps(framerate, par, width, height): 458 return ("video/x-raw-yuv,width=%d,height=%d,framerate=%d/%d," 459 "pixel-aspect-ratio=%d/%d,format=(fourcc)I420" 460 % (width, height, framerate[0], framerate[1], 461 par[0], par[1]))
462 463 def audiocaps(channels, samplerate): 464 return ("audio/x-raw-int,channels=%d,samplerate=%d,width=16," 465 "depth=16,signed=true" % (channels, samplerate))
466 467 def props2caps(proc, parms, prefix, suffix=' ! '): 468 kw = dict([(parms[prop], properties[prop]) 469 for prop in properties if prop in parms]) 470 if kw: 471 return prefix + proc(**kw) + suffix 472 else: 473 return '' 474 475 vforce = props2caps(i420caps, self.vparms, 476 "ffmpegcolorspace ! videorate ! videoscale " 477 "! capsfilter caps=") 478 aforce = props2caps(audiocaps, self.aparms, 479 "audioconvert ! audioconvert ! capsfilter caps=") 480 481 pipeline = ("input-selector name=vswitch" 482 " ! identity silent=true single-segment=true" 483 " ! @feeder:video@ " 484 "input-selector name=aswitch" 485 " ! identity silent=true single-segment=true" 486 " ! @feeder:audio@ ") 487 for alias in self.eaters: 488 if "video" in alias: 489 pipeline += '@eater:%s@ ! %s vswitch. ' % (alias, vforce) 490 elif "audio" in alias: 491 pipeline += '@eater:%s@ ! %s aswitch. ' % (alias, aforce) 492 else: 493 raise AssertionError() 494 495 return pipeline 496