Package flumotion :: Package component :: Module feeder
[hide private]

Source Code for Module flumotion.component.feeder

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import time 
 19   
 20  import gst 
 21   
 22  from twisted.internet import reactor 
 23   
 24  from flumotion.common import componentui 
 25   
 26  __version__ = "$Rev$" 
 27   
 28   
29 -class Feeder:
30 """ 31 This class groups feeder-related information as used by a Feed Component. 32 33 @ivar feederName: name of the feeder 34 @ivar uiState: the serializable UI State for this feeder 35 """ 36
37 - def __init__(self, feederName):
38 self.feederName = feederName 39 self.elementName = 'feeder:' + feederName 40 self.payName = self.elementName + '-pay' 41 self.uiState = componentui.WorkerComponentUIState() 42 self.uiState.addKey('feederName') 43 self.uiState.set('feederName', feederName) 44 self.uiState.addListKey('clients') 45 self._fdToClient = {} # fd -> (FeederClient, cleanupfunc) 46 self._clients = {} # id -> FeederClient
47
48 - def __repr__(self):
49 return ('<Feeder %s (%d client(s))>' 50 % (self.feederName, len(self._clients)))
51
52 - def clientConnected(self, clientId, fd, cleanup):
53 """ 54 The given client has connected on the given file descriptor, and is 55 being added to multifdsink. This is called solely from the reactor 56 thread. 57 58 @param clientId: id of the client of the feeder 59 @param fd: file descriptor representing the client 60 @param cleanup: callable to be called when the given fd is removed 61 """ 62 if clientId not in self._clients: 63 # first time we see this client, create an object 64 client = FeederClient(clientId) 65 self._clients[clientId] = client 66 self.uiState.append('clients', client.uiState) 67 68 client = self._clients[clientId] 69 self._fdToClient[fd] = (client, cleanup) 70 71 client.connected(fd) 72 73 return client
74
75 - def clientDisconnected(self, fd):
76 """ 77 The client has been entirely removed from multifdsink, and we may 78 now close its file descriptor. 79 The client object stays around so we can track over multiple 80 connections. 81 82 Called from GStreamer threads. 83 84 @type fd: file descriptor 85 """ 86 (client, cleanup) = self._fdToClient.pop(fd) 87 client.disconnected(fd=fd) 88 89 # To avoid races between this thread (a GStreamer thread) closing the 90 # FD, and the reactor thread reusing this FD, we only actually perform 91 # the close in the reactor thread. 92 reactor.callFromThread(cleanup, fd)
93
94 - def getClients(self):
95 """ 96 @rtype: list of all L{FeederClient}s ever seen, including currently 97 disconnected clients 98 """ 99 return self._clients.values()
100 101
102 -class FeederClient:
103 """ 104 This class groups information related to the client of a feeder. 105 The client is identified by an id. 106 The information remains valid for the lifetime of the feeder, so it 107 can track reconnects of the client. 108 109 @ivar clientId: id of the client of the feeder 110 @ivar fd: file descriptor the client is currently using, or None. 111 """ 112
113 - def __init__(self, clientId):
114 self.uiState = componentui.WorkerComponentUIState() 115 self.uiState.addKey('client-id', clientId) 116 self.fd = None 117 self.uiState.addKey('fd', None) 118 119 # these values can be set to None, which would mean 120 # Unknown, not supported 121 # these are supported 122 for key in ( 123 'bytes-read-current', # bytes read over current connection 124 'bytes-read-total', # bytes read over all connections 125 'reconnects', # number of connections made by this client 126 'last-connect', # last client connection, in epoch seconds 127 'last-disconnect', # last client disconnect, in epoch seconds 128 'last-activity', # last time client read or connected 129 ): 130 self.uiState.addKey(key, 0) 131 # these are possibly unsupported 132 for key in ( 133 'buffers-dropped-current', # buffers dropped over current conn 134 'buffers-dropped-total', # buffers dropped over all connections 135 ): 136 self.uiState.addKey(key, None) 137 138 # internal state allowing us to track global numbers 139 self._buffersDroppedBefore = 0 140 self._bytesReadBefore = 0
141
142 - def setStats(self, stats):
143 """ 144 @type stats: list 145 """ 146 bytesSent = stats[0] 147 #timeAdded = stats[1] 148 #timeRemoved = stats[2] 149 #timeActive = stats[3] 150 timeLastActivity = float(stats[4]) / gst.SECOND 151 if len(stats) > 5: 152 # added in gst-plugins-base 0.10.11 153 buffersDropped = stats[5] 154 else: 155 # We don't know, but we cannot use None 156 # since that would break integer addition below 157 buffersDropped = 0 158 159 self.uiState.set('bytes-read-current', bytesSent) 160 self.uiState.set('buffers-dropped-current', buffersDropped) 161 self.uiState.set('bytes-read-total', self._bytesReadBefore + bytesSent) 162 self.uiState.set('last-activity', timeLastActivity) 163 if buffersDropped is not None: 164 self.uiState.set('buffers-dropped-total', 165 self._buffersDroppedBefore + buffersDropped)
166
167 - def connected(self, fd, when=None):
168 """ 169 The client has connected on this fd. 170 Update related stats. 171 172 Called only from the reactor thread. 173 """ 174 if not when: 175 when = time.time() 176 177 if self.fd: 178 # It's normal to receive a reconnection before we notice 179 # that an old connection has been closed. Perform the 180 # disconnection logic for the old FD if necessary. See #591. 181 self._updateUIStateForDisconnect(self.fd, when) 182 183 self.fd = fd 184 self.uiState.set('fd', fd) 185 self.uiState.set('last-connect', when) 186 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
187
188 - def _updateUIStateForDisconnect(self, fd, when):
189 if self.fd == fd: 190 self.fd = None 191 self.uiState.set('fd', None) 192 self.uiState.set('last-disconnect', when) 193 194 # update our internal counters and reset current counters to 0 195 self._bytesReadBefore += self.uiState.get('bytes-read-current') 196 self.uiState.set('bytes-read-current', 0) 197 if self.uiState.get('buffers-dropped-current') is not None: 198 self._buffersDroppedBefore += self.uiState.get( 199 'buffers-dropped-current') 200 self.uiState.set('buffers-dropped-current', 0)
201
202 - def disconnected(self, when=None, fd=None):
203 """ 204 The client has disconnected. 205 Update related stats. 206 207 Called from GStreamer threads. 208 """ 209 if self.fd != fd: 210 # assume that connected() already called 211 # _updateUIStateForDisconnect for us 212 return 213 214 if not when: 215 when = time.time() 216 217 reactor.callFromThread(self._updateUIStateForDisconnect, fd, 218 when)
219