Package flumotion :: Package component :: Package misc :: Package httpserver :: Package httpcached :: Module http_client
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpcached.http_client

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import datetime 
 19  import cgi 
 20   
 21  from twisted.internet import defer, protocol, reactor 
 22  from twisted.python.util import InsensitiveDict 
 23  from twisted.web import http 
 24   
 25  from flumotion.common import log 
 26  from flumotion.common import errors 
 27  from flumotion.component.misc.httpserver.httpcached import common 
 28  from flumotion.component.misc.httpserver.httpcached import http_utils 
 29   
 30   
 31  LOG_CATEGORY = "stream-provider" 
 32   
 33  USER_AGENT = "FlumotionClient/0.1" 
 34   
 35   
36 -def ts2str(ts):
37 if ts: 38 return datetime.datetime.fromtimestamp(ts).isoformat() 39 return "???"
40 41
42 -class StreamInfo(object):
43 """ 44 Provides information about a stream in a standard way. 45 The information is retrieved by parsing HTTP headers. 46 """ 47
48 - def __init__(self, headers):
49 self.expires = None 50 self.mtime = None 51 self.length = 0 52 self.start = 0 53 self.size = 0 54 self.mimeType = None 55 56 headers = InsensitiveDict(headers) 57 58 encoding = headers.get("Transfer-Encoding", None) 59 if encoding == 'chunked': 60 raise errors.FlumotionError("Chunked transfer not supported") 61 62 expires = headers.get("Expires", None) 63 if expires is not None: 64 try: 65 self.expires = http.stringToDatetime(expires) 66 except: 67 self.expires = 0 68 69 lastmod = headers.get("Last-Modified", None) 70 if lastmod is not None: 71 self.mtime = http.stringToDatetime(lastmod) 72 73 range = headers.get("Content-Range", None) 74 length = headers.get("Content-Length", None) 75 if range is not None: 76 start, end, total = http.parseContentRange(range) 77 self.start = start 78 self.length = total 79 if length is not None: 80 self.size = int(length) 81 else: 82 self.size = end - start 83 elif length is not None: 84 self.length = int(length) 85 self.size = int(length) 86 else: 87 raise errors.FlumotionError("Can't get length/size from headers", 88 headers) 89 90 ctype = headers.get("Content-Type", None) 91 if ctype is not None: 92 self.mimeType, _pdict = cgi.parse_header(ctype)
93 94
95 -class StreamRequester(log.Loggable):
96 """ 97 Allows retrieval of data streams using HTTP 1.0. 98 """ 99 100 logCategory = LOG_CATEGORY 101
102 - def __init__(self, connTimeout=0, idleTimeout=0):
103 self.connTimeout = connTimeout 104 self.idleTimeout = idleTimeout
105
106 - def retrieve(self, consumer, url, proxyAddress=None, proxyPort=None, 107 ifModifiedSince=None, ifUnmodifiedSince=None, 108 start=None, size=None):
109 self.log("Requesting %s%s%s%s%s%s", 110 size and (" %d bytes" % size) or "", 111 start and (" starting at %d" % start) or "", 112 (size or start) and " from " or "", 113 url.toString(), 114 ifModifiedSince and (" if modified since %s" 115 % ts2str(ifModifiedSince)) or "", 116 ifUnmodifiedSince and (" if not modified since %s" 117 % ts2str(ifUnmodifiedSince)) or "") 118 119 getter = StreamGetter(consumer, url, 120 ifModifiedSince, ifUnmodifiedSince, 121 start, size, self.idleTimeout) 122 getter.connect(proxyAddress, proxyPort, self.connTimeout) 123 return getter
124 125
126 -class StreamGetter(protocol.ClientFactory, http.HTTPClient, log.Loggable):
127 """ 128 Retrieves a stream using HTTP 1.0. 129 130 This class is at the same time a Factory and a Protocol, 131 this can be done because it's a client and in twisted 132 client factories only create on protocol. 133 134 The outcome, the stream info and stream data is forwarded 135 to a common.StreamConsumer instance given at creating time. 136 137 It supports range requests and some conditional request types 138 (ifModified and ifUnmodified). 139 """ 140 141 logCategory = LOG_CATEGORY 142 143 HTTP_METHOD = 'GET' 144 145 host = None 146 port = None 147
148 - def __init__(self, consumer, url, 149 ifModifiedSince=None, ifUnmodifiedSince=None, 150 start=None, size=None, timeout=0):
151 self.consumer = consumer 152 self.url = url 153 154 self.ifModifiedSince = ifModifiedSince 155 self.ifUnmodifiedSince = ifUnmodifiedSince 156 157 self.start = start 158 self.size = size 159 self.timeout = timeout 160 161 self.headers = {} 162 self.peer = None 163 self.status = None 164 self.info = None 165 166 self._connected = False 167 self._canceled = False 168 self._remaining = None 169 self._idlecheck = None 170 171 self.logName = common.log_id(self) # To be able to track the instance
172
173 - def __repr__(self):
174 return "<%s: %s>" % (type(self).__name__, self.url)
175 176 ### Public Methods ### 177
178 - def connect(self, proxyAddress=None, proxyPort=None, timeout=0):
179 assert not self._connected, "Already connected" 180 self._connected = True 181 url = self.url 182 self.host = proxyAddress or url.hostname 183 self.port = proxyPort or url.port 184 if url.scheme != 'http': 185 msg = "URL scheme %s not implemented" % url.scheme 186 self._serverError(common.NOT_IMPLEMENTED, msg) 187 else: 188 self.log("Connecting to %s:%s for %s", 189 self.host, self.port, self.url) 190 reactor.connectTCP(self.host, self.port, self, timeout)
191
192 - def pause(self):
193 if not self.paused and self.transport is not None: 194 self.pauseProducing() 195 self.log("Request paused for %s", self.url)
196
197 - def resume(self):
198 if self.paused and self.transport is not None: 199 self.resumeProducing() 200 self.log("Request resumed for %s", self.url)
201
202 - def cancel(self):
203 if self._connected and self.transport is not None: 204 self.transport.loseConnection() 205 self._cancelIdleCheck() 206 self.log("Request canceled for %s", self.url) 207 self._canceled = True
208 209 ### Overridden Methods ### 210
211 - def buildProtocol(self, addr):
212 assert self.peer is None, "Protocol already built" 213 self.peer = addr 214 return self
215
216 - def clientConnectionFailed(self, connector, reason):
217 self._serverError(common.SERVER_UNAVAILABLE, reason.getErrorMessage())
218
219 - def connectionMade(self):
220 self.log("Connection made for %s", self.url) 221 self.sendCommand(self.HTTP_METHOD, self.url.location) 222 self.sendHeader('Host', self.url.host) 223 self.sendHeader('User-Agent', USER_AGENT) 224 self.sendHeader('Connection', "close") # Pipeline not yet supported 225 226 if self.ifModifiedSince: 227 datestr = http.datetimeToString(self.ifModifiedSince) 228 self.sendHeader('If-Modified-Since', datestr) 229 230 if self.ifUnmodifiedSince: 231 datestr = http.datetimeToString(self.ifUnmodifiedSince) 232 self.sendHeader('If-Unmodified-Since', datestr) 233 234 if self.start or self.size: 235 start = self.start or 0 236 end = (self.size and (start + self.size - 1)) or None 237 rangeSpecs = "bytes=%s-%s" % (start, end or "") 238 self.sendHeader('Range', rangeSpecs) 239 240 self.endHeaders() 241 242 self._resetIdleCheck()
243
244 - def connectionLost(self, reason):
245 self.log("Connection lost for %s", self.url) 246 self.handleResponseEnd() 247 if not self._canceled: 248 self._serverError(common.SERVER_DISCONNECTED, 249 reason.getErrorMessage())
250
251 - def handleStatus(self, version, status_str, message):
252 self._keepActive() 253 status = int(status_str) 254 self.status = status 255 256 if status in (http.OK, http.NO_CONTENT, http.PARTIAL_CONTENT): 257 return 258 259 if status == http.REQUESTED_RANGE_NOT_SATISFIABLE: 260 self._serverError(common.RANGE_NOT_SATISFIABLE, 261 "HTTP range not satisfiable") 262 if status == http.NOT_MODIFIED: 263 self._conditionFail(common.STREAM_NOT_MODIFIED, 264 "Stream not modified") 265 elif status == http.PRECONDITION_FAILED: 266 self._conditionFail(common.STREAM_MODIFIED, "Stream Modified") 267 elif status == http.NOT_FOUND: 268 self._streamNotAvailable(common.STREAM_NOTFOUND, 269 "Resource Not Found") 270 elif status == http.FORBIDDEN: 271 self._streamNotAvailable(common.STREAM_FORBIDDEN, 272 "Resource Forbidden") 273 if status in (http.MOVED_PERMANENTLY, http.FOUND): 274 self._serverError(common.NOT_IMPLEMENTED, 275 "HTTP redirection not supported") 276 else: 277 self._serverError(common.NOT_IMPLEMENTED, 278 "Unsupported HTTP response: %s (%s)" 279 % (message, status))
280
281 - def handleHeader(self, key, val):
282 self._keepActive() 283 self.headers[key] = val
284
285 - def handleEndHeaders(self):
286 self._keepActive() 287 self.info = StreamInfo(self.headers) 288 self._remaining = self.info.size 289 290 if self.size and self.size < self.info.size: 291 self.warning("Response size bigger than the requested size, " 292 "expecting %s bytes and response length is %s", 293 self.size, self.info.size) 294 # We asked for a range but the proxy answered with the whole 295 # file. We're only interested on the first self.size bytes. 296 self._remaining = self.size 297 298 self._onInfo(self.info)
299
300 - def handleResponsePart(self, data):
301 self._keepActive() 302 size = len(data) 303 304 if self._remaining > 0 and self._remaining < size: 305 self.warning("More than %s bytes have been received", 306 self.info.size) 307 308 # Keep just the bytes needed to fulfill the original range request, 309 # discard the rest because they will be in the next request after 310 # this one is cancelled. 311 if self._remaining < size: 312 data = data[:self._remaining] 313 self._remaining = 0 314 self._onData(data) 315 self.cancel() 316 else: 317 self._remaining -= size 318 self._onData(data)
319
320 - def handleResponseEnd(self):
321 if self.info is not None: 322 if self._remaining == 0: 323 self.log("Request done, got %d bytes starting at %d from %s, " 324 "last modified on %s", self.info.size, 325 self.info.start, self.url.toString(), 326 ts2str(self.info.mtime)) 327 self._streamDone() 328 return 329 if self.info: 330 self.log("Incomplete request, missing %d bytes from the expected " 331 "%d bytes starting at %d from %s", self._remaining, 332 self.info.size, self.info.start, self.url.toString()) 333 else: 334 self.log("Incomplete request %s", self.url.toString())
335
336 - def sendCommand(self, command, path):
337 # We want HTTP/1.1 for conditional GET and range requests 338 self.transport.write('%s %s HTTP/1.1\r\n' % (command, path))
339 340 ### Private Methods ### 341
342 - def _keepActive(self):
343 self._updateCount += 1
344
345 - def _resetIdleCheck(self):
346 self._cancelIdleCheck() 347 self._idlecheck = reactor.callLater(self.timeout, self._onIdleCheck)
348
349 - def _cancelIdleCheck(self):
350 if self._idlecheck: 351 self._idlecheck.cancel() 352 self._idlecheck = None 353 self._updateCount = 0
354
355 - def _onIdleCheck(self):
356 self._idlecheck = None 357 if not self._updateCount: 358 self._onTimeout() 359 else: 360 self._resetIdleCheck()
361
362 - def _onTimeout(self):
363 self._idlecheck = None 364 self._serverError(common.SERVER_TIMEOUT, "Server timeout")
365
366 - def _cancel(self):
367 self._cancelIdleCheck() 368 if self.consumer: 369 if self.transport: 370 self.transport.loseConnection() 371 self.consumer = None
372
373 - def _serverError(self, code, message):
374 if self.consumer: 375 self.consumer.serverError(self, code, message) 376 self._cancel()
377
378 - def _conditionFail(self, code, message):
379 if self.consumer: 380 self.consumer.conditionFail(self, code, message) 381 self._cancel()
382
383 - def _streamNotAvailable(self, code, message):
384 if self.consumer: 385 self.consumer.streamNotAvailable(self, code, message) 386 self._cancel()
387
388 - def _onInfo(self, info):
389 if self.consumer: 390 self.consumer.onInfo(self, info)
391
392 - def _onData(self, data):
393 if self.consumer: 394 self.consumer.onData(self, data)
395
396 - def _streamDone(self):
397 if self.consumer: 398 self.consumer.streamDone(self) 399 self._cancel()
400 401 402 if __name__ == "__main__": 403 import sys 404
405 - def addarg(d, a):
406 k, v = a.split('=', 1) 407 if v == 'None': 408 d[k] = None 409 try: 410 d[k] = int(v) 411 except: 412 d[k] = v
413 414 415 kwargs = {} 416 for a in sys.argv[1:]: 417 addarg(kwargs, a) 418 419 url = kwargs.pop('url') 420
421 - class DummyConsumer(object):
422
423 - def serverError(self, getter, code, message):
424 print "Failure: %s (%d)" % (message, code) 425 reactor.stop()
426
427 - def conditionFail(self, getter, code, message):
428 print "Condition: %s (%d)" % (message, code) 429 reactor.stop()
430
431 - def streamNotAvailable(self, getter, code, message):
432 print message 433 reactor.stop()
434
435 - def streamDone(self, getter):
436 print "Finished" 437 reactor.stop()
438
439 - def onInfo(self, getter, info):
440 exp = info.expires and http.datetimeToString(info.expires) 441 mod = info.mtime and http.datetimeToString(info.mtime) 442 print "Found, Exp:", exp, "Mod:", mod 443 print "Len:", info.length, "Start:", \ 444 info.start, "Size:", info.size
445
446 - def onData(self, getter, data):
447 #print "Data (%d)" % len(data) 448 pass
449 450 451 consumer = DummyConsumer() 452 requester = StreamRequester(5000, 5000) 453 requester.retrieve(consumer, http_utils.Url.fromString(url), **kwargs) 454 reactor.run() 455