Package flumotion :: Package component :: Module feed
[hide private]

Source Code for Module flumotion.component.feed

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  implementation of a PB Client to interface with feedserver.py 
 20  """ 
 21   
 22  import socket 
 23  import os 
 24   
 25  from twisted.internet import reactor, main, defer, tcp 
 26  from twisted.python import failure 
 27  from zope.interface import implements 
 28   
 29  from flumotion.common import log, common, interfaces 
 30  from flumotion.twisted import pb as fpb 
 31   
 32  __version__ = "$Rev$" 
 33   
 34   
 35  # copied from fdserver.py so that it can be bundled 
 36   
 37   
38 -class _SocketMaybeCloser(tcp._SocketCloser):
39 keepSocketAlive = False 40
41 - def _closeSocket(self):
42 # We override this (from tcp._SocketCloser) so that we can close 43 # sockets properly in the normal case, but once we've passed our 44 # socket on via the FD-channel, we just close() it (not calling 45 # shutdown() which will close the TCP channel without closing 46 # the FD itself) 47 if self.keepSocketAlive: 48 try: 49 self.socket.close() 50 except socket.error: 51 pass 52 else: 53 tcp._SocketCloser._closeSocket(self)
54 55
56 -class PassableClientConnection(_SocketMaybeCloser, tcp.Client):
57 pass
58 59
60 -class PassableClientConnector(tcp.Connector):
61 # It is unfortunate, but it seems that either we override this 62 # private-ish method or reimplement BaseConnector.connect(). This is 63 # the path that tcp.py takes, so we take it too. 64
65 - def _makeTransport(self):
66 return PassableClientConnection(self.host, self.port, 67 self.bindAddress, self, 68 self.reactor)
69 70
71 -class FeedClientFactory(fpb.FPBClientFactory, log.Loggable):
72 """ 73 I am a client factory used by a feed component's medium to log into 74 a worker and exchange feeds. 75 """ 76 logCategory = 'feedclient' 77 perspectiveInterface = interfaces.IFeedMedium 78
79 - def __init__(self, medium):
80 fpb.FPBClientFactory.__init__(self) 81 self.medium = medium
82 83 # not a BaseMedium because we are going to do strange things to the transport 84 85
86 -class FeedMedium(fpb.Referenceable):
87 """ 88 I am a client for a Feed Server. 89 90 I am used as the remote interface between a component and another 91 component. 92 93 @ivar component: the component this is a feed client for 94 @type component: L{flumotion.component.feedcomponent.FeedComponent} 95 @ivar remote: a reference to a 96 L{flumotion.worker.feedserver.FeedAvatar} 97 @type remote: L{twisted.spread.pb.RemoteReference} 98 """ 99 logCategory = 'feedmedium' 100 remoteLogName = 'feedserver' 101 implements(interfaces.IFeedMedium) 102 103 remote = None 104
105 - def __init__(self, logName=None):
106 if logName: 107 assert isinstance(logName, str) 108 self.logName = logName 109 self._factory = None 110 self._feedToDeferred = defer.Deferred()
111
112 - def startConnecting(self, host, port, authenticator, timeout=30, 113 bindAddress=None):
114 """Optional helper method to connect to a remote feed server. 115 116 This method starts a client factory connecting via a 117 L{PassableClientConnector}. It offers the possibility of 118 cancelling an in-progress connection via the stopConnecting() 119 method. 120 121 @param host: the remote host name 122 @type host: str 123 @param port: the tcp port on which to connect 124 @param port int 125 @param authenticator: the authenticator, normally provided by 126 the worker 127 @type authenticator: L{flumotion.twisted.pb.Authenticator} 128 129 @returns: a deferred that will fire with the remote reference, 130 once we have authenticated. 131 """ 132 assert self._factory is None 133 self._factory = FeedClientFactory(self) 134 reactor.connectWith(PassableClientConnector, host, port, 135 self._factory, timeout, bindAddress) 136 return self._factory.login(authenticator)
137
138 - def requestFeed(self, host, port, authenticator, fullFeedId):
139 """Request a feed from a remote feed server. 140 141 This helper method calls startConnecting() to make the 142 connection and authenticate, and will return the feed file 143 descriptor or an error. A pending connection attempt can be 144 cancelled via stopConnecting(). 145 146 @param host: the remote host name 147 @type host: str 148 @param port: the tcp port on which to connect 149 @type port: int 150 @param authenticator: the authenticator, normally provided by 151 the worker 152 @type authenticator: L{flumotion.twisted.pb.Authenticator} 153 @param fullFeedId: the full feed id (/flow/component:feed) 154 offered by the remote side 155 @type fullFeedId: str 156 157 @returns: a deferred that, if successful, will fire with a pair 158 (feedId, fd). In an error case it will errback and close the 159 remote connection. 160 """ 161 162 def connected(remote): 163 self.setRemoteReference(remote) 164 return remote.callRemote('sendFeed', fullFeedId)
165 166 def feedSent(res): 167 # res is None 168 # either just before or just after this, we received a 169 # sendFeedReply call from the feedserver. so now we're 170 # waiting on the component to get its fd 171 return self._feedToDeferred
172 173 def error(failure): 174 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 175 host, port) 176 self.debug('failure: %s', log.getFailureMessage(failure)) 177 self.debug('closing connection') 178 self.stopConnecting() 179 return failure 180 181 d = self.startConnecting(host, port, authenticator) 182 d.addCallback(connected) 183 d.addCallback(feedSent) 184 d.addErrback(error) 185 return d 186
187 - def sendFeed(self, host, port, authenticator, fullFeedId):
188 """Send a feed to a remote feed server. 189 190 This helper method calls startConnecting() to make the 191 connection and authenticate, and will return the feed file 192 descriptor or an error. A pending connection attempt can be 193 cancelled via stopConnecting(). 194 195 @param host: the remote host name 196 @type host: str 197 @param port: the tcp port on which to connect 198 @type port: int 199 @param authenticator: the authenticator, normally provided by 200 the worker 201 @type authenticator: L{flumotion.twisted.pb.Authenticator} 202 @param fullFeedId: the full feed id (/flow/component:eaterAlias) 203 to feed to on the remote size 204 @type fullFeedId: str 205 206 @returns: a deferred that, if successful, will fire with a pair 207 (feedId, fd). In an error case it will errback and close the 208 remote connection. 209 """ 210 211 def connected(remote): 212 assert isinstance(remote.broker.transport, _SocketMaybeCloser) 213 self.setRemoteReference(remote) 214 return remote.callRemote('receiveFeed', fullFeedId)
215 216 def feedSent(res): 217 t = self.remote.broker.transport 218 self.debug('stop reading from transport') 219 t.stopReading() 220 221 self.debug('flushing PB write queue') 222 t.doWrite() 223 self.debug('stop writing to transport') 224 t.stopWriting() 225 226 t.keepSocketAlive = True 227 fd = os.dup(t.fileno()) 228 229 # avoid refcount cycles 230 self.setRemoteReference(None) 231 232 d = defer.Deferred() 233 234 def loseConnection(): 235 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 236 d.callback((fullFeedId, fd)) 237 238 reactor.callLater(0, loseConnection) 239 return d 240 241 def error(failure): 242 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 243 host, port) 244 self.debug('failure: %s', log.getFailureMessage(failure)) 245 self.debug('closing connection') 246 self.stopConnecting() 247 return failure 248 249 d = self.startConnecting(host, port, authenticator) 250 d.addCallback(connected) 251 d.addCallback(feedSent) 252 d.addErrback(error) 253 return d 254
255 - def stopConnecting(self):
256 """Stop a pending or established connection made via 257 startConnecting(). 258 259 Stops any established or pending connection to a remote feed 260 server started via the startConnecting() method. Safe to call 261 even if connection has not been started. 262 """ 263 if self._factory: 264 self._factory.disconnect() 265 self._factory = None 266 # not sure if this is necessary; call it just in case, so we 267 # don't leave a lingering reference cycle 268 self.setRemoteReference(None)
269 270 ### IMedium methods 271
272 - def setRemoteReference(self, remoteReference):
273 self.remote = remoteReference
274
275 - def hasRemoteReference(self):
276 return self.remote is not None
277
278 - def callRemote(self, name, *args, **kwargs):
279 return self.remote.callRemote(name, args, kwargs)
280
281 - def remote_sendFeedReply(self, fullFeedId):
282 t = self.remote.broker.transport 283 # make sure we stop receiving PB messages 284 self.debug('stop reading from transport') 285 t.stopReading() 286 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
287
288 - def _doFeedTo(self, fullFeedId, t):
289 self.debug('flushing PB write queue') 290 t.doWrite() 291 self.debug('stop writing to transport') 292 t.stopWriting() 293 294 # make sure shutdown() is not called on the socket 295 t.keepSocketAlive = True 296 297 fd = os.dup(t.fileno()) 298 # Similar to feedserver._sendFeedReplyCb, but since we are in a 299 # callLater, not doReadOrWrite, we call connectionLost directly 300 # on the transport. 301 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 302 303 # This medium object is of no use any more; drop our reference 304 # to the remote so we can avoid cycles. 305 self.setRemoteReference(None) 306 307 (flowName, componentName, feedName) = common.parseFullFeedId( 308 fullFeedId) 309 feedId = common.feedId(componentName, feedName) 310 311 self.debug('firing deferred with feedId %s on fd %d', feedId, 312 fd) 313 self._feedToDeferred.callback((feedId, fd))
314