Package flumotion :: Package component :: Package misc :: Package porter :: Module porterclient
[hide private]

Source Code for Module flumotion.component.misc.porter.porterclient

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  from twisted.internet.protocol import Protocol, Factory 
 19  from twisted.internet.tcp import Port, Connection 
 20  from twisted.internet import reactor, address 
 21   
 22  from flumotion.common import medium, log 
 23  from flumotion.twisted import defer, fdserver 
 24  from flumotion.twisted import pb as fpb 
 25   
 26  import socket 
 27   
 28  __version__ = "$Rev$" 
 29   
 30   
 31  # Very similar to tcp.Server, but we need to call things in a different order 
 32   
 33   
34 -class FDPorterServer(Connection):
35 """ 36 A connection class for use with passed FDs. 37 Similar to tcp.Server, but gets the initial FD from a different source, 38 obviously, and also passes along some data with the original connection. 39 """ 40
41 - def __init__(self, sock, protocol, addr, additionalData):
42 Connection.__init__(self, sock, protocol) 43 self.client = addr 44 45 # Inform the protocol we've made a connection. 46 protocol.makeConnection(self) 47 48 # Now, we want to feed in the extra data BEFORE the reactor reads 49 # anything additional from the socket. However, if we call this in 50 # the other order, and the socket gets closed (or passed to something 51 # non-twisted) after just the initial chunk, we'll be calling 52 # startReading() on something we've already stopped reading. That won't 53 # work too well... Fortunately, the reactor runs in this thread, so 54 # merely adding it (with startReading()) can't cause a read to happen 55 # immediately. 56 self.startReading() 57 self.connected = 1 58 59 protocol.dataReceived(additionalData)
60
61 - def getHost(self):
62 return address.IPv4Address('TCP', *( 63 self.socket.getsockname() + ('INET', )))
64
65 - def getPeer(self):
66 return address.IPv4Address('TCP', *(self.client + ('INET', )))
67 68
69 -class PorterMedium(medium.BaseMedium):
70 """ 71 A medium we use to talk to the porter. 72 Mostly, we use this to say what mountpoints (or perhaps, later, 73 (hostname, mountpoint) pairs?) we expect to receive requests for. 74 """ 75
76 - def registerPath(self, path):
77 return self.callRemote("registerPath", path)
78
79 - def deregisterPath(self, path):
80 return self.callRemote("deregisterPath", path)
81
82 - def registerPrefix(self, prefix):
83 return self.callRemote("registerPrefix", prefix)
84
85 - def deregisterPrefix(self, prefix):
86 return self.callRemote("deregisterPrefix", prefix)
87
88 - def getPort(self):
89 90 def handle_error(failure): 91 self.warning('Error getting port from old porter: %r', failure) 92 return 80
93 94 d = self.callRemote("getPort") 95 d.addErrback(handle_error) 96 return d
97 98
99 -class PorterClientFactory(fpb.ReconnectingPBClientFactory):
100 """ 101 A PB client factory that knows how to log into a Porter. 102 Lives in streaming components, and accepts FDs passed over this connection. 103 """ 104
105 - def __init__(self, childFactory):
106 """ 107 Create a PorterClientFactory that will use childFactory to create 108 protocol instances for clients attached to the FDs received over this 109 connection. 110 """ 111 fpb.ReconnectingPBClientFactory.__init__(self) 112 113 self.medium = PorterMedium() 114 115 self.protocol = fdserver.FDPassingBroker 116 self._childFactory = childFactory
117
118 - def buildProtocol(self, addr):
119 p = self.protocol(self._childFactory, FDPorterServer) 120 p.factory = self 121 return p
122
123 - def registerPath(self, path):
124 return self.medium.registerPath(path)
125
126 - def deregisterPath(self, path):
127 return self.medium.deregisterPath(path)
128
129 - def registerPrefix(self, prefix):
130 return self.medium.registerPrefix(prefix)
131
132 - def deregisterPrefix(self, prefix):
133 return self.medium.deregisterPrefix(prefix)
134
135 - def registerDefault(self):
136 return self.medium.registerPrefix("/")
137
138 - def deregisterDefault(self):
139 return self.medium.deregisterPrefix("/")
140 141
142 -class HTTPPorterClientFactory(PorterClientFactory):
143
144 - def __init__(self, childFactory, mountPoints, do_start_deferred, 145 prefixes=None):
146 """ 147 @param mountPoints: a list of mountPoint strings that should be 148 registered to the porter 149 """ 150 PorterClientFactory.__init__(self, childFactory) 151 self._mountPoints = mountPoints 152 self._prefixes = prefixes or [] 153 self._do_start_deferred = do_start_deferred 154 self.remote_port = None
155
156 - def _fireDeferred(self, r):
157 # If we still have the deferred, fire it (this happens after we've 158 # completed log in the _first_ time, not subsequent times) 159 if self._do_start_deferred: 160 self.debug("Firing initial deferred: should indicate " 161 "that login is complete") 162 self._do_start_deferred.callback(None) 163 self._do_start_deferred = None
164
165 - def gotDeferredLogin(self, deferred):
166 # This is called when we start logging in to give us the deferred for 167 # the login process. Once we're logged in, we want to set our 168 # remote ref, then register our path with the porter, then (possibly) 169 # fire a different deferred 170 self.debug("Got deferred login, adding callbacks") 171 deferred.addCallback(self.medium.setRemoteReference) 172 deferred.addCallback(lambda r: self.medium.getPort()) 173 deferred.addCallback(self._setRemotePort) 174 for mount in self._mountPoints: 175 self.debug("Registering mount point %s with porter", mount) 176 deferred.addCallback(lambda r, m: self.registerPath(m), mount) 177 for mount in self._prefixes: 178 self.debug("Registering mount prefix %s with porter", mount) 179 deferred.addCallback(lambda r, m: self.registerPrefix(m), mount) 180 deferred.addCallback(self._fireDeferred)
181
182 - def _setRemotePort(self, port):
183 self.remote_port = port 184 return port
185