Package flumotion :: Package component :: Package common :: Package streamer :: Module streamer
[hide private]

Source Code for Module flumotion.component.common.streamer.streamer

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import time 
 19   
 20  import gst 
 21   
 22  from twisted.cred import credentials 
 23  from twisted.internet import reactor, error, defer 
 24  from twisted.web import server 
 25   
 26  from zope.interface import implements 
 27   
 28  from flumotion.common import errors 
 29  from flumotion.common import messages, netutils, interfaces 
 30  from flumotion.common import format as formatting 
 31  from flumotion.component import feedcomponent 
 32  from flumotion.component.base import http 
 33  from flumotion.component.component import moods 
 34  from flumotion.component.misc.porter import porterclient 
 35  from flumotion.twisted import fdserver 
 36   
 37  from flumotion.common.i18n import N_, gettexter 
 38   
 39  __all__ = ['HTTPMedium'] 
 40  __version__ = "$Rev$" 
 41   
 42  STATS_POLL_INTERVAL = 10 
 43  UI_UPDATE_THROTTLE_PERIOD = 2.0 # Don't update UI more than once every two 
 44                                  # seconds 
 45  T_ = gettexter() 
 46   
 47   
48 -class Stats(object):
49
50 - def __init__(self):
51 self.no_clients = 0 52 self.clients_added_count = 0 53 self.clients_removed_count = 0 54 self.start_time = time.time() 55 # keep track of the highest number and the last epoch this was reached 56 self.peak_client_number = 0 57 self.peak_epoch = self.start_time 58 self.load_deltas = [0, 0] 59 self._load_deltas_period = 10 # seconds 60 self._load_deltas_ongoing = [time.time(), 0, 0] 61 self._currentBitrate = -1 # not known yet 62 self._lastBytesReceived = -1 # not known yet 63 64 # keep track of average clients by tracking last average and its time 65 self.average_client_number = 0 66 self.average_time = self.start_time
67
68 - def _updateAverage(self):
69 # update running average of clients connected 70 now = time.time() 71 # calculate deltas 72 dt1 = self.average_time - self.start_time 73 dc1 = self.average_client_number 74 dt2 = now - self.average_time 75 dc2 = self.no_clients 76 self.average_time = now # we can update now that we used self.a 77 if dt1 == 0: 78 # first measurement 79 self.average_client_number = 0 80 else: 81 dt = dt1 + dt2 82 before = (dc1 * dt1) / dt 83 after = dc2 * dt2 / dt 84 self.average_client_number = before + after
85
86 - def clientAdded(self):
87 self._updateAverage() 88 89 self.no_clients += 1 90 self.clients_added_count +=1 91 92 # >= so we get the last epoch this peak was achieved 93 if self.no_clients >= self.peak_client_number: 94 self.peak_epoch = time.time() 95 self.peak_client_number = self.no_clients
96
97 - def clientRemoved(self):
98 self._updateAverage() 99 self.no_clients -= 1 100 self.clients_removed_count +=1
101
102 - def _updateStats(self):
103 """ 104 Periodically, update our statistics on load deltas, and update the 105 UIState with new values for total bytes, bitrate, etc. 106 """ 107 108 oldtime, oldadd, oldremove = self._load_deltas_ongoing 109 add, remove = self.clients_added_count, self.clients_removed_count 110 now = time.time() 111 diff = float(now - oldtime) 112 113 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff] 114 self._load_deltas_ongoing = [now, add, remove] 115 116 bytesReceived = self.getBytesReceived() 117 if self._lastBytesReceived >= 0: 118 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) * 119 8 / STATS_POLL_INTERVAL) 120 self._lastBytesReceived = bytesReceived 121 122 self.update_ui_state() 123 124 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL, 125 self._updateStats)
126
127 - def getCurrentBitrate(self):
128 if self._currentBitrate >= 0: 129 return self._currentBitrate 130 else: 131 return self.getBytesReceived() * 8 / self.getUptime()
132
133 - def getBytesSent(self):
134 raise NotImplemented("getBytesSent must be implemented by subclasses")
135
136 - def getBytesReceived(self):
137 raise NotImplemented("getBytesReceived must be implemented by " 138 "subclasses")
139
140 - def getUrl(self):
141 raise NotImplementedError("getUrl must be implemented by subclasses")
142
143 - def getUptime(self):
144 return time.time() - self.start_time
145
146 - def getClients(self):
147 return self.no_clients
148
149 - def getPeakClients(self):
150 return self.peak_client_number
151
152 - def getPeakEpoch(self):
153 return self.peak_epoch
154
155 - def getAverageClients(self):
156 return self.average_client_number
157
158 - def getLoadDeltas(self):
159 return self.load_deltas
160
161 - def updateState(self, set):
162 c = self 163 164 bytes_sent = c.getBytesSent() 165 bytes_received = c.getBytesReceived() 166 uptime = c.getUptime() 167 168 set('stream-mime', c.get_mime()) 169 set('stream-url', c.getUrl()) 170 set('stream-uptime', formatting.formatTime(uptime)) 171 bitspeed = bytes_received * 8 / uptime 172 currentbitrate = self.getCurrentBitrate() 173 set('stream-bitrate', formatting.formatStorage(bitspeed) + 'bit/s') 174 set('stream-current-bitrate', 175 formatting.formatStorage(currentbitrate) + 'bit/s') 176 set('stream-totalbytes', 177 formatting.formatStorage(bytes_received) + 'Byte') 178 set('stream-bitrate-raw', bitspeed) 179 set('stream-totalbytes-raw', bytes_received) 180 181 set('clients-current', str(c.getClients())) 182 set('clients-max', str(c.getMaxClients())) 183 set('clients-peak', str(c.getPeakClients())) 184 set('clients-peak-time', c.getPeakEpoch()) 185 set('clients-average', str(int(c.getAverageClients()))) 186 187 bitspeed = bytes_sent * 8 / uptime 188 set('consumption-bitrate', 189 formatting.formatStorage(bitspeed) + 'bit/s') 190 set('consumption-bitrate-current', 191 formatting.formatStorage( 192 currentbitrate * c.getClients()) + 'bit/s') 193 set('consumption-totalbytes', 194 formatting.formatStorage(bytes_sent) + 'Byte') 195 set('consumption-bitrate-raw', bitspeed) 196 set('consumption-totalbytes-raw', bytes_sent)
197 198
199 -class HTTPMedium(feedcomponent.FeedComponentMedium):
200
201 - def __init__(self, comp):
202 """ 203 @type comp: L{Stats} 204 """ 205 feedcomponent.FeedComponentMedium.__init__(self, comp)
206
207 - def authenticate(self, bouncerName, keycard):
208 """ 209 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None. 210 """ 211 d = self.callRemote('authenticate', bouncerName, keycard) 212 return d
213
214 - def keepAlive(self, bouncerName, issuerName, ttl):
215 """ 216 @rtype: L{twisted.internet.defer.Deferred} 217 """ 218 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
219
220 - def removeKeycardId(self, bouncerName, keycardId):
221 """ 222 @rtype: L{twisted.internet.defer.Deferred} 223 """ 224 return self.callRemote('removeKeycardId', bouncerName, keycardId)
225 226 ### remote methods for manager to call on 227
228 - def remote_expireKeycard(self, keycardId):
229 return self.comp.httpauth.expireKeycard(keycardId)
230
231 - def remote_expireKeycards(self, keycardIds):
232 return self.comp.httpauth.expireKeycards(keycardIds)
233
234 - def remote_notifyState(self):
235 self.comp.update_ui_state()
236
237 - def remote_rotateLog(self):
238 self.comp.resource.rotateLogs()
239
240 - def remote_getStreamData(self):
241 return self.comp.getStreamData()
242
243 - def remote_getLoadData(self):
244 return self.comp.getLoadData()
245
246 - def remote_updatePorterDetails(self, path, username, password):
247 return self.comp.updatePorterDetails(path, username, password)
248
249 - def remote_removeAllClients(self):
250 return self.comp.remove_all_clients()
251 252
253 -class Streamer(feedcomponent.ParseLaunchComponent, Stats):
254 implements(interfaces.IStreamingComponent) 255 256 checkOffset = True 257 258 # this object is given to the HTTPMedium as comp 259 logCategory = 'cons-http' 260 261 pipe_template = 'multifdsink name=sinksync=false recover-policy=3' 262 263 componentMediumClass = HTTPMedium 264 siteClass = server.Site 265 multi_files = False 266
267 - def init(self):
268 reactor.debug = True 269 self.debug("HTTP streamer initialising") 270 271 self.caps = None 272 self.resource = None 273 self.httpauth = None 274 self.mountPoint = None 275 self.burst_on_connect = False 276 self.timeout = 0L 277 278 self.description = None 279 280 self.type = None 281 282 # Used if we've slaved to a porter. 283 self._pbclient = None 284 self._porterUsername = None 285 self._porterPassword = None 286 self._porterPath = None 287 288 # Or if we're a master, we open our own port here. Also used for URLs 289 # in the porter case. 290 self.port = None 291 # We listen on this interface, if set. 292 self.iface = None 293 294 self._tport = None 295 296 self._updateCallLaterId = None 297 self._lastUpdate = 0 298 self._updateUI_DC = None 299 300 self._pending_removals = {} 301 302 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate', 303 'stream-bitrate', 'stream-totalbytes', 'clients-current', 304 'clients-max', 'clients-peak', 'clients-peak-time', 305 'clients-average', 'consumption-bitrate', 306 'consumption-bitrate-current', 307 'consumption-totalbytes', 'stream-bitrate-raw', 308 'stream-totalbytes-raw', 'consumption-bitrate-raw', 309 'consumption-totalbytes-raw', 'stream-url'): 310 self.uiState.addKey(i, None)
311
312 - def getDescription(self):
313 return self.description
314
315 - def get_pipeline_string(self, properties):
316 return self.pipe_template
317
318 - def check_properties(self, props, addMessage):
319 320 if props.get('type', 'master') == 'slave': 321 for k in 'socket-path', 'username', 'password': 322 if not 'porter-' + k in props: 323 raise errors.ConfigError("slave mode, missing required" 324 " property 'porter-%s'" % k) 325 326 if 'burst-size' in props and 'burst-time' in props: 327 raise errors.ConfigError('both burst-size and burst-time ' 328 'set, cannot satisfy')
329
331 raise NotImplementedError("configure_auth_and_resource must be " 332 "implemented by subclasses")
333
334 - def parseProperties(self, properties):
335 mountPoint = properties.get('mount-point', '') 336 if not mountPoint.startswith('/'): 337 mountPoint = '/' + mountPoint 338 self.mountPoint = mountPoint 339 340 # Hostname is used for a variety of purposes. We do a best-effort guess 341 # where nothing else is possible, but it's much preferable to just 342 # configure this 343 self.hostname = properties.get('hostname', None) 344 self.iface = self.hostname # We listen on this if explicitly 345 # configured, but not if it's only guessed 346 # at by the below code. 347 if not self.hostname: 348 # Don't call this nasty, nasty, probably flaky function unless we 349 # need to. 350 self.hostname = netutils.guess_public_hostname() 351 352 self.description = properties.get('description', None) 353 if self.description is None: 354 self.description = "Flumotion Stream" 355 356 # check how to set client sync mode 357 358 if 'client-limit' in properties: 359 limit = int(properties['client-limit']) 360 self.resource.setUserLimit(limit) 361 if limit != self.resource.maxclients: 362 m = messages.Info(T_(N_( 363 "Your system configuration does not allow the maximum " 364 "client limit to be set to %d clients."), 365 limit)) 366 m.description = T_(N_( 367 "Learn how to increase the maximum number of clients.")) 368 m.section = 'chapter-optimization' 369 m.anchor = 'section-configuration-system-fd' 370 self.addMessage(m) 371 372 if 'bandwidth-limit' in properties: 373 limit = int(properties['bandwidth-limit']) 374 if limit < 1000: 375 # The wizard used to set this as being in Mbps, oops. 376 self.debug("Bandwidth limit set to unreasonably low %d bps, " 377 "assuming this is meant to be Mbps", limit) 378 limit *= 1000000 379 self.resource.setBandwidthLimit(limit) 380 381 if 'redirect-on-overflow' in properties: 382 self.resource.setRedirectionOnLimits( 383 properties['redirect-on-overflow']) 384 385 if 'bouncer' in properties: 386 self.httpauth.setBouncerName(properties['bouncer']) 387 388 if 'allow-default' in properties: 389 self.httpauth.setAllowDefault(properties['allow-default']) 390 391 if 'duration' in properties: 392 self.httpauth.setDefaultDuration( 393 float(properties['duration'])) 394 395 if 'domain' in properties: 396 self.httpauth.setDomain(properties['domain']) 397 398 if 'avatarId' in self.config: 399 self.httpauth.setRequesterId(self.config['avatarId']) 400 401 if 'ip-filter' in properties: 402 logFilter = http.LogFilter() 403 for f in properties['ip-filter']: 404 logFilter.addIPFilter(f) 405 self.resource.setLogFilter(logFilter) 406 407 if 'timeout' in properties: 408 self.timeout = properties['timeout'] * gst.SECOND 409 410 self.type = properties.get('type', 'master') 411 if self.type == 'slave': 412 # already checked for these in do_check 413 self._porterPath = properties['porter-socket-path'] 414 self._porterUsername = properties['porter-username'] 415 self._porterPassword = properties['porter-password'] 416 417 self.port = int(properties.get('port', 8800))
418
419 - def configure_pipeline(self, pipeline, properties):
420 self._updateCallLaterId = reactor.callLater(10, self._updateStats) 421 self.configure_auth_and_resource() 422 self.parseProperties(properties)
423
424 - def getMaxClients(self):
425 return self.resource.maxclients
426
427 - def hasCaps(self):
428 # all the sinks should have caps set 429 sinkHasCaps = map(lambda sink: sink.caps is not None, self.sinks) 430 return None not in sinkHasCaps
431
432 - def get_mime(self):
433 raise NotImplemented("get_mime must be implemented by subclasses")
434
435 - def get_content_type(self):
436 raise NotImplemented("get_content_type must be implemented by " 437 "subclasses")
438
439 - def getUrl(self):
440 port = self.port 441 442 if self.type == 'slave' and self._pbclient: 443 if not self._pbclient.remote_port: 444 return "" 445 port = self._pbclient.remote_port 446 447 if (not port) or (port == 80): 448 port_str = "" 449 else: 450 port_str = ":%d" % port 451 452 return "http://%s%s%s" % (self.hostname, port_str, self.mountPoint)
453
454 - def getStreamData(self):
455 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug' 456 if self.plugs[socket]: 457 plug = self.plugs[socket][-1] 458 return plug.getStreamData() 459 else: 460 return {'protocol': 'HTTP', 461 'description': self.description, 462 'url': self.getUrl()}
463
464 - def getLoadData(self):
465 """Return a tuple (deltaadded, deltaremoved, bytes_transferred, 466 current_clients, current_load) of our current bandwidth and 467 user values. 468 The deltas are estimates of how much bitrate is added, removed 469 due to client connections, disconnections, per second. 470 """ 471 # We calculate the estimated clients added/removed per second, then 472 # multiply by the stream bitrate 473 deltaadded, deltaremoved = self.getLoadDeltas() 474 475 bytes_received = self.getBytesReceived() 476 uptime = self.getUptime() 477 bitrate = bytes_received * 8 / uptime 478 479 bytes_sent = self.getBytesSent() 480 clients_connected = self.getClients() 481 current_load = bitrate * clients_connected 482 483 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent, 484 clients_connected, current_load)
485
486 - def update_ui_state(self):
487 """Update the uiState object. 488 Such updates (through this function) are throttled to a maximum rate, 489 to avoid saturating admin clients with traffic when many clients are 490 connecting/disconnecting. 491 """ 492 493 def setIfChanged(k, v): 494 if self.uiState.get(k) != v: 495 self.uiState.set(k, v)
496 497 def update_ui_state_later(): 498 self._updateUI_DC = None 499 self.update_ui_state()
500 501 now = time.time() 502 503 # If we haven't updated too recently, do it immediately. 504 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD: 505 if self._updateUI_DC: 506 self._updateUI_DC.cancel() 507 self._updateUI_DC = None 508 509 self._lastUpdate = now 510 # fixme: have updateState just update what changed itself 511 # without the hack above 512 self.updateState(setIfChanged) 513 elif not self._updateUI_DC: 514 # Otherwise, schedule doing this in a few seconds (unless an update 515 # was already scheduled) 516 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD, 517 update_ui_state_later) 518
519 - def do_stop(self):
520 if self._updateCallLaterId: 521 self._updateCallLaterId.cancel() 522 self._updateCallLaterId = None 523 524 if self.httpauth: 525 self.httpauth.stopKeepAlive() 526 527 if self._tport: 528 self._tport.stopListening() 529 530 l = [] 531 # After we stop listening (so new connections aren't possible), 532 # disconnect (and thus log) all the old ones. 533 clients = self.remove_all_clients() 534 if clients: 535 l.append(clients) 536 537 if self.type == 'slave' and self._pbclient: 538 l.append(self._pbclient.deregisterPath(self.mountPoint)) 539 return defer.DeferredList(l)
540
541 - def updatePorterDetails(self, path, username, password):
542 """Provide a new set of porter login information, for when we're 543 in slave mode and the porter changes. 544 If we're currently connected, this won't disconnect - it'll just change 545 the information so that next time we try and connect we'll use the 546 new ones 547 """ 548 if self.type == 'slave': 549 self._porterUsername = username 550 self._porterPassword = password 551 552 creds = credentials.UsernamePassword(self._porterUsername, 553 self._porterPassword) 554 555 self._pbclient.startLogin(creds, self._pbclient.medium) 556 557 # If we've changed paths, we must do some extra work. 558 if path != self._porterPath: 559 self.debug("Changing porter login to use \"%s\"", path) 560 self._porterPath = path 561 self._pbclient.stopTrying() # Stop trying to connect with the 562 # old connector. 563 self._pbclient.resetDelay() 564 reactor.connectWith( 565 fdserver.FDConnector, self._porterPath, 566 self._pbclient, 10, checkPID=False) 567 else: 568 raise errors.WrongStateError( 569 "Can't specify porter details in master mode")
570
571 - def do_pipeline_playing(self):
572 # Override this to not set the component happy; instead do this once 573 # both the pipeline has started AND we've logged in to the porter. 574 if hasattr(self, '_porterDeferred'): 575 d = self._porterDeferred 576 else: 577 d = defer.succeed(None) 578 self.httpauth.scheduleKeepAlive() 579 d.addCallback(lambda res: 580 feedcomponent.ParseLaunchComponent.do_pipeline_playing( 581 self)) 582 return d
583
584 - def remove_all_clients(self):
585 """ Remove all the the clients 586 587 Returns a deferred fired once all clients have been removed 588 """ 589 if self.resource: 590 return self.resource.removeAllClients()
591
592 - def _get_root():
593 raise NotImplementedError("getRoot must be implemented by subclasses")
594
595 - def do_setup(self):
596 root = self._get_root() 597 self._site = self.siteClass(resource=root) 598 if self.type == 'slave': 599 # Streamer is slaved to a porter. 600 601 # We have two things we want to do in parallel: 602 # - ParseLaunchComponent.do_start() 603 # - log in to the porter, then register our mountpoint with 604 # the porter. 605 # So, we return a DeferredList with a deferred for each of 606 # these tasks. The second one's a bit tricky: we pass a dummy 607 # deferred to our PorterClientFactory that gets fired once 608 # we've done all of the tasks the first time (it's an 609 # automatically-reconnecting client factory, and we only fire 610 # this deferred the first time) 611 612 self._porterDeferred = d = defer.Deferred() 613 mountpoints = [self.mountPoint] 614 if self.multi_files: 615 self._pbclient = porterclient.HTTPPorterClientFactory( 616 self._site, [], d, prefixes=mountpoints) 617 else: 618 self._pbclient = porterclient.HTTPPorterClientFactory( 619 self._site, mountpoints, d) 620 621 creds = credentials.UsernamePassword(self._porterUsername, 622 self._porterPassword) 623 self._pbclient.startLogin(creds, self._pbclient.medium) 624 625 self.info("Starting porter login at \"%s\"", self._porterPath) 626 # This will eventually cause d to fire 627 reactor.connectWith( 628 fdserver.FDConnector, self._porterPath, 629 self._pbclient, 10, checkPID=False) 630 else: 631 # Streamer is standalone. 632 try: 633 iface = self.iface or "" 634 self.info('Listening on port %d, interface=%r', 635 self.port, iface) 636 self._tport = reactor.listenTCP( 637 self.port, self._site, interface=iface) 638 except error.CannotListenError: 639 t = 'Port %d is not available.' % self.port 640 self.warning(t) 641 m = messages.Error(T_(N_( 642 "Network error: TCP port %d is not available."), 643 self.port)) 644 self.addMessage(m) 645 self.setMood(moods.sad) 646 return defer.fail(errors.ComponentSetupHandledError(t))
647