Package flumotion :: Package component :: Package common :: Package streamer :: Module multifdsinkstreamer
[hide private]

Source Code for Module flumotion.component.common.streamer.multifdsinkstreamer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gst 
 19   
 20  from twisted.internet import reactor 
 21   
 22  from flumotion.common import gstreamer 
 23  from flumotion.common import messages 
 24  from flumotion.component.base import http 
 25  from flumotion.component.common.streamer import streamer 
 26  from flumotion.component.common.streamer.mfdsresources import \ 
 27      MultiFdSinkStreamingResource, HTTPRoot 
 28   
 29  from flumotion.common.i18n import N_, gettexter 
 30   
 31  __all__ = ['MultifdSinkStreamer'] 
 32  __version__ = "$Rev$" 
 33   
 34  T_ = gettexter() 
 35   
 36   
 37  ### the actual component is a streamer using multifdsink 
 38   
 39   
40 -class Stats(streamer.Stats):
41
42 - def __init__(self, sinks):
43 streamer.Stats.__init__(self) 44 if not isinstance(sinks, list): 45 sinks = [sinks] 46 self.sinks = sinks
47
48 - def getBytesSent(self):
49 return sum(map( 50 lambda sink: sink.get_property('bytes-served'), self.sinks))
51
52 - def getBytesReceived(self):
53 return max(map( 54 lambda sink: sink.get_property('bytes-to-serve'), self.sinks))
55 56
57 -class MultifdSinkStreamer(streamer.Streamer, Stats):
58 pipe_template = 'multifdsink name=sink ' + \ 59 'sync=false ' + \ 60 'recover-policy=3' 61 defaultSyncMethod = 0 62
63 - def setup_burst_mode(self, sink):
64 if self.burst_on_connect: 65 if self.burst_time and \ 66 gstreamer.element_factory_has_property('multifdsink', 67 'units-max'): 68 self.debug("Configuring burst mode for %f second burst", 69 self.burst_time) 70 # Set a burst for configurable minimum time, plus extra to 71 # start from a keyframe if needed. 72 sink.set_property('sync-method', 4) # burst-keyframe 73 sink.set_property('burst-unit', 2) # time 74 sink.set_property('burst-value', 75 long(self.burst_time * gst.SECOND)) 76 77 # We also want to ensure that we have sufficient data available 78 # to satisfy this burst; and an appropriate maximum, all 79 # specified in units of time. 80 sink.set_property('time-min', 81 long((self.burst_time + 5) * gst.SECOND)) 82 83 sink.set_property('unit-type', 2) # time 84 sink.set_property('units-soft-max', 85 long((self.burst_time + 8) * gst.SECOND)) 86 sink.set_property('units-max', 87 long((self.burst_time + 10) * gst.SECOND)) 88 elif self.burst_size: 89 self.debug("Configuring burst mode for %d kB burst", 90 self.burst_size) 91 # If we have a burst-size set, use modern 92 # needs-recent-multifdsink behaviour to have complex bursting. 93 # In this mode, we burst a configurable minimum, plus extra 94 # so we start from a keyframe (or less if we don't have a 95 # keyframe available) 96 sink.set_property('sync-method', 'burst-keyframe') 97 sink.set_property('burst-unit', 'bytes') 98 sink.set_property('burst-value', self.burst_size * 1024) 99 100 # To use burst-on-connect, we need to ensure that multifdsink 101 # has a minimum amount of data available - assume 512 kB beyond 102 # the burst amount so that we should have a keyframe available 103 sink.set_property('bytes-min', (self.burst_size + 512) * 1024) 104 105 # And then we need a maximum still further above that - the 106 # exact value doesn't matter too much, but we want it 107 # reasonably small to limit memory usage. multifdsink doesn't 108 # give us much control here, we can only specify the max 109 # values in buffers. We assume each buffer is close enough 110 # to 4kB - true for asf and ogg, at least 111 sink.set_property('buffers-soft-max', 112 (self.burst_size + 1024) / 4) 113 sink.set_property('buffers-max', 114 (self.burst_size + 2048) / 4) 115 116 else: 117 # Old behaviour; simple burst-from-latest-keyframe 118 self.debug("simple burst-on-connect, setting sync-method 2") 119 sink.set_property('sync-method', 2) 120 121 sink.set_property('buffers-soft-max', 250) 122 sink.set_property('buffers-max', 500) 123 else: 124 self.debug("no burst-on-connect, setting sync-method 0") 125 sink.set_property('sync-method', self.defaultSyncMethod) 126 127 sink.set_property('buffers-soft-max', 250) 128 sink.set_property('buffers-max', 500)
129
130 - def parseExtraProperties(self, properties):
131 # check how to set client sync mode 132 self.burst_on_connect = properties.get('burst-on-connect', False) 133 self.burst_size = properties.get('burst-size', 0) 134 self.burst_time = properties.get('burst-time', 0.0)
135
136 - def _configure_sink(self, sink):
137 self.setup_burst_mode(sink) 138 139 if gstreamer.element_factory_has_property('multifdsink', 140 'resend-streamheader'): 141 sink.set_property('resend-streamheader', False) 142 else: 143 self.debug("resend-streamheader property not available, " 144 "resending streamheader when it changes in the caps") 145 146 sink.set_property('timeout', self.timeout) 147 148 sink.connect('deep-notify::caps', self._notify_caps_cb) 149 150 # these are made threadsafe using idle_add in the handler 151 sink.connect('client-added', self._client_added_handler) 152 153 # We now require a sufficiently recent multifdsink anyway that we can 154 # use the new client-fd-removed signal 155 sink.connect('client-fd-removed', self._client_fd_removed_cb) 156 sink.connect('client-removed', self._client_removed_cb) 157 158 sink.caps = None
159
160 - def check_properties(self, props, addMessage):
161 streamer.Streamer.check_properties(self, props, addMessage) 162 163 # tcp is where multifdsink is 164 version = gstreamer.get_plugin_version('tcp') 165 if version < (0, 10, 9, 1): 166 m = messages.Error(T_(N_( 167 "Version %s of the '%s' GStreamer plug-in is too old.\n"), 168 ".".join(map(str, version)), 'multifdsink')) 169 m.add(T_(N_("Please upgrade '%s' to version %s."), 170 'gst-plugins-base', '0.10.10')) 171 addMessage(m)
172
174 self.httpauth = http.HTTPAuthentication(self) 175 self.resource = MultiFdSinkStreamingResource(self, self.httpauth)
176
177 - def configure_pipeline(self, pipeline, properties):
178 sink = self.get_element('sink') 179 Stats.__init__(self, sink) 180 181 streamer.Streamer.configure_pipeline(self, pipeline, properties) 182 self.parseExtraProperties(properties) 183 self._configure_sink(sink)
184
185 - def _get_root(self):
186 root = HTTPRoot() 187 # TwistedWeb wants the child path to not include the leading / 188 mount = self.mountPoint[1:] 189 root.putChild(mount, self.resource) 190 return root
191
192 - def __repr__(self):
193 return '<MultifdSinkStreamer (%s)>' % self.name
194
195 - def getMaxClients(self):
196 return self.resource.maxclients
197
198 - def get_mime(self):
199 if self.sinks[0].caps: 200 return self.sinks[0].caps[0].get_name()
201
202 - def get_content_type(self):
203 mime = self.get_mime() 204 if mime == 'multipart/x-mixed-replace': 205 mime += ";boundary=ThisRandomString" 206 return mime
207
208 - def add_client(self, fd, request):
209 sink = self.get_element('sink') 210 sink.emit('add', fd)
211
212 - def remove_client(self, fd):
213 sink = self.get_element('sink') 214 sink.emit('remove', fd)
215
216 - def remove_all_clients(self):
217 """Remove all the clients. 218 219 Returns a deferred fired once all clients have been removed. 220 """ 221 if self.resource: 222 # can be None if we never went happy 223 self.debug("Asking for all clients to be removed") 224 return self.resource.removeAllClients()
225
226 - def _client_added_handler(self, sink, fd):
227 self.log('[fd %5d] client_added_handler', fd) 228 Stats.clientAdded(self) 229 self.update_ui_state()
230
231 - def _client_removed_handler(self, sink, fd, reason, stats):
232 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason) 233 if reason.value_name == 'GST_CLIENT_STATUS_ERROR': 234 self.warning('[fd %5d] Client removed because of write error' % fd) 235 236 self.resource.clientRemoved(sink, fd, reason, stats) 237 Stats.clientRemoved(self) 238 self.update_ui_state()
239 240 ### START OF THREAD-AWARE CODE (called from non-reactor threads) 241
242 - def _notify_caps_cb(self, element, pad, param):
243 # We store caps in sink objects as 244 # each sink might (and will) serve different content-type 245 caps = pad.get_negotiated_caps() 246 if caps == None: 247 return 248 249 caps_str = gstreamer.caps_repr(caps) 250 self.debug('Got caps: %s' % caps_str) 251 252 if not element.caps == None: 253 self.warning('Already had caps: %s, replacing' % caps_str) 254 255 self.debug('Storing caps: %s' % caps_str) 256 element.caps = caps 257 258 reactor.callFromThread(self.update_ui_state)
259 260 # We now use both client-removed and client-fd-removed. We call get-stats 261 # from the first callback ('client-removed'), but don't actually start 262 # removing the client until we get 'client-fd-removed'. This ensures that 263 # there's no window in which multifdsink still knows about the fd, 264 # but we've actually closed it, so we no longer get spurious duplicates. 265 # this can be called from both application and streaming thread ! 266
267 - def _client_removed_cb(self, sink, fd, reason):
268 stats = sink.emit('get-stats', fd) 269 self._pending_removals[fd] = (stats, reason)
270 271 # this can be called from both application and streaming thread ! 272
273 - def _client_fd_removed_cb(self, sink, fd):
274 (stats, reason) = self._pending_removals.pop(fd) 275 276 reactor.callFromThread(self._client_removed_handler, sink, fd, 277 reason, stats)
278 279 ### END OF THREAD-AWARE CODE 280