Package flumotion :: Package component :: Package common :: Package streamer :: Module resources
[hide private]

Source Code for Module flumotion.component.common.streamer.resources

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import socket 
 19  import resource 
 20  import time 
 21   
 22  try: 
 23      from twisted.web import http 
 24  except ImportError: 
 25      from twisted.protocols import http 
 26   
 27  from twisted.web import server, resource as web_resource 
 28  from twisted.internet import defer 
 29   
 30  from flumotion.configure import configure 
 31  from flumotion.common import log 
 32   
 33  # register serializable 
 34  from flumotion.common import keycards 
 35   
 36   
 37  __all__ = ['HTTPStreamingResource'] 
 38  __version__ = "$Rev$" 
 39   
 40   
 41  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 42  <html> 
 43  <head> 
 44    <title>%(code)d %(error)s</title> 
 45  </head> 
 46  <body> 
 47  <h2>%(code)d %(error)s</h2> 
 48  </body> 
 49  </html> 
 50  """ 
 51   
 52   
 53  ### the Twisted resource that handles the base URL 
 54   
 55  HTTP_VERSION = configure.version 
 56   
 57   
58 -class HTTPStreamingResource(web_resource.Resource, log.Loggable):
59 HTTP_NAME = 'FlumotionHTTPServer' 60 HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION) 61 62 __reserve_fds__ = 50 # number of fd's to reserve for non-streaming 63 64 logCategory = 'httpstreamer' 65 66 # IResource interface variable; True means it will not chain requests 67 # further down the path to other resource providers through 68 # getChildWithDefault 69 isLeaf = True 70
71 - def __init__(self, streamer, httpauth):
72 """ 73 @param streamer: L{Streamer} 74 """ 75 self.streamer = streamer 76 self.httpauth = httpauth 77 78 self._requests = {} # request fd -> Request 79 self._removing = {} # Optional deferred notification of client removals 80 81 self.maxclients = self.getMaxAllowedClients(-1) 82 self.maxbandwidth = -1 # not limited by default 83 84 # If set, a URL to redirect a user to when the limits above are reached 85 self._redirectOnFull = None 86 87 socket = 'flumotion.component.plugs.request.RequestLoggerPlug' 88 self.loggers = streamer.plugs.get(socket, []) 89 90 socket = \ 91 'flumotion.component.plugs.requestmodifier.RequestModifierPlug' 92 self.modifiers = streamer.plugs.get(socket, []) 93 94 self.logfilter = None 95 96 web_resource.Resource.__init__(self)
97
98 - def removeAllClients(self):
99 l = [] 100 for fd in self._requests.keys(): 101 self._removing[fd] = defer.Deferred() 102 l.append(self._removing[fd]) 103 self.streamer.remove_client(fd) 104 105 return defer.DeferredList(l)
106
107 - def setRoot(self, path):
108 self.putChild(path, self)
109
110 - def setLogFilter(self, logfilter):
111 self.logfilter = logfilter
112
113 - def rotateLogs(self):
114 """ 115 Close the logfile, then reopen using the previous logfilename 116 """ 117 for logger in self.loggers: 118 self.debug('rotating logger %r' % logger) 119 logger.rotate()
120
121 - def logWrite(self, request, bytes_sent, time_connected):
122 headers = request.getAllHeaders() 123 124 args = {'ip': request.getClientIP(), 125 'time': time.gmtime(), 126 'method': request.method, 127 'uri': request.uri, 128 'username': '-', # FIXME: put the httpauth name 129 'get-parameters': request.args, 130 'clientproto': request.clientproto, 131 'response': request.code, 132 'bytes-sent': bytes_sent, 133 'referer': headers.get('referer', None), 134 'user-agent': headers.get('user-agent', None), 135 'time-connected': time_connected} 136 137 args.update(self._getExtraLogArgs(request)) 138 139 l = [] 140 for logger in self.loggers: 141 l.append(defer.maybeDeferred( 142 logger.event, 'http_session_completed', args)) 143 144 return defer.DeferredList(l)
145
146 - def setUserLimit(self, limit):
147 self.info('setting maxclients to %d' % limit) 148 self.maxclients = self.getMaxAllowedClients(limit) 149 # Log what we actually managed to set it to. 150 self.info('set maxclients to %d' % self.maxclients)
151
152 - def setBandwidthLimit(self, limit):
153 self.maxbandwidth = limit 154 self.info("set maxbandwidth to %d", self.maxbandwidth)
155
156 - def setRedirectionOnLimits(self, url):
157 self._redirectOnFull = url
158
159 - def isReady(self):
160 raise NotImplementedError("isReady must be implemented by " 161 "subclasses")
162
163 - def getMaxAllowedClients(self, maxclients):
164 """ 165 maximum number of allowed clients based on soft limit for number of 166 open file descriptors and fd reservation. Increases soft limit to 167 hard limit if possible. 168 """ 169 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE) 170 import sys 171 version = sys.version_info 172 173 if maxclients != -1: 174 neededfds = maxclients + self.__reserve_fds__ 175 176 # Bug in python 2.4.3, see 177 # http://sourceforge.net/tracker/index.php?func=detail& 178 # aid=1494314&group_id=5470&atid=105470 179 if version[:3] == (2, 4, 3) and \ 180 not hasattr(socket, "has_2_4_3_patch"): 181 self.warning( 182 'Setting hardmax to 1024 due to python 2.4.3 bug') 183 hardmax = 1024 184 185 if neededfds > softmax: 186 lim = min(neededfds, hardmax) 187 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax)) 188 return lim - self.__reserve_fds__ 189 else: 190 return maxclients 191 else: 192 return softmax - self.__reserve_fds__
193
194 - def reachedServerLimits(self):
195 """ 196 Check whether or not the server reached the limit of concurrent client 197 """ 198 if self.maxclients >= 0 and len(self._requests) >= self.maxclients: 199 return True 200 elif self.maxbandwidth >= 0: 201 # Reject if adding one more client would take us over the limit. 202 if ((len(self._requests) + 1) * 203 self.streamer.getCurrentBitrate() >= self.maxbandwidth): 204 return True 205 return False
206
207 - def _getExtraLogArgs(self, request):
208 """ 209 Extra arguments for logging. Should be overriden by subclasses 210 that provide extra arguments for logging 211 212 @rtype: dict 213 @returns: A dictionary with the extra arguments 214 """ 215 return {}
216
217 - def _setRequestHeaders(self, request):
218 content = self.streamer.get_content_type() 219 request.setHeader('Server', self.HTTP_SERVER) 220 request.setHeader('Date', http.datetimeToString()) 221 request.setHeader('Connection', 'close') 222 request.setHeader('Cache-Control', 'no-cache') 223 request.setHeader('Cache-Control', 'private') 224 request.setHeader('Content-type', content)
225
226 - def _addClient(self, id, request=None):
227 """ 228 Add a request, so it can be used for statistics. 229 230 @param id: the of the client (fd or session id) 231 @type request: int 232 """ 233 self._requests[id] = request and request or id
234
235 - def _removeClient(self, id):
236 """ 237 Delete a request from the list 238 239 @param request: the id of the client 240 @type request: int 241 """ 242 try: 243 del self._requests[id] 244 except Exception: 245 self.warning("Error removing request: %s", id)
246
247 - def _logRequestFromIP(self, ip):
248 """ 249 Returns whether we want to log a request from this IP; allows us to 250 filter requests from automated monitoring systems. 251 """ 252 if self.logfilter: 253 return not self.logfilter.isInRange(ip) 254 else: 255 return True
256 257 ### resource.Resource methods 258
259 - def _handleNotReady(self, request):
260 self.debug('Not sending data, it\'s not ready') 261 return server.NOT_DONE_YET
262
263 - def _handleServerFull(self, request):
264 if self._redirectOnFull: 265 self.debug("Redirecting client, client limit %d reached", 266 self.maxclients) 267 error_code = http.FOUND 268 request.setHeader('location', self._redirectOnFull) 269 else: 270 self.debug('Refusing clients, client limit %d reached' % 271 self.maxclients) 272 error_code = http.SERVICE_UNAVAILABLE 273 274 request.setHeader('content-type', 'text/html') 275 276 request.setHeader('server', HTTP_VERSION) 277 request.setResponseCode(error_code) 278 279 return ERROR_TEMPLATE % {'code': error_code, 280 'error': http.RESPONSES[error_code]}
281