Package flumotion :: Package component :: Package misc :: Package httpserver :: Module httpfile
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpfile

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_misc_httpserver -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import string 
 19  import time 
 20   
 21  # mp4seek is a library to split MP4 files, see the MP4File class docstring 
 22  HAS_MP4SEEK = False 
 23  try: 
 24      import mp4seek.async 
 25      HAS_MP4SEEK = True 
 26  except ImportError: 
 27      pass 
 28   
 29  from twisted.web import resource, server, http 
 30  from twisted.web import error as weberror 
 31  from twisted.internet import defer, reactor, abstract 
 32  from twisted.python.failure import Failure 
 33   
 34  from flumotion.configure import configure 
 35  from flumotion.common import log 
 36  from flumotion.component.component import moods 
 37  from flumotion.component.misc.httpserver import fileprovider 
 38   
 39  # register serializables 
 40  from flumotion.common import messages 
 41   
 42  __version__ = "$Rev$" 
 43   
 44  LOG_CATEGORY = "httpserver" 
 45   
 46  try: 
 47      resource.ErrorPage 
 48      errorpage = resource 
 49  except AttributeError: 
 50      errorpage = weberror 
 51   
 52   
53 -class BadRequest(errorpage.ErrorPage):
54 """ 55 Web error for invalid requests 56 """ 57
58 - def __init__(self, message="Invalid request format"):
59 errorpage.ErrorPage.__init__(self, http.BAD_REQUEST, 60 "Bad Request", message)
61 62
63 -class InternalServerError(errorpage.ErrorPage):
64 """ 65 Web error for internal failures 66 """ 67
68 - def __init__(self, message="The server failed to complete the request"):
69 errorpage.ErrorPage.__init__(self, http.INTERNAL_SERVER_ERROR, 70 "Internal Server Error", message)
71 72
73 -class ServiceUnavailableError(errorpage.ErrorPage):
74 """ 75 Web error for when the request cannot be served. 76 """ 77
78 - def __init__(self, message="The server is currently unable to handle " 79 "the request due to a temporary overloading " 80 "or maintenance of the server"):
81 errorpage.ErrorPage.__init__(self, http.SERVICE_UNAVAILABLE, 82 "Service Unavailable", message)
83 84
85 -class File(resource.Resource, log.Loggable):
86 """ 87 this file is inspired by/adapted from twisted.web.static 88 """ 89 90 logCategory = LOG_CATEGORY 91 92 defaultType = "application/octet-stream" 93 94 childNotFound = errorpage.NoResource("File not found.") 95 forbiddenerrorpage = errorpage.ForbiddenResource("Access forbidden") 96 badRequest = BadRequest() 97 internalServerError = InternalServerError() 98 serviceUnavailable = ServiceUnavailableError() 99
100 - def __init__(self, path, httpauth, 101 mimeToResource=None, 102 rateController=None, 103 requestModifiers=None, 104 metadataProvider=None):
105 resource.Resource.__init__(self) 106 107 self._path = path 108 self._httpauth = httpauth 109 # mapping of mime type -> File subclass 110 self._mimeToResource = mimeToResource or {} 111 self._rateController = rateController 112 self._metadataProvider = metadataProvider 113 self._requestModifiers = requestModifiers or [] 114 self._factory = MimedFileFactory(httpauth, self._mimeToResource, 115 rateController=rateController, 116 metadataProvider=metadataProvider, 117 requestModifiers=requestModifiers)
118
119 - def getChild(self, path, request):
120 self.log('getChild: self %r, path %r', self, path) 121 # we handle a request ending in '/' as well; this is how those come in 122 if path == '': 123 return self 124 125 try: 126 child = self._path.child(path) 127 except fileprovider.NotFoundError: 128 return self.childNotFound 129 except fileprovider.AccessError: 130 return self.forbiddenResource 131 except fileprovider.InsecureError: 132 return self.badRequest 133 134 return self._factory.create(child)
135
136 - def render(self, request):
137 """ 138 The request gets rendered by asking the httpauth object for 139 authentication, which returns a deferred. 140 This deferred will callback when the request gets authenticated. 141 """ 142 143 # PROBE: incoming request; see httpstreamer.resources 144 self.debug('[fd %5d] (ts %f) incoming request %r', 145 request.transport.fileno(), time.time(), request) 146 # Different headers not normally set in static.File... 147 # Specify that we will close the connection after this request, and 148 # that the client must not issue further requests. 149 # We do this because future requests on this server might actually need 150 # to go to a different process (because of the porter) 151 request.setHeader('Server', 'Flumotion/%s' % configure.version) 152 request.setHeader('Connection', 'close') 153 154 d = self._httpauth.startAuthentication(request) 155 d.addCallbacks(self._requestAuthenticated, self._authenticationFailed, 156 callbackArgs=(request, ), errbackArgs=(request, )) 157 # return NOT_DONE_YET, as required by the twisted.web interfaces 158 return server.NOT_DONE_YET
159
160 - def _authenticationFailed(self, failure, request):
161 # Authentication failed; nothing more to do, just swallow the 162 # failure. The object responsible for authentication has already 163 # written a proper response to the client and closed the request. 164 pass
165
166 - def _requestAuthenticated(self, result, request):
167 # Authentication suceeded. Start rendering the request. 168 # We always want to call _terminateRequest after rendering, 169 # regardless of whether there's a failure while rendering it or not. 170 d = defer.succeed(result) 171 d.addCallback(self._renderRequest, request) 172 d.addBoth(self._terminateRequest, request) 173 return d
174
175 - def _terminateRequest(self, body, request):
176 if body == server.NOT_DONE_YET: 177 # _renderRequest will return NOT_DONE_YET if it started serving the 178 # file. This means the callback chain started by _renderRequest has 179 # finished and we're currently serving the file. 180 return 181 if isinstance(body, Failure): 182 # Something went wrong, log it 183 self.warning("Failure during request rendering: %s", 184 log.getFailureMessage(body)) 185 if body.check(weberror.Error): 186 err = body.value 187 page = weberror.ErrorPage(err.status, err.message, 188 err.response) 189 elif body.check(fileprovider.UnavailableError): 190 page = self.serviceUnavailable 191 elif body.check(fileprovider.AccessError): 192 page = self.forbiddenResource 193 elif body.check(fileprovider.NotFoundError): 194 page = self.childNotFound 195 else: 196 page = self.internalServerError 197 body = page.render(request) 198 if body: 199 # the callback chain from _renderRequest chose to return a string 200 # body, write it out to the client 201 request.write(body) 202 self.debug('[fd %5d] Terminate request %r', 203 request.transport.fileno(), request) 204 request.finish()
205
206 - def _renderRequest(self, _, request):
207 208 # PROBE: authenticated request; see httpstreamer.resources 209 self.debug('[fd %5d] (ts %f) authenticated request %r', 210 request.transport.fileno(), time.time(), request) 211 212 # Now that we're authenticated (or authentication wasn't requested), 213 # write the file (or appropriate other response) to the client. 214 # We override static.File to implement Range requests, and to get 215 # access to the transfer object to abort it later; the bulk of this 216 # is a direct copy of static.File.render, though. 217 218 self.debug("Opening file %s", self._path) 219 220 # Use a deferred chain to uniformize error handling 221 # whether open returns a Deferred or directly a file. 222 d = defer.Deferred() 223 d.addCallback(lambda _: self._path.open()) 224 d.addCallbacks(self._gotProvider, self._fileOpenFailure, 225 callbackArgs=[request], errbackArgs=[request]) 226 d.callback(None) 227 return d
228
229 - def _fileOpenFailure(self, failure, request):
230 if failure.check(fileprovider.NotFoundError): 231 self.debug("Could not find resource %s", self._path) 232 return self.childNotFound.render(request) 233 if failure.check(fileprovider.CannotOpenError): 234 self.debug("%s is a directory, can't be GET", self._path) 235 return self.childNotFound.render(request) 236 if failure.check(fileprovider.AccessError): 237 return self.forbiddenResource.render(request) 238 return failure
239
240 - def _gotProvider(self, provider, request):
241 self.debug("Rendering file %s", self._path) 242 243 # Different headers not normally set in static.File... 244 # Specify that we will close the connection after this request, and 245 # that the client must not issue further requests. 246 # We do this because future requests on this server might actually need 247 # to go to a different process (because of the porter) 248 request.setHeader('Server', 'Flumotion/%s' % configure.version) 249 request.setHeader('Connection', 'close') 250 # We can do range requests, in bytes. 251 # UGLY HACK FIXME: if pdf, then do not accept range requests 252 # because Adobe Reader plugin messes up 253 if not self._path.mimeType == 'application/pdf': 254 request.setHeader('Accept-Ranges', 'bytes') 255 256 if request.setLastModified(provider.getmtime()) is http.CACHED: 257 return '' 258 259 contentType = provider.mimeType or self.defaultType 260 261 if contentType: 262 self.debug('File content type: %r' % contentType) 263 request.setHeader('content-type', contentType) 264 265 fileSize = provider.getsize() 266 # first and last byte offset we will write 267 first = 0 268 last = fileSize - 1 269 270 requestRange = request.getHeader('range') 271 if requestRange is not None: 272 # We have a partial data request. 273 # for interpretation of range, see RFC 2068 14.36 274 # examples: bytes=500-999; bytes=-500 (suffix mode; last 500) 275 self.log('range request, %r', requestRange) 276 rangeKeyValue = string.split(requestRange, '=') 277 if len(rangeKeyValue) != 2: 278 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 279 return '' 280 281 if rangeKeyValue[0] != 'bytes': 282 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 283 return '' 284 285 # ignore a set of range requests for now, only take the first 286 ranges = rangeKeyValue[1].split(',')[0] 287 l = ranges.split('-') 288 if len(l) != 2: 289 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 290 return '' 291 292 start, end = l 293 294 if start: 295 # byte-range-spec 296 first = int(start) 297 if end: 298 last = min(int(end), last) 299 elif end: 300 # suffix-byte-range-spec 301 count = int(end) 302 # we can't serve more than there are in the file 303 if count > fileSize: 304 count = fileSize 305 first = fileSize - count 306 else: 307 # need at least start or end 308 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 309 return '' 310 311 # Start sending from the requested position in the file 312 if first: 313 # TODO: logs suggest this is called with negative values, 314 # figure out how 315 self.debug("Request for range \"%s\" of file, seeking to " 316 "%d of total file size %d", ranges, first, fileSize) 317 provider.seek(first) 318 319 # FIXME: is it still partial if the request was for the complete 320 # file ? Couldn't find a conclusive answer in the spec. 321 request.setResponseCode(http.PARTIAL_CONTENT) 322 request.setHeader('Content-Range', "bytes %d-%d/%d" % 323 (first, last, fileSize)) 324 325 request.setResponseRange(first, last, fileSize) 326 d = defer.maybeDeferred(self.do_prepareBody, 327 request, provider, first, last) 328 329 def dispatchMethod(header, request): 330 if request.method == 'HEAD': 331 # the _terminateRequest callback will be fired, and the request 332 # will be finished 333 return '' 334 return self._startRequest(request, header, provider, first, last)
335 336 d.addCallback(dispatchMethod, request) 337 338 return d
339
340 - def _startRequest(self, request, header, provider, first, last):
341 # Call request modifiers 342 for modifier in self._requestModifiers: 343 modifier.modify(request) 344 345 # PROBE: started request; see httpstreamer.resources 346 self.debug('[fd %5d] (ts %f) started request %r', 347 request.transport.fileno(), time.time(), request) 348 349 if self._metadataProvider: 350 self.log("Retrieving metadata using %r", self._metadataProvider) 351 # The URL can't be rewrited. If we ever want to do so the API 352 # of the file provider will have to be changed. 353 d = self._metadataProvider.getMetadata(request.path) 354 else: 355 d = defer.succeed(None) 356 357 def metadataError(failure): 358 self.warning('Error retrieving metadata for file %s' 359 ' using plug %r. %r', 360 request.path, 361 self._metadataProvider, 362 failure.value)
363 364 d.addErrback(metadataError) 365 d.addCallback(self._configureTransfer, request, header, 366 provider, first, last) 367 368 return d 369
370 - def _configureTransfer(self, metadata, request, header, 371 provider, first, last):
372 if self._rateController: 373 self.debug("Creating RateControl object using plug %r and " 374 "metadata %r", self._rateController, metadata) 375 376 # We are passing a metadata dictionary as Proxy settings. 377 # So the rate control can use it if needed. 378 d = defer.maybeDeferred( 379 self._rateController.createProducerConsumerProxy, 380 request, metadata) 381 else: 382 d = defer.succeed(request) 383 384 def attachProxy(consumer, provider, header, first, last): 385 # If we have a header, give it to the consumer first 386 if header: 387 consumer.write(header) 388 389 # Set the provider first, because for very small file 390 # the transfer could terminate right away. 391 request._provider = provider 392 transfer = FileTransfer(provider, last + 1, consumer) 393 request._transfer = transfer 394 395 # The important NOT_DONE_YET was already returned by the render() 396 # method and the value returned here is just part of a convention 397 # between _renderRequest and _terminateRequest. The latter assumes 398 # that if the deferred chain initiated by _renderRequest will fire 399 # with NOT_DONE_YET if the transfer is in progress. 400 return server.NOT_DONE_YET
401 402 d.addCallback(attachProxy, provider, header, first, last) 403 404 return d 405
406 - def do_prepareBody(self, request, provider, first, last):
407 """ 408 I am called before the body of the response gets written, 409 and after generic header setting has been done. 410 411 I set Content-Length. 412 413 Override me to send additional headers, or to prefix the body 414 with data headers. 415 416 I can return a Deferred, that should fire with a string header. That 417 header will be written to the request. 418 """ 419 request.setHeader("Content-Length", str(last - first + 1)) 420 return ''
421 422
423 -class MimedFileFactory(log.Loggable):
424 """ 425 I create File subclasses based on the mime type of the given path. 426 """ 427 428 logCategory = LOG_CATEGORY 429 430 defaultType = "application/octet-stream" 431
432 - def __init__(self, httpauth, 433 mimeToResource=None, 434 rateController=None, 435 requestModifiers=None, 436 metadataProvider=None):
437 self._httpauth = httpauth 438 self._mimeToResource = mimeToResource or {} 439 self._rateController = rateController 440 self._requestModifiers = requestModifiers 441 self._metadataProvider = metadataProvider
442
443 - def create(self, path):
444 """ 445 Creates and returns an instance of a File subclass based 446 on the mime type of the given path. 447 """ 448 mimeType = path.mimeType or self.defaultType 449 self.debug("Create %s file for %s", mimeType, path) 450 klazz = self._mimeToResource.get(mimeType, File) 451 return klazz(path, self._httpauth, 452 mimeToResource=self._mimeToResource, 453 rateController=self._rateController, 454 requestModifiers=self._requestModifiers, 455 metadataProvider=self._metadataProvider)
456 457
458 -class FLVFile(File):
459 """ 460 I am a File resource for FLV files. 461 I can handle requests with a 'start' GET parameter. 462 This parameter represents the byte offset from where to start. 463 If it is non-zero, I will output an FLV header so the result is 464 playable. 465 """ 466 header = 'FLV\x01\x01\000\000\000\x09\000\000\000\x09' 467
468 - def do_prepareBody(self, request, provider, first, last):
469 self.log('do_prepareBody for FLV') 470 length = last - first + 1 471 ret = '' 472 473 # if there is a non-zero start get parameter, prefix the body with 474 # our FLV header 475 # each value is a list 476 try: 477 start = int(request.args.get('start', ['0'])[0]) 478 except ValueError: 479 start = 0 480 # range request takes precedence over our start parsing 481 if request.getHeader('range') is None and start: 482 self.debug('Start %d passed, seeking', start) 483 provider.seek(start) 484 length = last - start + 1 + len(self.header) 485 ret = self.header 486 487 request.setHeader("Content-Length", str(length)) 488 489 return ret
490 491
492 -class MP4File(File):
493 """ 494 I am a File resource for MP4 files. 495 If I have a library for manipulating MP4 files available, I can handle 496 requests with a 'start' GET parameter, Without the library, I ignore this 497 parameter. 498 The 'start' parameter represents the time offset from where to start, in 499 seconds. If it is non-zero, I will seek inside the file to the sample with 500 that time, and prepend the content with rebuilt MP4 tables, to make the 501 output playable. 502 """ 503
504 - def do_prepareBody(self, request, provider, first, last):
505 self.log('do_prepareBody for MP4') 506 length = last - first + 1 507 ret = '' 508 509 # if there is a non-zero start get parameter, split the file, prefix 510 # the body with the regenerated header and seek inside the provider 511 try: 512 start = float(request.args.get('start', ['0'])[0]) 513 except ValueError: 514 start = 0 515 # range request takes precedence over our start parsing 516 if request.getHeader('range') is None and start and HAS_MP4SEEK: 517 self.debug('Start %f passed, seeking', start) 518 provider.seek(0) 519 d = self._split_file(provider, start) 520 521 def seekAndSetContentLength(header_and_offset): 522 header, offset = header_and_offset 523 # the header is a file-like object with the file pointer at the 524 # end, the offset is a number 525 length = last - offset + 1 + header.tell() 526 provider.seek(offset) 527 request.setHeader("Content-Length", str(length)) 528 header.seek(0) 529 return header.read()
530 531 def seekingFailed(failure): 532 # swallow the failure and serve the file from the beginning 533 self.warning("Seeking in MP4 file %s failed: %s", provider, 534 log.getFailureMessage(failure)) 535 provider.seek(0) 536 request.setHeader('Content-Length', str(length)) 537 return ret
538 539 d.addCallback(seekAndSetContentLength) 540 d.addErrback(seekingFailed) 541 return d 542 else: 543 request.setHeader('Content-Length', str(length)) 544 return defer.succeed(ret) 545
546 - def _split_file(self, provider, start):
547 d = defer.Deferred() 548 549 def read_some_data(how_much, from_where): 550 if how_much: 551 provider.seek(from_where) 552 read_d = provider.read(how_much) 553 read_d.addCallback(splitter.feed) 554 read_d.addErrback(d.errback) 555 else: 556 d.callback(splitter.result())
557 558 splitter = mp4seek.async.Splitter(start) 559 splitter.start(read_some_data) 560 561 return d 562 563
564 -class FileTransfer(log.Loggable):
565 """ 566 A class to represent the transfer of a file over the network. 567 """ 568 569 logCategory = LOG_CATEGORY 570 571 consumer = None 572
573 - def __init__(self, provider, size, consumer):
574 """ 575 @param provider: an asynchronous file provider 576 @type provider: L{fileprovider.File} 577 @param size: file position to which file should be read 578 @type size: int 579 @param consumer: consumer to receive the data 580 @type consumer: L{twisted.internet.interfaces.IFinishableConsumer} 581 """ 582 self.provider = provider 583 self.size = size 584 self.consumer = consumer 585 self.written = self.provider.tell() 586 self.bytesWritten = 0 587 self._pending = None 588 self._again = False # True if resume was called while waiting for data 589 self._finished = False # Set when we finish a transfer 590 self.debug("Calling registerProducer on %r", consumer) 591 consumer.registerProducer(self, 0)
592
593 - def resumeProducing(self):
594 if not self.consumer: 595 return 596 self._produce()
597
598 - def pauseProducing(self):
599 pass
600
601 - def stopProducing(self):
602 self.debug('Stop producing from %s at %d/%d bytes', 603 self.provider, self.provider.tell(), self.size) 604 # even though it's the consumer stopping us, from looking at 605 # twisted code it looks like we still are required to 606 # unregister and notify the request that we're done... 607 self._terminate()
608
609 - def _produce(self):
610 if self._pending: 611 # We already are waiting for data, just remember more is needed 612 self._again = True 613 return 614 self._again = False 615 d = self.provider.read(min(abstract.FileDescriptor.bufferSize, 616 self.size - self.written)) 617 self._pending = d 618 d.addCallbacks(self._cbGotData, self._ebReadFailed)
619
620 - def _cbGotData(self, data):
621 self._pending = None 622 623 # We might have got a stopProducing before the _cbGotData callback has 624 # been fired, so we might be in the _finished state. If so, just 625 # return. 626 if self._finished: 627 return 628 629 if data: 630 # WARNING! This call goes back to the reactor! Read the comment in 631 # _writeToConsumer! 632 self._writeToConsumer(data) 633 634 # We again might be in _finished state, because we might just 635 # got out of the reactor after writing some data to the consumer. 636 # 637 # The story goes thusly: 638 # 1) you write the last data chunk 639 # 2) before you get out of _writeToConsumer(), the _cbGotData gets 640 # fired again 641 # 3) because it's the last write (we've written the entire file) 642 # _terminate() gets called 643 # 4) consumer and provider are set to None 644 # 5) you return from the _writeToConsumer call 645 # 646 # If this happened, just exit (again) 647 if self._finished: 648 return 649 650 if self.provider.tell() == self.size: 651 self.debug('Written entire file of %d bytes from %s', 652 self.size, self.provider) 653 self._terminate() 654 elif self._again: 655 # Continue producing 656 self._produce()
657
658 - def _ebReadFailed(self, failure):
659 self._pending = None 660 661 if self._finished: 662 return 663 664 self.warning('Failure during file %s reading: %s', 665 self.provider, log.getFailureMessage(failure)) 666 self._terminate()
667
668 - def _writeToConsumer(self, data):
669 self.written += len(data) 670 self.bytesWritten += len(data) 671 # this .write will spin the reactor, calling .doWrite and then 672 # .resumeProducing again, so be prepared for a re-entrant call 673 self.consumer.write(data)
674
675 - def _terminate(self):
676 if self.size != self.bytesWritten: 677 self.warning("Terminated before writing the full %s bytes, " 678 "only %s byte written", self.size, self.bytesWritten) 679 try: 680 self.provider.close() 681 finally: 682 self.provider = None 683 self.consumer.unregisterProducer() 684 self.consumer.finish() 685 self.consumer = None 686 self._finished = True
687