Package flumotion :: Package component :: Package consumers :: Package icystreamer :: Module icystreamer
[hide private]

Source Code for Module flumotion.component.consumers.icystreamer.icystreamer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import time 
 19   
 20  import gst 
 21  from twisted.internet import reactor, defer 
 22  from zope.interface import implements 
 23   
 24  from flumotion.common import interfaces 
 25  from flumotion.component.base import http 
 26  from flumotion.component.common.streamer.multifdsinkstreamer import \ 
 27          MultifdSinkStreamer, Stats 
 28  from flumotion.component.consumers.icystreamer import resources 
 29   
 30  # this import registers the gstreamer icymux element, don't remove it 
 31  import icymux 
 32   
 33   
 34  __all__ = ['ICYStreamer'] 
 35  __version__ = "$Rev$" 
 36   
 37   
38 -class ICYStreamer(MultifdSinkStreamer):
39 implements(interfaces.IStreamingComponent) 40 41 checkOffset = True 42 43 logCategory = 'icy-http' 44 45 pipe_template = 'identity name=input silent=true ! tee name=tee ' + \ 46 'tee. ! queue ! multifdsink name=sink-without-id3 sync=false ' + \ 47 'recover-policy=3 ' + \ 48 'tee. ! queue ! icymux name=mux ! ' + \ 49 'multifdsink name=sink-with-id3 sync=false recover-policy=3' 50 51 defaultSyncMethod = 2 52 defaultFrameSize = 256 53 defaultMetadataInterval = 2 54
55 - def init(self):
56 MultifdSinkStreamer.init(self) 57 58 # fd -> sink 59 self.sinkConnections = {} 60 # headers to be included in HTTP response 61 self.icyHeaders = {} 62 63 for i in ('icy-title', 'icy-timestamp'): 64 self.uiState.addKey(i, None) 65 66 # fired after we receive first datablock and configure muxer 67 self._muxerConfiguredDeferred = defer.Deferred()
68
70 self.httpauth = http.HTTPAuthentication(self) 71 self.resource = resources.ICYStreamingResource(self, 72 self.httpauth)
73
74 - def configure_pipeline(self, pipeline, properties):
75 self.sinksByID3 =\ 76 {False: self.get_element('sink-without-id3'), 77 True: self.get_element('sink-with-id3')} 78 Stats.__init__(self, self.sinksByID3.values()) 79 80 self._updateCallLaterId = reactor.callLater(10, self._updateStats) 81 82 self.configure_auth_and_resource() 83 self.parseProperties(properties) 84 85 for sink in self.sinks: 86 self._configure_sink(sink) 87 88 pad = pipeline.get_by_name('tee').get_pad('sink') 89 pad.add_event_probe(self._tag_event_cb) 90 91 self.configureMuxer(pipeline)
92
93 - def _tag_event_cb(self, pad, event):
94 95 def store_tag(struc, headerKey, structureKey): 96 if structureKey in struc.keys(): 97 self.icyHeaders[headerKey] = struc[structureKey] 98 self.debug("Set header key %s = %s", \ 99 headerKey, struc[structureKey])
100 101 mapping = {'icy-name': 'organization', 102 'icy-genre': 'genre', 103 'icy-url': 'location'} 104 if event.type == gst.EVENT_TAG: 105 struc = event.get_structure() 106 self.debug('Structure keys of tag event: %r', struc.keys()) 107 for headerName in mapping: 108 reactor.callFromThread(\ 109 store_tag, struc, headerName, mapping[headerName]) 110 return True
111
112 - def parseProperties(self, properties):
113 MultifdSinkStreamer.parseProperties(self, properties) 114 115 self._frameSize = properties.get('frame-size', self.defaultFrameSize) 116 self._metadataInterval = properties.get('metadata-interval', \ 117 self.defaultMetadataInterval)
118
119 - def configureMuxer(self, pipeline):
120 self.muxer = pipeline.get_by_name('mux') 121 self.muxer.set_property('frame-size', self._frameSize) 122 123 def _setMuxerBitrate(bitrate): 124 numFrames = int(self._metadataInterval * bitrate / \ 125 8 / self._frameSize) 126 self.debug("Setting number of frames to %r", numFrames) 127 self.muxer.set_property('num-frames', numFrames) 128 129 self.icyHeaders['icy-br'] = bitrate / 1000 130 self.icyHeaders['icy-metaint'] = \ 131 self.muxer.get_property("icy-metaint") 132 self._muxerConfiguredDeferred.callback(None)
133 134 def _calculateBitrate(pad, data): 135 self.debug('Calculating bitrate of the stream') 136 bitrate = 8 * data.size * gst.SECOND / data.duration 137 self.debug('bitrate: %r', bitrate) 138 pad.remove_event_probe(handler_id) 139 reactor.callFromThread(_setMuxerBitrate, bitrate) 140 handler_id = self.pipeline.get_by_name('input').get_pad('sink').\ 141 add_buffer_probe(_calculateBitrate) 142
143 - def get_content_type(self):
144 # The content type should always be the content type of the stream as 145 # some players do not understand "application/x-icy" 146 sink = self.sinksByID3[False] 147 if sink.caps: 148 self.debug('Caps: %r', sink.caps.to_string()) 149 cap = sink.caps[0] 150 if cap.get_name() == 'audio/mpeg': 151 if cap['mpegversion']==2: 152 return 'audio/aacp' 153 return cap.get_name()
154
155 - def add_client(self, fd, request):
156 sink = self.sinksByID3[request.serveIcy] 157 self.debug("Adding client to sink: %r", sink) 158 self.sinkConnections[fd] = sink 159 160 if request.serveIcy: 161 # FIXME: This sends title to every connected client. 162 # We should sent it only to the newly comming in client, but this 163 # requires patching multifdsink 164 self.muxer.emit('broadcast-title') 165 sink.emit('add', fd)
166
167 - def remove_client(self, fd):
168 sink = self.sinkConnections[fd] 169 sink.emit('remove', fd) 170 del self.sinkConnections[fd]
171
172 - def get_icy_headers(self):
173 self.debug("Icy headers: %r", self.icyHeaders) 174 return self.icyHeaders
175
176 - def updateState(self, set):
177 Stats.updateState(self, set) 178 179 set('icy-title', self.muxer.get_property('iradio-title')) 180 timestamp = time.strftime("%c", time.localtime(\ 181 self.muxer.get_property('iradio-timestamp'))) 182 set('icy-timestamp', timestamp)
183
184 - def do_pipeline_playing(self):
185 # change the component mood to happy after we receive first data block 186 # so that we can calculate the bitrate and configure muxer 187 d = MultifdSinkStreamer.do_pipeline_playing(self) 188 return defer.DeferredList([d, self._muxerConfiguredDeferred])
189