Package flumotion :: Package component :: Module feedcomponent
[hide private]

Source Code for Module flumotion.component.feedcomponent

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Feed components, participating in the stream 
 24  """ 
 25   
 26  import os 
 27   
 28  import gst 
 29  import gst.interfaces 
 30  import gobject 
 31   
 32  from twisted.internet import reactor, defer 
 33  from twisted.spread import pb 
 34  from zope.interface import implements 
 35   
 36  from flumotion.configure import configure 
 37  from flumotion.component import component as basecomponent 
 38  from flumotion.component import feed 
 39  from flumotion.common import common, interfaces, errors, log, pygobject, \ 
 40       messages 
 41  from flumotion.common import gstreamer 
 42  from flumotion.common.i18n import N_, gettexter 
 43  from flumotion.common.planet import moods 
 44  from flumotion.common.pygobject import gsignal 
 45   
 46  __version__ = "$Rev: 7970 $" 
 47  T_ = gettexter() 
 48   
 49   
50 -class FeedComponentMedium(basecomponent.BaseComponentMedium):
51 """ 52 I am a component-side medium for a FeedComponent to interface with 53 the manager-side ComponentAvatar. 54 """ 55 implements(interfaces.IComponentMedium) 56 logCategory = 'feedcompmed' 57 remoteLogName = 'feedserver' 58
59 - def __init__(self, component):
60 """ 61 @param component: L{flumotion.component.feedcomponent.FeedComponent} 62 """ 63 basecomponent.BaseComponentMedium.__init__(self, component) 64 65 self._feederFeedServer = {} # eaterAlias -> (fullFeedId, host, port) 66 # tuple for remote feeders 67 self._feederPendingConnections = {} # eaterAlias -> cancel thunk 68 self._eaterFeedServer = {} # fullFeedId -> (host, port) tuple 69 # for remote eaters 70 self._eaterPendingConnections = {} # feederName -> cancel thunk 71 self.logName = component.name
72 73 ### Referenceable remote methods which can be called from manager 74
75 - def remote_attachPadMonitorToFeeder(self, feederName):
76 self.comp.attachPadMonitorToFeeder(feederName)
77
78 - def remote_setGstDebug(self, debug):
79 """ 80 Sets the GStreamer debugging levels based on the passed debug string. 81 82 @since: 0.4.2 83 """ 84 self.debug('Setting GStreamer debug level to %s' % debug) 85 if not debug: 86 return 87 88 for part in debug.split(','): 89 glob = None 90 value = None 91 pair = part.split(':') 92 if len(pair) == 1: 93 # assume only the value 94 value = int(pair[0]) 95 elif len(pair) == 2: 96 glob, value = pair 97 value = int(value) 98 else: 99 self.warning("Cannot parse GStreamer debug setting '%s'." % 100 part) 101 continue 102 103 if glob: 104 try: 105 # value has to be an integer 106 gst.debug_set_threshold_for_name(glob, value) 107 except TypeError: 108 self.warning("Cannot set glob %s to value %s" % ( 109 glob, value)) 110 else: 111 gst.debug_set_default_threshold(value)
112
113 - def remote_eatFrom(self, eaterAlias, fullFeedId, host, port):
114 """ 115 Tell the component the host and port for the FeedServer through which 116 it can connect a local eater to a remote feeder to eat the given 117 fullFeedId. 118 119 Called on by the manager-side ComponentAvatar. 120 """ 121 if self._feederFeedServer.get(eaterAlias): 122 if self._feederFeedServer[eaterAlias] == (fullFeedId, host, port): 123 self.debug("Feed:%r is the same as the current one. "\ 124 "Request ignored.", (fullFeedId, host, port)) 125 return 126 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port) 127 return self.connectEater(eaterAlias)
128
129 - def _getAuthenticatorForFeed(self, eaterAliasOrFeedName):
130 # The avatarId on the keycards issued by the authenticator will 131 # identify us to the remote component. Attempt to use our 132 # fullFeedId, for debugging porpoises. 133 if hasattr(self.authenticator, 'copy'): 134 tup = common.parseComponentId(self.authenticator.avatarId) 135 flowName, componentName = tup 136 fullFeedId = common.fullFeedId(flowName, componentName, 137 eaterAliasOrFeedName) 138 return self.authenticator.copy(fullFeedId) 139 else: 140 return self.authenticator
141
142 - def connectEater(self, eaterAlias):
143 """ 144 Connect one of the medium's component's eaters to a remote feed. 145 Called by the component, both on initial connection and for 146 reconnecting. 147 148 @returns: deferred that will fire with a value of None 149 """ 150 # FIXME: There's no indication if the connection was made or not 151 152 def gotFeed((feedId, fd)): 153 self._feederPendingConnections.pop(eaterAlias, None) 154 self.comp.eatFromFD(eaterAlias, feedId, fd)
155 156 if eaterAlias not in self._feederFeedServer: 157 self.debug("eatFrom() hasn't been called yet for eater %s", 158 eaterAlias) 159 # unclear if this function should have a return value at 160 # all... 161 return defer.succeed(None) 162 163 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias] 164 165 cancel = self._feederPendingConnections.pop(eaterAlias, None) 166 if cancel: 167 self.debug('cancelling previous connection attempt on %s', 168 eaterAlias) 169 cancel() 170 171 client = feed.FeedMedium(logName=self.comp.name) 172 173 d = client.requestFeed(host, port, 174 self._getAuthenticatorForFeed(eaterAlias), 175 fullFeedId) 176 self._feederPendingConnections[eaterAlias] = client.stopConnecting 177 d.addCallback(gotFeed) 178 return d
179
180 - def remote_feedTo(self, feederName, fullFeedId, host, port):
181 """ 182 Tell the component to feed the given feed to the receiving component 183 accessible through the FeedServer on the given host and port. 184 185 Called on by the manager-side ComponentAvatar. 186 """ 187 self._eaterFeedServer[fullFeedId] = (host, port) 188 self.connectFeeder(feederName, fullFeedId)
189
190 - def connectFeeder(self, feederName, fullFeedId):
191 """ 192 Tell the component to feed the given feed to the receiving component 193 accessible through the FeedServer on the given host and port. 194 195 Called on by the manager-side ComponentAvatar. 196 """ 197 198 def gotFeed((fullFeedId, fd)): 199 self._eaterPendingConnections.pop(feederName, None) 200 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
201 202 if fullFeedId not in self._eaterFeedServer: 203 self.debug("feedTo() hasn't been called yet for feeder %s", 204 feederName) 205 # unclear if this function should have a return value at 206 # all... 207 return defer.succeed(None) 208 209 host, port = self._eaterFeedServer[fullFeedId] 210 211 # probably should key on feederName as well 212 cancel = self._eaterPendingConnections.pop(fullFeedId, None) 213 if cancel: 214 self.debug('cancelling previous connection attempt on %s', 215 feederName) 216 cancel() 217 218 client = feed.FeedMedium(logName=self.comp.name) 219 220 d = client.sendFeed(host, port, 221 self._getAuthenticatorForFeed(feederName), 222 fullFeedId) 223 self._eaterPendingConnections[feederName] = client.stopConnecting 224 d.addCallback(gotFeed) 225 return d 226
227 - def remote_provideMasterClock(self, port):
228 """ 229 Tells the component to start providing a master clock on the given 230 UDP port. 231 Can only be called if setup() has been called on the component. 232 233 The IP address returned is the local IP the clock is listening on. 234 235 @returns: (ip, port, base_time) 236 @rtype: tuple of (str, int, long) 237 """ 238 self.debug('remote_provideMasterClock(port=%r)' % port) 239 return self.comp.provide_master_clock(port)
240
241 - def remote_getMasterClockInfo(self):
242 """ 243 Return the clock master info created by a previous call 244 to provideMasterClock. 245 246 @returns: (ip, port, base_time) 247 @rtype: tuple of (str, int, long) 248 """ 249 return self.comp.get_master_clock()
250
251 - def remote_setMasterClock(self, ip, port, base_time):
252 return self.comp.set_master_clock(ip, port, base_time)
253
254 - def remote_effect(self, effectName, methodName, *args, **kwargs):
255 """ 256 Invoke the given methodName on the given effectName in this component. 257 The effect should implement effect_(methodName) to receive the call. 258 """ 259 self.debug("calling %s on effect %s" % (methodName, effectName)) 260 if not effectName in self.comp.effects: 261 raise errors.UnknownEffectError(effectName) 262 effect = self.comp.effects[effectName] 263 if not hasattr(effect, "effect_%s" % methodName): 264 raise errors.NoMethodError("%s on effect %s" % (methodName, 265 effectName)) 266 method = getattr(effect, "effect_%s" % methodName) 267 try: 268 result = method(*args, **kwargs) 269 except TypeError: 270 msg = "effect method %s did not accept %s and %s" % ( 271 methodName, args, kwargs) 272 self.debug(msg) 273 raise errors.RemoteRunError(msg) 274 self.debug("effect: result: %r" % result) 275 return result
276 277 from feedcomponent010 import FeedComponent 278 279 FeedComponent.componentMediumClass = FeedComponentMedium 280 281
282 -class ParseLaunchComponent(FeedComponent):
283 """A component using gst-launch syntax 284 285 @cvar checkTimestamp: whether to check continuity of timestamps for eaters 286 @cvar checkOffset: whether to check continuity of offsets for 287 eaters 288 """ 289 290 DELIMITER = '@' 291 292 # can be set by subclasses 293 checkTimestamp = False 294 checkOffset = False 295 296 # keep these as class variables for the tests 297 FDSRC_TMPL = 'fdsrc name=%(name)s' 298 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay' 299 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\ 300 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\ 301 'recover-policy=1' 302 EATER_TMPL = None 303
304 - def init(self):
305 if not gstreamer.get_plugin_version('coreelements'): 306 raise errors.MissingElementError('identity') 307 if not gstreamer.element_factory_has_property('identity', 308 'check-imperfect-timestamp'): 309 self.checkTimestamp = False 310 self.checkOffset = False 311 self.addMessage( 312 messages.Info(T_(N_( 313 "You will get more debugging information " 314 "if you upgrade to GStreamer 0.10.13 or later.")))) 315 316 self.EATER_TMPL = self.FDSRC_TMPL + ' %(queue)s ' + self.DEPAY_TMPL 317 if self.checkTimestamp or self.checkOffset: 318 self.EATER_TMPL += " ! identity name=%(name)s-identity silent=TRUE" 319 if self.checkTimestamp: 320 self.EATER_TMPL += " check-imperfect-timestamp=1" 321 if self.checkOffset: 322 self.EATER_TMPL += " check-imperfect-offset=1"
323 324 ### FeedComponent interface implementations 325
326 - def create_pipeline(self):
327 try: 328 unparsed = self.get_pipeline_string(self.config['properties']) 329 except errors.MissingElementError, e: 330 self.warning('Missing %s element' % e.args[0]) 331 m = messages.Error(T_(N_( 332 "The worker does not have the '%s' element installed.\n" 333 "Please install the necessary plug-in and restart " 334 "the component.\n"), e.args[0])) 335 self.addMessage(m) 336 raise errors.ComponentSetupHandledError(e) 337 338 self.pipeline_string = self.parse_pipeline(unparsed) 339 340 try: 341 pipeline = gst.parse_launch(self.pipeline_string) 342 except gobject.GError, e: 343 self.warning('Could not parse pipeline: %s' % e.message) 344 m = messages.Error(T_(N_( 345 "GStreamer error: could not parse component pipeline.")), 346 debug=e.message) 347 self.addMessage(m) 348 raise errors.PipelineParseError(e.message) 349 350 return pipeline
351
352 - def set_pipeline(self, pipeline):
353 FeedComponent.set_pipeline(self, pipeline) 354 if self.checkTimestamp or self.checkOffset: 355 watchElements = dict([ 356 (e.elementName + '-identity', e) 357 for e in self.eaters.values()]) 358 self.install_eater_continuity_watch(watchElements) 359 self.configure_pipeline(self.pipeline, self.config['properties'])
360 361 ### ParseLaunchComponent interface for subclasses 362
363 - def get_pipeline_string(self, properties):
364 """ 365 Method that must be implemented by subclasses to produce the 366 gstparse string for the component's pipeline. Subclasses should 367 not chain up; this method raises a NotImplemented error. 368 369 Returns: a new pipeline string representation. 370 """ 371 raise NotImplementedError('subclasses should implement ' 372 'get_pipeline_string')
373
374 - def configure_pipeline(self, pipeline, properties):
375 """ 376 Method that can be implemented by subclasses if they wish to 377 interact with the pipeline after it has been created and set 378 on the component. 379 380 This could include attaching signals and bus handlers. 381 """ 382 pass
383 384 ### private methods 385
386 - def add_default_eater_feeder(self, pipeline):
387 if len(self.eaters) == 1: 388 eater = 'eater:' + self.eaters.keys()[0] 389 if eater not in pipeline: 390 pipeline = '@' + eater + '@ ! ' + pipeline 391 if len(self.feeders) == 1: 392 feeder = 'feeder:' + self.feeders.keys()[0] 393 if feeder not in pipeline: 394 pipeline = pipeline + ' ! @' + feeder + '@' 395 return pipeline
396
397 - def parse_tmpl(self, pipeline, templatizers):
398 """ 399 Expand the given pipeline string representation by substituting 400 blocks between '@' with a filled-in template. 401 402 @param pipeline: a pipeline string representation with variables 403 @param templatizers: A dict of prefix => procedure. Template 404 blocks in the pipeline will be replaced 405 with the result of calling the procedure 406 with what is left of the template after 407 taking off the prefix. 408 @returns: a new pipeline string representation. 409 """ 410 assert pipeline != '' 411 412 # verify the template has an even number of delimiters 413 if pipeline.count(self.DELIMITER) % 2 != 0: 414 raise TypeError("'%s' contains an odd number of '%s'" 415 % (pipeline, self.DELIMITER)) 416 417 out = [] 418 for i, block in enumerate(pipeline.split(self.DELIMITER)): 419 # when splitting, the even-indexed members will remain, and 420 # the odd-indexed members are the blocks to be substituted 421 if i % 2 == 0: 422 out.append(block) 423 else: 424 block = block.strip() 425 try: 426 pos = block.index(':') 427 except ValueError: 428 raise TypeError("Template %r has no colon" % (block, )) 429 prefix = block[:pos+1] 430 if prefix not in templatizers: 431 raise TypeError("Template %r has invalid prefix %r" 432 % (block, prefix)) 433 out.append(templatizers[prefix](block[pos+1:])) 434 return ''.join(out)
435
436 - def parse_pipeline(self, pipeline):
437 pipeline = " ".join(pipeline.split()) 438 self.debug('Creating pipeline, template is %s', pipeline) 439 440 if pipeline == '' and not self.eaters: 441 raise TypeError("Need a pipeline or a eater") 442 443 if pipeline == '': 444 # code of dubious value 445 assert self.eaters 446 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink' 447 448 pipeline = self.add_default_eater_feeder(pipeline) 449 pipeline = self.parse_tmpl(pipeline, 450 {'eater:': self.get_eater_template, 451 'feeder:': self.get_feeder_template}) 452 453 self.debug('pipeline is %s', pipeline) 454 assert self.DELIMITER not in pipeline 455 456 return pipeline
457
458 - def get_eater_template(self, eaterAlias):
459 queue = self.get_queue_string(eaterAlias) 460 elementName = self.eaters[eaterAlias].elementName 461 462 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
463
464 - def get_feeder_template(self, feederName):
465 elementName = self.feeders[feederName].elementName 466 return self.FEEDER_TMPL % {'name': elementName}
467
468 - def get_queue_string(self, eaterAlias):
469 """ 470 Return a parse-launch string to join the fdsrc eater element and 471 the depayer, for example '!' or '! queue !'. The string may have 472 no format strings. 473 """ 474 return '!'
475 476
477 -class Effect(log.Loggable):
478 """ 479 I am a part of a feed component for a specific group 480 of functionality. 481 482 @ivar name: name of the effect 483 @type name: string 484 @ivar component: component owning the effect 485 @type component: L{FeedComponent} 486 """ 487 logCategory = "effect" 488
489 - def __init__(self, name):
490 """ 491 @param name: the name of the effect 492 """ 493 self.name = name 494 self.setComponent(None)
495
496 - def setComponent(self, component):
497 """ 498 Set the given component as the effect's owner. 499 500 @param component: the component to set as an owner of this effect 501 @type component: L{FeedComponent} 502 """ 503 self.component = component 504 self.setUIState(component and component.uiState or None)
505
506 - def setUIState(self, state):
507 """ 508 Set the given UI state on the effect. This method is ideal for 509 adding keys to the UI state. 510 511 @param state: the UI state for the component to use. 512 @type state: L{flumotion.common.componentui.WorkerComponentUIState} 513 """ 514 self.uiState = state
515
516 - def getComponent(self):
517 """ 518 Get the component owning this effect. 519 520 @rtype: L{FeedComponent} 521 """ 522 return self.component
523 524
525 -class MultiInputParseLaunchComponent(ParseLaunchComponent):
526 """ 527 This class provides for multi-input ParseLaunchComponents, such as muxers, 528 with a queue attached to each input. 529 """ 530 QUEUE_SIZE_BUFFERS = 16 531
532 - def get_muxer_string(self, properties):
533 """ 534 Return a gst-parse description of the muxer, which 535 must be named 'muxer' 536 """ 537 raise errors.NotImplementedError("Implement in a subclass")
538
539 - def get_queue_string(self, eaterAlias):
540 name = self.eaters[eaterAlias].elementName 541 return ("! queue name=%s-queue max-size-buffers=%d !" 542 % (name, self.QUEUE_SIZE_BUFFERS))
543
544 - def get_pipeline_string(self, properties):
545 eaters = self.config.get('eater', {}) 546 sources = self.config.get('source', []) 547 if eaters == {} and sources != []: 548 # for upgrade without manager restart 549 feeds = [] 550 for feed in sources: 551 if not ':' in feed: 552 feed = '%s:default' % feed 553 feeds.append(feed) 554 eaters = {'default': [(x, 'default') for x in feeds]} 555 556 pipeline = '' 557 for e in eaters: 558 for feed, alias in eaters[e]: 559 pipeline += '@ eater:%s @ ! muxer. ' % alias 560 561 pipeline += self.get_muxer_string(properties) + ' ' 562 563 return pipeline
564
565 - def unblock_eater(self, eaterAlias):
566 # Firstly, ensure that any push in progress is guaranteed to return, 567 # by temporarily enlarging the queue 568 queuename = self.eaters[eaterAlias].elementName + '-queue' 569 queue = self.pipeline.get_by_name(queuename) 570 571 size = queue.get_property("max-size-buffers") 572 queue.set_property("max-size-buffers", size + 1) 573 574 # So, now it's guaranteed to return. However, we want to return the 575 # queue size to its original value. Doing this in a thread-safe manner 576 # is rather tricky... 577 578 def _block_cb(pad, blocked): 579 # This is called from streaming threads, but we don't do anything 580 # here so it's safe. 581 pass
582 583 def _underrun_cb(element): 584 # Called from a streaming thread. The queue element does not hold 585 # the queue lock when this is called, so we block our sinkpad, 586 # then re-check the current level. 587 pad = element.get_pad("sink") 588 pad.set_blocked_async(True, _block_cb) 589 level = element.get_property("current-level-buffers") 590 if level < self.QUEUE_SIZE_BUFFERS: 591 element.set_property('max-size-buffers', 592 self.QUEUE_SIZE_BUFFERS) 593 element.disconnect(signalid) 594 pad.set_blocked_async(False, _block_cb)
595 596 signalid = queue.connect("underrun", _underrun_cb) 597