Package flumotion :: Package component :: Module feedcomponent
[hide private]

Source Code for Module flumotion.component.feedcomponent

   1  # -*- Mode: Python -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3   
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
   6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
   7  # All rights reserved. 
   8  # 
   9  # This file may be distributed and/or modified under the terms of 
  10  # the GNU Lesser General Public License version 2.1 as published by 
  11  # the Free Software Foundation. 
  12  # This file is distributed without any warranty; without even the implied 
  13  # warranty of merchantability or fitness for a particular purpose. 
  14  # See "LICENSE.LGPL" in the source distribution for more information. 
  15  # 
  16  # Headers in this file shall remain intact. 
  17   
  18  """ 
  19  Feed components, participating in the stream 
  20  """ 
  21   
  22  import os 
  23   
  24  import gst 
  25  import gst.interfaces 
  26  import gobject 
  27   
  28  from twisted.internet import reactor, defer 
  29  from twisted.spread import pb 
  30  from zope.interface import implements 
  31   
  32  from flumotion.component import component as basecomponent 
  33  from flumotion.component import feed 
  34  from flumotion.common import common, interfaces, errors, log, pygobject, \ 
  35       messages 
  36  from flumotion.common import gstreamer 
  37  from flumotion.common.i18n import N_, gettexter 
  38  from flumotion.common.planet import moods 
  39  from flumotion.common.pygobject import gsignal 
  40   
  41  __version__ = "$Rev$" 
  42  T_ = gettexter() 
  43   
  44   
