1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import time
19
20 import gst
21
22 from twisted.cred import credentials
23 from twisted.internet import reactor, error, defer
24 from twisted.web import server
25
26 from zope.interface import implements
27
28 from flumotion.common import errors
29 from flumotion.common import messages, netutils, interfaces
30 from flumotion.common import format as formatting
31 from flumotion.component import feedcomponent
32 from flumotion.component.base import http
33 from flumotion.component.component import moods
34 from flumotion.component.misc.porter import porterclient
35 from flumotion.twisted import fdserver
36
37 from flumotion.common.i18n import N_, gettexter
38
39 __all__ = ['HTTPMedium']
40 __version__ = "$Rev$"
41
42 STATS_POLL_INTERVAL = 10
43 UI_UPDATE_THROTTLE_PERIOD = 2.0
44
45 T_ = gettexter()
46
47
49
51 self.no_clients = 0
52 self.clients_added_count = 0
53 self.clients_removed_count = 0
54 self.start_time = time.time()
55
56 self.peak_client_number = 0
57 self.peak_epoch = self.start_time
58 self.load_deltas = [0, 0]
59 self._load_deltas_period = 10
60 self._load_deltas_ongoing = [time.time(), 0, 0]
61 self._currentBitrate = -1
62 self._lastBytesReceived = -1
63
64
65 self.average_client_number = 0
66 self.average_time = self.start_time
67
69
70 now = time.time()
71
72 dt1 = self.average_time - self.start_time
73 dc1 = self.average_client_number
74 dt2 = now - self.average_time
75 dc2 = self.no_clients
76 self.average_time = now
77 if dt1 == 0:
78
79 self.average_client_number = 0
80 else:
81 dt = dt1 + dt2
82 before = (dc1 * dt1) / dt
83 after = dc2 * dt2 / dt
84 self.average_client_number = before + after
85
87 self._updateAverage()
88
89 self.no_clients += 1
90 self.clients_added_count +=1
91
92
93 if self.no_clients >= self.peak_client_number:
94 self.peak_epoch = time.time()
95 self.peak_client_number = self.no_clients
96
98 self._updateAverage()
99 self.no_clients -= 1
100 self.clients_removed_count +=1
101
103 """
104 Periodically, update our statistics on load deltas, and update the
105 UIState with new values for total bytes, bitrate, etc.
106 """
107
108 oldtime, oldadd, oldremove = self._load_deltas_ongoing
109 add, remove = self.clients_added_count, self.clients_removed_count
110 now = time.time()
111 diff = float(now - oldtime)
112
113 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff]
114 self._load_deltas_ongoing = [now, add, remove]
115
116 bytesReceived = self.getBytesReceived()
117 if self._lastBytesReceived >= 0:
118 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) *
119 8 / STATS_POLL_INTERVAL)
120 self._lastBytesReceived = bytesReceived
121
122 self.update_ui_state()
123
124 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL,
125 self._updateStats)
126
128 if self._currentBitrate >= 0:
129 return self._currentBitrate
130 else:
131 return self.getBytesReceived() * 8 / self.getUptime()
132
134 raise NotImplemented("getBytesSent must be implemented by subclasses")
135
137 raise NotImplemented("getBytesReceived must be implemented by "
138 "subclasses")
139
141 raise NotImplementedError("getUrl must be implemented by subclasses")
142
144 return time.time() - self.start_time
145
147 return self.no_clients
148
150 return self.peak_client_number
151
153 return self.peak_epoch
154
156 return self.average_client_number
157
159 return self.load_deltas
160
162 c = self
163
164 bytes_sent = c.getBytesSent()
165 bytes_received = c.getBytesReceived()
166 uptime = c.getUptime()
167
168 set('stream-mime', c.get_mime())
169 set('stream-url', c.getUrl())
170 set('stream-uptime', formatting.formatTime(uptime))
171 bitspeed = bytes_received * 8 / uptime
172 currentbitrate = self.getCurrentBitrate()
173 set('stream-bitrate', formatting.formatStorage(bitspeed) + 'bit/s')
174 set('stream-current-bitrate',
175 formatting.formatStorage(currentbitrate) + 'bit/s')
176 set('stream-totalbytes',
177 formatting.formatStorage(bytes_received) + 'Byte')
178 set('stream-bitrate-raw', bitspeed)
179 set('stream-totalbytes-raw', bytes_received)
180
181 set('clients-current', str(c.getClients()))
182 set('clients-max', str(c.getMaxClients()))
183 set('clients-peak', str(c.getPeakClients()))
184 set('clients-peak-time', c.getPeakEpoch())
185 set('clients-average', str(int(c.getAverageClients())))
186
187 bitspeed = bytes_sent * 8 / uptime
188 set('consumption-bitrate',
189 formatting.formatStorage(bitspeed) + 'bit/s')
190 set('consumption-bitrate-current',
191 formatting.formatStorage(
192 currentbitrate * c.getClients()) + 'bit/s')
193 set('consumption-totalbytes',
194 formatting.formatStorage(bytes_sent) + 'Byte')
195 set('consumption-bitrate-raw', bitspeed)
196 set('consumption-totalbytes-raw', bytes_sent)
197
198
199 -class HTTPMedium(feedcomponent.FeedComponentMedium):
200
206
208 """
209 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
210 """
211 d = self.callRemote('authenticate', bouncerName, keycard)
212 return d
213
214 - def keepAlive(self, bouncerName, issuerName, ttl):
215 """
216 @rtype: L{twisted.internet.defer.Deferred}
217 """
218 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
219
221 """
222 @rtype: L{twisted.internet.defer.Deferred}
223 """
224 return self.callRemote('removeKeycardId', bouncerName, keycardId)
225
226
227
230
233
236
239
242
245
248
251
252
253 -class Streamer(feedcomponent.ParseLaunchComponent, Stats):
254 implements(interfaces.IStreamingComponent)
255
256 checkOffset = True
257
258
259 logCategory = 'cons-http'
260
261 pipe_template = 'multifdsink name=sinksync=false recover-policy=3'
262
263 componentMediumClass = HTTPMedium
264 siteClass = server.Site
265 multi_files = False
266
268 reactor.debug = True
269 self.debug("HTTP streamer initialising")
270
271 self.caps = None
272 self.resource = None
273 self.httpauth = None
274 self.mountPoint = None
275 self.burst_on_connect = False
276 self.timeout = 0L
277
278 self.description = None
279
280 self.type = None
281
282
283 self._pbclient = None
284 self._porterUsername = None
285 self._porterPassword = None
286 self._porterPath = None
287
288
289
290 self.port = None
291
292 self.iface = None
293
294 self._tport = None
295
296 self._updateCallLaterId = None
297 self._lastUpdate = 0
298 self._updateUI_DC = None
299
300 self._pending_removals = {}
301
302 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate',
303 'stream-bitrate', 'stream-totalbytes', 'clients-current',
304 'clients-max', 'clients-peak', 'clients-peak-time',
305 'clients-average', 'consumption-bitrate',
306 'consumption-bitrate-current',
307 'consumption-totalbytes', 'stream-bitrate-raw',
308 'stream-totalbytes-raw', 'consumption-bitrate-raw',
309 'consumption-totalbytes-raw', 'stream-url'):
310 self.uiState.addKey(i, None)
311
314
317
319
320 if props.get('type', 'master') == 'slave':
321 for k in 'socket-path', 'username', 'password':
322 if not 'porter-' + k in props:
323 raise errors.ConfigError("slave mode, missing required"
324 " property 'porter-%s'" % k)
325
326 if 'burst-size' in props and 'burst-time' in props:
327 raise errors.ConfigError('both burst-size and burst-time '
328 'set, cannot satisfy')
329
333
418
423
425 return self.resource.maxclients
426
428
429 sinkHasCaps = map(lambda sink: sink.caps is not None, self.sinks)
430 return None not in sinkHasCaps
431
433 raise NotImplemented("get_mime must be implemented by subclasses")
434
436 raise NotImplemented("get_content_type must be implemented by "
437 "subclasses")
438
440 port = self.port
441
442 if self.type == 'slave' and self._pbclient:
443 if not self._pbclient.remote_port:
444 return ""
445 port = self._pbclient.remote_port
446
447 if (not port) or (port == 80):
448 port_str = ""
449 else:
450 port_str = ":%d" % port
451
452 return "http://%s%s%s" % (self.hostname, port_str, self.mountPoint)
453
455 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug'
456 if self.plugs[socket]:
457 plug = self.plugs[socket][-1]
458 return plug.getStreamData()
459 else:
460 return {'protocol': 'HTTP',
461 'description': self.description,
462 'url': self.getUrl()}
463
465 """Return a tuple (deltaadded, deltaremoved, bytes_transferred,
466 current_clients, current_load) of our current bandwidth and
467 user values.
468 The deltas are estimates of how much bitrate is added, removed
469 due to client connections, disconnections, per second.
470 """
471
472
473 deltaadded, deltaremoved = self.getLoadDeltas()
474
475 bytes_received = self.getBytesReceived()
476 uptime = self.getUptime()
477 bitrate = bytes_received * 8 / uptime
478
479 bytes_sent = self.getBytesSent()
480 clients_connected = self.getClients()
481 current_load = bitrate * clients_connected
482
483 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent,
484 clients_connected, current_load)
485
487 """Update the uiState object.
488 Such updates (through this function) are throttled to a maximum rate,
489 to avoid saturating admin clients with traffic when many clients are
490 connecting/disconnecting.
491 """
492
493 def setIfChanged(k, v):
494 if self.uiState.get(k) != v:
495 self.uiState.set(k, v)
496
497 def update_ui_state_later():
498 self._updateUI_DC = None
499 self.update_ui_state()
500
501 now = time.time()
502
503
504 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD:
505 if self._updateUI_DC:
506 self._updateUI_DC.cancel()
507 self._updateUI_DC = None
508
509 self._lastUpdate = now
510
511
512 self.updateState(setIfChanged)
513 elif not self._updateUI_DC:
514
515
516 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD,
517 update_ui_state_later)
518
520 if self._updateCallLaterId:
521 self._updateCallLaterId.cancel()
522 self._updateCallLaterId = None
523
524 if self.httpauth:
525 self.httpauth.stopKeepAlive()
526
527 if self._tport:
528 self._tport.stopListening()
529
530 l = []
531
532
533 clients = self.remove_all_clients()
534 if clients:
535 l.append(clients)
536
537 if self.type == 'slave' and self._pbclient:
538 l.append(self._pbclient.deregisterPath(self.mountPoint))
539 return defer.DeferredList(l)
540
542 """Provide a new set of porter login information, for when we're
543 in slave mode and the porter changes.
544 If we're currently connected, this won't disconnect - it'll just change
545 the information so that next time we try and connect we'll use the
546 new ones
547 """
548 if self.type == 'slave':
549 self._porterUsername = username
550 self._porterPassword = password
551
552 creds = credentials.UsernamePassword(self._porterUsername,
553 self._porterPassword)
554
555 self._pbclient.startLogin(creds, self._pbclient.medium)
556
557
558 if path != self._porterPath:
559 self.debug("Changing porter login to use \"%s\"", path)
560 self._porterPath = path
561 self._pbclient.stopTrying()
562
563 self._pbclient.resetDelay()
564 reactor.connectWith(
565 fdserver.FDConnector, self._porterPath,
566 self._pbclient, 10, checkPID=False)
567 else:
568 raise errors.WrongStateError(
569 "Can't specify porter details in master mode")
570
583
585 """ Remove all the the clients
586
587 Returns a deferred fired once all clients have been removed
588 """
589 if self.resource:
590 return self.resource.removeAllClients()
591
593 raise NotImplementedError("getRoot must be implemented by subclasses")
594
596 root = self._get_root()
597 self._site = self.siteClass(resource=root)
598 if self.type == 'slave':
599
600
601
602
603
604
605
606
607
608
609
610
611
612 self._porterDeferred = d = defer.Deferred()
613 mountpoints = [self.mountPoint]
614 if self.multi_files:
615 self._pbclient = porterclient.HTTPPorterClientFactory(
616 self._site, [], d, prefixes=mountpoints)
617 else:
618 self._pbclient = porterclient.HTTPPorterClientFactory(
619 self._site, mountpoints, d)
620
621 creds = credentials.UsernamePassword(self._porterUsername,
622 self._porterPassword)
623 self._pbclient.startLogin(creds, self._pbclient.medium)
624
625 self.info("Starting porter login at \"%s\"", self._porterPath)
626
627 reactor.connectWith(
628 fdserver.FDConnector, self._porterPath,
629 self._pbclient, 10, checkPID=False)
630 else:
631
632 try:
633 iface = self.iface or ""
634 self.info('Listening on port %d, interface=%r',
635 self.port, iface)
636 self._tport = reactor.listenTCP(
637 self.port, self._site, interface=iface)
638 except error.CannotListenError:
639 t = 'Port %d is not available.' % self.port
640 self.warning(t)
641 m = messages.Error(T_(N_(
642 "Network error: TCP port %d is not available."),
643 self.port))
644 self.addMessage(m)
645 self.setMood(moods.sad)
646 return defer.fail(errors.ComponentSetupHandledError(t))
647