Trees | Indices | Help |
---|
|
1 # -*- Mode: Python -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 6 # Copyright (C) 2010,2011 Flumotion Services, S.A. 7 # All rights reserved. 8 # 9 # This file may be distributed and/or modified under the terms of 10 # the GNU Lesser General Public License version 2.1 as published by 11 # the Free Software Foundation. 12 # This file is distributed without any warranty; without even the implied 13 # warranty of merchantability or fitness for a particular purpose. 14 # See "LICENSE.LGPL" in the source distribution for more information. 15 # 16 # Headers in this file shall remain intact. 17 18 import gst 19 from twisted.internet import defer 20 from flumotion.component import feedcomponent 21 from flumotion.twisted.defer import RetryingDeferred 22 from flumotion.common import errors 23 24 __version__ = "$Rev$" 25 2628 29 configured = False 30 3311935 # Basing on the cappabilities plug additional gst compoponents: 36 # 1. If we have pure audio (http src doesn't support ICY) plug parser 37 # 2. If we have application/x-icy plug the icydemuxer and than parser 38 capsname = caps[0].get_name() 39 tf_src_pad = tf.get_pad('src') 40 gdp_sink_pad = tf_src_pad.get_peer() 41 # unlink the typefind from the gdp pad so that we can put another 42 # component in it's place 43 tf_src_pad.unlink(gdp_sink_pad) 44 45 if capsname == 'application/x-icy': 46 demuxer = gst.element_factory_make("icydemux") 47 demuxer.set_state(gst.STATE_PLAYING) 48 self._demuxer_name = demuxer.get_name() 49 self.pipeline.add(demuxer) 50 tf.link(demuxer) 51 # demuxer src pad is dynamic, we need to register a callback 52 demuxer.connect('pad-added', self._link_parser, gdp_sink_pad) 53 else: 54 self._demuxer_name = None 55 self._link_parser(tf, tf_src_pad, gdp_sink_pad)5658 # Append the audio parser to the end of the pipeline 59 caps = pad.get_caps() 60 capsname = caps.get_structure(0).get_name() 61 self._parser_name = None 62 parser = None 63 if self.passthrough: 64 self.info("Acting in passthrough mode, not parsing the audio") 65 pad.link(gdp_sink_pad) 66 return 67 if capsname == 'application/ogg': 68 parser = gst.element_factory_make('oggparse') 69 elif capsname == 'audio/mpeg': 70 mpegversion = caps[0]['mpegversion'] 71 if mpegversion == 1: 72 self.info("Detecting MP3 stream. Adding 'mp3parse'") 73 parser = gst.element_factory_make('mp3parse') 74 elif mpegversion in [2, 4]: 75 self.info("Detecting AAC stream. Adding 'aacparse'") 76 parser = gst.element_factory_make('aacparse') 77 if parser: 78 self._parser_name = parser.get_name() 79 parser.set_state(gst.STATE_PLAYING) 80 self.pipeline.add(parser) 81 element.link(parser) 82 parser.get_pad('src').link(gdp_sink_pad) 83 else: 84 # in case we good sth else than mp3 or ogg just connect the 85 # gdb back 86 self.warning("Couldn't find the correct parser for caps: %s",\ 87 capsname) 88 pad.link(gdp_sink_pad)8991 # Later, when the typefind element has successfully found the type 92 # of the data, we'll rebuild the pipeline. 93 self.src = pipeline.get_by_name('src') 94 self.url = properties['url'] 95 self.passthrough = properties.get('passthrough', False) 96 self.src.set_property('location', self.url) 97 self.src.set_property('iradio-mode', True) 98 99 typefind = pipeline.get_by_name('tf') 100 self.signal_id = typefind.connect('have-type',\ 101 self._typefind_have_caps_cb) 102 103 if not self.configured: 104 self.attachPadMonitorToElement('src', 105 self._src_connected, 106 self._src_disconnected) 107 self.reconnecting = False 108 self.reconnector = RetryingDeferred(self.connect) 109 self.reconnector.initialDelay = 1.0 110 self.attemptD = None 111 112 def _drop_eos(pad, event): 113 self.debug('Swallowing event %r', event) 114 if event.type == gst.EVENT_EOS: 115 return False 116 return True117 self.configured = True 118 self.src.get_pad('src').add_event_probe(_drop_eos)121 if message.type == gst.MESSAGE_ERROR and message.src == self.src: 122 gerror, debug = message.parse_error() 123 self.warning('element %s error %s %s', 124 message.src.get_path_string(), gerror, debug) 125 if self.reconnecting: 126 self._retry() 127 return True 128 feedcomponent.ParseLaunchComponent.bus_message_received_cb( 129 self, bus, message)130132 self.info('Connecting to icecast server on %s', self.url) 133 self.src.set_state(gst.STATE_READY) 134 # can't just self.src.set_state(gst.STATE_PLAYING), 135 # because the pipeline might NOT be in PLAYING, 136 # if we never connected to Icecast and never went to PLAYING 137 self.try_start_pipeline(force=True) 138 self.attemptD = defer.Deferred() 139 return self.attemptD140142 self.info('Connected to icecast server on %s', self.url) 143 if self.reconnecting: 144 assert self.attemptD 145 self.attemptD.callback(None) 146 self.reconnecting = False147149 # remove all the elements downstream souphttpsrc. 150 if not self._parser_name: 151 self.reconnecting = True 152 self.reconnector.start() 153 return 154 155 tf = self.get_element('tf') 156 pad.unlink(tf.get_pad('sink')) 157 158 parser = self.get_element(self._parser_name) 159 tf.get_pad('src').unlink(parser.get_pad('sink')) 160 peer = parser.get_pad('src').get_peer() 161 parser.get_pad('src').unlink(peer) 162 163 parser.set_state(gst.STATE_NULL) 164 self.pipeline.remove(parser) 165 self._parser_name = None 166 tf.set_state(gst.STATE_NULL) 167 self.pipeline.remove(tf) 168 if self._demuxer_name is not None: 169 demuxer = self.get_element(self._demuxer_name) 170 demuxer.set_state(gst.STATE_NULL) 171 self.pipeline.remove(demuxer) 172 self._demuxer_name = None 173 174 # recreate the typefind element in order to be in the same state as 175 # when the component was first initiated 176 tf = gst.element_factory_make('typefind', 'tf') 177 self.pipeline.add(tf) 178 tf.set_state(gst.STATE_PLAYING) 179 pad.link(tf.get_pad('sink')) 180 tf.get_pad('src').link(peer) 181 182 # reconfigure the pipeline 183 self.configure_pipeline(self.pipeline, self.config['properties']) 184 self.pipeline.set_state(gst.STATE_PLAYING) 185 self.reconnecting = True 186 self.reconnector.start()187189 self.info('Disconnected from icecast server on %s', self.url) 190 if not self.reconnecting: 191 src = self.get_element('src') 192 pad = src.get_pad('src') 193 self._reset(pad)194196 assert self.attemptD 197 self.debug('Retrying connection to icecast server on %s', self.url) 198 self.attemptD.errback(errors.ConnectionError)199
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Tue Aug 13 06:17:15 2013 | http://epydoc.sourceforge.net |