Package flumotion :: Package component :: Module eater
[hide private]

Source Code for Module flumotion.component.eater

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  # This module should not import or depend on gst 
 19  # All timestamps should be in float seconds 
 20   
 21  import time 
 22   
 23  from twisted.internet import reactor 
 24   
 25  from flumotion.common import componentui 
 26   
 27  __version__ = "$Rev$" 
 28   
 29   
30 -class Eater:
31 """ 32 This class groups eater-related information as used by a Feed Component. 33 34 The UI state contains the following keys: 35 - eater-alias: str 36 - eater-name: str 37 - connection: dict of 38 - feed-id 39 - time-timestamp-discont 40 - timestamp-timestamp-discont 41 - last-timestamp-discont 42 - total-timestamp-discont 43 - count-timestamp-discont 44 - time-offset-discont 45 - offset-offset-discont 46 - last-offset-discont 47 - total-offset-discont 48 - count-offset-discont 49 - last-connect 50 - last-disconnect 51 - total-connections 52 - count-timestamp-discont 53 - count-offset-discont 54 - total-timestamp-discont 55 - total-offset-discont 56 - fd 57 58 @ivar eaterAlias: the alias of this eater (e.g. "default", "video", 59 ...) 60 @ivar feedId: id of the feed this is eating from 61 @ivar uiState: the serializable UI State for this eater 62 """ 63
64 - def __init__(self, eaterAlias, eaterName):
65 self.eaterAlias = eaterAlias 66 self.eaterName = eaterName 67 self.feedId = None 68 self.fd = None 69 self.elementName = 'eater:' + eaterAlias 70 self.depayName = self.elementName + '-depay' 71 # for use to detect duplicate streamheader buffers 72 self.streamheaderBufferProbeHandler = None 73 self.setPadMonitor(None) 74 self.uiState = componentui.WorkerComponentUIState() 75 self.uiState.addKey('eater-alias') 76 self.uiState.set('eater-alias', eaterAlias) 77 self.uiState.addKey('eater-name') 78 self.uiState.set('eater-name', eaterName) 79 # dict for the current connection 80 connectionDict = { 81 "feed-id": None, 82 "time-timestamp-discont": None, 83 "timestamp-timestamp-discont": 0.0, # ts of buffer after discont, 84 # in float seconds 85 "last-timestamp-discont": 0.0, 86 "total-timestamp-discont": 0.0, 87 "count-timestamp-discont": 0, 88 "time-offset-discont": None, 89 "offset-offset-discont": 0, # offset of buffer 90 # after discont 91 "last-offset-discont": 0, 92 "total-offset-discont": 0, 93 "count-offset-discont": 0} 94 self.uiState.addDictKey('connection', connectionDict) 95 96 for key in ( 97 'last-connect', # last client connection, in epoch sec 98 'last-disconnect', # last client disconnect, in epoch sec 99 'total-connections', # number of connections by this client 100 'count-timestamp-discont', # number of timestamp disconts seen 101 'count-offset-discont', # number of timestamp disconts seen 102 ): 103 self.uiState.addKey(key, 0) 104 for key in ( 105 'total-timestamp-discont', # total timestamp discontinuity 106 'total-offset-discont', # total offset discontinuity 107 ): 108 self.uiState.addKey(key, 0.0) 109 self.uiState.addKey('fd', None)
110
111 - def __repr__(self):
112 return '<Eater %s %s>' % (self.eaterAlias, 113 (self.feedId and '(disconnected)' 114 or ('eating from %s' % self.feedId)))
115
116 - def connected(self, fd, feedId, when=None):
117 """ 118 The eater has been connected. 119 Update related stats. 120 """ 121 if not when: 122 when = time.time() 123 124 self.feedId = feedId 125 self.fd = fd 126 127 self.uiState.set('last-connect', when) 128 self.uiState.set('fd', fd) 129 self.uiState.set('total-connections', 130 self.uiState.get('total-connections', 0) + 1) 131 132 self.uiState.setitem("connection", 'feed-id', feedId) 133 self.uiState.setitem("connection", "count-timestamp-discont", 0) 134 self.uiState.setitem("connection", "time-timestamp-discont", None) 135 self.uiState.setitem("connection", "last-timestamp-discont", 0.0) 136 self.uiState.setitem("connection", "total-timestamp-discont", 0.0) 137 self.uiState.setitem("connection", "count-offset-discont", 0) 138 self.uiState.setitem("connection", "time-offset-discont", None) 139 self.uiState.setitem("connection", "last-offset-discont", 0) 140 self.uiState.setitem("connection", "total-offset-discont", 0)
141
142 - def disconnected(self, when=None):
143 """ 144 The eater has been disconnected. 145 Update related stats. 146 """ 147 if not when: 148 when = time.time() 149 150 def updateUIState(): 151 self.uiState.set('last-disconnect', when) 152 self.fd = None 153 self.uiState.set('fd', None)
154 155 reactor.callFromThread(updateUIState)
156
157 - def setPadMonitor(self, monitor):
158 self._padMonitor = monitor
159
160 - def isActive(self):
161 return self._padMonitor and self._padMonitor.isActive()
162
163 - def addWatch(self, setActive, setInactive):
164 self._padMonitor.addWatch(lambda _: setActive(self.eaterAlias), 165 lambda _: setInactive(self.eaterAlias))
166
167 - def timestampDiscont(self, seconds, timestamp):
168 """ 169 @param seconds: discont duration in seconds 170 @param timestamp: GStreamer timestamp of new buffer, in seconds. 171 172 Inform the eater of a timestamp discontinuity. 173 This is called from a bus message handler, so in the main thread. 174 """ 175 uiState = self.uiState 176 177 c = uiState.get('connection') # dict 178 uiState.setitem('connection', 'count-timestamp-discont', 179 c.get('count-timestamp-discont', 0) + 1) 180 uiState.set('count-timestamp-discont', 181 uiState.get('count-timestamp-discont', 0) + 1) 182 183 uiState.setitem('connection', 'time-timestamp-discont', time.time()) 184 uiState.setitem('connection', 'timestamp-timestamp-discont', timestamp) 185 uiState.setitem('connection', 'last-timestamp-discont', seconds) 186 uiState.setitem('connection', 'total-timestamp-discont', 187 c.get('total-timestamp-discont', 0) + seconds) 188 uiState.set('total-timestamp-discont', 189 uiState.get('total-timestamp-discont', 0) + seconds)
190
191 - def offsetDiscont(self, units, offset):
192 """ 193 Inform the eater of an offset discontinuity. 194 This is called from a bus message handler, so in the main thread. 195 """ 196 uiState = self.uiState 197 198 c = uiState.get('connection') # dict 199 uiState.setitem('connection', 'count-offset-discont', 200 c.get('count-offset-discont', 0) + 1) 201 uiState.set('count-offset-discont', 202 uiState.get('count-offset-discont', 0) + 1) 203 204 uiState.setitem('connection', 'time-offset-discont', time.time()) 205 uiState.setitem('connection', 'offset-offset-discont', offset) 206 uiState.setitem('connection', 'last-offset-discont', units) 207 uiState.setitem('connection', 'total-offset-discont', 208 c.get('total-offset-discont', 0) + units) 209 uiState.set('total-offset-discont', 210 uiState.get('total-offset-discont', 0) + units)
211