Package flumotion :: Package component :: Module feedcomponent010
[hide private]

Source Code for Module flumotion.component.feedcomponent010

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gst 
 19  import gobject 
 20   
 21  import os 
 22  import time 
 23   
 24  from twisted.internet import reactor, defer 
 25   
 26  from flumotion.common import common, errors, pygobject, messages, log 
 27  from flumotion.common import gstreamer 
 28  from flumotion.common.i18n import N_, gettexter 
 29  from flumotion.common.planet import moods 
 30  from flumotion.component import component as basecomponent 
 31  from flumotion.component import padmonitor 
 32  from flumotion.component.feeder import Feeder 
 33  from flumotion.component.eater import Eater 
 34   
 35  __version__ = "$Rev$" 
 36  T_ = gettexter() 
 37   
 38   
39 -class FeedComponent(basecomponent.BaseComponent):
40 """ 41 I am a base class for all Flumotion feed components. 42 43 @type eaters: dict of str -> L{Eater} 44 @type feeders: dict of str -> L{Feeder} 45 """ 46 47 # how often to update the UIState feeder statistics 48 FEEDER_STATS_UPDATE_FREQUENCY = 12.5 49 dropStreamHeaders = True 50 swallowNewSegment = True 51 52 logCategory = 'feedcomponent' 53 54 ### BaseComponent interface implementations 55
56 - def init(self):
57 # add keys for eaters and feeders uiState 58 self.feeders = {} # feeder feedName -> Feeder 59 self.eaters = {} # eater eaterAlias -> Eater 60 self.uiState.addListKey('feeders') 61 self.uiState.addListKey('eaters') 62 self.uiState.addKey('gst-debug') 63 64 self.pipeline = None 65 self.pipeline_signals = [] 66 self.bus_signal_id = None 67 self.effects = {} 68 self._feeder_probe_cl = None 69 70 self._pad_monitors = padmonitor.PadMonitorSet( 71 lambda: self.setMood(moods.happy), 72 lambda: self.setMood(moods.hungry)) 73 74 self._clock_slaved = False 75 self.clock_provider = None 76 self._master_clock_info = None # (ip, port, basetime) if we're the 77 # clock master 78 79 self._change_monitor = gstreamer.StateChangeMonitor() 80 81 # multifdsink's get-stats signal had critical bugs before this version 82 self._get_stats_supported = (gstreamer.get_plugin_version('tcp') 83 >= (0, 10, 11, 0))
84
85 - def do_setup(self):
86 """ 87 Sets up component. 88 89 Invokes the L{create_pipeline} and L{set_pipeline} vmethods, 90 which subclasses can provide. 91 """ 92 config = self.config 93 eater_config = config.get('eater', {}) 94 feeder_config = config.get('feed', []) 95 source_config = config.get('source', []) 96 97 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config) 98 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config) 99 self.debug("FeedComponent.do_setup(): source_config %r", source_config) 100 # for upgrade of code without restarting managers 101 # this will only be for components whose eater name in registry is 102 # default, so no need to import registry and find eater name 103 if eater_config == {} and source_config != []: 104 eater_config = {'default': [(x, 'default') for x in source_config]} 105 106 for eaterName in eater_config: 107 for feedId, eaterAlias in eater_config[eaterName]: 108 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName) 109 self.uiState.append('eaters', self.eaters[eaterAlias].uiState) 110 111 for feederName in feeder_config: 112 self.feeders[feederName] = Feeder(feederName) 113 self.uiState.append('feeders', 114 self.feeders[feederName].uiState) 115 116 clockMaster = config.get('clock-master', None) 117 if clockMaster: 118 self._clock_slaved = clockMaster != config['avatarId'] 119 else: 120 self._clock_slaved = False 121 122 pipeline = self.create_pipeline() 123 self.connect_feeders(pipeline) 124 self.set_pipeline(pipeline) 125 126 self.uiState.set('gst-debug', os.environ.get('GST_DEBUG', '*:0')) 127 self.debug("FeedComponent.do_setup(): setup finished") 128 129 self.try_start_pipeline() 130 131 # no race, messages marshalled asynchronously via the bus 132 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING) 133 d.addCallback(lambda x: self.do_pipeline_playing())
134
135 - def setup_completed(self):
136 # Just log; we override the superclass to not turn happy here. 137 # Instead, we turn happy once the pipeline gets to PLAYING. 138 self.debug("Setup completed")
139 140 ### FeedComponent interface for subclasses 141
142 - def create_pipeline(self):
143 """ 144 Subclasses have to implement this method. 145 146 @rtype: L{gst.Pipeline} 147 """ 148 raise NotImplementedError( 149 "subclass must implement create_pipeline")
150
151 - def set_pipeline(self, pipeline):
152 """ 153 Subclasses can override me. 154 They should chain up first. 155 """ 156 if self.pipeline: 157 self.cleanup() 158 self.pipeline = pipeline 159 self._setup_pipeline()
160
161 - def attachPadMonitorToFeeder(self, feederName):
162 elementName = self.feeders[feederName].payName 163 element = self.pipeline.get_by_name(elementName) 164 if not element: 165 raise errors.ComponentError("No such feeder %s" % feederName) 166 167 pad = element.get_pad('src') 168 self._pad_monitors.attach(pad, "%s:%s" % (self.name, elementName))
169
170 - def attachPadMonitorToElement(self, elementName, 171 setActive=None, setInactive=None):
172 element = self.pipeline.get_by_name(elementName) 173 if not element: 174 raise error.ComponentError("No such element %s" % elementName) 175 pad = element.get_pad('src') 176 name = "%s:%s" % (self.name, elementName) 177 self._pad_monitors.attach(pad, name) 178 179 if setActive and setInactive: 180 self._pad_monitors[name].addWatch(setActive, setInactive)
181 182 ### FeedComponent methods 183
184 - def addEffect(self, effect):
185 self.effects[effect.name] = effect 186 effect.setComponent(self)
187
188 - def connect_feeders(self, pipeline):
189 # Connect to the client-fd-removed signals on each feeder, so we 190 # can clean up properly on removal. 191 192 def client_fd_removed(sink, fd, feeder): 193 # Called (as a signal callback) when the FD is no longer in 194 # use by multifdsink. 195 # This will call the registered callable on the fd. 196 # Called from GStreamer threads. 197 self.debug("cleaning up fd %d", fd) 198 feeder.clientDisconnected(fd)
199 200 for feeder in self.feeders.values(): 201 element = pipeline.get_by_name(feeder.elementName) 202 if element: 203 element.connect('client-fd-removed', client_fd_removed, 204 feeder) 205 self.debug("Connected to client-fd-removed on %r", feeder) 206 else: 207 self.warning("No feeder %s in pipeline", feeder.elementName)
208
209 - def get_pipeline(self):
210 return self.pipeline
211
212 - def do_pipeline_playing(self):
213 """ 214 Invoked when the pipeline has changed the state to playing. 215 The default implementation sets the component's mood to HAPPY. 216 """ 217 self.setMood(moods.happy)
218
219 - def make_message_for_gstreamer_error(self, gerror, debug):
220 """Make a flumotion error message to show to the user. 221 222 This method may be overridden by components that have special 223 knowledge about potential errors. If the component does not know 224 about the error, it can chain up to this implementation, which 225 will make a generic message. 226 227 @param gerror: The GError from the error message posted on the 228 GStreamer message bus. 229 @type gerror: L{gst.GError} 230 @param debug: A string with debugging information. 231 @type debug: str 232 233 @returns: A L{flumotion.common.messages.Message} to show to the 234 user. 235 """ 236 # generate a unique id 237 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code) 238 m = messages.Error(T_(N_( 239 "Internal GStreamer error.")), 240 debug="%s\n%s: %d\n%s" % ( 241 gerror.message, gerror.domain, gerror.code, debug), 242 mid=mid, priority=40) 243 return m
244
245 - def bus_message_received_cb(self, bus, message):
246 247 def state_changed(): 248 if src == self.pipeline: 249 old, new, pending = message.parse_state_changed() 250 self._change_monitor.state_changed(old, new) 251 dump_filename = "%s.%s_%s" % (self.name, 252 gst.element_state_get_name(old), 253 gst.element_state_get_name(new)) 254 self.dump_gstreamer_debug_dot_file(dump_filename, True)
255 256 def error(): 257 gerror, debug = message.parse_error() 258 self.warning('element %s error %s %s', 259 src.get_path_string(), gerror, debug) 260 self.setMood(moods.sad) 261 262 # this method can fail if the component has a mistake 263 try: 264 m = self.make_message_for_gstreamer_error(gerror, debug) 265 except Exception, e: 266 msg = log.getExceptionMessage(e) 267 m = messages.Error(T_(N_( 268 "Programming error in component.")), 269 debug="Bug in %r.make_message_for_gstreamer_error: %s" % ( 270 self.__class__, msg)) 271 272 self.state.append('messages', m) 273 self._change_monitor.have_error(self.pipeline.get_state(), 274 message) 275 276 def eos(): 277 name = src.get_name() 278 if name in self._pad_monitors: 279 self.info('End of stream in element %s', name) 280 self._pad_monitors[name].setInactive() 281 else: 282 self.info("We got an eos from %s", name) 283 284 def default(): 285 self.log('message received: %r', message) 286 287 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed, 288 gst.MESSAGE_ERROR: error, 289 gst.MESSAGE_EOS: eos} 290 t = message.type 291 src = message.src 292 handlers.get(t, default)() 293 return True 294
295 - def install_eater_continuity_watch(self, eaterWatchElements):
296 """Watch a set of elements for discontinuity messages. 297 298 @param eaterWatchElements: the set of elements to watch for 299 discontinuities. 300 @type eaterWatchElements: Dict of elementName => Eater. 301 """ 302 303 def on_element_message(bus, message): 304 src = message.src 305 name = src.get_name() 306 if name in eaterWatchElements: 307 eater = eaterWatchElements[name] 308 s = message.structure 309 310 def timestampDiscont(): 311 prevTs = s["prev-timestamp"] 312 prevDuration = s["prev-duration"] 313 curTs = s["cur-timestamp"] 314 315 if prevTs == gst.CLOCK_TIME_NONE: 316 self.debug("no previous timestamp") 317 return 318 if prevDuration == gst.CLOCK_TIME_NONE: 319 self.debug("no previous duration") 320 return 321 if curTs == gst.CLOCK_TIME_NONE: 322 self.debug("no current timestamp") 323 return 324 325 discont = curTs - (prevTs + prevDuration) 326 dSeconds = discont / float(gst.SECOND) 327 self.debug("we have a discont on eater %s of %.9f s " 328 "between %s and %s ", eater.eaterAlias, 329 dSeconds, 330 gst.TIME_ARGS(prevTs + prevDuration), 331 gst.TIME_ARGS(curTs)) 332 333 eater.timestampDiscont(dSeconds, 334 float(curTs) / float(gst.SECOND))
335 336 def offsetDiscont(): 337 prevOffsetEnd = s["prev-offset-end"] 338 curOffset = s["cur-offset"] 339 discont = curOffset - prevOffsetEnd 340 self.debug("we have a discont on eater %s of %d " 341 "units between %d and %d ", 342 eater.eaterAlias, discont, prevOffsetEnd, 343 curOffset) 344 eater.offsetDiscont(discont, curOffset) 345 346 handlers = {'imperfect-timestamp': timestampDiscont, 347 'imperfect-offset': offsetDiscont} 348 if s.get_name() in handlers: 349 handlers[s.get_name()]() 350 351 # we know that there is a signal watch already installed 352 bus = self.pipeline.get_bus() 353 # never gets cleaned up; does that matter? 354 bus.connect("message::element", on_element_message) 355
356 - def install_eater_event_probes(self, eater):
357 358 def fdsrc_event(pad, event): 359 # An event probe used to consume unwanted EOS events on eaters. 360 # Called from GStreamer threads. 361 if event.type == gst.EVENT_EOS: 362 self.info('End of stream for eater %s, disconnect will be ' 363 'triggered', eater.eaterAlias) 364 # We swallow it because otherwise our component acts on the EOS 365 # and we can't recover from that later. Instead, fdsrc will be 366 # taken out and given a new fd on the next eatFromFD call. 367 return False 368 return True
369 370 def depay_event(pad, event): 371 # An event probe used to consume unwanted duplicate 372 # newsegment events. 373 # Called from GStreamer threads. 374 if event.type == gst.EVENT_NEWSEGMENT: 375 # We do this because we know gdppay/gdpdepay screw up on 2nd 376 # newsegments (unclear what the original reason for this 377 # was, perhaps #349204) 378 # Other elements might also have problems with repeated 379 # newsegments coming in, so we just drop them all. Flumotion 380 # operates in single segment space, so dropping newsegments 381 # should be fine. 382 if getattr(eater, '_gotFirstNewSegment', False): 383 self.info("Subsequent new segment event received on " 384 "depay on eater %s", eater.eaterAlias) 385 # swallow (gulp) 386 eater.streamheader = [] 387 if self.swallowNewSegment: 388 return False 389 else: 390 eater._gotFirstNewSegment = True 391 return True 392 393 self.debug('adding event probe for eater %s', eater.eaterAlias) 394 fdsrc = self.get_element(eater.elementName) 395 fdsrc.get_pad("src").add_event_probe(fdsrc_event) 396 depay = self.get_element(eater.depayName) 397 depay.get_pad("src").add_event_probe(depay_event) 398
399 - def _setup_pipeline(self):
400 self.debug('setup_pipeline()') 401 assert self.bus_signal_id == None 402 403 self.pipeline.set_name('pipeline-' + self.getName()) 404 bus = self.pipeline.get_bus() 405 bus.add_signal_watch() 406 self.bus_signal_id = bus.connect('message', 407 self.bus_message_received_cb) 408 sig_id = self.pipeline.connect('deep-notify', 409 gstreamer.verbose_deep_notify_cb, self) 410 self.pipeline_signals.append(sig_id) 411 412 # set to ready so that multifdsinks can always receive fds, even 413 # if the pipeline has a delayed start due to clock slaving 414 self.pipeline.set_state(gst.STATE_READY) 415 416 # start checking feeders, if we have a sufficiently recent multifdsink 417 if self._get_stats_supported: 418 self._feeder_probe_cl = reactor.callLater( 419 self.FEEDER_STATS_UPDATE_FREQUENCY, 420 self._feeder_probe_calllater) 421 else: 422 self.warning("Feeder statistics unavailable, your " 423 "gst-plugins-base is too old") 424 m = messages.Warning(T_(N_( 425 "Your gst-plugins-base is too old, so " 426 "feeder statistics will be unavailable.")), 427 mid='multifdsink') 428 m.add(T_(N_( 429 "Please upgrade '%s' to version %s."), 'gst-plugins-base', 430 '0.10.11')) 431 self.addMessage(m) 432 433 for eater in self.eaters.values(): 434 self.install_eater_event_probes(eater) 435 pad = self.get_element(eater.elementName).get_pad('src') 436 name = "%s:%s" % (self.name, eater.elementName) 437 self._pad_monitors.attach(pad, name, 438 padmonitor.EaterPadMonitor, 439 self.reconnectEater, 440 eater.eaterAlias) 441 eater.setPadMonitor(self._pad_monitors[name])
442
443 - def stop_pipeline(self):
444 if not self.pipeline: 445 return 446 447 if self.clock_provider: 448 self.clock_provider.set_property('active', False) 449 self.clock_provider = None 450 retval = self.pipeline.set_state(gst.STATE_NULL) 451 if retval != gst.STATE_CHANGE_SUCCESS: 452 self.warning('Setting pipeline to NULL failed')
453
454 - def cleanup(self):
455 self.debug("cleaning up") 456 457 assert self.pipeline != None 458 459 self.stop_pipeline() 460 # Disconnect signals 461 map(self.pipeline.disconnect, self.pipeline_signals) 462 self.pipeline_signals = [] 463 if self.bus_signal_id: 464 self.pipeline.get_bus().disconnect(self.bus_signal_id) 465 self.pipeline.get_bus().remove_signal_watch() 466 self.bus_signal_id = None 467 self.pipeline = None 468 469 if self._feeder_probe_cl: 470 self._feeder_probe_cl.cancel() 471 self._feeder_probe_cl = None 472 473 # clean up checkEater callLaters 474 for eater in self.eaters.values(): 475 self._pad_monitors.remove("%s:%s" % (self.name, eater.elementName)) 476 eater.setPadMonitor(None)
477
478 - def do_stop(self):
479 self.debug('Stopping') 480 if self.pipeline: 481 self.cleanup() 482 self.debug('Stopped') 483 return defer.succeed(None)
484
485 - def set_master_clock(self, ip, port, base_time):
486 self.debug("Master clock set to %s:%d with base_time %s", ip, port, 487 gst.TIME_ARGS(base_time)) 488 489 assert self._clock_slaved 490 if self._master_clock_info == (ip, port, base_time): 491 self.debug("Same master clock info, returning directly") 492 return defer.succeed(None) 493 elif self._master_clock_info: 494 self.stop_pipeline() 495 496 self._master_clock_info = ip, port, base_time 497 498 clock = gst.NetClientClock(None, ip, port, base_time) 499 # disable the pipeline's management of base_time -- we're going 500 # to set it ourselves. 501 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 502 self.pipeline.set_base_time(base_time) 503 self.pipeline.use_clock(clock) 504 505 self.try_start_pipeline()
506
507 - def get_master_clock(self):
508 """ 509 Return the connection details for the network clock provided by 510 this component, if any. 511 """ 512 if self.clock_provider: 513 ip, port, base_time = self._master_clock_info 514 return ip, port, base_time 515 else: 516 return None
517
518 - def provide_master_clock(self, port):
519 """ 520 Tell the component to provide a master clock on the given port. 521 522 @returns: a deferred firing a (ip, port, base_time) triple. 523 """ 524 525 def pipelinePaused(r): 526 clock = self.pipeline.get_clock() 527 # make sure the pipeline sticks with this clock 528 self.pipeline.use_clock(clock) 529 530 self.clock_provider = gst.NetTimeProvider(clock, None, port) 531 realport = self.clock_provider.get_property('port') 532 533 base_time = self.pipeline.get_base_time() 534 535 self.debug('provided master clock from %r, base time %s', 536 clock, gst.TIME_ARGS(base_time)) 537 538 if self.medium: 539 # FIXME: This isn't always correct. We need a more 540 # flexible API, and a proper network map, to do this. 541 # Even then, it's not always going to be possible. 542 ip = self.medium.getIP() 543 else: 544 ip = "127.0.0.1" 545 546 self._master_clock_info = (ip, realport, base_time) 547 return self.get_master_clock()
548 549 assert self.pipeline 550 assert not self._clock_slaved 551 (ret, state, pending) = self.pipeline.get_state(0) 552 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING: 553 self.debug("pipeline still spinning up: %r", state) 554 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED) 555 d.addCallback(pipelinePaused) 556 return d 557 elif self.clock_provider: 558 self.debug("returning existing master clock info") 559 return defer.succeed(self.get_master_clock()) 560 else: 561 return defer.maybeDeferred(pipelinePaused, None) 562
563 - def dump_gstreamer_debug_dot_file(self, filename, with_timestamp=False):
564 """ 565 Dumps a graphviz dot file of the pipeline's current state to disk. 566 This will only actually do anything if the environment variable 567 GST_DEBUG_DUMP_DOT_DIR is set. 568 569 @param filename: filename to store 570 @param with_timestamp: if True, then timestamp will be prepended to 571 filename 572 """ 573 if hasattr(gst, "DEBUG_BIN_TO_DOT_FILE"): 574 method = gst.DEBUG_BIN_TO_DOT_FILE 575 if with_timestamp: 576 method = gst.DEBUG_BIN_TO_DOT_FILE_WITH_TS 577 method(self.pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filename)
578 579 ### BaseComponent interface implementation 580
581 - def try_start_pipeline(self, force=False):
582 """ 583 Tell the component to start. 584 Whatever is using the component is responsible for making sure all 585 eaters have received their file descriptor to eat from. 586 """ 587 (ret, state, pending) = self.pipeline.get_state(0) 588 if state == gst.STATE_PLAYING: 589 self.log('already PLAYING') 590 if not force: 591 return 592 self.debug('pipeline PLAYING, but starting anyway as requested') 593 594 if self._clock_slaved and not self._master_clock_info: 595 self.debug("Missing master clock info, deferring set to PLAYING") 596 return 597 598 for eater in self.eaters.values(): 599 if not eater.fd: 600 self.debug('eater %s not yet connected, deferring set to ' 601 'PLAYING', eater.eaterAlias) 602 return 603 604 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline) 605 self.pipeline.set_state(gst.STATE_PLAYING)
606
607 - def _feeder_probe_calllater(self):
608 for feedId, feeder in self.feeders.items(): 609 feederElement = self.get_element(feeder.elementName) 610 for client in feeder.getClients(): 611 # a currently disconnected client will have fd None 612 if client.fd is not None: 613 array = feederElement.emit('get-stats', client.fd) 614 if len(array) == 0: 615 # There is an unavoidable race here: we can't know 616 # whether the fd has been removed from multifdsink. 617 # However, if we call get-stats on an fd that 618 # multifdsink doesn't know about, we just get a 619 # 0-length array. We ensure that we don't reuse 620 # the FD too soon so this can't result in calling 621 # this on a valid but WRONG fd 622 self.debug('Feeder element for feed %s does not know ' 623 'client fd %d' % (feedId, client.fd)) 624 else: 625 client.setStats(array) 626 self._feeder_probe_cl = reactor.callLater( 627 self.FEEDER_STATS_UPDATE_FREQUENCY, 628 self._feeder_probe_calllater)
629
630 - def unblock_eater(self, eaterAlias):
631 """ 632 After this function returns, the stream lock for this eater must have 633 been released. If your component needs to do something here, override 634 this method. 635 """ 636 pass
637
638 - def get_element(self, element_name):
639 """Get an element out of the pipeline. 640 641 If it is possible that the component has not yet been set up, 642 the caller needs to check if self.pipeline is actually set. 643 """ 644 assert self.pipeline 645 self.log('Looking up element %r in pipeline %r', 646 element_name, self.pipeline) 647 element = self.pipeline.get_by_name(element_name) 648 if not element: 649 self.warning("No element named %r in pipeline", element_name) 650 return element
651
652 - def get_element_property(self, element_name, property):
653 'Gets a property of an element in the GStreamer pipeline.' 654 self.debug("%s: getting property %s of element %s" % ( 655 self.getName(), property, element_name)) 656 element = self.get_element(element_name) 657 if not element: 658 msg = "Element '%s' does not exist" % element_name 659 self.warning(msg) 660 raise errors.PropertyError(msg) 661 662 self.debug('getting property %s on element %s' % ( 663 property, element_name)) 664 try: 665 value = element.get_property(property) 666 except (ValueError, TypeError): 667 msg = "Property '%s' on element '%s' does not exist" % ( 668 property, element_name) 669 self.warning(msg) 670 raise errors.PropertyError(msg) 671 672 # param enums and enums need to be returned by integer value 673 if isinstance(value, gobject.GEnum): 674 value = int(value) 675 676 return value
677
678 - def modify_element_property(self, element_name, property_name, value, 679 mutable_state=gst.STATE_READY, 680 needs_reset=False):
681 ''' 682 Sets a property on the fly on a gstreamer element 683 684 @param element_name: Name of the gstreamer element 685 @type element_name: str 686 @param property_name: Name of the property to change 687 @type property_name: str 688 @param value: Value to set 689 @param mutable_state: Minimum state required to set the property 690 @type mutable_state: L{gst.Enum} 691 @param needs_reset: Whether setting this property requires sending a 692 'flumotion-reset' event 693 @type needs_reset: bool 694 ''' 695 696 def drop_stream_headers(pad, buf): 697 if buf.flag_is_set(gst.BUFFER_FLAG_IN_CAPS): 698 return False 699 pad.remove_buffer_probe(probes[pad]) 700 return True
701 702 probes = {} 703 element = self.get_element(element_name) 704 if not element: 705 self.warning("The property %s cannot be set because the " 706 "element %s could not be found", 707 property_name, element_name) 708 return 709 710 state = self.pipeline.get_state(0)[1] 711 712 # get the peer pad for each sink pad 713 sink_pads = [p.get_peer() for p in element.pads() 714 if p.get_direction() == gst.PAD_SINK] 715 src_pads = [p for p in element.pads() 716 if p.get_direction() == gst.PAD_SRC] 717 718 # Iterate over all the sink pads and block them 719 for pad in sink_pads: 720 pad.set_blocked(True) 721 722 # If the state of the element is above the mutable state of the 723 # property, we need to change its state to the mutable one, block the 724 # sink pads, set the property, unblock the sink pads and set the 725 # element's state back to its original state 726 if state > mutable_state: 727 element.set_state(mutable_state) 728 element.get_state(0) 729 730 element.set_property(property_name, value) 731 732 # If the property change cause creating new streamheaders, 733 # sending a 'flumotion-reset' event is needed to instruct the 734 # downstream elements like muxers to reset them self 735 if needs_reset: 736 for pad in src_pads: 737 pad.push_event(gstreamer.flumotion_reset_event()) 738 739 # If a reset is not needed we must make sure to drop the duplicated 740 # streamheaders, iterating over all the src pads and installing a 741 # pad_probe to drop them. (eg: theora restarted after a bitrate change 742 # re-sending the headers again) 743 else: 744 for pad in src_pads: 745 probes[pad] = pad.add_buffer_probe(drop_stream_headers) 746 747 if state > mutable_state: 748 element.set_state(state) 749 750 # Unblock all sink pads 751 for pad in sink_pads: 752 pad.set_blocked(False) 753
754 - def set_element_property(self, element_name, property, value):
755 'Sets a property on an element in the GStreamer pipeline.' 756 self.debug("%s: setting property %s of element %s to %s" % ( 757 self.getName(), property, element_name, value)) 758 element = self.get_element(element_name) 759 if not element: 760 msg = "Element '%s' does not exist" % element_name 761 self.warning(msg) 762 raise errors.PropertyError(msg) 763 764 self.debug('setting property %s on element %r to %s' % 765 (property, element_name, value)) 766 pygobject.gobject_set_property(element, property, value)
767 768 ### methods to connect component eaters and feeders 769
770 - def reconnectEater(self, eaterAlias):
771 if not self.medium: 772 self.debug("Can't reconnect eater %s, running " 773 "without a medium", eaterAlias) 774 return 775 776 self.eaters[eaterAlias].disconnected() 777 self.medium.connectEater(eaterAlias)
778
779 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
780 """ 781 @param feedName: name of the feed to feed to the given fd. 782 @type feedName: str 783 @param fd: the file descriptor to feed to 784 @type fd: int 785 @param cleanup: the function to call when the FD is no longer feeding 786 @type cleanup: callable 787 """ 788 self.debug('FeedToFD(%s, %d)', feedName, fd) 789 790 # We must have a pipeline in READY or above to do this. Do a 791 # non-blocking (zero timeout) get_state. 792 if (not self.pipeline or 793 self.pipeline.get_state(0)[1] == gst.STATE_NULL): 794 self.warning('told to feed %s to fd %d, but pipeline not ' 795 'running yet', feedName, fd) 796 cleanup(fd) 797 # can happen if we are restarting but the other component is 798 # happy; assume other side will reconnect later 799 return 800 801 if feedName not in self.feeders: 802 msg = "Cannot find feeder named '%s'" % feedName 803 mid = "feedToFD-%s" % feedName 804 m = messages.Warning(T_(N_("Internal Flumotion error.")), 805 debug=msg, mid=mid, priority=40) 806 self.state.append('messages', m) 807 self.warning(msg) 808 cleanup(fd) 809 return False 810 811 feeder = self.feeders[feedName] 812 element = self.get_element(feeder.elementName) 813 assert element 814 clientId = eaterId or ('client-%d' % fd) 815 element.emit('add', fd) 816 feeder.clientConnected(clientId, fd, cleanup)
817
818 - def eatFromFD(self, eaterAlias, feedId, fd):
819 """ 820 Tell the component to eat the given feedId from the given fd. 821 The component takes over the ownership of the fd, closing it when 822 no longer eating. 823 824 @param eaterAlias: the alias of the eater 825 @type eaterAlias: str 826 @param feedId: feed id (componentName:feedName) to eat from through 827 the given fd 828 @type feedId: str 829 @param fd: the file descriptor to eat from 830 @type fd: int 831 """ 832 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd) 833 834 if not self.pipeline: 835 self.warning('told to eat %s from fd %d, but pipeline not ' 836 'running yet', feedId, fd) 837 # can happen if we are restarting but the other component is 838 # happy; assume other side will reconnect later 839 os.close(fd) 840 return 841 842 if eaterAlias not in self.eaters: 843 self.warning('Unknown eater alias: %s', eaterAlias) 844 os.close(fd) 845 return 846 847 eater = self.eaters[eaterAlias] 848 element = self.get_element(eater.elementName) 849 if not element: 850 self.warning('Eater element %s not found', eater.elementName) 851 os.close(fd) 852 return 853 854 # fdsrc only switches to the new fd in ready or below 855 (result, current, pending) = element.get_state(0L) 856 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY] 857 if pipeline_playing: 858 self.debug('eater %s in state %r, kidnapping it', 859 eaterAlias, current) 860 861 # we unlink fdsrc from its peer, take it out of the pipeline 862 # so we can set it to READY without having it send EOS, 863 # then switch fd and put it back in. 864 # To do this safely, we first block fdsrc:src, then let the 865 # component do any neccesary unlocking (needed for multi-input 866 # elements) 867 srcpad = element.get_pad('src') 868 869 def _block_cb(pad, blocked): 870 pass
871 srcpad.set_blocked_async(True, _block_cb) 872 # add buffer probe to drop buffers that are flagged as IN_CAPS 873 # needs to be done to gdpdepay's src pad 874 depay = self.get_element(eater.depayName) 875 876 def remove_in_caps_buffers(pad, buffer, eater): 877 if buffer.flag_is_set(gst.BUFFER_FLAG_IN_CAPS): 878 self.info("We got streamheader buffer which we are " 879 "dropping because we do not want this just " 880 "after a reconnect because it breaks " 881 "everything ") 882 return False 883 # now we have a buffer with no flag set 884 # we should remove the handler 885 self.log("We got buffer with no in caps flag set on " 886 "eater %r", eater) 887 888 if eater.streamheaderBufferProbeHandler: 889 self.log("Removing buffer probe on depay src pad on " 890 "eater %r", eater) 891 pad.remove_buffer_probe( 892 eater.streamheaderBufferProbeHandler) 893 eater.streamheaderBufferProbeHandler = None 894 else: 895 self.warning("buffer probe handler is None, bad news on " 896 "eater %r", eater) 897 898 return True 899 900 if not eater.streamheaderBufferProbeHandler: 901 if self.dropStreamHeaders: 902 self.log("Adding buffer probe on depay src pad on " 903 "eater %r", eater) 904 eater.streamheaderBufferProbeHandler = \ 905 depay.get_pad("src").add_buffer_probe( 906 remove_in_caps_buffers, eater) 907 908 self.unblock_eater(eaterAlias) 909 910 # Now, we can switch FD with this mess 911 sinkpad = srcpad.get_peer() 912 srcpad.unlink(sinkpad) 913 parent = element.get_parent() 914 parent.remove(element) 915 self.log("setting to ready") 916 element.set_state(gst.STATE_READY) 917 self.log("setting to ready complete!!!") 918 old = element.get_property('fd') 919 self.log("Closing old fd %d", old) 920 os.close(old) 921 element.set_property('fd', fd) 922 parent.add(element) 923 srcpad.link(sinkpad) 924 element.set_state(gst.STATE_PLAYING) 925 # We're done; unblock the pad 926 srcpad.set_blocked_async(False, _block_cb) 927 else: 928 element.set_property('fd', fd) 929 930 # update our eater uiState, saying that we are eating from a 931 # possibly new feedId 932 eater.connected(fd, feedId) 933 934 if not pipeline_playing: 935 self.try_start_pipeline() 936