1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import time
19
20 import gst
21
22 from twisted.internet import reactor
23
24 from flumotion.common import componentui
25
26 __version__ = "$Rev$"
27
28
30 """
31 This class groups feeder-related information as used by a Feed Component.
32
33 @ivar feederName: name of the feeder
34 @ivar uiState: the serializable UI State for this feeder
35 """
36
38 self.feederName = feederName
39 self.elementName = 'feeder:' + feederName
40 self.payName = self.elementName + '-pay'
41 self.uiState = componentui.WorkerComponentUIState()
42 self.uiState.addKey('feederName')
43 self.uiState.set('feederName', feederName)
44 self.uiState.addListKey('clients')
45 self._fdToClient = {}
46 self._clients = {}
47
49 return ('<Feeder %s (%d client(s))>'
50 % (self.feederName, len(self._clients)))
51
53 """
54 The given client has connected on the given file descriptor, and is
55 being added to multifdsink. This is called solely from the reactor
56 thread.
57
58 @param clientId: id of the client of the feeder
59 @param fd: file descriptor representing the client
60 @param cleanup: callable to be called when the given fd is removed
61 """
62 if clientId not in self._clients:
63
64 client = FeederClient(clientId)
65 self._clients[clientId] = client
66 self.uiState.append('clients', client.uiState)
67
68 client = self._clients[clientId]
69 self._fdToClient[fd] = (client, cleanup)
70
71 client.connected(fd)
72
73 return client
74
76 """
77 The client has been entirely removed from multifdsink, and we may
78 now close its file descriptor.
79 The client object stays around so we can track over multiple
80 connections.
81
82 Called from GStreamer threads.
83
84 @type fd: file descriptor
85 """
86 (client, cleanup) = self._fdToClient.pop(fd)
87 client.disconnected(fd=fd)
88
89
90
91
92 reactor.callFromThread(cleanup, fd)
93
95 """
96 @rtype: list of all L{FeederClient}s ever seen, including currently
97 disconnected clients
98 """
99 return self._clients.values()
100
101
103 """
104 This class groups information related to the client of a feeder.
105 The client is identified by an id.
106 The information remains valid for the lifetime of the feeder, so it
107 can track reconnects of the client.
108
109 @ivar clientId: id of the client of the feeder
110 @ivar fd: file descriptor the client is currently using, or None.
111 """
112
114 self.uiState = componentui.WorkerComponentUIState()
115 self.uiState.addKey('client-id', clientId)
116 self.fd = None
117 self.uiState.addKey('fd', None)
118
119
120
121
122 for key in (
123 'bytes-read-current',
124 'bytes-read-total',
125 'reconnects',
126 'last-connect',
127 'last-disconnect',
128 'last-activity',
129 ):
130 self.uiState.addKey(key, 0)
131
132 for key in (
133 'buffers-dropped-current',
134 'buffers-dropped-total',
135 ):
136 self.uiState.addKey(key, None)
137
138
139 self._buffersDroppedBefore = 0
140 self._bytesReadBefore = 0
141
143 """
144 @type stats: list
145 """
146 bytesSent = stats[0]
147
148
149
150 timeLastActivity = float(stats[4]) / gst.SECOND
151 if len(stats) > 5:
152
153 buffersDropped = stats[5]
154 else:
155
156
157 buffersDropped = 0
158
159 self.uiState.set('bytes-read-current', bytesSent)
160 self.uiState.set('buffers-dropped-current', buffersDropped)
161 self.uiState.set('bytes-read-total', self._bytesReadBefore + bytesSent)
162 self.uiState.set('last-activity', timeLastActivity)
163 if buffersDropped is not None:
164 self.uiState.set('buffers-dropped-total',
165 self._buffersDroppedBefore + buffersDropped)
166
168 """
169 The client has connected on this fd.
170 Update related stats.
171
172 Called only from the reactor thread.
173 """
174 if not when:
175 when = time.time()
176
177 if self.fd:
178
179
180
181 self._updateUIStateForDisconnect(self.fd, when)
182
183 self.fd = fd
184 self.uiState.set('fd', fd)
185 self.uiState.set('last-connect', when)
186 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
187
189 if self.fd == fd:
190 self.fd = None
191 self.uiState.set('fd', None)
192 self.uiState.set('last-disconnect', when)
193
194
195 self._bytesReadBefore += self.uiState.get('bytes-read-current')
196 self.uiState.set('bytes-read-current', 0)
197 if self.uiState.get('buffers-dropped-current') is not None:
198 self._buffersDroppedBefore += self.uiState.get(
199 'buffers-dropped-current')
200 self.uiState.set('buffers-dropped-current', 0)
201
203 """
204 The client has disconnected.
205 Update related stats.
206
207 Called from GStreamer threads.
208 """
209 if self.fd != fd:
210
211
212 return
213
214 if not when:
215 when = time.time()
216
217 reactor.callFromThread(self._updateUIStateForDisconnect, fd,
218 when)
219