1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import datetime
19 import cgi
20
21 from twisted.internet import defer, protocol, reactor
22 from twisted.python.util import InsensitiveDict
23 from twisted.web import http
24
25 from flumotion.common import log
26 from flumotion.common import errors
27 from flumotion.component.misc.httpserver.httpcached import common
28 from flumotion.component.misc.httpserver.httpcached import http_utils
29
30
31 LOG_CATEGORY = "stream-provider"
32
33 USER_AGENT = "FlumotionClient/0.1"
34
35
37 if ts:
38 return datetime.datetime.fromtimestamp(ts).isoformat()
39 return "???"
40
41
43 """
44 Provides information about a stream in a standard way.
45 The information is retrieved by parsing HTTP headers.
46 """
47
49 self.expires = None
50 self.mtime = None
51 self.length = 0
52 self.start = 0
53 self.size = 0
54 self.mimeType = None
55
56 headers = InsensitiveDict(headers)
57
58 encoding = headers.get("Transfer-Encoding", None)
59 if encoding == 'chunked':
60 raise errors.FlumotionError("Chunked transfer not supported")
61
62 expires = headers.get("Expires", None)
63 if expires is not None:
64 try:
65 self.expires = http.stringToDatetime(expires)
66 except:
67 self.expires = 0
68
69 lastmod = headers.get("Last-Modified", None)
70 if lastmod is not None:
71 self.mtime = http.stringToDatetime(lastmod)
72
73 range = headers.get("Content-Range", None)
74 length = headers.get("Content-Length", None)
75 if range is not None:
76 start, end, total = http.parseContentRange(range)
77 self.start = start
78 self.length = total
79 if length is not None:
80 self.size = int(length)
81 else:
82 self.size = end - start
83 elif length is not None:
84 self.length = int(length)
85 self.size = int(length)
86 else:
87 raise errors.FlumotionError("Can't get length/size from headers",
88 headers)
89
90 ctype = headers.get("Content-Type", None)
91 if ctype is not None:
92 self.mimeType, _pdict = cgi.parse_header(ctype)
93
94
96 """
97 Allows retrieval of data streams using HTTP 1.0.
98 """
99
100 logCategory = LOG_CATEGORY
101
102 - def __init__(self, connTimeout=0, idleTimeout=0):
103 self.connTimeout = connTimeout
104 self.idleTimeout = idleTimeout
105
106 - def retrieve(self, consumer, url, proxyAddress=None, proxyPort=None,
107 ifModifiedSince=None, ifUnmodifiedSince=None,
108 start=None, size=None):
109 self.log("Requesting %s%s%s%s%s%s",
110 size and (" %d bytes" % size) or "",
111 start and (" starting at %d" % start) or "",
112 (size or start) and " from " or "",
113 url.toString(),
114 ifModifiedSince and (" if modified since %s"
115 % ts2str(ifModifiedSince)) or "",
116 ifUnmodifiedSince and (" if not modified since %s"
117 % ts2str(ifUnmodifiedSince)) or "")
118
119 getter = StreamGetter(consumer, url,
120 ifModifiedSince, ifUnmodifiedSince,
121 start, size, self.idleTimeout)
122 getter.connect(proxyAddress, proxyPort, self.connTimeout)
123 return getter
124
125
126 -class StreamGetter(protocol.ClientFactory, http.HTTPClient, log.Loggable):
127 """
128 Retrieves a stream using HTTP 1.0.
129
130 This class is at the same time a Factory and a Protocol,
131 this can be done because it's a client and in twisted
132 client factories only create on protocol.
133
134 The outcome, the stream info and stream data is forwarded
135 to a common.StreamConsumer instance given at creating time.
136
137 It supports range requests and some conditional request types
138 (ifModified and ifUnmodified).
139 """
140
141 logCategory = LOG_CATEGORY
142
143 HTTP_METHOD = 'GET'
144
145 host = None
146 port = None
147
148 - def __init__(self, consumer, url,
149 ifModifiedSince=None, ifUnmodifiedSince=None,
150 start=None, size=None, timeout=0):
151 self.consumer = consumer
152 self.url = url
153
154 self.ifModifiedSince = ifModifiedSince
155 self.ifUnmodifiedSince = ifUnmodifiedSince
156
157 self.start = start
158 self.size = size
159 self.timeout = timeout
160
161 self.headers = {}
162 self.peer = None
163 self.status = None
164 self.info = None
165
166 self._connected = False
167 self._canceled = False
168 self._remaining = None
169 self._idlecheck = None
170
171 self.logName = common.log_id(self)
172
175
176
177
178 - def connect(self, proxyAddress=None, proxyPort=None, timeout=0):
191
193 if not self.paused and self.transport is not None:
194 self.pauseProducing()
195 self.log("Request paused for %s", self.url)
196
198 if self.paused and self.transport is not None:
199 self.resumeProducing()
200 self.log("Request resumed for %s", self.url)
201
203 if self._connected and self.transport is not None:
204 self.transport.loseConnection()
205 self._cancelIdleCheck()
206 self.log("Request canceled for %s", self.url)
207 self._canceled = True
208
209
210
212 assert self.peer is None, "Protocol already built"
213 self.peer = addr
214 return self
215
218
220 self.log("Connection made for %s", self.url)
221 self.sendCommand(self.HTTP_METHOD, self.url.location)
222 self.sendHeader('Host', self.url.host)
223 self.sendHeader('User-Agent', USER_AGENT)
224 self.sendHeader('Connection', "close")
225
226 if self.ifModifiedSince:
227 datestr = http.datetimeToString(self.ifModifiedSince)
228 self.sendHeader('If-Modified-Since', datestr)
229
230 if self.ifUnmodifiedSince:
231 datestr = http.datetimeToString(self.ifUnmodifiedSince)
232 self.sendHeader('If-Unmodified-Since', datestr)
233
234 if self.start or self.size:
235 start = self.start or 0
236 end = (self.size and (start + self.size - 1)) or None
237 rangeSpecs = "bytes=%s-%s" % (start, end or "")
238 self.sendHeader('Range', rangeSpecs)
239
240 self.endHeaders()
241
242 self._resetIdleCheck()
243
250
252 self._keepActive()
253 status = int(status_str)
254 self.status = status
255
256 if status in (http.OK, http.NO_CONTENT, http.PARTIAL_CONTENT):
257 return
258
259 if status == http.REQUESTED_RANGE_NOT_SATISFIABLE:
260 self._serverError(common.RANGE_NOT_SATISFIABLE,
261 "HTTP range not satisfiable")
262 if status == http.NOT_MODIFIED:
263 self._conditionFail(common.STREAM_NOT_MODIFIED,
264 "Stream not modified")
265 elif status == http.PRECONDITION_FAILED:
266 self._conditionFail(common.STREAM_MODIFIED, "Stream Modified")
267 elif status == http.NOT_FOUND:
268 self._streamNotAvailable(common.STREAM_NOTFOUND,
269 "Resource Not Found")
270 elif status == http.FORBIDDEN:
271 self._streamNotAvailable(common.STREAM_FORBIDDEN,
272 "Resource Forbidden")
273 if status in (http.MOVED_PERMANENTLY, http.FOUND):
274 self._serverError(common.NOT_IMPLEMENTED,
275 "HTTP redirection not supported")
276 else:
277 self._serverError(common.NOT_IMPLEMENTED,
278 "Unsupported HTTP response: %s (%s)"
279 % (message, status))
280
284
294
303
305 if self.info is not None:
306 if self._remaining == 0:
307 self.log("Request done, got %d bytes starting at %d from %s, "
308 "last modified on %s", self.info.size,
309 self.info.start, self.url.toString(),
310 ts2str(self.info.mtime))
311 self._streamDone()
312 return
313 if self.info:
314 self.log("Incomplete request, missing %d bytes from the expected "
315 "%d bytes starting at %d from %s", self._remaining,
316 self.info.size, self.info.start, self.url.toString())
317 else:
318 self.log("Incomplete request %s", self.url.toString())
319
323
324
325
327 self._updateCount += 1
328
332
334 if self._idlecheck:
335 self._idlecheck.cancel()
336 self._idlecheck = None
337 self._updateCount = 0
338
345
349
356
361
366
371
375
379
384
385
386 if __name__ == "__main__":
387 import sys
388
390 k, v = a.split('=', 1)
391 if v == 'None':
392 d[k] = None
393 try:
394 d[k] = int(v)
395 except:
396 d[k] = v
397
398
399 kwargs = {}
400 for a in sys.argv[1:]:
401 addarg(kwargs, a)
402
403 url = kwargs.pop('url')
404
406
410
414
418
420 print "Finished"
421 reactor.stop()
422
423 - def onInfo(self, getter, info):
429
430 - def onData(self, getter, data):
433
434
435 consumer = DummyConsumer()
436 requester = StreamRequester(5000, 5000)
437 requester.retrieve(consumer, http_utils.Url.fromString(url), **kwargs)
438 reactor.run()
439