1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23
24 import gst
25 from twisted.cred import credentials
26 from twisted.internet import reactor, error, defer
27 from twisted.web import server
28 from zope.interface import implements
29
30 from flumotion.common import gstreamer, errors
31 from flumotion.common import messages, netutils, interfaces
32 from flumotion.common.format import formatStorage, formatTime
33 from flumotion.common.i18n import N_, gettexter
34 from flumotion.component import feedcomponent
35 from flumotion.component.base import http
36 from flumotion.component.component import moods
37 from flumotion.component.consumers.httpstreamer import resources
38 from flumotion.component.misc.porter import porterclient
39 from flumotion.twisted import fdserver
40
41 __all__ = ['HTTPMedium', 'MultifdSinkStreamer']
42 __version__ = "$Rev: 8058 $"
43 T_ = gettexter()
44 STATS_POLL_INTERVAL = 10
45 UI_UPDATE_THROTTLE_PERIOD = 2.0
46
47
48
49
50
51
53
55 self.sink = sink
56
57 self.no_clients = 0
58 self.clients_added_count = 0
59 self.clients_removed_count = 0
60 self.start_time = time.time()
61
62 self.peak_client_number = 0
63 self.peak_epoch = self.start_time
64 self.load_deltas = [0, 0]
65 self._load_deltas_period = 10
66 self._load_deltas_ongoing = [time.time(), 0, 0]
67 self._currentBitrate = -1
68 self._lastBytesReceived = -1
69
70
71 self.average_client_number = 0
72 self.average_time = self.start_time
73
74 self.hostname = "localhost"
75 self.port = 0
76 self.mountPoint = "/"
77
79
80 now = time.time()
81
82 dt1 = self.average_time - self.start_time
83 dc1 = self.average_client_number
84 dt2 = now - self.average_time
85 dc2 = self.no_clients
86 self.average_time = now
87 if dt1 == 0:
88
89 self.average_client_number = 0
90 else:
91 dt = dt1 + dt2
92 before = (dc1 * dt1) / dt
93 after = dc2 * dt2 / dt
94 self.average_client_number = before + after
95
97 self._updateAverage()
98
99 self.no_clients += 1
100 self.clients_added_count +=1
101
102
103 if self.no_clients >= self.peak_client_number:
104 self.peak_epoch = time.time()
105 self.peak_client_number = self.no_clients
106
108 self._updateAverage()
109 self.no_clients -= 1
110 self.clients_removed_count +=1
111
113 """
114 Periodically, update our statistics on load deltas, and update the
115 UIState with new values for total bytes, bitrate, etc.
116 """
117
118 oldtime, oldadd, oldremove = self._load_deltas_ongoing
119 add, remove = self.clients_added_count, self.clients_removed_count
120 now = time.time()
121 diff = float(now - oldtime)
122
123 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff]
124 self._load_deltas_ongoing = [now, add, remove]
125
126 bytesReceived = self.getBytesReceived()
127 if self._lastBytesReceived >= 0:
128 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) *
129 8 / STATS_POLL_INTERVAL)
130 self._lastBytesReceived = bytesReceived
131
132 self.update_ui_state()
133
134 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL,
135 self._updateStats)
136
138 if self._currentBitrate >= 0:
139 return self._currentBitrate
140 else:
141 return self.getBytesReceived() * 8 / self.getUptime()
142
144 return self.sink.get_property('bytes-served')
145
147 return self.sink.get_property('bytes-to-serve')
148
150 return time.time() - self.start_time
151
153 return self.no_clients
154
156 return self.peak_client_number
157
159 return self.peak_epoch
160
162 return self.average_client_number
163
165 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
166
168 return self.load_deltas
169
171 c = self
172
173 bytes_sent = c.getBytesSent()
174 bytes_received = c.getBytesReceived()
175 uptime = c.getUptime()
176
177 set('stream-mime', c.get_mime())
178 set('stream-url', c.getUrl())
179 set('stream-uptime', formatTime(uptime))
180 bitspeed = bytes_received * 8 / uptime
181 currentbitrate = self.getCurrentBitrate()
182 set('stream-bitrate', formatStorage(bitspeed) + 'bit/s')
183 set('stream-current-bitrate',
184 formatStorage(currentbitrate) + 'bit/s')
185 set('stream-totalbytes', formatStorage(bytes_received) + 'Byte')
186 set('stream-bitrate-raw', bitspeed)
187 set('stream-totalbytes-raw', bytes_received)
188
189 set('clients-current', str(c.getClients()))
190 set('clients-max', str(c.getMaxClients()))
191 set('clients-peak', str(c.getPeakClients()))
192 set('clients-peak-time', c.getPeakEpoch())
193 set('clients-average', str(int(c.getAverageClients())))
194
195 bitspeed = bytes_sent * 8 / uptime
196 set('consumption-bitrate', formatStorage(bitspeed) + 'bit/s')
197 set('consumption-bitrate-current',
198 formatStorage(currentbitrate * c.getClients()) + 'bit/s')
199 set('consumption-totalbytes', formatStorage(bytes_sent) + 'Byte')
200 set('consumption-bitrate-raw', bitspeed)
201 set('consumption-totalbytes-raw', bytes_sent)
202
203
204 -class HTTPMedium(feedcomponent.FeedComponentMedium):
205
211
213 """
214 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
215 """
216 d = self.callRemote('authenticate', bouncerName, keycard)
217 return d
218
219 - def keepAlive(self, bouncerName, issuerName, ttl):
220 """
221 @rtype: L{twisted.internet.defer.Deferred}
222 """
223 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
224
226 """
227 @rtype: L{twisted.internet.defer.Deferred}
228 """
229 return self.callRemote('removeKeycardId', bouncerName, keycardId)
230
231
232
235
238
241
244
247
250
253
254
255
256
257
259 implements(interfaces.IStreamingComponent)
260
261 checkOffset = True
262
263
264 logCategory = 'cons-http'
265
266 pipe_template = 'multifdsink name=sink ' + \
267 'sync=false ' + \
268 'recover-policy=3'
269
270 componentMediumClass = HTTPMedium
271
273 reactor.debug = True
274 self.debug("HTTP streamer initialising")
275
276 self.caps = None
277 self.resource = None
278 self.httpauth = None
279 self.mountPoint = None
280 self.burst_on_connect = False
281
282 self.description = None
283
284 self.type = None
285
286
287 self._pbclient = None
288 self._porterUsername = None
289 self._porterPassword = None
290 self._porterPath = None
291
292
293
294 self.port = None
295
296 self.iface = None
297
298 self._tport = None
299
300 self._updateCallLaterId = None
301 self._lastUpdate = 0
302 self._updateUI_DC = None
303
304 self._pending_removals = {}
305
306 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate',
307 'stream-bitrate', 'stream-totalbytes', 'clients-current',
308 'clients-max', 'clients-peak', 'clients-peak-time',
309 'clients-average', 'consumption-bitrate',
310 'consumption-bitrate-current',
311 'consumption-totalbytes', 'stream-bitrate-raw',
312 'stream-totalbytes-raw', 'consumption-bitrate-raw',
313 'consumption-totalbytes-raw', 'stream-url'):
314 self.uiState.addKey(i, None)
315
318
321
323
324
325 self.fixRenamedProperties(props, [
326 ('issuer', 'issuer-class'),
327 ('mount_point', 'mount-point'),
328 ('porter_socket_path', 'porter-socket-path'),
329 ('porter_username', 'porter-username'),
330 ('porter_password', 'porter-password'),
331 ('user_limit', 'client-limit'),
332 ('bandwidth_limit', 'bandwidth-limit'),
333 ('burst_on_connect', 'burst-on-connect'),
334 ('burst_size', 'burst-size'),
335 ])
336
337 if props.get('type', 'master') == 'slave':
338 for k in 'socket-path', 'username', 'password':
339 if not 'porter-' + k in props:
340 raise errors.ConfigError("slave mode, missing required"
341 " property 'porter-%s'" % k)
342
343 if 'burst-size' in props and 'burst-time' in props:
344 raise errors.ConfigError('both burst-size and burst-time '
345 'set, cannot satisfy')
346
347
348 version = gstreamer.get_plugin_version('tcp')
349 if version < (0, 10, 9, 1):
350 m = messages.Error(T_(N_(
351 "Version %s of the '%s' GStreamer plug-in is too old.\n"),
352 ".".join(map(str, version)), 'multifdsink'))
353 m.add(T_(N_("Please upgrade '%s' to version %s."),
354 'gst-plugins-base', '0.10.10'))
355 addMessage(m)
356
358 try:
359 sink.get_property('units-max')
360 return True
361 except TypeError:
362 return False
363
365 if self.burst_on_connect:
366 if self.burst_time and self.time_bursting_supported(sink):
367 self.debug("Configuring burst mode for %f second burst",
368 self.burst_time)
369
370
371 sink.set_property('sync-method', 4)
372 sink.set_property('burst-unit', 2)
373 sink.set_property('burst-value',
374 long(self.burst_time * gst.SECOND))
375
376
377
378
379 sink.set_property('time-min',
380 long((self.burst_time + 5) * gst.SECOND))
381
382 sink.set_property('unit-type', 2)
383 sink.set_property('units-soft-max',
384 long((self.burst_time + 8) * gst.SECOND))
385 sink.set_property('units-max',
386 long((self.burst_time + 10) * gst.SECOND))
387 elif self.burst_size:
388 self.debug("Configuring burst mode for %d kB burst",
389 self.burst_size)
390
391
392
393
394
395 sink.set_property('sync-method', 'burst-keyframe')
396 sink.set_property('burst-unit', 'bytes')
397 sink.set_property('burst-value', self.burst_size * 1024)
398
399
400
401
402 sink.set_property('bytes-min', (self.burst_size + 512) * 1024)
403
404
405
406
407
408
409
410 sink.set_property('buffers-soft-max',
411 (self.burst_size + 1024) / 4)
412 sink.set_property('buffers-max',
413 (self.burst_size + 2048) / 4)
414
415 else:
416
417 self.debug("simple burst-on-connect, setting sync-method 2")
418 sink.set_property('sync-method', 2)
419
420 sink.set_property('buffers-soft-max', 250)
421 sink.set_property('buffers-max', 500)
422 else:
423 self.debug("no burst-on-connect, setting sync-method 0")
424 sink.set_property('sync-method', 0)
425
426 sink.set_property('buffers-soft-max', 250)
427 sink.set_property('buffers-max', 500)
428
535
537 return '<MultifdSinkStreamer (%s)>' % self.name
538
540 return self.resource.maxclients
541
543 if self.caps:
544 return self.caps.get_structure(0).get_name()
545
547 mime = self.get_mime()
548 if mime == 'multipart/x-mixed-replace':
549 mime += ";boundary=ThisRandomString"
550 return mime
551
553 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
554
556 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug'
557 if self.plugs[socket]:
558 plug = self.plugs[socket][-1]
559 return plug.getStreamData()
560 else:
561 return {'protocol': 'HTTP',
562 'description': self.description,
563 'url': self.getUrl()}
564
566 """Return a tuple (deltaadded, deltaremoved, bytes_transferred,
567 current_clients, current_load) of our current bandwidth and
568 user values.
569 The deltas are estimates of how much bitrate is added, removed
570 due to client connections, disconnections, per second.
571 """
572
573
574 deltaadded, deltaremoved = self.getLoadDeltas()
575
576 bytes_received = self.getBytesReceived()
577 uptime = self.getUptime()
578 bitrate = bytes_received * 8 / uptime
579
580 bytes_sent = self.getBytesSent()
581 clients_connected = self.getClients()
582 current_load = bitrate * clients_connected
583
584 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent,
585 clients_connected, current_load)
586
590
594
596 """Remove all the clients.
597
598 Returns a deferred fired once all clients have been removed.
599 """
600 if self.resource:
601
602 self.debug("Asking for all clients to be removed")
603 return self.resource.removeAllClients()
604
606 """Update the uiState object.
607 Such updates (through this function) are throttled to a maximum rate,
608 to avoid saturating admin clients with traffic when many clients are
609 connecting/disconnecting.
610 """
611
612 def setIfChanged(k, v):
613 if self.uiState.get(k) != v:
614 self.uiState.set(k, v)
615
616 def update_ui_state_later():
617 self._updateUI_DC = None
618 self.update_ui_state()
619
620 now = time.time()
621
622
623 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD:
624 if self._updateUI_DC:
625 self._updateUI_DC.cancel()
626 self._updateUI_DC = None
627
628 self._lastUpdate = now
629
630
631 self.updateState(setIfChanged)
632 elif not self._updateUI_DC:
633
634
635 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD,
636 update_ui_state_later)
637
642
644 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason)
645 if reason.value_name == 'GST_CLIENT_STATUS_ERROR':
646 self.warning('[fd %5d] Client removed because of write error' % fd)
647
648 self.resource.clientRemoved(sink, fd, reason, stats)
649 Stats.clientRemoved(self)
650 self.update_ui_state()
651
652
653
655 caps = pad.get_negotiated_caps()
656 if caps == None:
657 return
658
659 caps_str = gstreamer.caps_repr(caps)
660 self.debug('Got caps: %s' % caps_str)
661
662 if not self.caps == None:
663 self.warning('Already had caps: %s, replacing' % caps_str)
664
665 self.debug('Storing caps: %s' % caps_str)
666 self.caps = caps
667
668 reactor.callFromThread(self.update_ui_state)
669
670
671
672
673
674
675
676
678 stats = sink.emit('get-stats', fd)
679 self._pending_removals[fd] = (stats, reason)
680
681
682
684 (stats, reason) = self._pending_removals.pop(fd)
685
686 reactor.callFromThread(self._client_removed_handler, sink, fd,
687 reason, stats)
688
689
690
692 if self._updateCallLaterId:
693 self._updateCallLaterId.cancel()
694 self._updateCallLaterId = None
695
696 if self.httpauth:
697 self.httpauth.stopKeepAlive()
698
699 if self._tport:
700 self._tport.stopListening()
701
702 l = []
703
704
705 clients = self.remove_all_clients()
706 if clients:
707 l.append(clients)
708
709 if self.type == 'slave' and self._pbclient:
710 l.append(self._pbclient.deregisterPath(self.mountPoint))
711 return defer.DeferredList(l)
712
714 """Provide a new set of porter login information, for when we're
715 in slave mode and the porter changes.
716 If we're currently connected, this won't disconnect - it'll just change
717 the information so that next time we try and connect we'll use the
718 new ones
719 """
720 if self.type == 'slave':
721 self._porterUsername = username
722 self._porterPassword = password
723
724 creds = credentials.UsernamePassword(self._porterUsername,
725 self._porterPassword)
726
727 self._pbclient.startLogin(creds, self._pbclient.medium)
728
729
730 if path != self._porterPath:
731 self.debug("Changing porter login to use \"%s\"", path)
732 self._porterPath = path
733 self._pbclient.stopTrying()
734
735 self._pbclient.resetDelay()
736 reactor.connectWith(
737 fdserver.FDConnector, self._porterPath,
738 self._pbclient, 10, checkPID=False)
739 else:
740 raise errors.WrongStateError(
741 "Can't specify porter details in master mode")
742
755
757 root = resources.HTTPRoot()
758
759 mount = self.mountPoint[1:]
760 root.putChild(mount, self.resource)
761 if self.type == 'slave':
762
763
764
765
766
767
768
769
770
771
772
773
774
775 self._porterDeferred = d = defer.Deferred()
776 mountpoints = [self.mountPoint]
777 self._pbclient = porterclient.HTTPPorterClientFactory(
778 server.Site(resource=root), mountpoints, d)
779
780 creds = credentials.UsernamePassword(self._porterUsername,
781 self._porterPassword)
782 self._pbclient.startLogin(creds, self._pbclient.medium)
783
784 self.debug("Starting porter login at \"%s\"", self._porterPath)
785
786 reactor.connectWith(
787 fdserver.FDConnector, self._porterPath,
788 self._pbclient, 10, checkPID=False)
789 else:
790
791 try:
792 self.debug('Listening on %d' % self.port)
793 iface = self.iface or ""
794 self._tport = reactor.listenTCP(
795 self.port, server.Site(resource=root),
796 interface=iface)
797 except error.CannotListenError:
798 t = 'Port %d is not available.' % self.port
799 self.warning(t)
800 m = messages.Error(T_(N_(
801 "Network error: TCP port %d is not available."),
802 self.port))
803 self.addMessage(m)
804 self.setMood(moods.sad)
805 return defer.fail(errors.ComponentSetupHandledError(t))
806