Package flumotion :: Package component :: Package common :: Package streamer :: Module fragmentedresource
[hide private]

Source Code for Module flumotion.component.common.streamer.fragmentedresource

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import time 
 19  import base64 
 20  import hmac 
 21  import uuid 
 22  from datetime import datetime, timedelta 
 23   
 24  from twisted.internet import reactor 
 25  from twisted.web import server 
 26   
 27  try: 
 28      from twisted.web import http 
 29  except ImportError: 
 30      from twisted.protocols import http 
 31   
 32  from flumotion.common import log 
 33  from flumotion.component.common.streamer.resources import\ 
 34      HTTPStreamingResource, ERROR_TEMPLATE, HTTP_VERSION 
 35   
 36  __version__ = "$Rev: $" 
 37   
 38  COOKIE_NAME = 'flumotion-session' 
 39  NOT_VALID = 0 
 40  VALID = 1 
 41  RENEW_AUTH = 2 
 42   
 43   
44 -class FragmentNotFound(Exception):
45 "The requested fragment is not found."
46 47
48 -class FragmentNotAvailable(Exception):
49 "The requested fragment is not available."
50 51
52 -class PlaylistNotFound(Exception):
53 "The requested playlist is not found."
54 55
56 -class KeyNotFound(Exception):
57 "The requested key is not found."
58 59
60 -class Session(server.Session):
61 62 sessionTimeout = 900 63 _expireCall = None 64
65 - def _init_(self, site, uid):
66 server.Session.__init__(self, site, uid)
67
68 - def startCheckingExpiration(self):
69 """ 70 Start expiration tracking. 71 72 @return: C{None} 73 """ 74 self._expireCall = reactor.callLater( 75 self.sessionTimeout, self.expire)
76
77 - def notifyOnExpire(self, callback):
78 """ 79 Call this callback when the session expires or logs out. 80 """ 81 self.expireCallbacks.append(callback)
82
83 - def expire(self):
84 """ 85 Expire/logout of the session. 86 """ 87 del self.site.sessions[self.uid] 88 for c in self.expireCallbacks: 89 c() 90 self.expireCallbacks = [] 91 if self._expireCall and self._expireCall.active(): 92 self._expireCall.cancel() 93 # Break reference cycle. 94 self._expireCall = None
95
96 - def touch(self):
97 """ 98 Notify session modification. 99 """ 100 self.lastModified = time.time() 101 if self._expireCall is not None: 102 self._expireCall.reset(self.sessionTimeout)
103 104
105 -class FragmentedResource(HTTPStreamingResource, log.Loggable):
106 107 HTTP_NAME = 'FlumotionAppleHTTPLiveServer' 108 HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION) 109 110 logCategory = 'fragmented-resource' 111
112 - def __init__(self, streamer, httpauth, secretKey, sessionTimeout):
113 """ 114 @param streamer: L{FragmentedStreamer} 115 """ 116 HTTPStreamingResource.__init__(self, streamer, httpauth) 117 self.secretKey = secretKey 118 self.sessionTimeout = sessionTimeout 119 self.bytesSent = 0 120 self.bytesReceived = 0
121
122 - def setMountPoint(self, mountPoint):
123 if not mountPoint.startswith('/'): 124 mountPoint = '/' + mountPoint 125 if not mountPoint.endswith('/'): 126 mountPoint = mountPoint + '/' 127 self.mountPoint = mountPoint
128
129 - def isReady(self):
130 return self.streamer.isReady()
131
132 - def _addClient(self, id):
135
136 - def _removeClient(self, uid):
137 if uid in self._removing: 138 self.debug("client is removed; firing deferred") 139 removeD = self._removing.pop(uid) 140 removeD.callback(None) 141 HTTPStreamingResource._removeClient(self, uid) 142 self.log("session %s expired", uid) 143 self.streamer.clientRemoved()
144
145 - def _renewAuthentication(self, request, sessionID, authResponse):
146 # Delete, if it's present, the 'flumotion-session' cookie 147 for cookie in request.cookies: 148 if cookie.startswith('%s=%s' % (COOKIE_NAME, cookie)): 149 self.log("delete old cookie for session ID=%s", sessionID) 150 request.cookies.remove(cookie) 151 152 if authResponse and authResponse.duration != 0: 153 authExpiracy = time.mktime((datetime.utcnow() + 154 timedelta(seconds=authResponse.duration)).timetuple()) 155 else: 156 authExpiracy = 0 157 # Create a new token with the same Session ID and the renewed 158 # authentication's expiration time 159 token = self._generateToken(sessionID, request.getClientIP(), 160 authExpiracy) 161 request.addCookie(COOKIE_NAME, token, path=self.mountPoint)
162
163 - def _handleNotReady(self, request):
164 self.debug("Not sending data, it's not ready") 165 request.code = http.SERVICE_UNAVAILABLE 166 return self._errorMessage(request, http.SERVICE_UNAVAILABLE)
167
168 - def _getExtraLogArgs(self, request):
169 uid = request.session and request.session.uid or None 170 return {'uid': uid}
171
172 - def _checkSession(self, request):
173 """ 174 From t.w.s.Request.getSession() 175 Associates the request to a session using the 'flumotion-session' 176 cookie and updates the session's timeout. 177 If the authentication has expired, re-authenticates the session and 178 updates the cookie with the new authentication's expiracy time. 179 If the cookie is not valid (bad IP or bad signature) or the session 180 has expired, it creates a new session. 181 """ 182 183 def processAuthentication(response): 184 if response is None or response.duration == 0: 185 authExpiracy = 0 186 else: 187 authExpiracy = time.mktime((datetime.utcnow() + 188 timedelta(seconds=response.duration)).timetuple()) 189 self._createSession(request, authExpiracy)
190 191 if not request.session: 192 cookie = request.getCookie(COOKIE_NAME) 193 if cookie: 194 # The request has a flumotion cookie 195 cookieState, sessionID, authExpiracy = \ 196 self._cookieIsValid(cookie, request.getClientIP(), 197 request.args.get('GKID', [None])[0]) 198 if cookieState != NOT_VALID: 199 # The cookie is valid: retrieve or create a session 200 try: 201 # The session exists in this streamer 202 request.session = request.site.getSession(sessionID) 203 except KeyError: 204 # The session doesn't exists in this streamer 205 self._createSession(request, authExpiracy, sessionID) 206 self.log("replicating session %s.", sessionID) 207 if cookieState == RENEW_AUTH: 208 # The authentication as expired, renew it 209 self.debug('renewing authentication') 210 d = self.httpauth.startAuthentication(request) 211 d.addCallback(lambda res: 212 self._renewAuthentication(request, sessionID, res)) 213 d.addErrback(lambda x: self._delClient(sessionID)) 214 return d 215 216 # if it still hasn't been set, fix it up. 217 if not request.session: 218 self.debug('asked for authentication') 219 d = self.httpauth.startAuthentication(request) 220 d.addCallback(lambda res: processAuthentication(res)) 221 d.addErrback(lambda x: None) 222 return d 223 224 request.session.touch()
225
226 - def _createSession(self, request, authExpiracy=None, sessionID=None):
227 """ 228 From t.w.s.Site.makeSession() 229 Generates a new Session instance and store it for future reference 230 """ 231 if authExpiracy is None: 232 authExpiracy = 0 233 if sessionID is None: 234 sessionID = request.args.get('GKID', [uuid.uuid1().hex])[0] 235 token = self._generateToken( 236 sessionID, request.getClientIP(), authExpiracy) 237 try: 238 # Check if the session already exists 239 request.session = request.site.getSession(sessionID) 240 self.log("session already exists, the client is not using cookies" 241 "or the IP changed.") 242 except: 243 request.session = request.site.sessions[sessionID] =\ 244 Session(request.site, sessionID) 245 request.session.sessionTimeout = self.sessionTimeout 246 request.session.startCheckingExpiration() 247 request.session.notifyOnExpire(lambda: 248 self._removeClient(sessionID)) 249 self._addClient(request.session.uid) 250 request.addCookie(COOKIE_NAME, token, path=self.mountPoint) 251 252 self.debug('added new client with session id: "%s"' % 253 request.session.uid)
254
255 - def _generateToken(self, sessionID, clientIP, authExpiracy):
256 """ 257 Generate a cryptografic token: 258 PAYLOAD = SESSION_ID||:||AUTH_EXPIRACY 259 PRIVATE = CLIENT_IP||:||MOUNT_POINT 260 SIG=HMAC(SECRET,PAYLOAD||:||PRIVATE) 261 TOKEN=BASE64(PAYLOAD||:||SIG) 262 """ 263 payload = ':'.join([sessionID, str(authExpiracy)]) 264 private = ':'.join([clientIP, self.mountPoint]) 265 sig = hmac.new( 266 self.secretKey, ':'.join([payload, private])).hexdigest() 267 return base64.b64encode(':'.join([payload, sig]))
268
269 - def _cookieIsValid(self, cookie, clientIP, urlSessionID):
270 """ 271 Checks whether the cookie is valid against the authentication expiracy 272 time and the signature (and implicitly the client IP and mount point). 273 Returns the state of the cookie among 3 options: 274 VALID: the cookie is valid (expiracy and signature are OK) 275 RENEW_AUTH: the cookie is valid but the authentication has expired 276 NOT_VALID: the cookie is not valid 277 """ 278 private = ':'.join([clientIP, self.mountPoint]) 279 try: 280 token = base64.b64decode(cookie) 281 payload, sig = token.rsplit(':', 1) 282 sessionID, authExpiracy = payload.split(':') 283 except (TypeError, ValueError): 284 self.debug("cookie is not valid. reason: malformed cookie") 285 return (NOT_VALID, None, None) 286 287 self.log("cheking cookie for client_ip=%s auth_expiracy:%s", 288 clientIP, authExpiracy) 289 290 # Check signature 291 if hmac.new(self.secretKey, ':'.join([payload, private])).hexdigest()\ 292 != sig: 293 self.debug("cookie is not valid. reason: invalid signature") 294 return (NOT_VALID, None, None) 295 # Check sessionID 296 if urlSessionID is not None and urlSessionID != sessionID: 297 self.debug("cookie is not valid. reason: different sessions") 298 return (NOT_VALID, None, None) 299 now = time.mktime(datetime.utcnow().timetuple()) 300 # Check authentication expiracy 301 if float(authExpiracy) != 0 and float(authExpiracy) < now: 302 self.debug("cookie is not valid. reason: authentication expired") 303 return (RENEW_AUTH, sessionID, authExpiracy) 304 self.log("cookie is valid") 305 return (VALID, sessionID, None)
306
307 - def _errorMessage(self, request, error_code):
308 request.setHeader('content-type', 'html') 309 request.setHeader('server', HTTP_VERSION) 310 request.setResponseCode(error_code) 311 312 return ERROR_TEMPLATE % {'code': error_code, 313 'error': http.RESPONSES[error_code]}
314
315 - def _renderNotFoundResponse(self, failure, request):
316 failure.trap(FragmentNotAvailable, FragmentNotFound, 317 PlaylistNotFound, KeyNotFound) 318 request.write(self._errorMessage(request, http.NOT_FOUND)) 319 request.finish() 320 return ''
321
322 - def _renderForbidden(self, request):
323 request.write(self._errorMessage(request, http.FORBIDDEN)) 324 request.finish() 325 return ''
326
327 - def _writeHeaders(self, request, content, code=200):
328 """ 329 Write out the HTTP headers for the incoming HTTP request. 330 """ 331 request.setResponseCode(code) 332 request.setHeader('Server', self.HTTP_SERVER) 333 request.setHeader('Date', http.datetimeToString()) 334 request.setHeader('Cache-Control', 'no-cache') 335 if content: 336 request.setHeader('Content-type', content) 337 338 # Call request modifiers 339 for modifier in self.modifiers: 340 modifier.modify(request)
341
342 - def getBytesSent(self):
343 return self.bytesSent
344
345 - def getBytesReceived(self):
346 return self.bytesReceived
347
348 - def render(self, request):
349 self.debug('Incoming client connection from %s: %s', 350 request.getClientIP(), request) 351 request.notifyFinish().addCallback(self._logRequest, request) 352 return HTTPStreamingResource.render(self, request)
353
354 - def _logWrite(self, request):
355 return self.logWrite(request, request.getBytesSent(), 356 request.getDuration())
357
358 - def _logRequest(self, error, request):
359 if error: 360 self.info("%s %s error:%s", request.getClientIP(), request, error) 361 else: 362 uid = request.session and request.session.uid or None 363 self.info("%s %s %s %s %s %s", request.getClientIP(), request, 364 request.code, request.getBytesSent(), 365 request.getDuration(), uid)
366