Package flumotion :: Package component :: Module feed
[hide private]

Source Code for Module flumotion.component.feed

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  implementation of a PB Client to interface with feedserver.py 
 20  """ 
 21   
 22  import socket 
 23  import os 
 24   
 25  from twisted.internet import reactor, main, defer, tcp 
 26  from twisted.python import failure 
 27  from zope.interface import implements 
 28   
 29  from flumotion.common import log, common, interfaces 
 30  from flumotion.twisted import pb as fpb 
 31   
 32  __version__ = "$Rev$" 
 33   
 34   
 35  # copied from fdserver.py so that it can be bundled 
 36   
 37   
38 -class _SocketMaybeCloser(tcp._SocketCloser):
39 keepSocketAlive = False 40
41 - def _closeSocket(self, orderly=False):
42 # We override this (from tcp._SocketCloser) so that we can close 43 # sockets properly in the normal case, but once we've passed our 44 # socket on via the FD-channel, we just close() it (not calling 45 # shutdown() which will close the TCP channel without closing 46 # the FD itself) 47 if self.keepSocketAlive: 48 try: 49 self.socket.close() 50 except socket.error: 51 pass 52 else: 53 args = [] 54 from twisted import version as v 55 if (v.major, v.minor, v.micro) > (11, 0, 0): 56 args.append(orderly) 57 tcp.Server._closeSocket(self, *args)
58 59
60 -class PassableClientConnection(_SocketMaybeCloser, tcp.Client):
61 pass
62 63
64 -class PassableClientConnector(tcp.Connector):
65 # It is unfortunate, but it seems that either we override this 66 # private-ish method or reimplement BaseConnector.connect(). This is 67 # the path that tcp.py takes, so we take it too. 68
69 - def _makeTransport(self):
70 return PassableClientConnection(self.host, self.port, 71 self.bindAddress, self, 72 self.reactor)
73 74
75 -class FeedClientFactory(fpb.FPBClientFactory, log.Loggable):
76 """ 77 I am a client factory used by a feed component's medium to log into 78 a worker and exchange feeds. 79 """ 80 logCategory = 'feedclient' 81 perspectiveInterface = interfaces.IFeedMedium 82
83 - def __init__(self, medium):
84 fpb.FPBClientFactory.__init__(self) 85 self.medium = medium
86 87 # not a BaseMedium because we are going to do strange things to the transport 88 89
90 -class FeedMedium(fpb.Referenceable):
91 """ 92 I am a client for a Feed Server. 93 94 I am used as the remote interface between a component and another 95 component. 96 97 @ivar component: the component this is a feed client for 98 @type component: L{flumotion.component.feedcomponent.FeedComponent} 99 @ivar remote: a reference to a 100 L{flumotion.worker.feedserver.FeedAvatar} 101 @type remote: L{twisted.spread.pb.RemoteReference} 102 """ 103 logCategory = 'feedmedium' 104 remoteLogName = 'feedserver' 105 implements(interfaces.IFeedMedium) 106 107 remote = None 108
109 - def __init__(self, logName=None):
110 if logName: 111 assert isinstance(logName, str) 112 self.logName = logName 113 self._factory = None 114 self._feedToDeferred = defer.Deferred()
115
116 - def startConnecting(self, host, port, authenticator, timeout=30, 117 bindAddress=None):
118 """Optional helper method to connect to a remote feed server. 119 120 This method starts a client factory connecting via a 121 L{PassableClientConnector}. It offers the possibility of 122 cancelling an in-progress connection via the stopConnecting() 123 method. 124 125 @param host: the remote host name 126 @type host: str 127 @param port: the tcp port on which to connect 128 @param port int 129 @param authenticator: the authenticator, normally provided by 130 the worker 131 @type authenticator: L{flumotion.twisted.pb.Authenticator} 132 133 @returns: a deferred that will fire with the remote reference, 134 once we have authenticated. 135 """ 136 assert self._factory is None 137 self._factory = FeedClientFactory(self) 138 c = PassableClientConnector(host, port, self._factory, timeout, 139 bindAddress, reactor=reactor) 140 c.connect() 141 142 return self._factory.login(authenticator)
143
144 - def requestFeed(self, host, port, authenticator, fullFeedId):
145 """Request a feed from a remote feed server. 146 147 This helper method calls startConnecting() to make the 148 connection and authenticate, and will return the feed file 149 descriptor or an error. A pending connection attempt can be 150 cancelled via stopConnecting(). 151 152 @param host: the remote host name 153 @type host: str 154 @param port: the tcp port on which to connect 155 @type port: int 156 @param authenticator: the authenticator, normally provided by 157 the worker 158 @type authenticator: L{flumotion.twisted.pb.Authenticator} 159 @param fullFeedId: the full feed id (/flow/component:feed) 160 offered by the remote side 161 @type fullFeedId: str 162 163 @returns: a deferred that, if successful, will fire with a pair 164 (feedId, fd). In an error case it will errback and close the 165 remote connection. 166 """ 167 168 def connected(remote): 169 self.setRemoteReference(remote) 170 return remote.callRemote('sendFeed', fullFeedId)
171 172 def feedSent(res): 173 # res is None 174 # either just before or just after this, we received a 175 # sendFeedReply call from the feedserver. so now we're 176 # waiting on the component to get its fd 177 return self._feedToDeferred
178 179 def error(failure): 180 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 181 host, port) 182 self.debug('failure: %s', log.getFailureMessage(failure)) 183 self.debug('closing connection') 184 self.stopConnecting() 185 return failure 186 187 d = self.startConnecting(host, port, authenticator) 188 d.addCallback(connected) 189 d.addCallback(feedSent) 190 d.addErrback(error) 191 return d 192
193 - def sendFeed(self, host, port, authenticator, fullFeedId):
194 """Send a feed to a remote feed server. 195 196 This helper method calls startConnecting() to make the 197 connection and authenticate, and will return the feed file 198 descriptor or an error. A pending connection attempt can be 199 cancelled via stopConnecting(). 200 201 @param host: the remote host name 202 @type host: str 203 @param port: the tcp port on which to connect 204 @type port: int 205 @param authenticator: the authenticator, normally provided by 206 the worker 207 @type authenticator: L{flumotion.twisted.pb.Authenticator} 208 @param fullFeedId: the full feed id (/flow/component:eaterAlias) 209 to feed to on the remote size 210 @type fullFeedId: str 211 212 @returns: a deferred that, if successful, will fire with a pair 213 (feedId, fd). In an error case it will errback and close the 214 remote connection. 215 """ 216 217 def connected(remote): 218 assert isinstance(remote.broker.transport, _SocketMaybeCloser) 219 self.setRemoteReference(remote) 220 return remote.callRemote('receiveFeed', fullFeedId)
221 222 def feedSent(res): 223 t = self.remote.broker.transport 224 self.debug('stop reading from transport') 225 t.stopReading() 226 227 self.debug('flushing PB write queue') 228 t.doWrite() 229 self.debug('stop writing to transport') 230 t.stopWriting() 231 232 t.keepSocketAlive = True 233 fd = os.dup(t.fileno()) 234 235 # avoid refcount cycles 236 self.setRemoteReference(None) 237 238 d = defer.Deferred() 239 240 def loseConnection(): 241 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 242 d.callback((fullFeedId, fd)) 243 244 reactor.callLater(0, loseConnection) 245 return d 246 247 def error(failure): 248 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 249 host, port) 250 self.debug('failure: %s', log.getFailureMessage(failure)) 251 self.debug('closing connection') 252 self.stopConnecting() 253 return failure 254 255 d = self.startConnecting(host, port, authenticator) 256 d.addCallback(connected) 257 d.addCallback(feedSent) 258 d.addErrback(error) 259 return d 260
261 - def stopConnecting(self):
262 """Stop a pending or established connection made via 263 startConnecting(). 264 265 Stops any established or pending connection to a remote feed 266 server started via the startConnecting() method. Safe to call 267 even if connection has not been started. 268 """ 269 if self._factory: 270 self._factory.disconnect() 271 self._factory = None 272 # not sure if this is necessary; call it just in case, so we 273 # don't leave a lingering reference cycle 274 self.setRemoteReference(None)
275 276 ### IMedium methods 277
278 - def setRemoteReference(self, remoteReference):
279 self.remote = remoteReference
280
281 - def hasRemoteReference(self):
282 return self.remote is not None
283
284 - def callRemote(self, name, *args, **kwargs):
285 return self.remote.callRemote(name, args, kwargs)
286
287 - def remote_sendFeedReply(self, fullFeedId):
288 t = self.remote.broker.transport 289 # make sure we stop receiving PB messages 290 self.debug('stop reading from transport') 291 t.stopReading() 292 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
293
294 - def _doFeedTo(self, fullFeedId, t):
295 self.debug('flushing PB write queue') 296 t.doWrite() 297 self.debug('stop writing to transport') 298 t.stopWriting() 299 300 # make sure shutdown() is not called on the socket 301 t.keepSocketAlive = True 302 303 fd = os.dup(t.fileno()) 304 # Similar to feedserver._sendFeedReplyCb, but since we are in a 305 # callLater, not doReadOrWrite, we call connectionLost directly 306 # on the transport. 307 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 308 309 # This medium object is of no use any more; drop our reference 310 # to the remote so we can avoid cycles. 311 self.setRemoteReference(None) 312 313 (flowName, componentName, feedName) = common.parseFullFeedId( 314 fullFeedId) 315 feedId = common.feedId(componentName, feedName) 316 317 self.debug('firing deferred with feedId %s on fd %d', feedId, 318 fd) 319 self._feedToDeferred.callback((feedId, fd))
320