45 -class FeedComponentMedium(basecomponent.BaseComponentMedium):
46 """ 47 I am a component-side medium for a FeedComponent to interface with 48 the manager-side ComponentAvatar. 49 """ 50 implements(interfaces.IComponentMedium) 51 logCategory = 'feedcompmed' 52 remoteLogName = 'feedserver' 53
54 - def __init__(self, component):
55 """ 56 @param component: L{flumotion.component.feedcomponent.FeedComponent} 57 """ 58 basecomponent.BaseComponentMedium.__init__(self, component) 59 60 self._feederFeedServer = {} # eaterAlias -> (fullFeedId, host, port) 61 # tuple for remote feeders 62 self._feederPendingConnections = {} # eaterAlias -> cancel thunk 63 self._eaterFeedServer = {} # fullFeedId -> (host, port) tuple 64 # for remote eaters 65 self._eaterPendingConnections = {} # feederName -> cancel thunk 66 self.logName = component.name
67 68 ### Referenceable remote methods which can be called from manager 69
70 - def remote_attachPadMonitorToFeeder(self, feederName):
71 self.comp.attachPadMonitorToFeeder(feederName)
72
73 - def remote_setGstDebug(self, debug):
74 """ 75 Sets the GStreamer debugging levels based on the passed debug string. 76 77 @since: 0.4.2 78 """ 79 self.debug('Setting GStreamer debug level to %s' % debug) 80 if not debug: 81 return 82 83 for part in debug.split(','): 84 glob = None 85 value = None 86 pair = part.split(':') 87 if len(pair) == 1: 88 # assume only the value 89 value = int(pair[0]) 90 elif len(pair) == 2: 91 glob, value = pair 92 value = int(value) 93 else: 94 self.warning("Cannot parse GStreamer debug setting '%s'." % 95 part) 96 continue 97 98 if glob: 99 try: 100 # value has to be an integer 101 gst.debug_set_threshold_for_name(glob, value) 102 except TypeError: 103 self.warning("Cannot set glob %s to value %s" % ( 104 glob, value)) 105 else: 106 gst.debug_set_default_threshold(value) 107 108 self.comp.uiState.set('gst-debug', debug)
109
110 - def remote_eatFrom(self, eaterAlias, fullFeedId, host, port):
111 """ 112 Tell the component the host and port for the FeedServer through which 113 it can connect a local eater to a remote feeder to eat the given 114 fullFeedId. 115 116 Called on by the manager-side ComponentAvatar. 117 """ 118 if self._feederFeedServer.get(eaterAlias): 119 if self._feederFeedServer[eaterAlias] == (fullFeedId, host, port): 120 self.debug("Feed:%r is the same as the current one. "\ 121 "Request ignored.", (fullFeedId, host, port)) 122 return 123 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port) 124 return self.connectEater(eaterAlias)
125
126 - def _getAuthenticatorForFeed(self, eaterAliasOrFeedName):
127 # The avatarId on the keycards issued by the authenticator will 128 # identify us to the remote component. Attempt to use our 129 # fullFeedId, for debugging porpoises. 130 if hasattr(self.authenticator, 'copy'): 131 tup = common.parseComponentId(self.authenticator.avatarId) 132 flowName, componentName = tup 133 fullFeedId = common.fullFeedId(flowName, componentName, 134 eaterAliasOrFeedName) 135 return self.authenticator.copy(fullFeedId) 136 else: 137 return self.authenticator
138
139 - def connectEater(self, eaterAlias):
140 """ 141 Connect one of the medium's component's eaters to a remote feed. 142 Called by the component, both on initial connection and for 143 reconnecting. 144 145 @returns: deferred that will fire with a value of None 146 """ 147 # FIXME: There's no indication if the connection was made or not 148 149 def gotFeed((feedId, fd)): 150 self._feederPendingConnections.pop(eaterAlias, None) 151 self.comp.eatFromFD(eaterAlias, feedId, fd)
152 153 if eaterAlias not in self._feederFeedServer: 154 self.debug("eatFrom() hasn't been called yet for eater %s", 155 eaterAlias) 156 # unclear if this function should have a return value at 157 # all... 158 return defer.succeed(None) 159 160 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias] 161 162 cancel = self._feederPendingConnections.pop(eaterAlias, None) 163 if cancel: 164 self.debug('cancelling previous connection attempt on %s', 165 eaterAlias) 166 cancel() 167 168 client = feed.FeedMedium(logName=self.comp.name) 169 170 d = client.requestFeed(host, port, 171 self._getAuthenticatorForFeed(eaterAlias), 172 fullFeedId) 173 self._feederPendingConnections[eaterAlias] = client.stopConnecting 174 d.addCallback(gotFeed) 175 return d
176
177 - def remote_feedTo(self, feederName, fullFeedId, host, port):
178 """ 179 Tell the component to feed the given feed to the receiving component 180 accessible through the FeedServer on the given host and port. 181 182 Called on by the manager-side ComponentAvatar. 183 """ 184 self._eaterFeedServer[fullFeedId] = (host, port) 185 self.connectFeeder(feederName, fullFeedId)
186
187 - def connectFeeder(self, feederName, fullFeedId):
188 """ 189 Tell the component to feed the given feed to the receiving component 190 accessible through the FeedServer on the given host and port. 191 192 Called on by the manager-side ComponentAvatar. 193 """ 194 195 def gotFeed((fullFeedId, fd)): 196 self._eaterPendingConnections.pop(feederName, None) 197 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
198 199 if fullFeedId not in self._eaterFeedServer: 200 self.debug("feedTo() hasn't been called yet for feeder %s", 201 feederName) 202 # unclear if this function should have a return value at 203 # all... 204 return defer.succeed(None) 205 206 host, port = self._eaterFeedServer[fullFeedId] 207 208 # probably should key on feederName as well 209 cancel = self._eaterPendingConnections.pop(fullFeedId, None) 210 if cancel: 211 self.debug('cancelling previous connection attempt on %s', 212 feederName) 213 cancel() 214 215 client = feed.FeedMedium(logName=self.comp.name) 216 217 d = client.sendFeed(host, port, 218 self._getAuthenticatorForFeed(feederName), 219 fullFeedId) 220 self._eaterPendingConnections[feederName] = client.stopConnecting 221 d.addCallback(gotFeed) 222 return d 223
224 - def remote_provideMasterClock(self, port):
225 """ 226 Tells the component to start providing a master clock on the given 227 UDP port. 228 Can only be called if setup() has been called on the component. 229 230 The IP address returned is the local IP the clock is listening on. 231 232 @returns: (ip, port, base_time) 233 @rtype: tuple of (str, int, long) 234 """ 235 self.debug('remote_provideMasterClock(port=%r)' % port) 236 return self.comp.provide_master_clock(port)
237
238 - def remote_getMasterClockInfo(self):
239 """ 240 Return the clock master info created by a previous call 241 to provideMasterClock. 242 243 @returns: (ip, port, base_time) 244 @rtype: tuple of (str, int, long) 245 """ 246 return self.comp.get_master_clock()
247
248 - def remote_setMasterClock(self, ip, port, base_time):
249 return self.comp.set_master_clock(ip, port, base_time)
250
251 - def remote_effect(self, effectName, methodName, *args, **kwargs):
252 """ 253 Invoke the given methodName on the given effectName in this component. 254 The effect should implement effect_(methodName) to receive the call. 255 """ 256 self.debug("calling %s on effect %s" % (methodName, effectName)) 257 if not effectName in self.comp.effects: 258 raise errors.UnknownEffectError(effectName) 259 effect = self.comp.effects[effectName] 260 if not hasattr(effect, "effect_%s" % methodName): 261 raise errors.NoMethodError("%s on effect %s" % (methodName, 262 effectName)) 263 method = getattr(effect, "effect_%s" % methodName) 264 try: 265 result = method(*args, **kwargs) 266 except TypeError: 267 msg = "effect method %s did not accept %s and %s" % ( 268 methodName, args, kwargs) 269 self.debug(msg) 270 raise errors.RemoteRunError(msg) 271 self.debug("effect: result: %r" % result) 272 return result
273
274 - def remote_dumpGstreamerDotFile(self, filename):
275 self.comp.dump_gstreamer_debug_dot_file(filename)
276 277 from feedcomponent010 import FeedComponent 278 279 FeedComponent.componentMediumClass = FeedComponentMedium 280 281
282 -class ParseLaunchComponent(FeedComponent):
283 """A component using gst-launch syntax 284 285 @cvar checkTimestamp: whether to check continuity of timestamps for eaters 286 @cvar checkOffset: whether to check continuity of offsets for 287 eaters 288 """ 289 290 DELIMITER = '@' 291 292 # can be set by subclasses 293 checkTimestamp = False 294 checkOffset = False 295 296 # keep these as class variables for the tests 297 FDSRC_TMPL = 'fdsrc name=%(name)s' 298 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay' 299 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\ 300 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\ 301 'recover-policy=1' 302 EATER_TMPL = None 303
304 - def init(self):
305 if not gstreamer.get_plugin_version('coreelements'): 306 raise errors.MissingElementError('identity') 307 if not gstreamer.element_factory_has_property('identity', 308 'check-imperfect-timestamp'): 309 self.checkTimestamp = False 310 self.checkOffset = False 311 self.addMessage( 312 messages.Info(T_(N_( 313 "You will get more debugging information " 314 "if you upgrade to GStreamer 0.10.13 or later.")))) 315 316 self.EATER_TMPL = self.FDSRC_TMPL + ' %(queue)s ' + self.DEPAY_TMPL 317 if self.checkTimestamp or self.checkOffset: 318 self.EATER_TMPL += " ! identity name=%(name)s-identity silent=TRUE" 319 if self.checkTimestamp: 320 self.EATER_TMPL += " check-imperfect-timestamp=1" 321 if self.checkOffset: 322 self.EATER_TMPL += " check-imperfect-offset=1"
323 324 ### FeedComponent interface implementations 325
326 - def create_pipeline(self):
327 try: 328 unparsed = self.get_pipeline_string(self.config['properties']) 329 except errors.MissingElementError, e: 330 self.warning('Missing %s element' % e.args[0]) 331 m = messages.Error(T_(N_( 332 "The worker does not have the '%s' element installed.\n" 333 "Please install the necessary plug-in and restart " 334 "the component.\n"), e.args[0])) 335 self.addMessage(m) 336 raise errors.ComponentSetupHandledError(e) 337 338 self.pipeline_string = self.parse_pipeline(unparsed) 339 340 try: 341 pipeline = gst.parse_launch(self.pipeline_string) 342 except gobject.GError, e: 343 self.warning('Could not parse pipeline: %s' % e.message) 344 m = messages.Error(T_(N_( 345 "GStreamer error: could not parse component pipeline.")), 346 debug="Reason: %s\nPipeline: %s" % ( 347 e.message, self.pipeline_string)) 348 self.addMessage(m) 349 raise errors.PipelineParseError(e.message) 350 351 return pipeline
352
353 - def set_pipeline(self, pipeline):
354 FeedComponent.set_pipeline(self, pipeline) 355 if self.checkTimestamp or self.checkOffset: 356 watchElements = dict([ 357 (e.elementName + '-identity', e) 358 for e in self.eaters.values()]) 359 self.install_eater_continuity_watch(watchElements) 360 self.configure_pipeline(self.pipeline, self.config['properties'])
361 362 ### ParseLaunchComponent interface for subclasses 363
364 - def get_pipeline_string(self, properties):
365 """ 366 Method that must be implemented by subclasses to produce the 367 gstparse string for the component's pipeline. Subclasses should 368 not chain up; this method raises a NotImplemented error. 369 370 Returns: a new pipeline string representation. 371 """ 372 raise NotImplementedError('subclasses should implement ' 373 'get_pipeline_string')
374
375 - def configure_pipeline(self, pipeline, properties):
376 """ 377 Method that can be implemented by subclasses if they wish to 378 interact with the pipeline after it has been created and set 379 on the component. 380 381 This could include attaching signals and bus handlers. 382 """ 383 pass
384 385 ### private methods 386
387 - def add_default_eater_feeder(self, pipeline):
388 if len(self.eaters) == 1: 389 eater = 'eater:' + self.eaters.keys()[0] 390 if eater not in pipeline: 391 pipeline = '@' + eater + '@ ! ' + pipeline 392 if len(self.feeders) == 1: 393 feeder = 'feeder:' + self.feeders.keys()[0] 394 if feeder not in pipeline: 395 pipeline = pipeline + ' ! @' + feeder + '@' 396 return pipeline
397
398 - def parse_tmpl(self, pipeline, templatizers):
399 """ 400 Expand the given pipeline string representation by substituting 401 blocks between '@' with a filled-in template. 402 403 @param pipeline: a pipeline string representation with variables 404 @param templatizers: A dict of prefix => procedure. Template 405 blocks in the pipeline will be replaced 406 with the result of calling the procedure 407 with what is left of the template after 408 taking off the prefix. 409 @returns: a new pipeline string representation. 410 """ 411 assert pipeline != '' 412 413 # verify the template has an even number of delimiters 414 if pipeline.count(self.DELIMITER) % 2 != 0: 415 raise TypeError("'%s' contains an odd number of '%s'" 416 % (pipeline, self.DELIMITER)) 417 418 out = [] 419 for i, block in enumerate(pipeline.split(self.DELIMITER)): 420 # when splitting, the even-indexed members will remain, and 421 # the odd-indexed members are the blocks to be substituted 422 if i % 2 == 0: 423 out.append(block) 424 else: 425 block = block.strip() 426 try: 427 pos = block.index(':') 428 except ValueError: 429 raise TypeError("Template %r has no colon" % (block, )) 430 prefix = block[:pos+1] 431 if prefix not in templatizers: 432 raise TypeError("Template %r has invalid prefix %r" 433 % (block, prefix)) 434 out.append(templatizers[prefix](block[pos+1:])) 435 return ''.join(out)
436
437 - def parse_pipeline(self, pipeline):
438 """ 439 Parse the pipeline template into a fully expanded pipeline string. 440 441 @type pipeline: str 442 443 @rtype: str 444 """ 445 pipeline = " ".join(pipeline.split()) 446 self.debug('Creating pipeline, template is %s', pipeline) 447 448 if pipeline == '' and not self.eaters: 449 raise TypeError("Need a pipeline or a eater") 450 451 if pipeline == '': 452 # code of dubious value 453 assert self.eaters 454 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink' 455 456 pipeline = self.add_default_eater_feeder(pipeline) 457 pipeline = self.parse_tmpl(pipeline, 458 {'eater:': self.get_eater_template, 459 'feeder:': self.get_feeder_template}) 460 461 self.debug('pipeline is %s', pipeline) 462 assert self.DELIMITER not in pipeline 463 464 return pipeline
465
466 - def get_eater_template(self, eaterAlias):
467 queue = self.get_queue_string(eaterAlias) 468 elementName = self.eaters[eaterAlias].elementName 469 470 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
471
472 - def get_feeder_template(self, feederName):
473 elementName = self.feeders[feederName].elementName 474 return self.FEEDER_TMPL % {'name': elementName}
475
476 - def get_queue_string(self, eaterAlias):
477 """ 478 Return a parse-launch string to join the fdsrc eater element and 479 the depayer, for example '!' or '! queue !'. The string may have 480 no format strings. 481 """ 482 return '!'
483
484 - def get_eater_srcpad(self, eaterAlias):
485 """ 486 Method that returns the source pad of the final element in an eater. 487 488 @returns: the GStreamer source pad of the final element in an eater 489 @rtype: L{gst.Pad} 490 """ 491 e = self.eaters[eaterAlias] 492 identity = self.get_element(e.elementName + '-identity') 493 depay = self.get_element(e.depayName) 494 srcpad = depay.get_pad("src") 495 if identity: 496 srcpad = identity.get_pad("src") 497 return srcpad
498
499 - def get_feeder_sinkpad(self, feederAlias):
500 """ 501 Method that returns the sink pad of the first element in a feeder 502 503 @returns: the GStreamer sink pad of the first element in a feeder 504 @rtype: L{gst.Pad} 505 """ 506 e = self.feeders[feederAlias] 507 gdppay = self.get_element(e.elementName + '-pay') 508 return gdppay.get_static_pad("sink")
509 510
511 -class Effect(log.Loggable):
512 """ 513 I am a part of a feed component for a specific group 514 of functionality. 515 516 @ivar name: name of the effect 517 @type name: string 518 @ivar component: component owning the effect 519 @type component: L{FeedComponent} 520 """ 521 logCategory = "effect" 522
523 - def __init__(self, name):
524 """ 525 @param name: the name of the effect 526 """ 527 self.name = name 528 self.setComponent(None)
529
530 - def setComponent(self, component):
531 """ 532 Set the given component as the effect's owner. 533 534 @param component: the component to set as an owner of this effect 535 @type component: L{FeedComponent} 536 """ 537 self.component = component 538 self.setUIState(component and component.uiState or None)
539
540 - def setUIState(self, state):
541 """ 542 Set the given UI state on the effect. This method is ideal for 543 adding keys to the UI state. 544 545 @param state: the UI state for the component to use. 546 @type state: L{flumotion.common.componentui.WorkerComponentUIState} 547 """ 548 self.uiState = state
549
550 - def getComponent(self):
551 """ 552 Get the component owning this effect. 553 554 @rtype: L{FeedComponent} 555 """ 556 return self.component
557 558
559 -class PostProcEffect (Effect):
560 """ 561 I am an effect that is plugged in the pipeline to do a post processing 562 job and can be chained to other effect of the same class. 563 564 @ivar name: name of the effect 565 @type name: string 566 @ivar component: component owning the effect 567 @type component: L{FeedComponent} 568 @ivar sourcePad: pad of the source after which I'm plugged 569 @type sourcePad: L{GstPad} 570 @ivar effectBin: gstreamer bin doing the post processing effect 571 @type source: L{GstBin} 572 @ivar pipeline: pipeline holding the gstreamer elements 573 @type pipeline: L{GstPipeline} 574 575 """ 576 logCategory = "effect" 577
578 - def __init__(self, name, sourcePad, effectBin, pipeline):
579 """ 580 @param name: the name of the effect 581 @param sourcePad: pad of the source after which I'm plugged 582 @param effectBin: gstreamer bin doing the post processing effect 583 @param pipeline: pipeline holding the gstreamer elements 584 """ 585 Effect.__init__(self, name) 586 self.sourcePad = sourcePad 587 self.effectBin = effectBin 588 self.pipeline = pipeline 589 self.plugged = False
590
591 - def plug(self):
592 """ 593 Plug the effect in the pipeline unlinking the source element with it's 594 downstream peer 595 """ 596 if self.plugged: 597 return 598 # Unlink the source pad of the source element after which we need 599 # are going to be plugged 600 peerSinkPad = self.sourcePad 601 peerSrcPad = peerSinkPad.get_peer() 602 peerSinkPad.unlink(peerSrcPad) 603 604 # Add the deinterlacer bin to the pipeline 605 self.effectBin.set_state(gst.STATE_PLAYING) 606 self.pipeline.add(self.effectBin) 607 608 # link it with the element src pad and its peer's sink pad 609 peerSinkPad.link(self.effectBin.get_pad('sink')) 610 self.effectBin.get_pad('src').link(peerSrcPad) 611 self.plugged = True
612 613
614 -class MultiInputParseLaunchComponent(ParseLaunchComponent):
615 """ 616 This class provides for multi-input ParseLaunchComponents, such as muxers, 617 with a queue attached to each input. 618 """ 619 QUEUE_SIZE_BUFFERS = 16 620 LINK_MUXER = True 621
622 - def get_muxer_string(self, properties):
623 """ 624 Return a gst-parse description of the muxer, which 625 must be named 'muxer' 626 """ 627 raise errors.NotImplementedError("Implement in a subclass")
628
629 - def get_queue_string(self, eaterAlias):
630 name = self.eaters[eaterAlias].elementName 631 return ("! queue name=%s-queue max-size-buffers=%d !" 632 % (name, self.QUEUE_SIZE_BUFFERS))
633
634 - def get_pipeline_string(self, properties):
635 eaters = self.config.get('eater', {}) 636 sources = self.config.get('source', []) 637 if eaters == {} and sources != []: 638 # for upgrade without manager restart 639 feeds = [] 640 for feed in sources: 641 if not ':' in feed: 642 feed = '%s:default' % feed 643 feeds.append(feed) 644 eaters = {'default': [(x, 'default') for x in feeds]} 645 646 pipeline = '' 647 for e in eaters: 648 for feed, alias in eaters[e]: 649 pipeline += '@ eater:%s @ ' % alias 650 if self.LINK_MUXER: 651 pipeline += ' ! muxer. ' 652 653 pipeline += self.get_muxer_string(properties) + ' ' 654 655 return pipeline
656
657 - def unblock_eater(self, eaterAlias):
658 # Firstly, ensure that any push in progress is guaranteed to return, 659 # by temporarily enlarging the queue 660 queuename = self.eaters[eaterAlias].elementName + '-queue' 661 queue = self.pipeline.get_by_name(queuename) 662 663 size = queue.get_property("max-size-buffers") 664 queue.set_property("max-size-buffers", size + 1) 665 666 # So, now it's guaranteed to return. However, we want to return the 667 # queue size to its original value. Doing this in a thread-safe manner 668 # is rather tricky... 669 670 def _block_cb(pad, blocked): 671 # This is called from streaming threads, but we don't do anything 672 # here so it's safe. 673 pass
674 675 def _underrun_cb(element): 676 # Called from a streaming thread. The queue element does not hold 677 # the queue lock when this is called, so we block our sinkpad, 678 # then re-check the current level. 679 pad = element.get_pad("sink") 680 pad.set_blocked_async(True, _block_cb) 681 level = element.get_property("current-level-buffers") 682 if level < self.QUEUE_SIZE_BUFFERS: 683 element.set_property('max-size-buffers', 684 self.QUEUE_SIZE_BUFFERS) 685 element.disconnect(signalid) 686 pad.set_blocked_async(False, _block_cb)
687 688 signalid = queue.connect("underrun", _underrun_cb) 689 690
691 -class ReconfigurableComponent(ParseLaunchComponent):
692 693 disconnectedPads = False 694 dropStreamHeaders = False 695
696 - def _get_base_pipeline_string(self):
697 """Should be overrided by subclasses to provide the pipeline the 698 component uses. 699 """ 700 return ""
701
702 - def init(self):
703 self.EATER_TMPL += ' ! queue name=input-%(name)s' 704 self._reset_count = 0 705 706 self.uiState.addKey('reset-count', 0) 707 self.not_dropping = False
708
709 - def setup_completed(self):
712 713 # Public methods 714
715 - def get_output_elements(self):
716 return [self.get_element(f.elementName + '-pay') 717 for f in self.feeders.values()]
718
719 - def get_input_elements(self):
720 return [self.get_element('input-' + f.elementName) 721 for f in self.eaters.values()]
722
723 - def get_base_pipeline_string(self):
724 raise NotImplementedError('Subclasses should implement ' 725 'get_base_pipeline_string')
726
727 - def get_eater_srcpad(self, eaterAlias):
728 e = self.eaters[eaterAlias] 729 inputq = self.get_element('input-' + e.elementName) 730 return inputq.get_pad('src')
731 732 # Private methods 733
734 - def _install_changes_probes(self):
735 """ 736 Add the event probes that will check for a caps change. 737 738 Those will trigger the pipeline's blocking and posterior reload 739 """ 740 # FIXME: Add documentation 741 742 def output_reset_event(pad, event): 743 if event.type != gst.EVENT_FLUSH_START: 744 return True 745 746 self.debug('RESET: out reset event received on output pad %r', pad) 747 # TODO: Can we use EVENT_FLUSH_{START,STOP} for the same purpose? 748 # The component only waits for the first eos to come. After that 749 # all the elements inside the pipeline will be down and won't 750 # process any more events. 751 # Pads cannot be blocked from the streaming thread. They have to be 752 # manipulated from outside according gstreamer's documentation 753 self._reset_count -= 1 754 if self._reset_count > 0: 755 return False 756 757 reactor.callFromThread(self._on_pipeline_drained) 758 # Do not let the eos pass. 759 return False
760 761 def got_new_caps(pad, args): 762 caps = pad.get_negotiated_caps() 763 if not caps: 764 self.debug("RESET: Caps unset! Looks like we're stopping") 765 return 766 self.debug("Got new caps at %s: %s", 767 pad.get_name(), caps.to_string()) 768 769 if self.disconnectedPads: 770 return 771 772 # FIXME: Only reset when the caps change and prevent the headers to 773 # propagate when they are the same 774 #if not self._capsChanged(caps): 775 # return 776 777 self.debug('RESET: caps changed on input pad %r', pad) 778 self._reset_count = len(self.feeders) 779 # Block all the eaters and send an eos downstream the pipeline to 780 # drain all the elements. It will also unlink the pipeline from the 781 # input queues. 782 self._block_eaters()
783 784 def got_new_buffer(pad, buff, element): 785 if self.disconnectedPads: 786 self.info("INCAPS: Got buffer but we're still disconnected.") 787 return True 788 789 if not buff.flag_is_set(gst.BUFFER_FLAG_IN_CAPS): 790 return True 791 792 self.info("INCAPS: Got buffer with caps of len %d", buff.size) 793 if buff.caps: 794 newcaps = buff.caps[0].copy() 795 resets = self.uiState.get('reset-count') 796 newcaps['count'] = resets 797 buff.set_caps(gst.Caps(newcaps)) 798 return True 799 800 self.log('RESET: installing event probes for detecting changes') 801 # Listen for incoming flumotion-reset events on eaters 802 for elem in self.get_input_elements(): 803 self.debug('RESET: Add caps monitor for %s', elem.get_name()) 804 sink = elem.get_pad('sink') 805 sink.get_peer().add_buffer_probe(got_new_buffer, elem) 806 sink.connect("notify::caps", got_new_caps) 807 808 for elem in self.get_output_elements(): 809 self.debug('RESET: adding event probe for %s', elem.get_name()) 810 elem.get_pad('sink').add_event_probe(output_reset_event) 811
812 - def _block_eaters(self):
813 """ 814 Function that blocks all the identities of the eaters 815 """ 816 for elem in self.get_input_elements(): 817 pad = elem.get_pad('src') 818 self.debug("RESET: Blocking pad %s", pad) 819 pad.set_blocked_async(True, self._on_eater_blocked)
820
821 - def _unblock_eaters(self):
822 for elem in self.get_input_elements(): 823 pad = elem.get_pad('src') 824 self.debug("RESET: Unblocking pad %s", pad) 825 pad.set_blocked_async(False, self._on_eater_blocked)
826 840
841 - def _remove_pipeline(self, pipeline, element, end, done=None):
842 if done is None: 843 done = [] 844 if not element: 845 return 846 if element in done: 847 return 848 if element in end: 849 return 850 851 for src in element.src_pads(): 852 self.log('going to start by pad %r', src) 853 if not src.get_peer(): 854 continue 855 peer = src.get_peer().get_parent() 856 self._remove_pipeline(pipeline, peer, end, done) 857 done.append(peer) 858 element.unlink(peer) 859 860 self.log("RESET: removing old element %s from pipeline", element) 861 element.set_state(gst.STATE_NULL) 862 pipeline.remove(element)
863
864 - def _rebuild_pipeline(self):
865 # TODO: Probably this would be easier and clearer if we used a bin to 866 # wrap the component's functionality.Then the component would only need 867 # to reset the bin and connect the resulting pads to the {eat,feed}ers. 868 869 self.log('RESET: Going to rebuild the pipeline') 870 871 base_pipe = self._get_base_pipeline_string() 872 873 # Place a fakesrc element so we can know from where to start 874 # rebuilding the pipeline. 875 fake_pipeline = 'fakesrc name=start ! %s' % base_pipe 876 pipeline = gst.parse_launch(fake_pipeline) 877 878 def move_element(element, orig, dest): 879 if not element: 880 return 881 if element in done: 882 return 883 884 to_link = [] 885 done.append(element) 886 self.log("RESET: going to remove %s", element) 887 for src in element.src_pads(): 888 self.log("RESET: got src pad element %s", src) 889 if not src.get_peer(): 890 continue 891 peer = src.get_peer().get_parent() 892 to_link.append(peer) 893 894 move_element(to_link[-1], orig, dest) 895 896 self._unlink_pads(element, [gst.PAD_SRC, gst.PAD_SINK]) 897 orig.remove(element) 898 dest.add(element) 899 900 self.log("RESET: new element %s added to the pipeline", element) 901 for peer in to_link: 902 self.log("RESET: linking peers %s -> %s", element, peer) 903 element.link(peer)
904 905 done = [] 906 start = pipeline.get_by_name('start').get_pad('src').get_peer() 907 move_element(start.get_parent(), pipeline, self.pipeline) 908 909 # Link eaters to the first element in the pipeline 910 # By now we there can only be two situations: 911 # 1. Encoders, where there is only one eater connected to the encoder 912 # 2. Muxers, where multiple eaters are connected directly to the muxer 913 # TODO: Probably we'd like the link process to check the caps 914 if len(self.get_input_elements()) == 1: 915 elem = self.get_input_elements()[0] 916 self.log("RESET: linking eater %r to %r", elem, done[0]) 917 elem.link(done[0]) 918 919 # Link the last element in the pipeline to the feeders. 920 if len(self.get_output_elements()) == 1: 921 elem = self.get_output_elements()[0] 922 self.log("RESET: linking %r to feeder %r", done[-1], elem) 923 done[-1].link(elem) 924 925 self.configure_pipeline(self.pipeline, self.config['properties']) 926 self.pipeline.set_state(gst.STATE_PLAYING) 927 self._unblock_eaters() 928 929 resets = self.uiState.get('reset-count') 930 self.uiState.set('reset-count', resets+1) 931 932 # Callbacks 933
934 - def _on_pad_blocked(self, pad, blocked):
935 self.log("RESET: Pad %s %s", pad, 936 (blocked and "blocked") or "unblocked")
937
938 - def _on_eater_blocked(self, pad, blocked):
939 self._on_pad_blocked(pad, blocked) 940 if blocked: 941 peer = pad.get_peer() 942 peer.send_event(gst.event_new_flush_start())
943 #peer.send_event(gst.event_new_eos()) 944 #self._unlink_pads(pad.get_parent(), [gst.PAD_SRC]) 945
946 - def _on_pipeline_drained(self):
947 self.debug('RESET: Proceed to unlink pipeline') 948 start = self.get_input_elements() 949 end = self.get_output_elements() 950 done = [] 951 for element in start: 952 element = element.get_pad('src').get_peer().get_parent() 953 self._remove_pipeline(self.pipeline, element, end, done) 954 self._rebuild_pipeline()
955 956
957 -class EncoderComponent(ParseLaunchComponent):
958 """ 959 Component that is reconfigured when new changes arrive through the 960 flumotion-reset event (sent by the fms producer). 961 """ 962 pass
963 964
965 -class MuxerComponent(MultiInputParseLaunchComponent):
966 """ 967 This class provides for multi-input ParseLaunchComponents, such as muxers, 968 that handle flumotion-reset events for reconfiguration. 969 """ 970 971 LINK_MUXER = False 972 dropAudioKuEvents = True 973 976
977 - def buffer_probe_cb(self, pad, buffer, depay, eaterAlias):
978 pad = depay.get_pad("src") 979 caps = pad.get_negotiated_caps() 980 if not caps: 981 return False 982 srcpad_to_link = self.get_eater_srcpad(eaterAlias) 983 muxer = self.pipeline.get_by_name("muxer") 984 self.debug("Trying to get compatible pad for pad %r with caps %s", 985 srcpad_to_link, caps) 986 linkpad = self.get_link_pad(muxer, srcpad_to_link, caps) 987 if not linkpad: 988 m = messages.Error(T_(N_( 989 "The incoming data is not compatible with this muxer.")), 990 debug="Caps %s not compatible with this muxer." % ( 991 caps.to_string())) 992 self.addMessage(m) 993 # this is the streaming thread, cannot set state here 994 # so we do it in the mainloop 995 reactor.callLater(0, self.pipeline.set_state, gst.STATE_NULL) 996 return True 997 self.debug("Got link pad %r", linkpad) 998 srcpad_to_link.link(linkpad) 999 depay.get_pad("src").remove_buffer_probe(self._probes[eaterAlias]) 1000 if srcpad_to_link.is_blocked(): 1001 self.is_blocked_cb(srcpad_to_link, True) 1002 else: 1003 srcpad_to_link.set_blocked_async(True, self.is_blocked_cb) 1004 return True
1005
1006 - def event_probe_cb(self, pad, event, depay, eaterAlias):
1007 caps = pad.get_negotiated_caps() 1008 if caps is None: 1009 return True 1010 # if this pad doesn't push audio, remove the probe 1011 if 'audio' not in caps[0].to_string(): 1012 depay.get_pad("src").remove_buffer_probe(self._eprobes[eaterAlias]) 1013 if event.get_structure().get_name() == 'GstForceKeyUnit': 1014 return False 1015 return True
1016
1017 - def configure_pipeline(self, pipeline, properties):
1018 """ 1019 Method not overridable by muxer subclasses. 1020 """ 1021 # link the muxers' sink pads when data comes in so we get compatible 1022 # sink pads with input data 1023 # gone are the days when we know we only have one pad template in 1024 # muxers 1025 self.fired_eaters = 0 1026 self._probes = {} # depay element -> id 1027 self._eprobes = {} # depay element -> id 1028 1029 for e in self.eaters: 1030 depay = self.get_element(self.eaters[e].depayName) 1031 self._probes[e] = \ 1032 depay.get_pad("src").add_buffer_probe( 1033 self.buffer_probe_cb, depay, e) 1034 # Add an event probe to drop GstForceKeyUnit events 1035 # in audio pads 1036 if self.dropAudioKuEvents: 1037 self._eprobes[e] = \ 1038 depay.get_pad("src").add_event_probe( 1039 self.event_probe_cb, depay, e)
1040
1041 - def is_blocked_cb(self, pad, is_blocked):
1042 if is_blocked: 1043 self.fired_eaters = self.fired_eaters + 1 1044 if self.fired_eaters == len(self.eaters): 1045 self.debug("All pads are now blocked") 1046 self.disconnectedPads = False 1047 for e in self.eaters: 1048 srcpad = self.get_eater_srcpad(e) 1049 srcpad.set_blocked_async(False, self.is_blocked_cb)
1050