Package flumotion :: Package twisted :: Module defer
[hide private]

Source Code for Module flumotion.twisted.defer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_defer -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import random 
 19   
 20  from twisted.internet import defer, reactor 
 21  from twisted.python import reflect 
 22   
 23  # FIXME: this is for HandledException - maybe it should move here instead ? 
 24  from flumotion.common import errors 
 25   
 26  __version__ = "$Rev$" 
 27   
 28   
 29  # See flumotion.test.test_defer for examples 
 30   
 31   
32 -def defer_generator(proc):
33 34 def wrapper(*args, **kwargs): 35 gen = proc(*args, **kwargs) 36 result = defer.Deferred() 37 38 # To support having the errback of last resort, we need to have 39 # an errback which runs after all the other errbacks, *at the 40 # point at which the deferred is fired*. So users of this code 41 # have from between the time the deferred is created and the 42 # time that the deferred is fired to attach their errbacks. 43 # 44 # Unfortunately we only control the time that the deferred is 45 # created. So we attach a first errback that then adds an 46 # errback to the end of the list. Unfortunately we can't add to 47 # the list while the deferred is firing. In a decision between 48 # having decent error reporting and being nice to a small part 49 # of twisted I chose the former. This code takes a reference to 50 # the callback list, so that we can add an errback to the list 51 # while the deferred is being fired. It temporarily sets the 52 # state of the deferred to not having been fired, so that adding 53 # the errbacks doesn't automatically call the newly added 54 # methods. 55 result.__callbacks = result.callbacks 56 57 def with_saved_callbacks(proc, *_args, **_kwargs): 58 saved_callbacks, saved_called = result.callbacks, result.called 59 result.callbacks, result.called = result.__callbacks, False 60 proc(*_args, **_kwargs) 61 result.callbacks, result.called = saved_callbacks, saved_called
62 63 # Add errback-of-last-resort 64 65 def default_errback(failure, d): 66 # an already handled exception just gets propagated up without 67 # doing a traceback 68 if failure.check(errors.HandledException): 69 return failure 70 71 def print_traceback(f): 72 import traceback 73 print 'flumotion.twisted.defer: ' + \ 74 'Unhandled error calling', proc.__name__, ':', f.type 75 traceback.print_exc() 76 with_saved_callbacks(lambda: d.addErrback(print_traceback)) 77 raise 78 result.addErrback(default_errback, result) 79 80 def generator_next(): 81 try: 82 x = gen.next() 83 if isinstance(x, defer.Deferred): 84 x.addCallback(callback, x).addErrback(errback, x) 85 else: 86 result.callback(x) 87 except StopIteration: 88 result.callback(None) 89 except Exception, e: 90 result.errback(e) 91 92 def errback(failure, d): 93 94 def raise_error(): 95 # failure.parents[-1] will be the exception class for local 96 # failures and the string name of the exception class 97 # for remote failures (which might not exist in our 98 # namespace) 99 # 100 # failure.value will be the tuple of arguments to the 101 # exception in the local case, or a string 102 # representation of that in the remote case (see 103 # pb.CopyableFailure.getStateToCopy()). 104 # 105 # we can only reproduce a remote exception if the 106 # exception class is in our namespace, and it only takes 107 # one string argument. if either condition is not true, 108 # we wrap the strings in a default Exception. 109 k, v = failure.parents[-1], failure.value 110 try: 111 if isinstance(k, str): 112 k = reflect.namedClass(k) 113 if isinstance(v, tuple): 114 e = k(*v) 115 else: 116 e = k(v) 117 except Exception: 118 e = Exception('%s: %r' % (failure.type, v)) 119 raise e 120 d.value = raise_error 121 generator_next() 122 123 def callback(result, d): 124 d.value = lambda: result 125 generator_next() 126 127 generator_next() 128 129 return result 130 131 return wrapper 132 133
134 -def defer_generator_method(proc):
135 return lambda self, *args, **kwargs: \ 136 defer_generator(proc)(self, *args, **kwargs)
137 138
139 -def defer_call_later(deferred):
140 """ 141 Return a deferred which will fire from a callLater after d fires 142 """ 143 144 def fire(result, d): 145 reactor.callLater(0, d.callback, result)
146 res = defer.Deferred() 147 deferred.addCallback(fire, res) 148 return res 149 150
151 -class Resolution:
152 """ 153 I am a helper class to make sure that the deferred is fired only once 154 with either a result or exception. 155 156 @ivar d: the deferred that gets fired as part of the resolution 157 @type d: L{twisted.internet.defer.Deferred} 158 """ 159
160 - def __init__(self):
161 self.d = defer.Deferred() 162 self.fired = False
163
164 - def cleanup(self):
165 """ 166 Clean up any resources related to the resolution. 167 Subclasses can implement me. 168 """ 169 pass
170
171 - def callback(self, result):
172 """ 173 Make the result succeed, triggering the callbacks with 174 the given result. If a result was already reached, do nothing. 175 """ 176 if not self.fired: 177 self.fired = True 178 self.cleanup() 179 self.d.callback(result)
180
181 - def errback(self, exception):
182 """ 183 Make the result fail, triggering the errbacks with the given exception. 184 If a result was already reached, do nothing. 185 """ 186 if not self.fired: 187 self.fired = True 188 self.cleanup() 189 self.d.errback(exception)
190 191
192 -class RetryingDeferred(object):
193 """ 194 Provides a mechanism to attempt to run some deferred operation until it 195 succeeds. On failure, the operation is tried again later, exponentially 196 backing off. 197 """ 198 maxDelay = 1800 # Default to 30 minutes 199 initialDelay = 5.0 200 # Arbitrarily take these constants from twisted's ReconnectingClientFactory 201 factor = 2.7182818284590451 202 jitter = 0.11962656492 203 delay = None 204
205 - def __init__(self, deferredCreate, *args, **kwargs):
206 """ 207 Create a new RetryingDeferred. Will call 208 deferredCreate(*args, **kwargs) each time a new deferred is needed. 209 """ 210 self._create = deferredCreate 211 self._args = args 212 self._kwargs = kwargs 213 214 self._masterD = None 215 self._running = False 216 self._callId = None
217
218 - def start(self):
219 """ 220 Start trying. Returns a deferred that will fire when this operation 221 eventually succeeds. That deferred will only errback if this 222 RetryingDeferred is cancelled (it will then errback with the result of 223 the next attempt if one is in progress, or a CancelledError. 224 # TODO: yeah? 225 """ 226 self._masterD = defer.Deferred() 227 self._running = True 228 self.delay = None 229 230 self._retry() 231 232 return self._masterD
233
234 - def cancel(self):
235 if self._callId: 236 self._callId.cancel() 237 self._masterD.errback(errors.CancelledError()) 238 self._masterD = None 239 240 self._callId = None 241 self._running = False
242
243 - def _retry(self):
244 self._callId = None 245 d = self._create(*self._args, **self._kwargs) 246 d.addCallbacks(self._success, self._failed)
247
248 - def _success(self, val):
249 # TODO: what if we were cancelled and then get here? 250 self._masterD.callback(val) 251 self._masterD = None
252
253 - def _failed(self, failure):
254 if self._running: 255 nextDelay = self._nextDelay() 256 self._callId = reactor.callLater(nextDelay, self._retry) 257 else: 258 self._masterD.errback(failure) 259 self._masterD = None
260
261 - def _nextDelay(self):
262 if self.delay is None: 263 self.delay = self.initialDelay 264 else: 265 self.delay = self.delay * self.factor 266 267 if self.jitter: 268 self.delay = random.normalvariate(self.delay, 269 self.delay * self.jitter) 270 self.delay = min(self.delay, self.maxDelay) 271 272 return self.delay
273