1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import gst
19
20 from twisted.internet import reactor
21
22 from flumotion.common import gstreamer
23 from flumotion.common import messages
24 from flumotion.component.base import http
25 from flumotion.component.common.streamer import streamer
26 from flumotion.component.common.streamer.mfdsresources import \
27 MultiFdSinkStreamingResource, HTTPRoot
28
29 from flumotion.common.i18n import N_, gettexter
30
31 __all__ = ['MultifdSinkStreamer']
32 __version__ = "$Rev$"
33
34 T_ = gettexter()
35
36
37
38
39
40 -class Stats(streamer.Stats):
41
47
49 return sum(map(
50 lambda sink: sink.get_property('bytes-served'), self.sinks))
51
53 return max(map(
54 lambda sink: sink.get_property('bytes-to-serve'), self.sinks))
55
56
58 pipe_template = 'multifdsink name=sink ' + \
59 'sync=false ' + \
60 'recover-policy=3'
61 defaultSyncMethod = 0
62
64 if self.burst_on_connect:
65 if self.burst_time and \
66 gstreamer.element_factory_has_property('multifdsink',
67 'units-max'):
68 self.debug("Configuring burst mode for %f second burst",
69 self.burst_time)
70
71
72 sink.set_property('sync-method', 4)
73 sink.set_property('burst-unit', 2)
74 sink.set_property('burst-value',
75 long(self.burst_time * gst.SECOND))
76
77
78
79
80 sink.set_property('time-min',
81 long((self.burst_time + 5) * gst.SECOND))
82
83 sink.set_property('unit-type', 2)
84 sink.set_property('units-soft-max',
85 long((self.burst_time + 8) * gst.SECOND))
86 sink.set_property('units-max',
87 long((self.burst_time + 10) * gst.SECOND))
88 elif self.burst_size:
89 self.debug("Configuring burst mode for %d kB burst",
90 self.burst_size)
91
92
93
94
95
96 sink.set_property('sync-method', 'burst-keyframe')
97 sink.set_property('burst-unit', 'bytes')
98 sink.set_property('burst-value', self.burst_size * 1024)
99
100
101
102
103 sink.set_property('bytes-min', (self.burst_size + 512) * 1024)
104
105
106
107
108
109
110
111 sink.set_property('buffers-soft-max',
112 (self.burst_size + 1024) / 4)
113 sink.set_property('buffers-max',
114 (self.burst_size + 2048) / 4)
115
116 else:
117
118 self.debug("simple burst-on-connect, setting sync-method 2")
119 sink.set_property('sync-method', 2)
120
121 sink.set_property('buffers-soft-max', 250)
122 sink.set_property('buffers-max', 500)
123 else:
124 self.debug("no burst-on-connect, setting sync-method 0")
125 sink.set_property('sync-method', self.defaultSyncMethod)
126
127 sink.set_property('buffers-soft-max', 250)
128 sink.set_property('buffers-max', 500)
129
135
159
161 streamer.Streamer.check_properties(self, props, addMessage)
162
163
164 version = gstreamer.get_plugin_version('tcp')
165 if version < (0, 10, 9, 1):
166 m = messages.Error(T_(N_(
167 "Version %s of the '%s' GStreamer plug-in is too old.\n"),
168 ".".join(map(str, version)), 'multifdsink'))
169 m.add(T_(N_("Please upgrade '%s' to version %s."),
170 'gst-plugins-base', '0.10.10'))
171 addMessage(m)
172
176
184
186 root = HTTPRoot()
187
188 mount = self.mountPoint[1:]
189 root.putChild(mount, self.resource)
190 return root
191
193 return '<MultifdSinkStreamer (%s)>' % self.name
194
196 return self.resource.maxclients
197
199 if self.sinks[0].caps:
200 return self.sinks[0].caps[0].get_name()
201
203 mime = self.get_mime()
204 if mime == 'multipart/x-mixed-replace':
205 mime += ";boundary=ThisRandomString"
206 return mime
207
211
215
217 """Remove all the clients.
218
219 Returns a deferred fired once all clients have been removed.
220 """
221 if self.resource:
222
223 self.debug("Asking for all clients to be removed")
224 return self.resource.removeAllClients()
225
230
232 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason)
233 if reason.value_name == 'GST_CLIENT_STATUS_ERROR':
234 self.warning('[fd %5d] Client removed because of write error' % fd)
235
236 self.resource.clientRemoved(sink, fd, reason, stats)
237 Stats.clientRemoved(self)
238 self.update_ui_state()
239
240
241
243
244
245 caps = pad.get_negotiated_caps()
246 if caps == None:
247 return
248
249 caps_str = gstreamer.caps_repr(caps)
250 self.debug('Got caps: %s' % caps_str)
251
252 if not element.caps == None:
253 self.warning('Already had caps: %s, replacing' % caps_str)
254
255 self.debug('Storing caps: %s' % caps_str)
256 element.caps = caps
257
258 reactor.callFromThread(self.update_ui_state)
259
260
261
262
263
264
265
266
268 stats = sink.emit('get-stats', fd)
269 self._pending_removals[fd] = (stats, reason)
270
271
272
278
279
280