Package flumotion :: Package component :: Package base :: Module http
[hide private]

Source Code for Module flumotion.component.base.http

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import struct 
 19  import socket 
 20   
 21  from twisted.web import http 
 22  from twisted.internet import reactor, defer 
 23  from twisted.python import failure 
 24   
 25  from flumotion.configure import configure 
 26  from flumotion.common import errors 
 27  from flumotion.twisted.credentials import cryptChallenge 
 28   
 29  from flumotion.common import log, keycards 
 30   
 31  #__all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer'] 
 32  __version__ = "$Rev$" 
 33   
 34   
 35  HTTP_SERVER_NAME = 'FlumotionHTTPServer' 
 36  HTTP_SERVER_VERSION = configure.version 
 37   
 38  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 39  <html> 
 40  <head> 
 41    <title>%(code)d %(error)s</title> 
 42  </head> 
 43  <body> 
 44  <h2>%(code)d %(error)s</h2> 
 45  </body> 
 46  </html> 
 47  """ 
 48   
 49  HTTP_SERVER = '%s/%s' % (HTTP_SERVER_NAME, HTTP_SERVER_VERSION) 
 50   
 51  ### This is new Issuer code that eventually should move to e.g. 
 52  ### flumotion.common.keycards or related 
 53   
 54   
55 -class HTTPGenericIssuer(log.Loggable):
56 """ 57 I create L{flumotion.common.keycards.Keycard} based on an 58 HTTP request. Useful for authenticating based on 59 server-side checks such as time, as well as client credentials 60 such as HTTP Auth, get parameters, IP address and token. 61 """ 62
63 - def issue(self, request):
64 keycard = keycards.KeycardGeneric() 65 keycard.username = request.getUser() 66 keycard.password = request.getPassword() 67 keycard.address = request.getClientIP() 68 # args can have lists as values, if more than one specified 69 token = request.args.get('token', '') 70 if not isinstance(token, str): 71 token = token[0] 72 keycard.token = token 73 keycard.arguments = request.args 74 keycard.path = request.path 75 self.debug("Asking for authentication, generic HTTP") 76 return keycard
77 78 79 BOUNCER_SOCKET = 'flumotion.component.bouncers.plug.BouncerPlug' 80 BUS_SOCKET = 'flumotion.component.plugs.bus.BusPlug' 81 82
83 -class HTTPAuthentication(log.Loggable):
84 """ 85 Helper object for handling HTTP authentication for twisted.web 86 Resources, using issuers and bouncers. 87 """ 88 89 logCategory = 'httpauth' 90 91 KEYCARD_TTL = 60 * 60 92 KEYCARD_KEEPALIVE_INTERVAL = 20 * 60 93 KEYCARD_TRYAGAIN_INTERVAL = 1 * 60 94
95 - def __init__(self, component):
96 self.component = component 97 self._fdToKeycard = {} # request fd -> Keycard 98 self._idToKeycard = {} # keycard id -> Keycard 99 self._fdToDurationCall = {} # request fd -> IDelayedCall 100 # for duration 101 self._domain = None # used for auth challenge and on keycard 102 self._issuer = HTTPGenericIssuer() # issues keycards;default for compat 103 self.bouncerName = None 104 self.setRequesterId(component.getName()) 105 self._defaultDuration = None # default duration to use if the keycard 106 # doesn't specify one. 107 self._allowDefault = False # whether failures communicating with 108 # the bouncer should result in HTTP 500 109 # or with allowing the connection 110 self._pendingCleanups = [] 111 self._keepAlive = None 112 113 if (BOUNCER_SOCKET in self.component.plugs 114 and self.component.plugs[BOUNCER_SOCKET]): 115 assert len(self.component.plugs[BOUNCER_SOCKET]) == 1 116 self.plug = self.component.plugs[BOUNCER_SOCKET][0] 117 self.plug.set_expire_function(self.expireKeycards) 118 else: 119 self.plug = None
120
121 - def scheduleKeepAlive(self, tryingAgain=False):
122 123 def timeout(): 124 125 def reschedule(res): 126 if isinstance(res, failure.Failure): 127 self.info('keepAlive failed, rescheduling in %d ' 128 'seconds', self.KEYCARD_TRYAGAIN_INTERVAL) 129 self._keepAlive = None 130 self.scheduleKeepAlive(tryingAgain=True) 131 else: 132 self.info('keepAlive successful') 133 self._keepAlive = None 134 self.scheduleKeepAlive(tryingAgain=False)
135 136 if self.bouncerName is not None: 137 self.debug('calling keepAlive on bouncer %s', 138 self.bouncerName) 139 d = self.keepAlive(self.bouncerName, self.issuerName, 140 self.KEYCARD_TTL) 141 d.addCallbacks(reschedule, reschedule) 142 else: 143 self.scheduleKeepAlive()
144 145 if tryingAgain: 146 self._keepAlive = reactor.callLater( 147 self.KEYCARD_TRYAGAIN_INTERVAL, timeout) 148 else: 149 self._keepAlive = reactor.callLater( 150 self.KEYCARD_KEEPALIVE_INTERVAL, timeout) 151
152 - def stopKeepAlive(self):
153 if self._keepAlive is not None: 154 self._keepAlive.cancel() 155 self._keepAlive = None
156
157 - def setDomain(self, domain):
158 """ 159 Set a domain name on the resource, used in HTTP auth challenges and 160 on the keycard. 161 162 @type domain: string 163 """ 164 self._domain = domain
165
166 - def setBouncerName(self, bouncerName):
167 self.bouncerName = bouncerName
168
169 - def setRequesterId(self, requesterId):
170 self.requesterId = requesterId 171 # make something uniquey 172 self.issuerName = str(self.requesterId) + '-' + cryptChallenge()
173
174 - def setDefaultDuration(self, defaultDuration):
175 self._defaultDuration = defaultDuration
176
177 - def setAllowDefault(self, allowDefault):
178 self._allowDefault = allowDefault
179
180 - def authenticate(self, request):
181 """ 182 Returns: a deferred returning a keycard or None 183 """ 184 keycard = self._issuer.issue(request) 185 if not keycard: 186 self.debug('no keycard from issuer, firing None') 187 return defer.succeed(None) 188 189 keycard.requesterId = self.requesterId 190 keycard.issuerName = self.issuerName 191 keycard._fd = request.transport.fileno() 192 keycard.domain = self._domain 193 194 if self.plug: 195 self.debug('authenticating against plug') 196 return self.plug.authenticate(keycard) 197 elif self.bouncerName == None: 198 self.debug('no bouncer, accepting') 199 return defer.succeed(keycard) 200 else: 201 keycard.ttl = self.KEYCARD_TTL 202 self.debug('sending keycard to remote bouncer %r', 203 self.bouncerName) 204 return self.authenticateKeycard(self.bouncerName, keycard)
205
206 - def authenticateKeycard(self, bouncerName, keycard):
207 return self.component.medium.authenticate(bouncerName, keycard)
208
209 - def keepAlive(self, bouncerName, issuerName, ttl):
210 return self.component.medium.keepAlive(bouncerName, issuerName, ttl)
211
212 - def cleanupKeycard(self, bouncerName, keycard):
213 return self.component.medium.removeKeycardId(bouncerName, keycard.id)
214 215 # FIXME: check this 216
217 - def clientDone(self, fd):
218 return self.component.remove_client(fd)
219
220 - def doCleanupKeycard(self, bouncerName, keycard):
221 # cleanup this one keycard, and take the opportunity to retry 222 # previous failed cleanups 223 224 def cleanup(bouncerName, keycard): 225 226 def cleanupLater(res, pair): 227 self.log('failed to clean up keycard %r, will do ' 228 'so later', keycard) 229 self._pendingCleanups.append(pair)
230 d = self.cleanupKeycard(bouncerName, keycard) 231 d.addErrback(cleanupLater, (bouncerName, keycard)) 232 pending = self._pendingCleanups 233 self._pendingCleanups = [] 234 cleanup(bouncerName, keycard) 235 for bouncerName, keycard in pending: 236 cleanup(bouncerName, keycard) 237 238 # public 239
240 - def cleanupAuth(self, fd):
241 if self.bouncerName and fd in self._fdToKeycard: 242 keycard = self._fdToKeycard[fd] 243 self.debug('[fd %5d] asking bouncer %s to remove keycard id %s', 244 fd, self.bouncerName, keycard.id) 245 self.doCleanupKeycard(self.bouncerName, keycard) 246 self._removeKeycard(fd)
247
248 - def _removeKeycard(self, fd):
249 if (self.bouncerName or self.plug) and fd in self._fdToKeycard: 250 keycard = self._fdToKeycard[fd] 251 del self._fdToKeycard[fd] 252 del self._idToKeycard[keycard.id] 253 if fd in self._fdToDurationCall: 254 self.debug('[fd %5d] canceling later expiration call' % fd) 255 self._fdToDurationCall[fd].cancel() 256 del self._fdToDurationCall[fd]
257
258 - def _durationCallLater(self, fd):
259 """ 260 Expire a client due to a duration expiration. 261 """ 262 self.debug('[fd %5d] duration exceeded, expiring client' % fd) 263 264 # we're called from a callLater, so we've already run; just delete 265 if fd in self._fdToDurationCall: 266 del self._fdToDurationCall[fd] 267 268 self.debug('[fd %5d] asking streamer to remove client' % fd) 269 self.clientDone(fd)
270
271 - def expireKeycard(self, keycardId):
272 """ 273 Expire a client's connection associated with the keycard Id. 274 """ 275 keycard = self._idToKeycard[keycardId] 276 277 fd = keycard._fd 278 279 self.debug('[fd %5d] expiring client' % fd) 280 281 self._removeKeycard(fd) 282 283 self.debug('[fd %5d] asking streamer to remove client' % fd) 284 self.clientDone(fd)
285
286 - def expireKeycards(self, keycardIds):
287 """ 288 Expire client's connections associated with the keycard Ids. 289 """ 290 expired = 0 291 for keycardId in keycardIds: 292 try: 293 self.expireKeycard(keycardId) 294 expired += 1 295 except KeyError, e: 296 self.warn("Failed to expire keycard %r: %s", 297 keycardId, log.getExceptionMessage(e)) 298 return expired
299 300 ### resource.Resource methods 301
302 - def startAuthentication(self, request):
303 d = self.authenticate(request) 304 d.addCallback(self._authenticatedCallback, request) 305 d.addErrback(self._authenticatedErrback, request) 306 d.addErrback(self._defaultErrback, request) 307 308 return d
309
310 - def _authenticatedCallback(self, keycard, request):
311 # !: since we are a callback, the incoming fd might have gone away 312 # and closed 313 self.debug('_authenticatedCallback: keycard %r' % keycard) 314 if not keycard: 315 raise errors.NotAuthenticatedError() 316 317 # properly authenticated 318 if request.method == 'GET': 319 fd = request.transport.fileno() 320 321 if self.bouncerName or self.plug: 322 # the request was finished before the callback was executed 323 if fd == -1: 324 self.debug('Request interrupted before authentification ' 325 'was finished: asking bouncer %s to remove ' 326 'keycard id %s', self.bouncerName, keycard.id) 327 self.doCleanupKeycard(self.bouncerName, keycard) 328 return None 329 if keycard.id in self._idToKeycard: 330 self.warning("Duplicate keycard id: refusing") 331 raise errors.NotAuthenticatedError() 332 333 self._fdToKeycard[fd] = keycard 334 self._idToKeycard[keycard.id] = keycard 335 336 duration = keycard.duration or self._defaultDuration 337 338 if duration: 339 self.debug('new connection on %d will expire in %f seconds' % ( 340 fd, duration)) 341 self._fdToDurationCall[fd] = reactor.callLater( 342 duration, self._durationCallLater, fd) 343 344 return None
345
346 - def _authenticatedErrback(self, failure, request):
347 failure.trap(errors.NotAuthenticatedError) 348 self._handleUnauthorized(request, http.UNAUTHORIZED) 349 return failure
350
351 - def _defaultErrback(self, failure, request):
352 if failure.check(errors.NotAuthenticatedError) is None: 353 # If something else went wrong, we want to either disconnect the 354 # client and give them a 500 Internal Server Error or just allow 355 # them, depending on the configuration. 356 self.debug("Authorization request failed: %s", 357 log.getFailureMessage(failure)) 358 if self._allowDefault: 359 self.debug("Authorization failed, but allowing anyway") 360 return None 361 self._handleUnauthorized(request, http.INTERNAL_SERVER_ERROR) 362 return failure
363
364 - def _handleUnauthorized(self, request, code):
365 self.debug('client from %s is unauthorized, returning code %r' % 366 (request.getClientIP(), code)) 367 request.setHeader('content-type', 'text/html') 368 request.setHeader('server', HTTP_SERVER_VERSION) 369 request.setHeader('Connection', 'close') 370 if self._domain and code == http.UNAUTHORIZED: 371 request.setHeader('WWW-Authenticate', 372 'Basic realm="%s"' % self._domain) 373 374 request.setResponseCode(code) 375 376 # we have to write data ourselves, 377 # since we already returned NOT_DONE_YET 378 html = ERROR_TEMPLATE % {'code': code, 379 'error': http.RESPONSES[code]} 380 request.write(html) 381 request.finish()
382 383
384 -class LogFilter:
385
386 - def __init__(self):
387 self.filters = [] # list of (network, mask)
388
389 - def addIPFilter(self, filter):
390 """ 391 Add an IP filter of the form IP/prefix-length (CIDR syntax), or just 392 a single IP address 393 """ 394 definition = filter.split('/') 395 if len(definition) == 2: 396 (net, prefixlen) = definition 397 prefixlen = int(prefixlen) 398 elif len(definition) == 1: 399 net = definition[0] 400 prefixlen = 32 401 else: 402 raise errors.ConfigError( 403 "Cannot parse filter definition %s" % filter) 404 405 if prefixlen < 0 or prefixlen > 32: 406 raise errors.ConfigError("Invalid prefix length") 407 408 mask = ~((1 << (32 - prefixlen)) - 1) 409 try: 410 net = struct.unpack(">I", socket.inet_pton(socket.AF_INET, net))[0] 411 except socket.error: 412 raise errors.ConfigError( 413 "Failed to parse network address %s" % net) 414 net = net & mask # just in case 415 416 self.filters.append((net, mask))
417
418 - def isInRange(self, ip):
419 """ 420 Return true if ip is in any of the defined network(s) for this filter 421 """ 422 # Handles IPv4 only. 423 realip = struct.unpack(">I", socket.inet_pton(socket.AF_INET, ip))[0] 424 for f in self.filters: 425 if (realip & f[1]) == f[0]: 426 return True 427 return False
428