Package flumotion :: Package twisted :: Module rtsp
[hide private]

Source Code for Module flumotion.twisted.rtsp

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_rtsp -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  RTSP - Real Time Streaming Protocol. 
 20   
 21  See RFC 2326, and its Robin, RFC 2068. 
 22  """ 
 23   
 24  import sys 
 25  import re 
 26  import types 
 27   
 28  from twisted.web import http 
 29  from twisted.web import server, resource 
 30  from twisted.internet import defer 
 31   
 32  from twisted.python import log, failure, reflect 
 33   
 34  try: 
 35      from twisted.protocols._c_urlarg import unquote 
 36  except ImportError: 
 37      from urllib import unquote 
 38   
 39  from flumotion.common import log as flog 
 40   
 41  __version__ = "$Rev$" 
 42   
 43  SERVER_PROTOCOL = "RTSP/1.0" 
 44  # I can be overridden to add the version 
 45   
 46  SERVER_STRING = "Flumotion RTP" 
 47   
 48  # response codes 
 49  CONTINUE = 100 
 50   
 51  OK = 200 
 52  CREATED = 201 
 53  LOW_STORAGE = 250 
 54   
 55  MULTIPLE_CHOICE = 300 
 56  MOVED_PERMANENTLY = 301 
 57  MOVED_TEMPORARILY = 302 
 58  SEE_OTHER = 303 
 59  NOT_MODIFIED = 304 
 60  USE_PROXY = 305 
 61   
 62  BAD_REQUEST = 400 
 63  UNAUTHORIZED = 401 
 64  PAYMENT_REQUIRED = 402 
 65  FORBIDDEN = 403 
 66  NOT_FOUND = 404 
 67  NOT_ALLOWED = 405 
 68  NOT_ACCEPTABLE = 406 
 69  PROXY_AUTH_REQUIRED = 407 
 70  REQUEST_TIMEOUT = 408 
 71  GONE = 410 
 72  LENGTH_REQUIRED = 411 
 73  PRECONDITION_FAILED = 412 
 74  REQUEST_ENTITY_TOO_LARGE = 413 
 75  REQUEST_URI_TOO_LONG = 414 
 76  UNSUPPORTED_MEDIA_TYPE = 415 
 77   
 78  PARAMETER_NOT_UNDERSTOOD = 451 
 79  CONFERENCE_NOT_FOUND = 452 
 80  NOT_ENOUGH_BANDWIDTH = 453 
 81  SESSION_NOT_FOUND = 454 
 82  METHOD_INVALID_STATE = 455 
 83  HEADER_FIELD_INVALID = 456 
 84  INVALID_RANGE = 457 
 85  PARAMETER_READ_ONLY = 458 
 86  AGGREGATE_NOT_ALLOWED = 459 
 87  AGGREGATE_ONLY_ALLOWED = 460 
 88  UNSUPPORTED_TRANSPORT = 461 
 89  DESTINATION_UNREACHABLE = 462 
 90   
 91  INTERNAL_SERVER_ERROR = 500 
 92  NOT_IMPLEMENTED = 501 
 93  BAD_GATEWAY = 502 
 94  SERVICE_UNAVAILABLE = 503 
 95  GATEWAY_TIMEOUT = 504 
 96  RTSP_VERSION_NOT_SUPPORTED = 505 
 97  OPTION_NOT_SUPPORTED = 551 
 98   
 99  RESPONSES = { 
100      # 100 
101      CONTINUE: "Continue", 
102   
103      # 200 
104      OK: "OK", 
105      CREATED: "Created", 
106      LOW_STORAGE: "Low on Storage Space", 
107   
108      # 300 
109      MULTIPLE_CHOICE: "Multiple Choices", 
110      MOVED_PERMANENTLY: "Moved Permanently", 
111      MOVED_TEMPORARILY: "Moved Temporarily", 
112      SEE_OTHER: "See Other", 
113      NOT_MODIFIED: "Not Modified", 
114      USE_PROXY: "Use Proxy", 
115   
116      # 400 
117      BAD_REQUEST: "Bad Request", 
118      UNAUTHORIZED: "Unauthorized", 
119      PAYMENT_REQUIRED: "Payment Required", 
120      FORBIDDEN: "Forbidden", 
121      NOT_FOUND: "Not Found", 
122      NOT_ALLOWED: "Method Not Allowed", 
123      NOT_ACCEPTABLE: "Not Acceptable", 
124      PROXY_AUTH_REQUIRED: "Proxy Authentication Required", 
125      REQUEST_TIMEOUT: "Request Time-out", 
126      GONE: "Gone", 
127      LENGTH_REQUIRED: "Length Required", 
128      PRECONDITION_FAILED: "Precondition Failed", 
129      REQUEST_ENTITY_TOO_LARGE: "Request Entity Too Large", 
130      REQUEST_URI_TOO_LONG: "Request-URI Too Large", 
131      UNSUPPORTED_MEDIA_TYPE: "Unsupported Media Type", 
132   
133      PARAMETER_NOT_UNDERSTOOD: "Parameter Not Understood", 
134      CONFERENCE_NOT_FOUND: "Conference Not Found", 
135      NOT_ENOUGH_BANDWIDTH: "Not Enough Bandwidth", 
136      SESSION_NOT_FOUND: "Session Not Found", 
137      METHOD_INVALID_STATE: "Method Not Valid In This State", 
138      HEADER_FIELD_INVALID: "Header Field Not Valid for Resource", 
139      INVALID_RANGE: "Invalid Range", 
140      PARAMETER_READ_ONLY: "Parameter is Read-Only", 
141      AGGREGATE_NOT_ALLOWED: "Aggregate operation not allowed", 
142      AGGREGATE_ONLY_ALLOWED: "Only aggregate operation allowed", 
143      UNSUPPORTED_TRANSPORT: "Unsupported transport", 
144      DESTINATION_UNREACHABLE: "Destination unreachable", 
145   
146      # 500 
147      INTERNAL_SERVER_ERROR: "Internal Server Error", 
148      NOT_IMPLEMENTED: "Not Implemented", 
149      BAD_GATEWAY: "Bad Gateway", 
150      SERVICE_UNAVAILABLE: "Service Unavailable", 
151      GATEWAY_TIMEOUT: "Gateway Time-out", 
152      RTSP_VERSION_NOT_SUPPORTED: "RTSP Version not supported", 
153      OPTION_NOT_SUPPORTED: "Option not supported", 
154  } 
155   
156   
157 -class RTSPError(Exception):
158 """An exception with the RTSP status code and a str as arguments"""
159 160
161 -class RTSPRequest(http.Request, flog.Loggable):
162 logCategory = 'request' 163 code = OK 164 code_message = RESPONSES[OK] 165 host = None 166 port = None 167
168 - def delHeader(self, key):
169 if key.lower() in self.headers.keys(): 170 del self.headers[key.lower()]
171 172 # base method override 173 174 # copied from HTTP since we have our own set of RESPONSES 175
176 - def setResponseCode(self, code, message=None):
177 """ 178 Set the RTSP response code. 179 """ 180 self.code = code 181 if message: 182 self.code_message = message 183 else: 184 self.code_message = RESPONSES.get(code, "Unknown Status")
185
186 - def process(self):
187 # First check that we have a valid request. 188 if self.clientproto != SERVER_PROTOCOL: 189 e = ErrorResource(BAD_REQUEST) 190 self.render(e) 191 return 192 193 # process the request and render the resource or give a failure 194 first = "%s %s %s" % (self.method, self.path, SERVER_PROTOCOL) 195 self.debug('incoming request: %s' % first) 196 197 lines = [] 198 for key, value in self.received_headers.items(): 199 lines.append("%s: %s" % (key, value)) 200 201 self.debug('incoming headers:\n%s\n' % "\n".join(lines)) 202 203 #self.debug('user-agent: %s' % self.received_headers.get('user-agent', 204 # '[Unknown]')) 205 #self.debug('clientid: %s' % self.received_headers.get('clientid', 206 # '[Unknown]')) 207 208 # don't store site locally; we can't be sure every request has gone 209 # through our customized handlers 210 site = self.channel.site 211 ip = self.getClientIP() 212 site.logRequest(ip, first, lines) 213 214 if not self._processPath(): 215 return 216 217 try: 218 if self.path == "*": 219 resrc = site.resource 220 else: 221 resrc = site.getResourceFor(self) 222 self.debug("RTSPRequest.process(): got resource %r" % resrc) 223 try: 224 self.render(resrc) 225 except server.UnsupportedMethod: 226 e = ErrorResource(OPTION_NOT_SUPPORTED) 227 self.setHeader('Allow', ",".join(resrc.allowedMethods)) 228 self.render(e) 229 except RTSPError, e: 230 er = ErrorResource(e.args[0]) 231 self.render(er) 232 except Exception, e: 233 self.warning('failed to process %s: %s' % 234 (lines and lines[0] or "[No headers]", 235 flog.getExceptionMessage(e))) 236 self.processingFailed(failure.Failure())
237
238 - def _processPath(self):
239 # process self.path into components; return whether or not it worked 240 self.log("path %s" % self.path) 241 242 self.prepath = [] # used in getResourceFor 243 244 # check Request-URI; RFC 2326 6.1 says it's "*" or absolute URI 245 if self.path == '*': 246 self.log('Request-URI is *') 247 return True 248 249 # match the host:port 250 matcher = re.compile('rtspu?://([^/]*)') 251 m = matcher.match(self.path) 252 hostport = None 253 if m: 254 hostport = m.expand('\\1') 255 256 if not hostport: 257 # malformed Request-URI; 400 seems like a likely reply ? 258 self.log('Absolute rtsp URL required: %s' % self.path) 259 self.render(ErrorResource(BAD_REQUEST, 260 "Malformed Request-URI %s" % self.path)) 261 return False 262 263 # get the rest after hostport starting with '/' 264 rest = self.path.split(hostport)[1] 265 self.host = hostport 266 if ':' in hostport: 267 chunks = hostport.split(':') 268 self.host = chunks[0] 269 self.port = int(chunks[1]) 270 # if we got fed crap, they're in other chunks, and we ignore them 271 272 self.postpath = map(unquote, rest.split('/')) 273 self.log( 274 'split up self.path in host %s, port %r, pre %r and post %r' % ( 275 self.host, self.port, self.prepath, self.postpath)) 276 return True
277
278 - def processingFailed(self, reason):
279 self.warningFailure(reason) 280 # FIXME: disable tracebacks until we can reliably disable them 281 if not True: # self.site or self.site.displayTracebacks: 282 self.debug('sending traceback to client') 283 import traceback 284 tb = sys.exc_info()[2] 285 text = "".join(traceback.format_exception( 286 reason.type, reason.value, tb)) 287 else: 288 text = "RTSP server failed to process your request.\n" 289 290 self.setResponseCode(INTERNAL_SERVER_ERROR) 291 self.setHeader('Content-Type', "text/plain") 292 self.setHeader('Content-Length', str(len(text))) 293 self.write(text) 294 self.finish() 295 return reason
296
297 - def _error(self, code, *lines):
298 self.setResponseCode(code) 299 self.setHeader('content-type', "text/plain") 300 body = "\n".join(lines) 301 return body
302
303 - def render(self, resrc):
304 self.log('%r.render(%r)' % (resrc, self)) 305 result = resrc.render(self) 306 self.log('%r.render(%r) returned result %r' % (resrc, self, result)) 307 if isinstance(result, defer.Deferred): 308 result.addCallback(self._renderCallback, resrc) 309 result.addErrback(self._renderErrback, resrc) 310 else: 311 self._renderCallback(result, resrc)
312 313 # TODO: Refactor this and renderCallback to be cleaner and share code. 314
315 - def _renderErrback(self, failure, resrc):
316 body = self._error(INTERNAL_SERVER_ERROR, 317 "Request failed: %r" % failure) 318 self.setHeader('Content-Length', str(len(body))) 319 lines = [] 320 for key, value in self.headers.items(): 321 lines.append("%s: %s" % (key, value)) 322 323 self.channel.site.logReply(self.code, self.code_message, lines, body) 324 325 self.write(body) 326 self.finish()
327
328 - def _renderCallback(self, result, resrc):
329 body = result 330 if type(body) is not types.StringType: 331 self.warning('request did not return a string but %r' % 332 type(body)) 333 body = self._error(INTERNAL_SERVER_ERROR, 334 "Request did not return a string", 335 "Request: " + reflect.safe_repr(self), 336 "Resource: " + reflect.safe_repr(resrc), 337 "Value: " + reflect.safe_repr(body)) 338 self.setHeader('Content-Length', str(len(body))) 339 340 lines = [] 341 for key, value in self.headers.items(): 342 lines.append("%s: %s" % (key, value)) 343 # FIXME: debug response code 344 self.debug('responding to %s %s with %s (%d)' % ( 345 self.method, self.path, self.code_message, self.code)) 346 self.debug('outgoing headers:\n%s\n' % "\n".join(lines)) 347 if body: 348 self.debug('body:\n%s\n' % body) 349 self.log('RTSPRequest._renderCallback(): outgoing response:\n%s\n' % 350 "\n".join(lines)) 351 self.log("\n".join(lines)) 352 self.log("\n") 353 self.log(body) 354 355 self.channel.site.logReply(self.code, self.code_message, lines, body) 356 357 self.write(body) 358 self.finish()
359 360 # RTSP keeps the initial request alive, pinging it regularly. 361 # for now we just keep it persistent for ever 362 363
364 -class RTSPChannel(http.HTTPChannel):
365 366 requestFactory = RTSPRequest 367
368 - def checkPersistence(self, request, version):
369 if version == SERVER_PROTOCOL: 370 return 1 371 log.err('version %s not handled' % version) 372 return 0
373 374 #class RTSPFactory(http.HTTPFactory): 375 # protocol = RTSPChannel 376 # timeout = 60 377 378
379 -class RTSPSite(server.Site):
380 """ 381 I am a ServerFactory that can be used in 382 L{twisted.internet.interfaces.IReactorTCP}'s .listenTCP 383 Create me with an L{RTSPResource} object. 384 """ 385 protocol = RTSPChannel 386 requestFactory = RTSPRequest 387
388 - def logRequest(self, ip, requestLine, headerLines):
389 pass
390
391 - def logReply(self, code, message, headerLines, body):
392 pass
393 394
395 -class RTSPResource(resource.Resource, flog.Loggable):
396 """ 397 I am a base class for all RTSP Resource classes. 398 399 @type allowedMethods: tuple 400 @ivar allowedMethods: a tuple of allowed methods that can be invoked 401 on this resource. 402 """ 403 404 logCategory = 'resource' 405 allowedMethods = ['OPTIONS'] 406
407 - def getChild(self, path, request):
408 return NoResource() 409 # use WithDefault so static children have a chance too 410 self.log( 411 'RTSPResource.getChild(%r, %s, <request>), pre %r, post %r' % ( 412 self, path, request.prepath, request.postpath)) 413 res = resource.Resource.getChild(self, path, request) 414 self.log('RTSPResource.getChild(%r, %s, <request>) returns %r' % ( 415 self, path, res)) 416 return res
417
418 - def getChildWithDefault(self, path, request):
419 self.log( 420 'RTSPResource.getChildWithDefault(%r, %s, <request>), pre %r, ' 421 'post %r' % ( 422 self, path, request.prepath, request.postpath)) 423 self.log('children: %r' % self.children.keys()) 424 res = resource.Resource.getChildWithDefault(self, path, request) 425 self.log( 426 'RTSPResource.getChildWithDefault(%r, %s, <request>) ' 427 'returns %r' % ( 428 self, path, res)) 429 return res
430 431 # FIXME: remove 432
433 - def noputChild(self, path, r):
434 self.log('RTSPResource.putChild(%r, %s, %r)' % (self, path, r)) 435 return resource.Resource.putChild(self, path, r)
436 437 # needs to be done for ALL responses 438 # see 12.17 CSeq and H14.19 Date 439
440 - def render_startCSeqDate(self, request, method):
441 """ 442 Set CSeq and Date on response to given request. 443 This should be done even for errors. 444 """ 445 self.log('render_startCSeqDate, method %r' % method) 446 cseq = request.getHeader('CSeq') 447 # RFC says clients MUST have CSeq field, but we're lenient 448 # in what we accept and assume 0 if not specified 449 if cseq == None: 450 cseq = 0 451 request.setHeader('CSeq', cseq) 452 request.setHeader('Date', http.datetimeToString())
453
454 - def render_start(self, request, method):
455 ip = request.getClientIP() 456 self.log('RTSPResource.render_start(): client from %s requests %s' % ( 457 ip, method)) 458 self.log('RTSPResource.render_start(): uri %r' % request.path) 459 460 self.render_startCSeqDate(request, method) 461 request.setHeader('Server', SERVER_STRING) 462 request.delHeader('Content-Type') 463 464 # tests for 3gpp 465 request.setHeader('Last-Modified', http.datetimeToString()) 466 request.setHeader('Cache-Control', 'must-revalidate') 467 #request.setHeader('x-Accept-Retransmit', 'our-revalidate') 468 #request.setHeader('x-Accept-Dynamic-Rate', '1') 469 #request.setHeader('Content-Base', 'rtsp://core.fluendo.com/test.3gpp') 470 #request.setHeader('Via', 'RTSP/1.0 288f9c2a') 471 472 # hacks for Real 473 if 'Real' in request.received_headers.get('user-agent', ''): 474 self.debug('Detected Real client, sending specific headers') 475 # request.setHeader('Public', 'OPTIONS, DESCRIBE, ANNOUNCE, PLAY, 476 # SETUP, GET_PARAMETER, SET_PARAMETER, TEARDOWN') 477 # Public seems to be the same as allowed-methods, and real clients 478 # seem to respect SET_PARAMETER not listed here 479 request.setHeader( 480 'Public', 481 'OPTIONS, DESCRIBE, ANNOUNCE, PLAY, SETUP, TEARDOWN') 482 # without a RealChallenge1, clients don't even go past OPTIONS 483 request.setHeader('RealChallenge1', 484 '28d49444034696e1d523f2819b8dcf4c')
485 #request.setHeader('StatsMask', '3') 486
487 - def render_GET(self, request):
488 # the Resource.get_HEAD refers to this -- pacify pychecker 489 raise NotImplementedError
490 491
492 -class ErrorResource(RTSPResource):
493
494 - def __init__(self, code, *lines):
495 resource.Resource.__init__(self) 496 self.code = code 497 self.body = "" 498 if lines != (None, ): 499 self.body = "\n".join(lines) + "\n\n" 500 501 # HACK! 502 if not hasattr(self, 'method'): 503 self.method = 'GET'
504
505 - def render(self, request):
506 request.clientproto = SERVER_PROTOCOL 507 self.render_startCSeqDate(request, request.method) 508 request.setResponseCode(self.code) 509 if self.body: 510 request.setHeader('content-type', "text/plain") 511 return self.body
512
513 - def render_GET(self, request):
514 # the Resource.get_HEAD refers to this -- pacify pychecker 515 raise NotImplementedError
516
517 - def getChild(self, chname, request):
518 return self
519 520
521 -class NoResource(ErrorResource):
522
523 - def __init__(self, message=None):
525