Package flumotion :: Package component :: Package misc :: Package httpserver :: Package httpcached :: Module request_manager
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpcached.request_manager

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18   
 19  from flumotion.common import log 
 20  from flumotion.component.misc.httpserver.httpcached import common 
 21  from flumotion.component.misc.httpserver.httpcached import http_utils 
 22  from flumotion.component.misc.httpserver.httpcached import server_selection 
 23   
 24   
 25  LOG_CATEGORY = "request-manager" 
26 27 28 -class RequestManager(log.Loggable):
29 30 logCategory = LOG_CATEGORY 31
32 - def __init__(self, selector, client):
33 """ 34 Selector: a ServerSelector 35 Client: HttpClient (StreamRequester) 36 """ 37 self.selector = selector 38 self.client = client
39
40 - def retrieve(self, consumer, url, 41 ifModifiedSince=None, ifUnmodifiedSince=None, 42 start=None, size=None):
43 """ 44 Consumer: a StreamConsumer 45 Url: 46 Start: Position from which to start the download 47 Size: Number of bytes to download 48 IfModifiedSince: 49 IfUnmodifiedSince: 50 """ 51 servers = self.selector.getServers() 52 consumer_manager = ConsumerManager(consumer, url, start, size, 53 ifModifiedSince, ifUnmodifiedSince, 54 servers, self.client) 55 return consumer_manager.retrieve()
56
57 - def setup(self):
58 return self.selector.setup()
59
60 - def cleanup(self):
61 return self.selector.cleanup()
62
63 64 -class ConsumerManager(common.StreamConsumer, log.Loggable):
65 66 logCategory = LOG_CATEGORY 67
68 - def __init__(self, consumer, url, start, size, ifModifiedSince, 69 ifUnmodifiedSince, servers, client):
70 self.consumer = consumer 71 self.url = url 72 self.start = start 73 self.size = size 74 self.ifModifiedSince = ifModifiedSince 75 self.ifUnmodifiedSince = ifUnmodifiedSince 76 self.servers = servers 77 self.client = client 78 self.current_server = None 79 self.current_request = None 80 self.last_error = None 81 self.last_message = None 82 83 self.logName = common.log_id(self) # To be able to track the instance
84 85 @property
86 - def host(self):
87 if self.current_request: 88 return self.current_request.host 89 return None
90 91 @property
92 - def port(self):
93 if self.current_request: 94 return self.current_request.port 95 return None
96
97 - def retrieve(self):
98 try: 99 s = self.servers.next() 100 self.current_server = s 101 if self.size is None or self.start is None: 102 self.debug("Retrieving %s from %s:%s", self.url, 103 self.current_server.ip, self.current_server.port) 104 else: 105 self.debug("Retrieving range %s-%s (%s B) of %s from %s:%s", 106 self.start, self.start + self.size, self.size, 107 self.url, self.current_server.ip, 108 self.current_server.port) 109 proxy_address = s.ip 110 proxy_port = s.port 111 self.current_request =\ 112 self.client.retrieve(self, self.url, 113 proxyAddress=proxy_address, 114 proxyPort=proxy_port, 115 ifModifiedSince=self.ifModifiedSince, 116 ifUnmodifiedSince=self.ifUnmodifiedSince, 117 start=self.start, size=self.size) 118 self.log("Retrieving data using %s", self.current_request.logName) 119 return self 120 except StopIteration: 121 code = self.last_error or common.SERVER_UNAVAILABLE 122 message = self.last_message or "" 123 self.consumer.serverError(self, code, message) 124 return self
125
126 - def pause(self):
127 self.log("Pausing request %s", self.url) 128 self.current_request.pause()
129
130 - def resume(self):
131 self.log("Resuming request %s", self.url) 132 self.current_request.resume()
133
134 - def cancel(self):
135 self.debug("Canceling request %s", self.url) 136 self.current_request.cancel() 137 self.current_request = None
138
139 - def serverError(self, getter, code, message):
140 self.debug("Server Error %s (%s) for %s using %s:%s", 141 message, code, self.url, getter.host, getter.port) 142 self.last_error = code 143 self.last_message = message 144 if code in (common.SERVER_DISCONNECTED, 145 common.SERVER_TIMEOUT): 146 # The connection was established 147 # and data may have already been received. 148 self.consumer.serverError(self, code, message) 149 return 150 self.current_server.reportError(code) 151 self.retrieve()
152
153 - def conditionFail(self, getter, code, message):
154 if self.current_request is None: 155 return 156 self.log("Condition Error %s (%s) for %s", 157 message, code, self.url) 158 self.consumer.conditionFail(self, code, message)
159
160 - def streamNotAvailable(self, getter, code, message):
161 if self.current_request is None: 162 return 163 self.log("Stream not available \"%s\" for %s", message, self.url) 164 self.consumer.streamNotAvailable(self, code, message)
165
166 - def onInfo(self, getter, info):
167 if self.current_request is None: 168 return 169 self.consumer.onInfo(self, info)
170
171 - def onData(self, getter, data):
172 if self.current_request is None: 173 return 174 self.consumer.onData(self, data)
175
176 - def streamDone(self, getter):
177 if self.current_request is None: 178 return 179 self.consumer.streamDone(self)
180