Package flumotion :: Package component :: Package producers :: Package icecast :: Module icecast
[hide private]

Source Code for Module flumotion.component.producers.icecast.icecast

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gst 
 19  from twisted.internet import defer 
 20  from flumotion.component import feedcomponent 
 21  from flumotion.twisted.defer import RetryingDeferred 
 22  from flumotion.common import errors 
 23   
 24  __version__ = "$Rev$" 
 25   
 26   
27 -class Icecast(feedcomponent.ParseLaunchComponent):
28 29 configured = False 30
31 - def get_pipeline_string(self, properties):
32 return "souphttpsrc name=src ! typefind name=tf"
33
34 - def _typefind_have_caps_cb(self, tf, prob, caps):
35 # Basing on the cappabilities plug additional gst compoponents: 36 # 1. If we have pure audio (http src doesn't support ICY) plug parser 37 # 2. If we have application/x-icy plug the icydemuxer and than parser 38 capsname = caps[0].get_name() 39 tf_src_pad = tf.get_pad('src') 40 gdp_sink_pad = tf_src_pad.get_peer() 41 # unlink the typefind from the gdp pad so that we can put another 42 # component in it's place 43 tf_src_pad.unlink(gdp_sink_pad) 44 45 if capsname == 'application/x-icy': 46 demuxer = gst.element_factory_make("icydemux") 47 demuxer.set_state(gst.STATE_PLAYING) 48 self._demuxer_name = demuxer.get_name() 49 self.pipeline.add(demuxer) 50 tf.link(demuxer) 51 # demuxer src pad is dynamic, we need to register a callback 52 demuxer.connect('pad-added', self._link_parser, gdp_sink_pad) 53 else: 54 self._demuxer_name = None 55 self._link_parser(tf, tf_src_pad, gdp_sink_pad)
56 89
90 - def configure_pipeline(self, pipeline, properties):
91 # Later, when the typefind element has successfully found the type 92 # of the data, we'll rebuild the pipeline. 93 self.src = pipeline.get_by_name('src') 94 self.url = properties['url'] 95 self.passthrough = properties.get('passthrough', False) 96 self.src.set_property('location', self.url) 97 self.src.set_property('iradio-mode', True) 98 99 typefind = pipeline.get_by_name('tf') 100 self.signal_id = typefind.connect('have-type',\ 101 self._typefind_have_caps_cb) 102 103 if not self.configured: 104 self.attachPadMonitorToElement('src', 105 self._src_connected, 106 self._src_disconnected) 107 self.reconnecting = False 108 self.reconnector = RetryingDeferred(self.connect) 109 self.reconnector.initialDelay = 1.0 110 self.attemptD = None 111 112 def _drop_eos(pad, event): 113 self.debug('Swallowing event %r', event) 114 if event.type == gst.EVENT_EOS: 115 return False 116 return True
117 self.configured = True 118 self.src.get_pad('src').add_event_probe(_drop_eos)
119
120 - def bus_message_received_cb(self, bus, message):
121 if message.type == gst.MESSAGE_ERROR and message.src == self.src: 122 gerror, debug = message.parse_error() 123 self.warning('element %s error %s %s', 124 message.src.get_path_string(), gerror, debug) 125 if self.reconnecting: 126 self._retry() 127 return True 128 feedcomponent.ParseLaunchComponent.bus_message_received_cb( 129 self, bus, message)
130
131 - def connect(self):
132 self.info('Connecting to icecast server on %s', self.url) 133 self.src.set_state(gst.STATE_READY) 134 # can't just self.src.set_state(gst.STATE_PLAYING), 135 # because the pipeline might NOT be in PLAYING, 136 # if we never connected to Icecast and never went to PLAYING 137 self.try_start_pipeline(force=True) 138 self.attemptD = defer.Deferred() 139 return self.attemptD
140
141 - def _src_connected(self, name):
142 self.info('Connected to icecast server on %s', self.url) 143 if self.reconnecting: 144 assert self.attemptD 145 self.attemptD.callback(None) 146 self.reconnecting = False
147
148 - def _reset(self, pad):
149 # remove all the elements downstream souphttpsrc. 150 if not self._parser_name: 151 self.reconnecting = True 152 self.reconnector.start() 153 return 154 155 tf = self.get_element('tf') 156 pad.unlink(tf.get_pad('sink')) 157 158 parser = self.get_element(self._parser_name) 159 tf.get_pad('src').unlink(parser.get_pad('sink')) 160 peer = parser.get_pad('src').get_peer() 161 parser.get_pad('src').unlink(peer) 162 163 parser.set_state(gst.STATE_NULL) 164 self.pipeline.remove(parser) 165 self._parser_name = None 166 tf.set_state(gst.STATE_NULL) 167 self.pipeline.remove(tf) 168 if self._demuxer_name is not None: 169 demuxer = self.get_element(self._demuxer_name) 170 demuxer.set_state(gst.STATE_NULL) 171 self.pipeline.remove(demuxer) 172 self._demuxer_name = None 173 174 # recreate the typefind element in order to be in the same state as 175 # when the component was first initiated 176 tf = gst.element_factory_make('typefind', 'tf') 177 self.pipeline.add(tf) 178 tf.set_state(gst.STATE_PLAYING) 179 pad.link(tf.get_pad('sink')) 180 tf.get_pad('src').link(peer) 181 182 # reconfigure the pipeline 183 self.configure_pipeline(self.pipeline, self.config['properties']) 184 self.pipeline.set_state(gst.STATE_PLAYING) 185 self.reconnecting = True 186 self.reconnector.start()
187
188 - def _src_disconnected(self, name):
189 self.info('Disconnected from icecast server on %s', self.url) 190 if not self.reconnecting: 191 src = self.get_element('src') 192 pad = src.get_pad('src') 193 self._reset(pad)
194
195 - def _retry(self):
196 assert self.attemptD 197 self.debug('Retrying connection to icecast server on %s', self.url) 198 self.attemptD.errback(errors.ConnectionError)
199