Package flumotion :: Package component :: Package producers :: Package playlist :: Module singledecodebin
[hide private]

Source Code for Module flumotion.component.producers.playlist.singledecodebin

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  # Originally part of PiTiVi, 
 19  # Copyright (C) 2005-2007 Edward Hervey <bilboed@bilboed.com>, 
 20   
 21  """ 
 22  Single-stream queue-less decodebin 
 23  """ 
 24   
 25  import gobject 
 26  import gst 
 27   
 28  __version__ = "$Rev$" 
 29   
 30   
31 -def find_upstream_demuxer_and_pad(pad):
32 while pad: 33 if pad.props.direction == gst.PAD_SRC \ 34 and isinstance(pad, gst.GhostPad): 35 pad = pad.get_target() 36 continue 37 38 if pad.props.direction == gst.PAD_SINK: 39 pad = pad.get_peer() 40 continue 41 42 element = pad.get_parent() 43 if isinstance(element, gst.Pad): 44 # pad is a proxy pad 45 element = element.get_parent() 46 47 if element is None: 48 pad = None 49 continue 50 51 element_factory = element.get_factory() 52 element_klass = element_factory.get_klass() 53 54 if 'Demuxer' in element_klass: 55 return element, pad 56 57 sink_pads = list(element.sink_pads()) 58 if len(sink_pads) > 1: 59 if element_factory.get_name() == 'multiqueue': 60 pad = element.get_pad(pad.get_name().replace('src', 'sink')) 61 else: 62 raise Exception('boom!') 63 64 elif len(sink_pads) == 0: 65 pad = None 66 else: 67 pad = sink_pads[0] 68 69 return None, None
70 71
72 -def get_type_from_decoder(decoder):
73 klass = decoder.get_factory().get_klass() 74 parts = klass.split('/', 2) 75 if len(parts) != 3: 76 return None 77 78 return parts[2].lower()
79 80
81 -def get_pad_id(pad):
82 lst = [] 83 while pad: 84 demuxer, pad = find_upstream_demuxer_and_pad(pad) 85 if (demuxer, pad) != (None, None): 86 lst.append([demuxer.get_factory().get_name(), pad.get_name()]) 87 88 # FIXME: we always follow back the first sink 89 try: 90 pad = list(demuxer.sink_pads())[0] 91 except IndexError: 92 pad = None 93 94 return lst
95 96
97 -def is_raw(caps):
98 """ returns True if the caps are RAW """ 99 rep = caps.to_string() 100 valid = ["video/x-raw", "audio/x-raw", "text/plain", "text/x-pango-markup"] 101 for val in valid: 102 if rep.startswith(val): 103 return True 104 return False
105 106
107 -class SingleDecodeBin(gst.Bin):
108 """ 109 A variant of decodebin. 110 111 * Only outputs one stream 112 * Doesn't contain any internal queue 113 """ 114 115 QUEUE_SIZE = 1 * gst.SECOND 116 117 __gsttemplates__ = ( 118 gst.PadTemplate("sinkpadtemplate", gst.PAD_SINK, gst.PAD_ALWAYS, 119 gst.caps_new_any()), 120 gst.PadTemplate("srcpadtemplate", gst.PAD_SRC, gst.PAD_SOMETIMES, 121 gst.caps_new_any())) 122
123 - def __init__(self, caps=None, uri=None, stream=None, *args, **kwargs):
124 gst.Bin.__init__(self, *args, **kwargs) 125 126 if not caps: 127 caps = gst.caps_new_any() 128 self.caps = caps 129 self.stream = stream 130 self.typefind = gst.element_factory_make("typefind", 131 "internal-typefind") 132 self.add(self.typefind) 133 134 self.uri = uri 135 if self.uri and gst.uri_is_valid(self.uri): 136 self.urisrc = gst.element_make_from_uri(gst.URI_SRC, uri, "urisrc") 137 self.log("created urisrc %s / %r" % (self.urisrc.get_name(), 138 self.urisrc)) 139 self.add(self.urisrc) 140 self.urisrc.link(self.typefind) 141 else: 142 self._sinkpad = gst.GhostPad("sink", self.typefind.get_pad("sink")) 143 self._sinkpad.set_active(True) 144 self.add_pad(self._sinkpad) 145 146 self.typefind.connect("have_type", self._typefindHaveTypeCb) 147 148 self._srcpad = None 149 150 self._dynamics = [] 151 152 self._validelements = [] #added elements 153 154 self._factories = self._getSortedFactoryList()
155 156 157 ## internal methods 158
159 - def _controlDynamicElement(self, element):
160 self.log("element:%s" % element.get_name()) 161 self._dynamics.append(element) 162 element.connect("pad-added", self._dynamicPadAddedCb) 163 element.connect("no-more-pads", self._dynamicNoMorePadsCb)
164
165 - def _getSortedFactoryList(self):
166 """ 167 Returns the list of demuxers, decoders and parsers available, sorted 168 by rank 169 """ 170 171 def _myfilter(fact): 172 if fact.get_rank() < 64: 173 return False 174 klass = fact.get_klass() 175 if not ("Demuxer" in klass or "Decoder" in klass \ 176 or "Parse" in klass): 177 return False 178 return True
179 reg = gst.registry_get_default() 180 res = [x for x in reg.get_feature_list(gst.ElementFactory) \ 181 if _myfilter(x)] 182 res.sort(lambda a, b: int(b.get_rank() - a.get_rank())) 183 return res
184
185 - def _findCompatibleFactory(self, caps):
186 """ 187 Returns a list of factories (sorted by rank) which can take caps as 188 input. Returns empty list if none are compatible 189 """ 190 self.debug("caps:%s" % caps.to_string()) 191 res = [] 192 for factory in self._factories: 193 for template in factory.get_static_pad_templates(): 194 if template.direction == gst.PAD_SINK: 195 intersect = caps.intersect(template.static_caps.get()) 196 if not intersect.is_empty(): 197 res.append(factory) 198 break 199 self.debug("returning %r" % res) 200 return res
201 233
234 - def _isDemuxer(self, element):
235 if not 'Demux' in element.get_factory().get_klass(): 236 return False 237 238 potential_src_pads = 0 239 for template in element.get_pad_template_list(): 240 if template.direction != gst.PAD_SRC: 241 continue 242 243 if template.presence == gst.PAD_REQUEST or \ 244 "%" in template.name_template: 245 potential_src_pads += 2 246 break 247 else: 248 potential_src_pads += 1 249 250 return potential_src_pads > 1
251
252 - def _plugDecodingQueue(self, pad):
253 queue = gst.element_factory_make("queue") 254 queue.props.max_size_time = self.QUEUE_SIZE 255 self.add(queue) 256 queue.sync_state_with_parent() 257 pad.link(queue.get_pad("sink")) 258 pad = queue.get_pad("src") 259 260 return pad
261
262 - def _tryToLink1(self, source, pad, factories):
263 """ 264 Tries to link one of the factories' element to the given pad. 265 266 Returns the element that was successfully linked to the pad. 267 """ 268 self.debug("source:%s, pad:%s , factories:%r" % (source.get_name(), 269 pad.get_name(), 270 factories)) 271 272 if self._isDemuxer(source): 273 pad = self._plugDecodingQueue(pad) 274 275 result = None 276 for factory in factories: 277 element = factory.create() 278 if not element: 279 self.warning("weren't able to create element from %r" % ( 280 factory)) 281 continue 282 283 sinkpad = element.get_pad("sink") 284 if not sinkpad: 285 continue 286 287 self.add(element) 288 element.set_state(gst.STATE_READY) 289 try: 290 pad.link(sinkpad) 291 except: 292 element.set_state(gst.STATE_NULL) 293 self.remove(element) 294 continue 295 296 self._closeLink(element) 297 element.set_state(gst.STATE_PAUSED) 298 299 result = element 300 break 301 302 return result
303 339
340 - def _wrapUp(self, element, pad):
341 """ 342 Ghost the given pad of element. 343 Remove non-used elements. 344 """ 345 346 if self._srcpad: 347 return 348 self._markValidElements(element) 349 self._removeUnusedElements(self.typefind) 350 self.log("ghosting pad %s" % pad.get_name()) 351 self._srcpad = gst.GhostPad("src", pad) 352 self._srcpad.set_active(True) 353 self.add_pad(self._srcpad) 354 self.post_message(gst.message_new_state_dirty(self))
355
356 - def _markValidElements(self, element):
357 """ 358 Mark this element and upstreams as valid 359 """ 360 self.log("element:%s" % element.get_name()) 361 if element == self.typefind: 362 return 363 self._validelements.append(element) 364 # find upstream element 365 pad = list(element.sink_pads())[0] 366 parent = pad.get_peer().get_parent() 367 self._markValidElements(parent)
368
369 - def _removeUnusedElements(self, element):
370 """ 371 Remove unused elements connected to srcpad(s) of element 372 """ 373 self.log("element:%r" % element) 374 for pad in element.src_pads(): 375 if pad.is_linked(): 376 peer = pad.get_peer().get_parent() 377 self._removeUnusedElements(peer) 378 if not peer in self._validelements: 379 self.log("removing %s" % peer.get_name()) 380 pad.unlink(pad.get_peer()) 381 peer.set_state(gst.STATE_NULL) 382 self.remove(peer)
383
384 - def _cleanUp(self):
385 self.log("") 386 if self._srcpad: 387 self.remove_pad(self._srcpad) 388 self._srcpad = None 389 for element in self._validelements: 390 element.set_state(gst.STATE_NULL) 391 self.remove(element) 392 self._validelements = []
393 394 ## Overrides 395
396 - def do_change_state(self, transition):
397 self.debug("transition:%r" % transition) 398 res = gst.Bin.do_change_state(self, transition) 399 if transition == gst.STATE_CHANGE_PAUSED_TO_READY: 400 self._cleanUp() 401 return res
402 403 ## Signal callbacks 404
405 - def _typefindHaveTypeCb(self, typefind, probability, caps):
406 self.debug("probability:%d, caps:%s" % (probability, caps.to_string())) 407 self._closePadLink(typefind, typefind.get_pad("src"), caps)
408 409 ## Dynamic element Callbacks 410
411 - def _dynamicPadAddedCb(self, element, pad):
412 self.log("element:%s, pad:%s" % (element.get_name(), pad.get_name())) 413 if not self._srcpad: 414 self._closePadLink(element, pad, pad.get_caps())
415
416 - def _dynamicNoMorePadsCb(self, element):
417 self.log("element:%s" % element.get_name())
418 419 gobject.type_register(SingleDecodeBin) 420