Package flumotion :: Package twisted :: Module fdserver
[hide private]

Source Code for Module flumotion.twisted.fdserver

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  from flumotion.common import log 
 19  from flumotion.extern.fdpass import fdpass 
 20   
 21  from twisted.internet import unix, main, address, tcp 
 22  from twisted.spread import pb 
 23   
 24  import errno 
 25  import os 
 26  import socket 
 27  import struct 
 28  import time 
 29   
 30  __version__ = "$Rev$" 
 31   
 32   
 33  # Heavily based on 
 34  # http://twistedmatrix.com/trac/browser/sandbox/exarkun/copyover/server.py 
 35  # and client.py 
 36  # Thanks for the inspiration! 
 37   
 38  # Since we're doing this over a stream socket, our file descriptor messages 
 39  # aren't guaranteed to be received alone; they could arrive along with some 
 40  # unrelated data. 
 41  # So, we prefix the message with a 16 byte randomly generated magic signature, 
 42  # and a length, and if we receive file descriptors decode based on this. 
 43  # 
 44  # map() instead of a string to workaround gettext encoding problems. 
 45  # 
 46  MAGIC_SIGNATURE = ''.join(map(chr, [253, 252, 142, 127, 7, 71, 185, 234, 
 47                                      161, 117, 238, 216, 220, 54, 200, 163])) 
 48   
 49   
50 -class FDServer(unix.Server):
51
52 - def sendFileDescriptor(self, fileno, data=""):
53 message = struct.pack("@16sI", MAGIC_SIGNATURE, len(data)) + data 54 return fdpass.writefds(self.fileno(), [fileno], message)
55 56
57 -class FDPort(unix.Port):
58 transport = FDServer
59 60
61 -class FDClient(unix.Client): #, log.Loggable):
62
63 - def doRead(self):
64 if not self.connected: 65 return 66 try: 67 (fds, message) = fdpass.readfds(self.fileno(), 64 * 1024) 68 except OSError, e: 69 if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): 70 return 71 else: 72 return main.CONNECTION_LOST 73 else: 74 if not message: 75 return main.CONNECTION_DONE 76 77 if len(fds) > 0: 78 # Look for our magic cookie in (possibly) the midst of other 79 # data. Pass surrounding chunks, if any, onto dataReceived(), 80 # which (undocumentedly) must return None unless a failure 81 # occurred. 82 # Pass the actual FDs and their message to 83 # fileDescriptorsReceived() 84 offset = message.find(MAGIC_SIGNATURE) 85 if offset < 0: 86 # Old servers did not send this; be hopeful that this 87 # doesn't have bits of other protocol (i.e. PB) mixed up 88 # in it. 89 return self.protocol.fileDescriptorsReceived(fds, message) 90 elif offset > 0: 91 ret = self.protocol.dataReceived(message[0:offset]) 92 if ret: 93 return ret 94 95 msglen = struct.unpack("@I", message[offset+16:offset+20])[0] 96 offset += 20 97 ret = self.protocol.fileDescriptorsReceived(fds, 98 message[offset:offset+msglen]) 99 if ret: 100 return ret 101 102 if offset+msglen < len(message): 103 return self.protocol.dataReceived(message[offset+msglen:]) 104 return ret 105 else: 106 # self.debug("No FDs, passing to dataReceived") 107 return self.protocol.dataReceived(message)
108 109
110 -class FDConnector(unix.Connector):
111
112 - def _makeTransport(self):
113 return FDClient(self.address, self, self.reactor)
114 115
116 -class FDPassingBroker(pb.Broker, log.Loggable):
117 """ 118 A pb.Broker subclass that handles FDs being passed to it (with associated 119 data) over the same connection as the normal PB data stream. 120 When an FD is seen, it creates new protocol objects for them from the 121 childFactory attribute. 122 """ 123 # FIXME: looks like we can only use our own subclasses that take 124 # three __init__ args 125
126 - def __init__(self, childFactory, connectionClass, **kwargs):
127 """ 128 @param connectionClass: subclass of L{twisted.internet.tcp.Connection} 129 """ 130 pb.Broker.__init__(self, **kwargs) 131 132 self.childFactory = childFactory 133 self._connectionClass = connectionClass
134 135 # This is the complex bit. If our underlying transport receives a file 136 # descriptor, this gets called - along with the data we got with the FD. 137 # We create an appropriate protocol object, and attach it to the reactor. 138
139 - def fileDescriptorsReceived(self, fds, message):
140 if len(fds) == 1: 141 fd = fds[0] 142 143 # Note that we hardcode IPv4 here! 144 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 145 146 # PROBE: received fd; see porter.py 147 self.debug("[fd %5d] (ts %f) received fd from %d, created socket", 148 sock.fileno(), time.time(), fd) 149 150 # Undocumentedly (other than a comment in 151 # Python/Modules/socketmodule.c), socket.fromfd() calls dup() on 152 # the passed FD before it actually wraps it in a socket object. 153 # So, we need to close the FD that we originally had... 154 os.close(fd) 155 156 try: 157 peeraddr = sock.getpeername() 158 except socket.error: 159 self.info("Socket disconnected before being passed to client") 160 sock.close() 161 return 162 163 # Based on bits in tcp.Port.doRead() 164 addr = address._ServerFactoryIPv4Address('TCP', 165 peeraddr[0], peeraddr[1]) 166 protocol = self.childFactory.buildProtocol(addr) 167 168 self._connectionClass(sock, protocol, peeraddr, message) 169 else: 170 self.warning("Unexpected: FD-passing message with len(fds) != 1")
171 172
173 -class _SocketMaybeCloser(tcp._SocketCloser):
174 keepSocketAlive = False 175
176 - def _closeSocket(self):
177 # We override this (from tcp._SocketCloser) so that we can close 178 # sockets properly in the normal case, but once we've passed our 179 # socket on via the FD-channel, we just close() it (not calling 180 # shutdown() which will close the TCP channel without closing 181 # the FD itself) 182 if self.keepSocketAlive: 183 try: 184 self.socket.close() 185 except socket.error: 186 pass 187 else: 188 tcp.Server._closeSocket(self)
189 190
191 -class PassableServerConnection(_SocketMaybeCloser, tcp.Server):
192 """ 193 A subclass of tcp.Server that permits passing the FDs used to other 194 processes (by just calling close(2) rather than shutdown(2) on them) 195 """ 196 pass
197 198
199 -class PassableServerPort(tcp.Port):
200 transport = PassableServerConnection
201