Package flumotion :: Package component :: Package decoders :: Package generic :: Module generic
[hide private]

Source Code for Module flumotion.component.decoders.generic.generic

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gst 
 19  import gobject 
 20  import threading 
 21   
 22  from flumotion.component import decodercomponent as dc 
 23  from flumotion.common import messages, gstreamer 
 24  from flumotion.common.i18n import N_, gettexter 
 25   
 26  T_ = gettexter() 
 27   
 28  __version__ = "$Rev: 7162 $" 
 29   
 30  BASIC_AUDIO_CAPS = "audio/x-raw-int;audio/x-raw-float" 
 31  BASIC_VIDEO_CAPS = "video/x-raw-yuv;video/x-raw-rgb" 
 32   
 33  # FIXME: The GstAutoplugSelectResult enum has no bindings in gst-python. 
 34  # Replace this when the enum is exposed in the bindings. 
 35   
 36  GST_AUTOPLUG_SELECT_TRY = 0 
 37  GST_AUTOPLUG_SELECT_SKIP = 2 
 38   
 39   
40 -class FeederInfo(object):
41
42 - def __init__(self, name, caps, linked=False):
43 self.name = name 44 self.caps = caps
45 46
47 -class SyncKeeper(gst.Element):
48 __gstdetails__ = ('SyncKeeper', 'Generic', 49 'Retimestamp the output to be contiguous and maintain ' 50 'the sync', 'Xavier Queralt') 51 _audiosink = gst.PadTemplate("audio-in", 52 gst.PAD_SINK, 53 gst.PAD_ALWAYS, 54 gst.caps_from_string(BASIC_AUDIO_CAPS)) 55 _videosink = gst.PadTemplate("video-in", 56 gst.PAD_SINK, 57 gst.PAD_ALWAYS, 58 gst.caps_from_string(BASIC_VIDEO_CAPS)) 59 _audiosrc = gst.PadTemplate("audio-out", 60 gst.PAD_SRC, 61 gst.PAD_ALWAYS, 62 gst.caps_from_string(BASIC_AUDIO_CAPS)) 63 _videosrc = gst.PadTemplate("video-out", 64 gst.PAD_SRC, 65 gst.PAD_ALWAYS, 66 gst.caps_from_string(BASIC_VIDEO_CAPS)) 67
68 - def __init__(self):
69 gst.Element.__init__(self) 70 71 # create source pads 72 self.audiosrc = gst.Pad(self._audiosrc, "audio-out") 73 self.add_pad(self.audiosrc) 74 self.videosrc = gst.Pad(self._videosrc, "video-out") 75 self.add_pad(self.videosrc) 76 77 # create the sink pads and set the chain and event function 78 self.audiosink = gst.Pad(self._audiosink, "audio-in") 79 self.audiosink.set_chain_function(lambda pad, buffer: 80 self.chainfunc(pad, buffer, self.audiosrc)) 81 self.audiosink.set_event_function(lambda pad, buffer: 82 self.eventfunc(pad, buffer, self.audiosrc)) 83 self.add_pad(self.audiosink) 84 self.videosink = gst.Pad(self._videosink, "video-in") 85 self.videosink.set_chain_function(lambda pad, buffer: 86 self.chainfunc(pad, buffer, self.videosrc)) 87 self.videosink.set_event_function(lambda pad, buffer: 88 self.eventfunc(pad, buffer, self.videosrc)) 89 self.add_pad(self.videosink) 90 91 # all this variables need to be protected with a lock!!! 92 self._lock = threading.Lock() 93 self._totalTime = 0L 94 self._syncTimestamp = 0L 95 self._syncOffset = 0L 96 self._resetReceived = True 97 self._sendNewSegment = True
98
99 - def _send_new_segment(self):
100 for pad in [self.videosrc, self.audiosrc]: 101 pad.push_event( 102 gst.event_new_new_segment(True, 1.0, gst.FORMAT_TIME, 103 self._syncTimestamp, -1, 0)) 104 self._sendNewSegment = False
105
106 - def _update_sync_point(self, start, position):
107 # Only update the sync point if we haven't received any buffer 108 # (totalTime == 0) or we received a reset 109 if not self._totalTime and not self._resetReceived: 110 return 111 self._syncTimestamp = self._totalTime 112 if position >= start: 113 self._syncOffset = start + (position - start) 114 else: 115 self._syncOffset = start 116 self._resetReceived = False 117 self.info("Update sync point to % r, offset to %r" % 118 (gst.TIME_ARGS(self._syncTimestamp), 119 (gst.TIME_ARGS(self._syncOffset))))
120
121 - def chainfunc(self, pad, buf, srcpad):
122 self.log("Input %s timestamp: %s, %s" % 123 (srcpad is self.audiosrc and 'audio' or 'video', 124 gst.TIME_ARGS(buf.timestamp), 125 gst.TIME_ARGS(buf.duration))) 126 127 if not self._sendNewSegment: 128 self._send_new_segment() 129 130 try: 131 self._lock.acquire() 132 # Discard buffers outside the configured segment 133 if buf.timestamp < self._syncOffset: 134 self.warning("Could not clip buffer to segment") 135 return gst.FLOW_OK 136 # Get the input stream time of the buffer 137 buf.timestamp -= self._syncOffset 138 # Set the accumulated stream time 139 buf.timestamp += self._syncTimestamp 140 duration = 0 141 if buf.duration != gst.CLOCK_TIME_NONE: 142 duration = buf.duration 143 self._totalTime = max(buf.timestamp + duration, self._totalTime) 144 145 self.log("Output %s timestamp: %s, %s" % 146 (srcpad is self.audiosrc and 'audio' or 'video', 147 gst.TIME_ARGS(buf.timestamp), 148 gst.TIME_ARGS(buf.duration))) 149 finally: 150 self._lock.release() 151 152 srcpad.push(buf) 153 return gst.FLOW_OK
154
155 - def eventfunc(self, pad, event, srcpad):
156 self.debug("Received event %r from %s" % (event, event.src)) 157 try: 158 self._lock.acquire() 159 if event.type == gst.EVENT_NEWSEGMENT: 160 u, r, f, start, s, position = event.parse_new_segment() 161 self._update_sync_point(start, position) 162 if gstreamer.event_is_flumotion_reset(event): 163 self._resetReceived = True 164 self._send_new_segment = True 165 finally: 166 self._lock.release() 167 168 # forward all the events except the new segment events 169 if event.type != gst.EVENT_NEWSEGMENT: 170 return srcpad.push_event(event) 171 return True
172 173 gobject.type_register(SyncKeeper) 174 gst.element_register(SyncKeeper, "synckeeper", gst.RANK_MARGINAL) 175 176
177 -class GenericDecoder(dc.DecoderComponent):
178 """ 179 Generic decoder component using decodebin2. 180 181 It listen to the custom gstreamer event flumotion-reset, 182 and reset the decoding pipeline by removing the old one 183 and creating a new one. 184 185 Sub-classes must override _get_feeders_info() and return 186 a list of FeederInfo instances that describe the decoder 187 output. 188 189 When reset, if the new decoded pads do not match the 190 previously negotiated caps, feeder will not be connected, 191 and the decoder will go sad. 192 """ 193 194 logCategory = "gen-decoder" 195 feeder_tmpl = ("identity name=%(ename)s single-segment=true " 196 "silent=true ! %(caps)s ! @feeder:%(pad)s@") 197 198 ### Public Methods ### 199
200 - def init(self):
201 self._feeders_info = None # {FEEDER_NAME: FeederInfo}
202
203 - def get_pipeline_string(self, properties):
204 # Retrieve feeder info and build a dict out of it 205 finfo = self._get_feeders_info() 206 assert finfo, "No feeder info specified" 207 self._feeders_info = dict([(i.name, i) for i in finfo]) 208 209 pipeline_parts = [self._get_base_pipeline_string()] 210 211 for i in self._feeders_info.values(): 212 ename = self._get_output_element_name(i.name) 213 pipeline_parts.append( 214 self.feeder_tmpl % dict(ename=ename, caps=i.caps, pad=i.name)) 215 216 pipeline_str = " ".join(pipeline_parts) 217 self.log("Decoder pipeline: %s", pipeline_str) 218 219 self._blacklist = properties.get('blacklist', []) 220 221 return pipeline_str
222
223 - def configure_pipeline(self, pipeline, properties):
224 dc.DecoderComponent.configure_pipeline(self, pipeline, 225 properties) 226 227 decoder = self.pipeline.get_by_name("decoder") 228 decoder.connect('autoplug-select', self._autoplug_select_cb)
229 230 ### Protected Methods ## 231
233 return 'decodebin2 name=decoder'
234
235 - def _get_feeders_info(self):
236 """ 237 Must be overridden to returns a tuple of FeederInfo. 238 """ 239 return None
240 241 ### Private Methods ### 242
243 - def _get_output_element_name(self, feed_name):
244 return "%s-output" % feed_name
245 246 ### Callbacks ### 247
248 - def _autoplug_select_cb(self, decoder, pad, caps, factory):
249 if factory.get_name() in self._blacklist: 250 self.log("Skipping element %s because it's in the blacklist", 251 factory.get_name()) 252 return GST_AUTOPLUG_SELECT_SKIP 253 return GST_AUTOPLUG_SELECT_TRY
254 255
256 -class SingleGenericDecoder(GenericDecoder):
257 258 logCategory = "sgen-decoder" 259 260 _caps_lookup = {'audio': BASIC_AUDIO_CAPS, 261 'video': BASIC_VIDEO_CAPS} 262
263 - def init(self):
264 self._media_type = None
265
266 - def check_properties(self, properties, addMessage):
267 media_type = properties.get("media-type") 268 if media_type not in ["audio", "video"]: 269 msg = 'Property media-type can only be "audio" or "video"' 270 m = messages.Error(T_(N_(msg)), mid="error-decoder-media-type") 271 addMessage(m) 272 else: 273 self._media_type = media_type
274
275 - def _get_feeders_info(self):
276 caps = self._caps_lookup[self._media_type] 277 return FeederInfo('default', caps),
278 279
280 -class AVGenericDecoder(GenericDecoder):
281 282 logCategory = "avgen-decoder" 283 feeder_tmpl = ("identity name=%(ename)s silent=true ! %(caps)s ! " 284 "sync.%(pad)s-in sync.%(pad)s-out ! @feeder:%(pad)s@") 285
286 - def _get_feeders_info(self):
287 return (FeederInfo('audio', BASIC_AUDIO_CAPS), 288 FeederInfo('video', BASIC_VIDEO_CAPS))
289
291 return 'decodebin2 name=decoder synckeeper name=sync'
292