1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import random
19
20 from twisted.internet import defer, reactor
21 from twisted.python import reflect
22
23
24 from flumotion.common import errors
25
26 __version__ = "$Rev$"
27
28
29
30
31
33
34 def wrapper(*args, **kwargs):
35 gen = proc(*args, **kwargs)
36 result = defer.Deferred()
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 result.__callbacks = result.callbacks
56
57 def with_saved_callbacks(proc, *_args, **_kwargs):
58 saved_callbacks, saved_called = result.callbacks, result.called
59 result.callbacks, result.called = result.__callbacks, False
60 proc(*_args, **_kwargs)
61 result.callbacks, result.called = saved_callbacks, saved_called
62
63
64
65 def default_errback(failure, d):
66
67
68 if failure.check(errors.HandledException):
69 return failure
70
71 def print_traceback(f):
72 import traceback
73 print 'flumotion.twisted.defer: ' + \
74 'Unhandled error calling', proc.__name__, ':', f.type
75 traceback.print_exc()
76 with_saved_callbacks(lambda: d.addErrback(print_traceback))
77 raise
78 result.addErrback(default_errback, result)
79
80 def generator_next():
81 try:
82 x = gen.next()
83 if isinstance(x, defer.Deferred):
84 x.addCallback(callback, x).addErrback(errback, x)
85 else:
86 result.callback(x)
87 except StopIteration:
88 result.callback(None)
89 except Exception, e:
90 result.errback(e)
91
92 def errback(failure, d):
93
94 def raise_error():
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 k, v = failure.parents[-1], failure.value
110 try:
111 if isinstance(k, str):
112 k = reflect.namedClass(k)
113 if isinstance(v, tuple):
114 e = k(*v)
115 else:
116 e = k(v)
117 except Exception:
118 e = Exception('%s: %r' % (failure.type, v))
119 raise e
120 d.value = raise_error
121 generator_next()
122
123 def callback(result, d):
124 d.value = lambda: result
125 generator_next()
126
127 generator_next()
128
129 return result
130
131 return wrapper
132
133
137
138
140 """
141 Return a deferred which will fire from a callLater after d fires
142 """
143
144 def fire(result, d):
145 reactor.callLater(0, d.callback, result)
146 res = defer.Deferred()
147 deferred.addCallback(fire, res)
148 return res
149
150
152 """
153 I am a helper class to make sure that the deferred is fired only once
154 with either a result or exception.
155
156 @ivar d: the deferred that gets fired as part of the resolution
157 @type d: L{twisted.internet.defer.Deferred}
158 """
159
161 self.d = defer.Deferred()
162 self.fired = False
163
165 """
166 Clean up any resources related to the resolution.
167 Subclasses can implement me.
168 """
169 pass
170
172 """
173 Make the result succeed, triggering the callbacks with
174 the given result. If a result was already reached, do nothing.
175 """
176 if not self.fired:
177 self.fired = True
178 self.cleanup()
179 self.d.callback(result)
180
182 """
183 Make the result fail, triggering the errbacks with the given exception.
184 If a result was already reached, do nothing.
185 """
186 if not self.fired:
187 self.fired = True
188 self.cleanup()
189 self.d.errback(exception)
190
191
193 """
194 Provides a mechanism to attempt to run some deferred operation until it
195 succeeds. On failure, the operation is tried again later, exponentially
196 backing off.
197 """
198 maxDelay = 1800
199 initialDelay = 5.0
200
201 factor = 2.7182818284590451
202 jitter = 0.11962656492
203 delay = None
204
205 - def __init__(self, deferredCreate, *args, **kwargs):
206 """
207 Create a new RetryingDeferred. Will call
208 deferredCreate(*args, **kwargs) each time a new deferred is needed.
209 """
210 self._create = deferredCreate
211 self._args = args
212 self._kwargs = kwargs
213
214 self._masterD = None
215 self._running = False
216 self._callId = None
217
219 """
220 Start trying. Returns a deferred that will fire when this operation
221 eventually succeeds. That deferred will only errback if this
222 RetryingDeferred is cancelled (it will then errback with the result of
223 the next attempt if one is in progress, or a CancelledError.
224 # TODO: yeah?
225 """
226 self._masterD = defer.Deferred()
227 self._running = True
228 self.delay = None
229
230 self._retry()
231
232 return self._masterD
233
242
244 self._callId = None
245 d = self._create(*self._args, **self._kwargs)
246 d.addCallbacks(self._success, self._failed)
247
249
250 self._masterD.callback(val)
251 self._masterD = None
252
254 if self._running:
255 nextDelay = self._nextDelay()
256 self._callId = reactor.callLater(nextDelay, self._retry)
257 else:
258 self._masterD.errback(failure)
259 self._masterD = None
260
273