Package flumotion :: Package component :: Package misc :: Package httpserver :: Package httpcached :: Module http_client
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpcached.http_client

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import datetime 
 19  import cgi 
 20   
 21  from twisted.internet import defer, protocol, reactor 
 22  from twisted.python.util import InsensitiveDict 
 23  from twisted.web import http 
 24   
 25  from flumotion.common import log 
 26  from flumotion.common import errors 
 27  from flumotion.component.misc.httpserver.httpcached import common 
 28  from flumotion.component.misc.httpserver.httpcached import http_utils 
 29   
 30   
 31  LOG_CATEGORY = "stream-provider" 
 32   
 33  USER_AGENT = "FlumotionClient/0.1" 
 34   
 35   
36 -def ts2str(ts):
37 if ts: 38 return datetime.datetime.fromtimestamp(ts).isoformat() 39 return "???"
40 41
42 -class StreamInfo(object):
43 """ 44 Provides information about a stream in a standard way. 45 The information is retrieved by parsing HTTP headers. 46 """ 47
48 - def __init__(self, headers):
49 self.expires = None 50 self.mtime = None 51 self.length = 0 52 self.start = 0 53 self.size = 0 54 self.mimeType = None 55 56 headers = InsensitiveDict(headers) 57 58 encoding = headers.get("Transfer-Encoding", None) 59 if encoding == 'chunked': 60 raise errors.FlumotionError("Chunked transfer not supported") 61 62 expires = headers.get("Expires", None) 63 if expires is not None: 64 try: 65 self.expires = http.stringToDatetime(expires) 66 except: 67 self.expires = 0 68 69 lastmod = headers.get("Last-Modified", None) 70 if lastmod is not None: 71 self.mtime = http.stringToDatetime(lastmod) 72 73 range = headers.get("Content-Range", None) 74 length = headers.get("Content-Length", None) 75 if range is not None: 76 start, end, total = http.parseContentRange(range) 77 self.start = start 78 self.length = total 79 if length is not None: 80 self.size = int(length) 81 else: 82 self.size = end - start 83 elif length is not None: 84 self.length = int(length) 85 self.size = int(length) 86 else: 87 raise errors.FlumotionError("Can't get length/size from headers", 88 headers) 89 90 ctype = headers.get("Content-Type", None) 91 if ctype is not None: 92 self.mimeType, _pdict = cgi.parse_header(ctype)
93 94
95 -class StreamRequester(log.Loggable):
96 """ 97 Allows retrieval of data streams using HTTP 1.0. 98 """ 99 100 logCategory = LOG_CATEGORY 101
102 - def __init__(self, connTimeout=0, idleTimeout=0):
103 self.connTimeout = connTimeout 104 self.idleTimeout = idleTimeout
105
106 - def retrieve(self, consumer, url, proxyAddress=None, proxyPort=None, 107 ifModifiedSince=None, ifUnmodifiedSince=None, 108 start=None, size=None):
109 self.log("Requesting %s%s%s%s%s%s", 110 size and (" %d bytes" % size) or "", 111 start and (" starting at %d" % start) or "", 112 (size or start) and " from " or "", 113 url.toString(), 114 ifModifiedSince and (" if modified since %s" 115 % ts2str(ifModifiedSince)) or "", 116 ifUnmodifiedSince and (" if not modified since %s" 117 % ts2str(ifUnmodifiedSince)) or "") 118 119 getter = StreamGetter(consumer, url, 120 ifModifiedSince, ifUnmodifiedSince, 121 start, size, self.idleTimeout) 122 getter.connect(proxyAddress, proxyPort, self.connTimeout) 123 return getter
124 125
126 -class StreamGetter(protocol.ClientFactory, http.HTTPClient, log.Loggable):
127 """ 128 Retrieves a stream using HTTP 1.0. 129 130 This class is at the same time a Factory and a Protocol, 131 this can be done because it's a client and in twisted 132 client factories only create on protocol. 133 134 The outcome, the stream info and stream data is forwarded 135 to a common.StreamConsumer instance given at creating time. 136 137 It supports range requests and some conditional request types 138 (ifModified and ifUnmodified). 139 """ 140 141 logCategory = LOG_CATEGORY 142 143 HTTP_METHOD = 'GET' 144 145 host = None 146 port = None 147
148 - def __init__(self, consumer, url, 149 ifModifiedSince=None, ifUnmodifiedSince=None, 150 start=None, size=None, timeout=0):
151 self.consumer = consumer 152 self.url = url 153 154 self.ifModifiedSince = ifModifiedSince 155 self.ifUnmodifiedSince = ifUnmodifiedSince 156 157 self.start = start 158 self.size = size 159 self.timeout = timeout 160 161 self.headers = {} 162 self.peer = None 163 self.status = None 164 self.info = None 165 166 self._connected = False 167 self._canceled = False 168 self._remaining = None 169 self._idlecheck = None 170 171 self.logName = common.log_id(self) # To be able to track the instance
172
173 - def __repr__(self):
174 return "<%s: %s>" % (type(self).__name__, self.url)
175 176 ### Public Methods ### 177
178 - def connect(self, proxyAddress=None, proxyPort=None, timeout=0):
179 assert not self._connected, "Already connected" 180 self._connected = True 181 url = self.url 182 self.host = proxyAddress or url.hostname 183 self.port = proxyPort or url.port 184 if url.scheme != 'http': 185 msg = "URL scheme %s not implemented" % url.scheme 186 self._serverError(common.NOT_IMPLEMENTED, msg) 187 else: 188 self.log("Connecting to %s:%s for %s", 189 self.host, self.port, self.url) 190 reactor.connectTCP(self.host, self.port, self, timeout)
191
192 - def pause(self):
193 if not self.paused and self.transport is not None: 194 self.pauseProducing() 195 self.log("Request paused for %s", self.url)
196
197 - def resume(self):
198 if self.paused and self.transport is not None: 199 self.resumeProducing() 200 self.log("Request resumed for %s", self.url)
201
202 - def cancel(self):
203 if self._connected and self.transport is not None: 204 self.transport.loseConnection() 205 self._cancelIdleCheck() 206 self.log("Request canceled for %s", self.url) 207 self._canceled = True
208 209 ### Overridden Methods ### 210
211 - def buildProtocol(self, addr):
212 assert self.peer is None, "Protocol already built" 213 self.peer = addr 214 return self
215
216 - def clientConnectionFailed(self, connector, reason):
217 self._serverError(common.SERVER_UNAVAILABLE, reason.getErrorMessage())
218
219 - def connectionMade(self):
220 self.log("Connection made for %s", self.url) 221 self.sendCommand(self.HTTP_METHOD, self.url.location) 222 self.sendHeader('Host', self.url.host) 223 self.sendHeader('User-Agent', USER_AGENT) 224 self.sendHeader('Connection', "close") # Pipeline not yet supported 225 226 if self.ifModifiedSince: 227 datestr = http.datetimeToString(self.ifModifiedSince) 228 self.sendHeader('If-Modified-Since', datestr) 229 230 if self.ifUnmodifiedSince: 231 datestr = http.datetimeToString(self.ifUnmodifiedSince) 232 self.sendHeader('If-Unmodified-Since', datestr) 233 234 if self.start or self.size: 235 start = self.start or 0 236 end = (self.size and (start + self.size - 1)) or None 237 rangeSpecs = "bytes=%s-%s" % (start, end or "") 238 self.sendHeader('Range', rangeSpecs) 239 240 self.endHeaders() 241 242 self._resetIdleCheck()
243
244 - def connectionLost(self, reason):
245 self.log("Connection lost for %s", self.url) 246 self.handleResponseEnd() 247 if not self._canceled: 248 self._serverError(common.SERVER_DISCONNECTED, 249 reason.getErrorMessage())
250
251 - def handleStatus(self, version, status_str, message):
252 self._keepActive() 253 status = int(status_str) 254 self.status = status 255 256 if status in (http.OK, http.NO_CONTENT, http.PARTIAL_CONTENT): 257 return 258 259 if status == http.REQUESTED_RANGE_NOT_SATISFIABLE: 260 self._serverError(common.RANGE_NOT_SATISFIABLE, 261 "HTTP range not satisfiable") 262 if status == http.NOT_MODIFIED: 263 self._conditionFail(common.STREAM_NOT_MODIFIED, 264 "Stream not modified") 265 elif status == http.PRECONDITION_FAILED: 266 self._conditionFail(common.STREAM_MODIFIED, "Stream Modified") 267 elif status == http.NOT_FOUND: 268 self._streamNotAvailable(common.STREAM_NOTFOUND, 269 "Resource Not Found") 270 elif status == http.FORBIDDEN: 271 self._streamNotAvailable(common.STREAM_FORBIDDEN, 272 "Resource Forbidden") 273 if status in (http.MOVED_PERMANENTLY, http.FOUND): 274 self._serverError(common.NOT_IMPLEMENTED, 275 "HTTP redirection not supported") 276 else: 277 self._serverError(common.NOT_IMPLEMENTED, 278 "Unsupported HTTP response: %s (%s)" 279 % (message, status))
280
281 - def handleHeader(self, key, val):
282 self._keepActive() 283 self.headers[key] = val
284
285 - def handleEndHeaders(self):
286 self._keepActive() 287 self.info = StreamInfo(self.headers) 288 if self.size and self.size < self.info.size: 289 self.warning("Response size bigger than the requested size, " 290 "expecting %s bytes and response length is %s", 291 self.size, self.info.size) 292 self._remaining = self.info.size 293 self._onInfo(self.info)
294
295 - def handleResponsePart(self, data):
296 self._keepActive() 297 size = len(data) 298 if self._remaining > 0 and self._remaining < size: 299 self.warning("More than %s bytes have been received", 300 self.info.size) 301 self._remaining -= size 302 self._onData(data)
303
304 - def handleResponseEnd(self):
305 if self.info is not None: 306 if self._remaining == 0: 307 self.log("Request done, got %d bytes starting at %d from %s, " 308 "last modified on %s", self.info.size, 309 self.info.start, self.url.toString(), 310 ts2str(self.info.mtime)) 311 self._streamDone() 312 return 313 if self.info: 314 self.log("Incomplete request, missing %d bytes from the expected " 315 "%d bytes starting at %d from %s", self._remaining, 316 self.info.size, self.info.start, self.url.toString()) 317 else: 318 self.log("Incomplete request %s", self.url.toString())
319
320 - def sendCommand(self, command, path):
321 # We want HTTP/1.1 for conditional GET and range requests 322 self.transport.write('%s %s HTTP/1.1\r\n' % (command, path))
323 324 ### Private Methods ### 325
326 - def _keepActive(self):
327 self._updateCount += 1
328
329 - def _resetIdleCheck(self):
330 self._cancelIdleCheck() 331 self._idlecheck = reactor.callLater(self.timeout, self._onIdleCheck)
332
333 - def _cancelIdleCheck(self):
334 if self._idlecheck: 335 self._idlecheck.cancel() 336 self._idlecheck = None 337 self._updateCount = 0
338
339 - def _onIdleCheck(self):
340 self._idlecheck = None 341 if not self._updateCount: 342 self._onTimeout() 343 else: 344 self._resetIdleCheck()
345
346 - def _onTimeout(self):
347 self._idlecheck = None 348 self._serverError(common.SERVER_TIMEOUT, "Server timeout")
349
350 - def _cancel(self):
351 self._cancelIdleCheck() 352 if self.consumer: 353 if self.transport: 354 self.transport.loseConnection() 355 self.consumer = None
356
357 - def _serverError(self, code, message):
358 if self.consumer: 359 self.consumer.serverError(self, code, message) 360 self._cancel()
361
362 - def _conditionFail(self, code, message):
363 if self.consumer: 364 self.consumer.conditionFail(self, code, message) 365 self._cancel()
366
367 - def _streamNotAvailable(self, code, message):
368 if self.consumer: 369 self.consumer.streamNotAvailable(self, code, message) 370 self._cancel()
371
372 - def _onInfo(self, info):
373 if self.consumer: 374 self.consumer.onInfo(self, info)
375
376 - def _onData(self, data):
377 if self.consumer: 378 self.consumer.onData(self, data)
379
380 - def _streamDone(self):
381 if self.consumer: 382 self.consumer.streamDone(self) 383 self._cancel()
384 385 386 if __name__ == "__main__": 387 import sys 388
389 - def addarg(d, a):
390 k, v = a.split('=', 1) 391 if v == 'None': 392 d[k] = None 393 try: 394 d[k] = int(v) 395 except: 396 d[k] = v
397 398 399 kwargs = {} 400 for a in sys.argv[1:]: 401 addarg(kwargs, a) 402 403 url = kwargs.pop('url') 404
405 - class DummyConsumer(object):
406
407 - def serverError(self, getter, code, message):
408 print "Failure: %s (%d)" % (message, code) 409 reactor.stop()
410
411 - def conditionFail(self, getter, code, message):
412 print "Condition: %s (%d)" % (message, code) 413 reactor.stop()
414
415 - def streamNotAvailable(self, getter, code, message):
416 print message 417 reactor.stop()
418
419 - def streamDone(self, getter):
420 print "Finished" 421 reactor.stop()
422
423 - def onInfo(self, getter, info):
424 exp = info.expires and http.datetimeToString(info.expires) 425 mod = info.mtime and http.datetimeToString(info.mtime) 426 print "Found, Exp:", exp, "Mod:", mod 427 print "Len:", info.length, "Start:", \ 428 info.start, "Size:", info.size
429
430 - def onData(self, getter, data):
431 #print "Data (%d)" % len(data) 432 pass
433 434 435 consumer = DummyConsumer() 436 requester = StreamRequester(5000, 5000) 437 requester.retrieve(consumer, http_utils.Url.fromString(url), **kwargs) 438 reactor.run() 439