1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Flumotion Twisted-like flavors
20
21 Inspired by L{twisted.spread.flavors}
22 """
23
24 from twisted.internet import defer
25 from twisted.spread import pb
26 from zope.interface import Interface
27 from flumotion.common import log
28
29 __version__ = "$Rev$"
30
31
32
33
34
36 """
37 I am an interface for objects that want to listen to changes on
38 cached states.
39 """
40
42 """
43 @type object: L{StateRemoteCache}
44 @param object: the state object having changed
45 @type key: string
46 @param key: the key being set
47 @param value: the value the key is being set to
48
49 The given key on the given object has been set to the given value.
50 """
51
53 """
54 @type object: L{StateRemoteCache}
55 @param object: the state object having changed
56 @type key: string
57 @param key: the key being appended to
58 @param value: the value being appended to the list given by key
59
60 The given value has been added to the list given by the key.
61 """
62
64 """
65 @type object: L{StateRemoteCache}
66 @param object: the state object having changed
67 @type key: string
68 @param key: the key being removed from
69 @param value: the value being removed from the list given by key
70
71 The given value has been removed from the list given by the key.
72 """
73
74
76 """
77 I am an interface for objects that want to listen to changes on
78 cacheable states
79 """
80
82 """
83 @type observer: L{twisted.spread.flavors.RemoteCacheObserver}
84 @param observer: reference to the peer's L{RemoteCache}
85 that was added
86 @type num: int
87 @param num: number of observers present
88 """
89
91 """
92 @type observer: L{twisted.spread.flavors.RemoteCacheObserver}
93 @param observer: reference to the peer's L{RemoteCache}
94 that was removed
95 @type num: int
96 @param num: number of observers remaining
97 """
98
99
101 """
102 I am a cacheable state object.
103
104 I cache key-value pairs, where values can be either single objects
105 or list of objects.
106 """
107
109 self._observers = []
110 self._hooks = []
111 self._dict = {}
112
115
116
117
118 - def addKey(self, key, value=None):
119 """
120 Add a key to the state cache so it can be used with set.
121 """
122 self._dict[key] = value
123
124
125
126
128 """
129 Add a key for a list of objects to the state cache.
130 """
131 if value is None:
132 value = []
133 self._dict[key] = value
134
135
136
137
139 """
140 Add a key for a dict value to the state cache.
141 """
142 if value is None:
143 value = {}
144 self._dict[key] = value
145
147 return key in self._dict.keys()
148
150 return self._dict.keys()
151
152 - def get(self, key, otherwise=None):
153 """
154 Get the state cache value for the given key.
155
156 Return otherwise in case where key is present but value None.
157 """
158 if not key in self._dict.keys():
159 raise KeyError('%s in %r' % (key, self))
160
161 v = self._dict[key]
162
163 if v == None:
164 return otherwise
165
166 return v
167
168 - def set(self, key, value):
169 """
170 Set a given state key to the given value.
171 Notifies observers of this Cacheable through observe_set.
172 """
173 if not key in self._dict.keys():
174 raise KeyError('%s in %r' % (key, self))
175
176 self._dict[key] = value
177 dList = [o.callRemote('set', key, value) for o in self._observers]
178 return defer.DeferredList(dList)
179
180 - def append(self, key, value):
181 """
182 Append the given object to the given list.
183 Notifies observers of this Cacheable through observe_append.
184 """
185 if not key in self._dict.keys():
186 raise KeyError('%s in %r' % (key, self))
187
188 self._dict[key].append(value)
189 dList = [o.callRemote('append', key, value) for o in self._observers]
190 return defer.DeferredList(dList)
191
192 - def remove(self, key, value):
193 """
194 Remove the given object from the given list.
195 Notifies observers of this Cacheable through observe_remove.
196 """
197 if not key in self._dict.keys():
198 raise KeyError('%s in %r' % (key, self))
199
200 try:
201 self._dict[key].remove(value)
202 except ValueError:
203 raise ValueError('value %r not in list %r for key %r' % (
204 value, self._dict[key], key))
205 dList = [o.callRemote('remove', key, value) for o in self._observers]
206 dl = defer.DeferredList(dList)
207 return dl
208
209 - def setitem(self, key, subkey, value):
210 """
211 Set a value in the given dict.
212 Notifies observers of this Cacheable through observe_setitem.
213 """
214 if not key in self._dict.keys():
215 raise KeyError('%s in %r' % (key, self))
216
217 self._dict[key][subkey] = value
218 dList = [o.callRemote('setitem', key, subkey, value)
219 for o in self._observers]
220 return defer.DeferredList(dList)
221
223 """
224 Removes an element from the given dict. Note that the key refers
225 to the dict; it is the subkey (and its value) that will be removed.
226 Notifies observers of this Cacheable through observe_delitem.
227 """
228 if not key in self._dict.keys():
229 raise KeyError('%s in %r' % (key, self))
230
231 try:
232 value = self._dict[key].pop(subkey)
233 except KeyError:
234 raise KeyError('key %r not in dict %r for key %r' % (
235 subkey, self._dict[key], key))
236 dList = [o.callRemote('delitem', key, subkey, value) for o in
237 self._observers]
238 dl = defer.DeferredList(dList)
239 return dl
240
241
242
244 self._observers.append(observer)
245 for hook in self._hooks:
246 hook.observerAppend(observer, len(self._observers))
247 return self._dict
248
250 self._observers.remove(observer)
251 for hook in self._hooks:
252 hook.observerRemove(observer, len(self._observers))
253
255 """
256 A helper function that adds an object that would like to get
257 informed by StateCacheable when observers has been added or
258 removed.
259
260 @param hook: an object who would like to receive state events
261 @type hook: object that implements
262 L{flumotion.twisted.flavors.IStateCacheableListener}
263 """
264 if hook in self._hooks:
265 raise ValueError(
266 "%r is already a hook of %r" % (hook, self))
267 self._hooks.append(hook)
268
270 """
271 Remove the object that listens to StateCacheable observer events
272
273 @param hook: the object who would like to unsubscribe to state
274 events
275 @type hook: object that implements
276 L{flumotion.twisted.flavors.IStateCacheableListener}
277 """
278 self._hooks.remove(hook)
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
297 """
298 I am a remote cache of a state object.
299 """
300
303
304
305
308
309
310
312 return key in self._dict.keys()
313
315 return self._dict.keys()
316
317 - def get(self, key, otherwise=None):
318 """
319 Get the state cache value for the given key.
320
321 Return otherwise in case where key is present but value None.
322 """
323 if not key in self._dict.keys():
324 raise KeyError('%s in %r' % (key, self))
325
326 v = self._dict[key]
327
328 if v == None:
329 return otherwise
330
331 return v
332
334
335
336 if not hasattr(self, '_listeners'):
337
338
339
340 self._listeners = {}
341
342
343
344 - def addListener(self, listener, set=None, append=None, remove=None,
345 setitem=None, delitem=None, invalidate=None, set_=None):
346 """
347 Adds a listener to the remote cache.
348
349 The caller will be notified of state events via the functions
350 given as the 'set_', 'append', and 'remove', 'setitem', and
351 'delitem' keyword arguments.
352
353 Always call this method using keyword arguments for the functions;
354 calling them with positional arguments is not supported.
355
356 Setting one of the event handlers to None will ignore that
357 event. It is an error for all event handlers to be None.
358
359 @param listener: new listener object that wants to receive
360 cache state change notifications.
361 @type listener: object implementing
362 L{flumotion.twisted.flavors.IStateListener}
363 @param set_: procedure to call when a value is set
364 @type set_: procedure(object, key, value) -> None
365 @param append: procedure to call when a value is appended to a list
366 @type append: procedure(object, key, value) -> None
367 @param remove: procedure to call when a value is removed from
368 a list
369 @type remove: procedure(object, key, value) -> None
370 @param setitem: procedure to call when a value is set in a dict
371 @type setitem: procedure(object, key, subkey, value) -> None
372 @param delitem: procedure to call when a value is removed
373 from a dict.
374 @type delitem: procedure(object, key, subkey, value) -> None
375 @param invalidate: procedure to call when this cache has been
376 invalidated.
377 @type invalidate: procedure(object) -> None
378 """
379
380 if set:
381 import warnings
382 warnings.warn('Please use the set_ kwarg instead',
383 DeprecationWarning, stacklevel=2)
384 set_ = set
385
386 if not (set_ or append or remove or setitem or delitem or invalidate):
387 raise ValueError("At least one event handler has to be specified")
388
389 self._ensureListeners()
390 if listener in self._listeners:
391 raise KeyError(
392 "%r is already a listener of %r" % (listener, self))
393 self._listeners[listener] = [set_, append, remove, setitem,
394 delitem, invalidate]
395 if invalidate and hasattr(self, '_cache_invalid'):
396 invalidate(self)
397
399 self._ensureListeners()
400 if listener not in self._listeners:
401 raise KeyError(listener)
402 del self._listeners[listener]
403
404
405
408
410
411
412 self._ensureListeners()
413 for proc in [tup[index] for tup in self._listeners.values()]:
414 if proc:
415 try:
416 proc(self, *args)
417 except Exception, e:
418
419 log.warning("stateremotecache",
420 'Exception in StateCache handler: %s',
421 log.getExceptionMessage(e))
422
430
439
441
442 if hasattr(self, 'remove'):
443 StateCacheable.remove(self, key, value)
444 else:
445 try:
446 self._dict[key].remove(value)
447 except ValueError:
448 raise ValueError("value %r not under key %r with values %r" %
449 (value, key, self._dict[key]))
450
451 self._notifyListeners(2, key, value)
452
454
455 if hasattr(self, 'setitem'):
456 StateCacheable.setitem(self, key, subkey, value)
457 else:
458 self._dict[key][subkey] = value
459
460 self._notifyListeners(3, key, subkey, value)
461
463
464 if hasattr(self, 'delitem'):
465 StateCacheable.delitem(self, key, subkey)
466 else:
467 try:
468 del self._dict[key][subkey]
469 except KeyError:
470 raise KeyError("key %r not in dict %r for state dict %r" %
471 (subkey, self._dict[key], self._dict))
472
473 self._notifyListeners(4, key, subkey, value)
474
476 """Invalidate this StateRemoteCache.
477
478 Calling this method will result in the invalidate callback being
479 called for all listeners that passed an invalidate handler to
480 addListener. This method is not called automatically; it is
481 provided as a convenience to applications.
482 """
483 assert not hasattr(self, '_cache_invalid'), \
484 'object has already been invalidated'
485
486
487
488
489 self._cache_invalid = True
490
491 self._notifyListeners(5)
492