1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import gobject
19 import gst
20
21 from twisted.internet import reactor
22
23 from flumotion.component.common.fgdp import protocol as fgdp
24
25 GDP_TYPE_PRODUCER = "producer-type"
26 GDP_TYPE_CONSUMER = "consumer-type"
27
28
30 """
31 Base class for elements handling file descriptors
32
33 @type fdelement: L{gst.Element}
34 """
35
37 self.fdelement = fdelement
38
39
40
42 '''
43 Connects a file descriptor to the gstreamer element that will be
44 writting to it or reading from it
45
46 @type fd: int
47 '''
48 raise NotImplemented("subclass must implement connectFD")
49
51 '''
52 Disconnects a file descriptor from the gstreamer element that is
53 writting to it or reading from it
54
55 @type fd: int
56 '''
57 raise NotImplemented("subclass must implement disconnectFD")
58
59
61 """
62 File descriptors handler based on fdsrc
63
64 @type protocol: L{flummotion.common.gdp_protocol.FGDPBaseProtocol}
65 """
66
67 logCategory = 'gdp-producer'
68
69 protocol = None
70 _handler_id = None
71
74
84
86
87
88 self.fdelement.set_locked_state(False)
89 self.fdelement.set_property('fd', fd)
90 srcpad = self.fdelement.get_pad("src")
91 self._handler_id = srcpad.add_event_probe(self._check_eos)
92 self.fdelement.set_state(gst.STATE_PLAYING)
93
95
96
97 self.fdelement.set_state(gst.STATE_READY)
98 self.fdelement.get_state(0)
99 self.fdelement.set_locked_state(True)
100
101
103 """
104 File descriptors handler based on fdsrc
105
106 @type protocol: L{flummotion.common.gdp_protocol.FGDPBaseProtocol}
107 """
108
109 logCategory = 'gdp-consumer'
110
111 protocol = None
112 _activeFD = None
113 _handler_id = None
114
117
122
128
133
134
136 """
137 Provides an abstraction for the start and stop of a client or server using
138 the FGDP protocol, which depends on the 'mode' selected amongst 'push' or
139 'pull'
140
141 @type mode: str
142 @type host: str
143 @type port: int
144 """
145
146 mode = ''
147 host = None
148 port = None
149 _listener = None
150 _connector = None
151
153 """
154 Starts a server/client using the FGDP protocol when the element
155 is ready.
156 """
157 if self.mode == 'push':
158 self._start_push()
159 else:
160 self._start_pull()
161
171
176
181
182
183 -class FGDPBase(gst.Bin, _ProtocolMixin):
184 """
185 Base class for gstreamer elements using the FGDP protocol
186 """
187
188 mode = 'pull'
189 host = 'localhost'
190 port = 15000
191 username = 'user'
192 password = 'test'
193 maxDelay = 5
194 version = '0.1'
195
196 __gproperties__ = {
197 'mode': (gobject.TYPE_STRING, 'mode',
198 "Connection mode: 'pull' or 'push'",
199 'pull', gobject.PARAM_READWRITE),
200 'host': (gobject.TYPE_STRING, 'host',
201 'Name of the host to connect (in push mode)',
202 'localhost', gobject.PARAM_READWRITE),
203 'port': (gobject.TYPE_INT, 'port',
204 'Connection port',
205 1, 64000, 15000, gobject.PARAM_READWRITE),
206 'username': (gobject.TYPE_STRING, 'user name',
207 'Username for the authentication',
208 'user', gobject.PARAM_READWRITE),
209 'password': (gobject.TYPE_STRING, 'password',
210 'Password for the authentication',
211 'test', gobject.PARAM_READWRITE),
212 'version': (gobject.TYPE_STRING, 'version',
213 'Protocol version',
214 '0.1', gobject.PARAM_READWRITE),
215 'max-reconnection-delay': (gobject.TYPE_FLOAT,
216 'maximum delay between reconnections in seconds',
217 'Maximum delay between reconnections in seconds (for push mode)',
218 1, 100, 5, gobject.PARAM_READWRITE)}
219
220 __gsignals__ = {"connected": (gobject.SIGNAL_RUN_LAST,\
221 gobject.TYPE_NONE, []),
222 "disconnected": (gobject.SIGNAL_RUN_LAST,\
223 gobject.TYPE_NONE,
224 (gobject.TYPE_STRING, ))}
225
227 err = gst.GError(gst.RESOURCE_ERROR,
228 gst.RESOURCE_ERROR_FAILED, message)
229 m = gst.message_new_error(self, err, message)
230 self.post_message(m)
231 self.error(message)
232
234 if transition == gst.STATE_CHANGE_READY_TO_PAUSED:
235 try:
236 self.prepare()
237 self.start()
238 except Exception, e:
239 self._handle_error(str(e))
240 self.stop()
241 return gst.STATE_CHANGE_FAILURE
242 elif transition == gst.STATE_CHANGE_PAUSED_TO_READY:
243 self.stop()
244 return gst.Bin.do_change_state(self, transition)
245
247 if prop.name in ['mode', 'host', 'username', 'password', 'port',
248 'version']:
249 setattr(self, prop.name, value)
250 elif prop.name == 'max-reconnection-delay':
251 self.maxDelay = float(value)
252 else:
253 raise AttributeError('unknown property %s' % prop.name)
254
256 if prop.name in ['mode', 'host', 'username', 'password', 'port',
257 'version']:
258 return getattr(self, prop.name)
259 if prop.name == 'max-reconnection-delay':
260 return self.maxDelay
261 raise AttributeError('unknown property %s' % prop.name)
262
264 """
265 Should be implemented by subclasses that needs to do something
266 before starting the server/client
267 """
268 pass
269
270
272 '''
273 GStreamer sink element using the FGDP protocol
274 '''
275
276 mode = 'push'
277
278 __gstdetails__ = ('FGDPsink', 'Sink',
279 'Flumotion GStreamer data protocol sink',
280 'Flumotion DevTeam')
281
283 FGDPBase.__init__(self)
284
285 gdppay = gst.element_factory_make('gdppay')
286 self.fdelement = gst.element_factory_make('multifdsink')
287
288 self.fdelement.set_property('sync', False)
289 self.fdelement.set_property('unit-type', 2)
290 self.fdelement.set_property('units-max', 1 * gst.SECOND)
291 self.fdelement.set_property('units-soft-max', 700 * gst.MSECOND)
292 self.fdelement.set_property('recover-policy', 1)
293
294 MultiFDSink.__init__(self, self.fdelement)
295
296 self.add(gdppay, self.fdelement)
297 gdppay.link(self.fdelement)
298
299 self._sink_pad = gst.GhostPad('sink', gdppay.get_pad('sink'))
300 self.add_pad(self._sink_pad)
301
302
304 '''
305 GStreamer source element using the FGDP protocol
306 '''
307
308 mode = 'pull'
309
310 __gstdetails__ = ('FGDPsrc', 'Source',
311 'Flumotion GStreamer data protocol source',
312 'Flumotion DevTeam')
313
315 FGDPBase.__init__(self)
316
317 self.fdelement = gst.element_factory_make('fdsrc')
318 gdpdepay = gst.element_factory_make('gdpdepay')
319
320 self.add(self.fdelement, gdpdepay)
321 self.fdelement.link(gdpdepay)
322
323 FDSrc.__init__(self, self.fdelement)
324
325 self._src_pad = gst.GhostPad('src', gdpdepay.get_pad('src'))
326 self.add_pad(self._src_pad)
327
329
330
331 self.fdelement.set_locked_state(True)
332
333
334 gobject.type_register(FGDPSink)
335 gst.element_register(FGDPSink, "fgdpsink", gst.RANK_MARGINAL)
336 gobject.type_register(FGDPSrc)
337 gst.element_register(FGDPSrc, "fgdpsrc", gst.RANK_MARGINAL)
338