Trees | Indices | Help |
---|
|
1 # -*- Mode: Python; test-case-name: -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 6 # Copyright (C) 2010,2011 Flumotion Services, S.A. 7 # All rights reserved. 8 # 9 # This file may be distributed and/or modified under the terms of 10 # the GNU Lesser General Public License version 2.1 as published by 11 # the Free Software Foundation. 12 # This file is distributed without any warranty; without even the implied 13 # warranty of merchantability or fitness for a particular purpose. 14 # See "LICENSE.LGPL" in the source distribution for more information. 15 # 16 # Headers in this file shall remain intact. 17 18 __version__ = "$Rev$" 19 20 import time 21 22 from flumotion.common import log 23 24 from twisted.internet import reactor 25 26 from flumotion.component.plugs import base as plugbase 27 2830 31 # Create a producer-consumer proxy that sits between a FileTransfer object 32 # and a request object. 33 # You may return a Deferred here. 3437 384053 5442 props = args['properties'] 43 self._rateBytesPerSec = int(props.get('rate', 128000) / 8) 44 # Peak level is 10 seconds of data; this is chosen 45 # entirely arbitrarily. 46 self._maxLevel = int(props.get('max-level', 47 self._rateBytesPerSec * 8 * 10) / 8) 48 self._initialLevel = int(props.get('initial-level', 0) / 8)4951 return TokenBucketConsumer(consumer, self._maxLevel, 52 self._rateBytesPerSec, self._initialLevel)56 """ 57 Use a token bucket to proxy between a producer (e.g. FileTransfer) and a 58 consumer (TCP protocol, etc.), doing rate control. 59 60 The bucket has a rate and a maximum level, so a small burst can be 61 permitted. The initial level can be set to a non-zero value, this is 62 useful to implement burst-on-connect behaviour. 63 64 TODO: This almost certainly only works with producers that work like 65 FileTransfer - i.e. they produce data directly in resumeProducing, and 66 ignore pauseProducing. This is sufficient for our needs right now. 67 """ 68 69 logCategory = 'token-bucket' 70 71 # NOTE: Performance is strongly correlated with this value. 72 # Low values (e.g. 0.2) give a 'smooth' transfer, but very high cpu usage 73 # if you have several hundred clients. 74 # Higher values (e.g. 1.0 or more) give bursty transfer, but nicely lower 75 # cpu usage. 76 _dripInterval = 1.0 # If we need to wait for more bits in our bucket, wait 77 # at least this long, to avoid overly frequent small 78 # writes 7927981 self.maxLevel = maxLevel # in bytes 82 self.fillRate = fillRate # in bytes per second 83 self.fillLevel = fillLevel # in bytes 84 85 self._buffers = [] # List of (offset, buffer) tuples 86 self._buffersSize = 0 87 88 self._finishing = False # If true, we'll stop once the current buffer 89 # has been sent. 90 91 self._unregister = False # If true, we'll unregister from the consumer 92 # once the data has been sent. 93 94 self._lastDrip = time.time() 95 self._dripDC = None 96 self._paused = True 97 98 self.producer = None # we get this in registerProducer. 99 self.consumer = consumer 100 101 # We are implemented as a push producer. We forcibly push some 102 # data every couple of seconds to maintain the requested 103 # rate. If the consumer cannot keep up with that rate we want 104 # to get a pauseProducing() call, so we will stop 105 # writing. Otherwise the data would have been buffered on the 106 # server side, leading to excessive memory consumption. 107 self.consumer.registerProducer(self, 1) 108 109 self.info("Created TokenBucketConsumer with rate %d, " 110 "initial level %d, maximum level %d", 111 fillRate, fillLevel, maxLevel)112114 """ 115 Re-fill our token bucket based on how long it has been since we last 116 refilled it. 117 Then attempt to write some data. 118 """ 119 self._dripDC = None 120 121 now = time.time() 122 elapsed = now - self._lastDrip 123 self._lastDrip = now 124 125 newBytes = self.fillRate * elapsed 126 # Note that this does introduce rounding errors - not particularly 127 # important if the drip interval is reasonably high, though. These will 128 # cause the actual rate to be lower than the nominal rate. 129 self.fillLevel = int(min(self.fillLevel + newBytes, self.maxLevel)) 130 131 self._tryWrite()132134 if not self.consumer: 135 return 136 137 while self.fillLevel > 0 and self._buffersSize > 0: 138 # If we're permitted to write at the moment, do so. 139 offset, buf = self._buffers[0] 140 sendbuf = buf[offset:offset+self.fillLevel] 141 sendBytes = len(sendbuf) 142 143 if sendBytes + offset == len(buf): 144 self._buffers.pop(0) 145 else: 146 self._buffers[0] = (offset + sendBytes, buf) 147 self._buffersSize -= sendBytes 148 149 self.consumer.write(sendbuf) 150 self.fillLevel -= sendBytes 151 152 if self._buffersSize > 0: 153 # If we have data (and we're not already waiting for our next drip 154 # interval), wait... this is what actually performs the data 155 # throttling. 156 if not (self._dripDC or self._paused): 157 self._dripDC = reactor.callLater(self._dripInterval, 158 self._dripAndTryWrite) 159 else: 160 # No buffer remaining; ask for more data or finish 161 if self._finishing: 162 if self._unregister: 163 self._doUnregister() 164 self._doFinish() 165 elif self.producer: 166 self.producer.resumeProducing() 167 elif self._unregister: 168 self._doUnregister()169 173 178180 self.debug('stopProducing; buffered data: %d', self._buffersSize) 181 if self.producer is not None: 182 self.producer.stopProducing() 183 184 if self._dripDC: 185 # don't produce after stopProducing()! 186 self._dripDC.cancel() 187 self._dripDC = None 188 189 # ...and then, we still may have pending things to do 190 if self._unregister: 191 self._doUnregister() 192 193 if self._finishing: 194 self._finishing = False 195 self.consumer.finish() 196 197 if self._buffersSize > 0: 198 # make sure we release all the buffers, just in case 199 self._buffers = [] 200 self._buffersSize = 0 201 202 self.consumer = None203205 self._paused = True 206 207 # In case our producer is also 'push', we want it to stop. 208 # FIXME: Pull producers don't even need to implement that 209 # method, so we probably should remember what kind of producer 210 # are we dealing with and not call pauseProducing when it's 211 # 'pull'. 212 # However, all our producers (e.g. FileProducer) just 213 # ignore pauseProducing, so for now it works. 214 # 215 # FIXME: convert the following scenario into a unit test and remove it 216 # from here. It's rather lengthy for a comment. 217 # 218 # The producer might be None at this point if the following happened: 219 # 1) we resumeProducing() 220 # 2) we find out we're not permitted to write more, so we set up the 221 # callLater to write after self._dripInterval 222 # 3) the producer goes avay, unregisterProducer() gets called 223 # 4) the callLater fires and we _dripAndTryWrite() 224 # 5) we try to push some data to the consumer 225 # 6) but the consumer is not reading fast enough, Twisted calls 226 # pauseProducing() on us 227 # 7) at this point if self.producer is None we simply don't proxy the 228 # pauseProducing() call to him 229 if self.producer: 230 self.producer.pauseProducing() 231 232 # We have to stop dripping, otherwise we will keep on filling 233 # the buffers and eventually run out of memory. 234 if self._dripDC: 235 self._dripDC.cancel() 236 self._dripDC = None237239 self._paused = False 240 self._tryWrite() 241 242 if not self._buffers and self.producer: 243 self.producer.resumeProducing()244246 self._buffers.append((0, data)) 247 self._buffersSize += len(data) 248 249 self._tryWrite() 250 251 if self._buffers and not self.fillLevel and self.producer: 252 # FIXME: That's not completely correct. See the comment in 253 # self.pauseProducing() about not calling pauseProducing 254 # on 'pull' producers. 255 self.producer.pauseProducing()256 262264 self.debug("Producer registered: %r", producer) 265 self.producer = producer 266 267 self.resumeProducing()268270 self.debug('unregisterProducer; buffered data: %d', self._buffersSize) 271 if self.producer is not None: 272 self.producer = None 273 274 if not self._dripDC: 275 self._doUnregister() 276 else: 277 # we need to wait until we've written the data 278 self._unregister = True
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Tue Aug 13 06:17:15 2013 | http://epydoc.sourceforge.net |