Package flumotion :: Package component :: Package common :: Package fgdp :: Module fgdp
[hide private]

Source Code for Module flumotion.component.common.fgdp.fgdp

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gobject 
 19  import gst 
 20   
 21  from twisted.internet import reactor 
 22   
 23  from flumotion.component.common.fgdp import protocol as fgdp 
 24   
 25  GDP_TYPE_PRODUCER = "producer-type" 
 26  GDP_TYPE_CONSUMER = "consumer-type" 
 27   
 28   
29 -class FDHandler(object):
30 """ 31 Base class for elements handling file descriptors 32 33 @type fdelement: L{gst.Element} 34 """ 35
36 - def __init__(self, fdelement):
37 self.fdelement = fdelement
38 39 ### FDHandler interface for subclasses 40
41 - def connectFd(self, fd):
42 ''' 43 Connects a file descriptor to the gstreamer element that will be 44 writting to it or reading from it 45 46 @type fd: int 47 ''' 48 raise NotImplemented("subclass must implement connectFD")
49
50 - def disconnectFd(self, fd):
51 ''' 52 Disconnects a file descriptor from the gstreamer element that is 53 writting to it or reading from it 54 55 @type fd: int 56 ''' 57 raise NotImplemented("subclass must implement disconnectFD")
58 59
60 -class FDSrc(FDHandler):
61 """ 62 File descriptors handler based on fdsrc 63 64 @type protocol: L{flummotion.common.gdp_protocol.FGDPBaseProtocol} 65 """ 66 67 logCategory = 'gdp-producer' 68 69 protocol = None 70 _handler_id = None 71
72 - def __init__(self, fdelement):
73 FDHandler.__init__(self, fdelement)
74
75 - def _check_eos(self, pad, event):
76 if event.type == gst.EVENT_EOS: 77 # EOS are triggered after a disconnection, when the read in the 78 # socket is 0 Bytes. Remove the handler and close the connection 79 pad.remove_event_probe(self._handler_id) 80 if self.protocol is not None: 81 reactor.callFromThread(self.protocol.loseConnection) 82 return False 83 return True
84
85 - def connectFd(self, fd):
86 # Unlock the state of the element, which should be already in the READY 87 # state. Add the fd and an event probe to detect disconnections 88 self.fdelement.set_locked_state(False) 89 self.fdelement.set_property('fd', fd) 90 srcpad = self.fdelement.get_pad("src") 91 self._handler_id = srcpad.add_event_probe(self._check_eos) 92 self.fdelement.set_state(gst.STATE_PLAYING)
93
94 - def disconnectFd(self, _):
95 # Set back the element to the READY state, in which a fd can be 96 # added/changed and lock the state of the element 97 self.fdelement.set_state(gst.STATE_READY) 98 self.fdelement.get_state(0) 99 self.fdelement.set_locked_state(True)
100 101
102 -class MultiFDSink(FDHandler):
103 """ 104 File descriptors handler based on fdsrc 105 106 @type protocol: L{flummotion.common.gdp_protocol.FGDPBaseProtocol} 107 """ 108 109 logCategory = 'gdp-consumer' 110 111 protocol = None 112 _activeFD = None 113 _handler_id = None 114
115 - def __init__(self, fdelement):
116 FDHandler.__init__(self, fdelement)
117
118 - def _on_client_removed(self, a, fd):
119 if self.protocol is not None: 120 reactor.callFromThread(self.protocol.loseConnection) 121 self.fdelement.handler_disconnect(self._handler_id)
122
123 - def connectFd(self, fd):
124 self.fdelement.emit('add', fd) 125 self._activeFD = fd 126 self._handler_id = self.fdelement.connect('client-fd-removed', 127 self._on_client_removed)
128
129 - def disconnectFd(self, fd):
130 if self._activeFD == fd: 131 self.fdelement.emit('remove', fd) 132 self._activeFD = None
133 134
135 -class _ProtocolMixin(object):
136 """ 137 Provides an abstraction for the start and stop of a client or server using 138 the FGDP protocol, which depends on the 'mode' selected amongst 'push' or 139 'pull' 140 141 @type mode: str 142 @type host: str 143 @type port: int 144 """ 145 146 mode = '' 147 host = None 148 port = None 149 _listener = None 150 _connector = None 151
152 - def start(self):
153 """ 154 Starts a server/client using the FGDP protocol when the element 155 is ready. 156 """ 157 if self.mode == 'push': 158 self._start_push() 159 else: 160 self._start_pull()
161
162 - def stop(self):
163 """ 164 Stops the server/client using the FGDP protocol. 165 """ 166 if self._listener is not None: 167 self._listener.stopListening() 168 169 if self._connector is not None: 170 self._connector.disconnect()
171
172 - def _start_push(self):
173 self.info("Starting fgdp client") 174 factory = fgdp.FGDPClientFactory(self) 175 self._connector = reactor.connectTCP(self.host, self.port, factory)
176
177 - def _start_pull(self):
178 self.info("Starting fgdp server") 179 factory = fgdp.FGDPServerFactory(self) 180 self._listener = reactor.listenTCP(self.port, factory)
181 182
183 -class FGDPBase(gst.Bin, _ProtocolMixin):
184 """ 185 Base class for gstreamer elements using the FGDP protocol 186 """ 187 188 mode = 'pull' 189 host = 'localhost' 190 port = 15000 191 username = 'user' 192 password = 'test' 193 maxDelay = 5 194 version = '0.1' 195 196 __gproperties__ = { 197 'mode': (gobject.TYPE_STRING, 'mode', 198 "Connection mode: 'pull' or 'push'", 199 'pull', gobject.PARAM_READWRITE), 200 'host': (gobject.TYPE_STRING, 'host', 201 'Name of the host to connect (in push mode)', 202 'localhost', gobject.PARAM_READWRITE), 203 'port': (gobject.TYPE_INT, 'port', 204 'Connection port', 205 1, 64000, 15000, gobject.PARAM_READWRITE), 206 'username': (gobject.TYPE_STRING, 'user name', 207 'Username for the authentication', 208 'user', gobject.PARAM_READWRITE), 209 'password': (gobject.TYPE_STRING, 'password', 210 'Password for the authentication', 211 'test', gobject.PARAM_READWRITE), 212 'version': (gobject.TYPE_STRING, 'version', 213 'Protocol version', 214 '0.1', gobject.PARAM_READWRITE), 215 'max-reconnection-delay': (gobject.TYPE_FLOAT, 216 'maximum delay between reconnections in seconds', 217 'Maximum delay between reconnections in seconds (for push mode)', 218 1, 100, 5, gobject.PARAM_READWRITE)} 219 220 __gsignals__ = {"connected": (gobject.SIGNAL_RUN_LAST,\ 221 gobject.TYPE_NONE, []), 222 "disconnected": (gobject.SIGNAL_RUN_LAST,\ 223 gobject.TYPE_NONE, 224 (gobject.TYPE_STRING, ))} 225
226 - def _handle_error(self, message):
227 err = gst.GError(gst.RESOURCE_ERROR, 228 gst.RESOURCE_ERROR_FAILED, message) 229 m = gst.message_new_error(self, err, message) 230 self.post_message(m) 231 self.error(message)
232
233 - def do_change_state(self, transition):
234 if transition == gst.STATE_CHANGE_READY_TO_PAUSED: 235 try: 236 self.prepare() 237 self.start() 238 except Exception, e: 239 self._handle_error(str(e)) 240 self.stop() 241 return gst.STATE_CHANGE_FAILURE 242 elif transition == gst.STATE_CHANGE_PAUSED_TO_READY: 243 self.stop() 244 return gst.Bin.do_change_state(self, transition)
245
246 - def do_set_property(self, prop, value):
247 if prop.name in ['mode', 'host', 'username', 'password', 'port', 248 'version']: 249 setattr(self, prop.name, value) 250 elif prop.name == 'max-reconnection-delay': 251 self.maxDelay = float(value) 252 else: 253 raise AttributeError('unknown property %s' % prop.name)
254
255 - def do_get_property(self, prop):
256 if prop.name in ['mode', 'host', 'username', 'password', 'port', 257 'version']: 258 return getattr(self, prop.name) 259 if prop.name == 'max-reconnection-delay': 260 return self.maxDelay 261 raise AttributeError('unknown property %s' % prop.name)
262
263 - def prepare(self):
264 """ 265 Should be implemented by subclasses that needs to do something 266 before starting the server/client 267 """ 268 pass
269 270
271 -class FGDPSink(FGDPBase, MultiFDSink):
272 ''' 273 GStreamer sink element using the FGDP protocol 274 ''' 275 276 mode = 'push' 277 278 __gstdetails__ = ('FGDPsink', 'Sink', 279 'Flumotion GStreamer data protocol sink', 280 'Flumotion DevTeam') 281
282 - def __init__(self):
283 FGDPBase.__init__(self) 284 # Create elements 285 gdppay = gst.element_factory_make('gdppay') 286 self.fdelement = gst.element_factory_make('multifdsink') 287 # Set default properties 288 self.fdelement.set_property('sync', False) 289 self.fdelement.set_property('unit-type', 2) 290 self.fdelement.set_property('units-max', 1 * gst.SECOND) 291 self.fdelement.set_property('units-soft-max', 700 * gst.MSECOND) 292 self.fdelement.set_property('recover-policy', 1) 293 # Create fd handler proxy 294 MultiFDSink.__init__(self, self.fdelement) 295 # Add elements to the bin and link them 296 self.add(gdppay, self.fdelement) 297 gdppay.link(self.fdelement) 298 # Create sink pads 299 self._sink_pad = gst.GhostPad('sink', gdppay.get_pad('sink')) 300 self.add_pad(self._sink_pad)
301 302
303 -class FGDPSrc(FGDPBase, FDSrc):
304 ''' 305 GStreamer source element using the FGDP protocol 306 ''' 307 308 mode = 'pull' 309 310 __gstdetails__ = ('FGDPsrc', 'Source', 311 'Flumotion GStreamer data protocol source', 312 'Flumotion DevTeam') 313
314 - def __init__(self):
315 FGDPBase.__init__(self) 316 # Create elements 317 self.fdelement = gst.element_factory_make('fdsrc') 318 gdpdepay = gst.element_factory_make('gdpdepay') 319 # Add elements to the bin and link them 320 self.add(self.fdelement, gdpdepay) 321 self.fdelement.link(gdpdepay) 322 # Create fd handler proxy 323 FDSrc.__init__(self, self.fdelement) 324 # Create sink pads 325 self._src_pad = gst.GhostPad('src', gdpdepay.get_pad('src')) 326 self.add_pad(self._src_pad)
327
328 - def prepare(self):
329 # Lock the state until we get the first connection and we can pass it 330 # a valid fd, otherwhise it will be using stdin. 331 self.fdelement.set_locked_state(True)
332 333 334 gobject.type_register(FGDPSink) 335 gst.element_register(FGDPSink, "fgdpsink", gst.RANK_MARGINAL) 336 gobject.type_register(FGDPSrc) 337 gst.element_register(FGDPSrc, "fgdpsrc", gst.RANK_MARGINAL) 338