Package flumotion :: Package component :: Package common :: Package streamer :: Module mfdsresources
[hide private]

Source Code for Module flumotion.component.common.streamer.mfdsresources

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import os 
 19  import time 
 20  import errno 
 21  import fcntl 
 22  import string 
 23   
 24  import gst 
 25   
 26  from twisted.web import server, resource as web_resource 
 27  from twisted.internet import reactor, defer 
 28   
 29  from flumotion.common import log 
 30  from flumotion.component.common.streamer import resources 
 31   
 32   
33 -class MultiFdSinkStreamingResource(resources.HTTPStreamingResource, 34 log.Loggable):
35
36 - def __init__(self, streamer, httpauth):
38
39 - def isReady(self):
40 if not self.streamer.hasCaps(): 41 self.debug('We have no caps yet') 42 return False 43 return True
44
45 - def clientRemoved(self, sink, fd, reason, stats):
46 # this is the callback attached to our flumotion component, 47 # not the GStreamer element 48 if fd in self._requests: 49 request = self._requests[fd] 50 self._removeClient(request, fd, stats) 51 else: 52 self.warning('[fd %5d] not found in _requests' % fd)
53
54 - def _logWrite(self, request, stats):
55 if stats: 56 bytes_sent = stats[0] 57 time_connected = int(stats[3] / gst.SECOND) 58 else: 59 bytes_sent = -1 60 time_connected = -1 61 return self.logWrite(request, bytes_sent, time_connected)
62
63 - def _removeClient(self, request, fd, stats):
64 """ 65 Removes a request and add logging. 66 Note that it does not disconnect the client; it is called in reaction 67 to a client disconnecting. 68 69 @param request: the request 70 @type request: L{twisted.protocols.http.Request} 71 @param fd: the file descriptor for the client being removed 72 @type fd: L{int} 73 @param stats: the statistics for the removed client 74 @type stats: GValueArray 75 """ 76 # PROBE: finishing request; see httpserver.httpserver 77 self.debug('[fd %5d] (ts %f) finishing request %r', 78 request.transport.fileno(), time.time(), request) 79 80 ip = request.getClientIP() 81 if self._logRequestFromIP(ip): 82 d = self._logWrite(request, stats) 83 else: 84 d = defer.succeed(True) 85 self.info('[fd %5d] Client from %s disconnected' % (fd, ip)) 86 87 # We can't call request.finish(), since we already "stole" the fd, we 88 # just loseConnection on the transport directly, and delete the 89 # Request object, after cleaning up the bouncer bits. 90 self.httpauth.cleanupAuth(fd) 91 92 self.debug('[fd %5d] (ts %f) closing transport %r', fd, time.time(), 93 request.transport) 94 # This will close the underlying socket. We first remove the request 95 # from our fd->request map, since the moment we call this the fd might 96 # get re-added. 97 request.transport.loseConnection() 98 99 self.debug('[fd %5d] closed transport %r' % (fd, request.transport)) 100 101 def _done(_): 102 if fd in self._removing: 103 self.debug("client is removed; firing deferred") 104 removeD = self._removing.pop(fd) 105 removeD.callback(None) 106 107 resources.HTTPStreamingResource._removeClient(self, fd) 108 # PROBE: finished request; see httpserver.httpserver 109 self.debug('[fd %5d] (ts %f) finished request %r', 110 fd, time.time(), request)
111 112 d.addCallback(_done) 113 return d
114 115 ### resource.Resource methods 116
117 - def handleAuthenticatedRequest(self, res, request):
118 # PROBE: authenticated request; see httpserver.httpfile 119 self.debug('[fd %5d] (ts %f) authenticated request %r', 120 request.transport.fileno(), time.time(), request) 121 122 if request.method == 'GET': 123 self._handleNewClient(request) 124 elif request.method == 'HEAD': 125 self.debug('handling HEAD request') 126 self._writeHeaders(request) 127 request.finish() 128 else: 129 raise AssertionError 130 131 return res
132
133 - def _formatHeaders(self, request):
134 # Mimic Twisted as close as possible 135 headers = [] 136 for name, value in request.headers.items(): 137 headers.append('%s: %s\r\n' % (name, value)) 138 for cookie in request.cookies: 139 headers.append('%s: %s\r\n' % ("Set-Cookie", cookie)) 140 return headers
141
142 - def _writeHeaders(self, request):
143 """ 144 Write out the HTTP headers for the incoming HTTP request. 145 146 @rtype: boolean 147 @returns: whether or not the file descriptor can be used further. 148 """ 149 fd = request.transport.fileno() 150 fdi = request.fdIncoming 151 152 # the fd could have been closed, in which case it will be -1 153 if fd == -1: 154 self.info('[fd %5d] Client gone before writing header' % fdi) 155 # FIXME: do this ? del request 156 return False 157 if fd != request.fdIncoming: 158 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd)) 159 # FIXME: do this ? del request 160 return False 161 162 self._setRequestHeaders(request) 163 164 # Call request modifiers 165 for modifier in self.modifiers: 166 modifier.modify(request) 167 168 headers = self._formatHeaders(request) 169 170 ### FIXME: there's a window where Twisted could have removed the 171 # fd because the client disconnected. Catch EBADF correctly here. 172 try: 173 # TODO: This is a non-blocking socket, we really should check 174 # return values here, or just let twisted handle all of this 175 # normally, and not hand off the fd until after twisted has 176 # finished writing the headers. 177 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers)) 178 # tell TwistedWeb we already wrote headers ourselves 179 request.startedWriting = True 180 return True 181 except OSError, (no, s): 182 if no == errno.EBADF: 183 self.info('[fd %5d] client gone before writing header' % fd) 184 elif no == errno.ECONNRESET: 185 self.info( 186 '[fd %5d] client reset connection writing header' % fd) 187 else: 188 self.info( 189 '[fd %5d] unhandled write error when writing header: %s' 190 % (fd, s)) 191 # trigger cleanup of request 192 del request 193 return False
194
195 - def _handleNewClient(self, request):
196 # everything fulfilled, serve to client 197 fdi = request.fdIncoming 198 if not self._writeHeaders(request): 199 self.debug("[fd %5d] not adding as a client" % fdi) 200 return 201 202 # take over the file descriptor from Twisted by removing them from 203 # the reactor 204 # spiv told us to remove* on request.transport, and that works 205 # then we figured out that a new request is only a Reader, so we 206 # remove the removedWriter - this is because we never write to the 207 # socket through twisted, only with direct os.write() calls from 208 # _writeHeaders. 209 210 # see http://twistedmatrix.com/trac/ticket/1796 for a guarantee 211 # that this is a supported way of stealing the socket 212 fd = fdi 213 self.debug("[fd %5d] taking away from Twisted" % fd) 214 reactor.removeReader(request.transport) 215 #reactor.removeWriter(request.transport) 216 217 # check if it's really an open fd (i.e. that twisted didn't close it 218 # before the removeReader() call) 219 try: 220 fcntl.fcntl(fd, fcntl.F_GETFL) 221 except IOError, e: 222 if e.errno == errno.EBADF: 223 self.warning("[fd %5d] is not actually open, ignoring" % fd) 224 else: 225 self.warning("[fd %5d] error during check: %s (%d)" % ( 226 fd, e.strerror, e.errno)) 227 return 228 229 self._addClient(fd, request) 230 231 # hand it to multifdsink 232 self.streamer.add_client(fd, request) 233 ip = request.getClientIP() 234 235 # PROBE: started request; see httpfile.httpfile 236 self.debug('[fd %5d] (ts %f) started request %r', 237 fd, time.time(), request) 238 239 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
240
241 - def _render(self, request):
242 fd = request.transport.fileno() 243 # we store the fd again in the request using it as an id for later 244 # on, so we can check when an fd went away (being -1) inbetween 245 request.fdIncoming = fd 246 247 # PROBE: incoming request; see httpserver.httpfile 248 self.debug('[fd %5d] (ts %f) incoming request %r', 249 fd, time.time(), request) 250 251 self.info('[fd %5d] Incoming client connection from %s' % ( 252 fd, request.getClientIP())) 253 self.debug('[fd %5d] _render(): request %s' % ( 254 fd, request)) 255 256 if not self.isReady(): 257 return self._handleNotReady(request) 258 elif self.reachedServerLimits(): 259 return self._handleServerFull(request) 260 261 self.debug('_render(): asked for (possible) authentication') 262 d = self.httpauth.startAuthentication(request) 263 d.addCallback(self.handleAuthenticatedRequest, request) 264 # Authentication has failed and we've written a response; nothing 265 # more to do 266 d.addErrback(lambda x: None) 267 268 # we MUST return this from our _render. 269 return server.NOT_DONE_YET
270 271 render_GET = _render 272 render_HEAD = _render 273 274
275 -class HTTPRoot(web_resource.Resource, log.Loggable):
276 logCategory = "httproot" 277
278 - def getChildWithDefault(self, path, request):
279 # we override this method so that we can look up tree resources 280 # directly without having their parents. 281 # There's probably a more Twisted way of doing this, but ... 282 fullPath = path 283 if request.postpath: 284 fullPath += '/' + string.join(request.postpath, '/') 285 self.debug("[fd %5d] Incoming request %r for path %s", 286 request.transport.fileno(), request, fullPath) 287 r = web_resource.Resource.getChildWithDefault(self, fullPath, request) 288 self.debug("Returning resource %r" % r) 289 return r
290