Package flumotion :: Package twisted :: Module flavors
[hide private]

Source Code for Module flumotion.twisted.flavors

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_flavors -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  Flumotion Twisted-like flavors 
 20   
 21  Inspired by L{twisted.spread.flavors} 
 22  """ 
 23   
 24  from twisted.internet import defer 
 25  from twisted.spread import pb 
 26  from zope.interface import Interface 
 27  from flumotion.common import log 
 28   
 29  __version__ = "$Rev$" 
 30   
 31   
 32  ### Generice Cacheable/RemoteCache for state objects 
 33   
 34   
35 -class IStateListener(Interface):
36 """ 37 I am an interface for objects that want to listen to changes on 38 cached states. 39 """ 40
41 - def stateSet(self, object, key, value):
42 """ 43 @type object: L{StateRemoteCache} 44 @param object: the state object having changed 45 @type key: string 46 @param key: the key being set 47 @param value: the value the key is being set to 48 49 The given key on the given object has been set to the given value. 50 """
51
52 - def stateAppend(self, object, key, value):
53 """ 54 @type object: L{StateRemoteCache} 55 @param object: the state object having changed 56 @type key: string 57 @param key: the key being appended to 58 @param value: the value being appended to the list given by key 59 60 The given value has been added to the list given by the key. 61 """
62
63 - def stateRemove(self, object, key, value):
64 """ 65 @type object: L{StateRemoteCache} 66 @param object: the state object having changed 67 @type key: string 68 @param key: the key being removed from 69 @param value: the value being removed from the list given by key 70 71 The given value has been removed from the list given by the key. 72 """
73 74
75 -class IStateCacheableListener(Interface):
76 """ 77 I am an interface for objects that want to listen to changes on 78 cacheable states 79 """ 80
81 - def observerAppend(self, observer, num):
82 """ 83 @type observer: L{twisted.spread.flavors.RemoteCacheObserver} 84 @param observer: reference to the peer's L{RemoteCache} 85 that was added 86 @type num: int 87 @param num: number of observers present 88 """
89
90 - def observerRemove(self, observer, num):
91 """ 92 @type observer: L{twisted.spread.flavors.RemoteCacheObserver} 93 @param observer: reference to the peer's L{RemoteCache} 94 that was removed 95 @type num: int 96 @param num: number of observers remaining 97 """
98 99
100 -class StateCacheable(pb.Cacheable):
101 """ 102 I am a cacheable state object. 103 104 I cache key-value pairs, where values can be either single objects 105 or list of objects. 106 """ 107
108 - def __init__(self):
109 self._observers = [] 110 self._hooks = [] 111 self._dict = {}
112
113 - def __getitem__(self, key):
114 return self.get(key)
115 116 # our methods 117
118 - def addKey(self, key, value=None):
119 """ 120 Add a key to the state cache so it can be used with set. 121 """ 122 self._dict[key] = value
123 124 # don't use [] as the default value, it creates only one reference and 125 # reuses it 126
127 - def addListKey(self, key, value=None):
128 """ 129 Add a key for a list of objects to the state cache. 130 """ 131 if value is None: 132 value = [] 133 self._dict[key] = value
134 135 # don't use {} as the default value, it creates only one reference and 136 # reuses it 137
138 - def addDictKey(self, key, value=None):
139 """ 140 Add a key for a dict value to the state cache. 141 """ 142 if value is None: 143 value = {} 144 self._dict[key] = value
145
146 - def hasKey(self, key):
147 return key in self._dict.keys()
148
149 - def keys(self):
150 return self._dict.keys()
151
152 - def get(self, key, otherwise=None):
153 """ 154 Get the state cache value for the given key. 155 156 Return otherwise in case where key is present but value None. 157 """ 158 if not key in self._dict.keys(): 159 raise KeyError('%s in %r' % (key, self)) 160 161 v = self._dict[key] 162 # not v would also trigger empty lists 163 if v == None: 164 return otherwise 165 166 return v
167
168 - def set(self, key, value):
169 """ 170 Set a given state key to the given value. 171 Notifies observers of this Cacheable through observe_set. 172 """ 173 if not key in self._dict.keys(): 174 raise KeyError('%s in %r' % (key, self)) 175 176 self._dict[key] = value 177 dList = [o.callRemote('set', key, value) for o in self._observers] 178 return defer.DeferredList(dList)
179
180 - def append(self, key, value):
181 """ 182 Append the given object to the given list. 183 Notifies observers of this Cacheable through observe_append. 184 """ 185 if not key in self._dict.keys(): 186 raise KeyError('%s in %r' % (key, self)) 187 188 self._dict[key].append(value) 189 dList = [o.callRemote('append', key, value) for o in self._observers] 190 return defer.DeferredList(dList)
191
192 - def remove(self, key, value):
193 """ 194 Remove the given object from the given list. 195 Notifies observers of this Cacheable through observe_remove. 196 """ 197 if not key in self._dict.keys(): 198 raise KeyError('%s in %r' % (key, self)) 199 200 try: 201 self._dict[key].remove(value) 202 except ValueError: 203 raise ValueError('value %r not in list %r for key %r' % ( 204 value, self._dict[key], key)) 205 dList = [o.callRemote('remove', key, value) for o in self._observers] 206 dl = defer.DeferredList(dList) 207 return dl
208
209 - def setitem(self, key, subkey, value):
210 """ 211 Set a value in the given dict. 212 Notifies observers of this Cacheable through observe_setitem. 213 """ 214 if not key in self._dict.keys(): 215 raise KeyError('%s in %r' % (key, self)) 216 217 self._dict[key][subkey] = value 218 dList = [o.callRemote('setitem', key, subkey, value) 219 for o in self._observers] 220 return defer.DeferredList(dList)
221
222 - def delitem(self, key, subkey):
223 """ 224 Removes an element from the given dict. Note that the key refers 225 to the dict; it is the subkey (and its value) that will be removed. 226 Notifies observers of this Cacheable through observe_delitem. 227 """ 228 if not key in self._dict.keys(): 229 raise KeyError('%s in %r' % (key, self)) 230 231 try: 232 value = self._dict[key].pop(subkey) 233 except KeyError: 234 raise KeyError('key %r not in dict %r for key %r' % ( 235 subkey, self._dict[key], key)) 236 dList = [o.callRemote('delitem', key, subkey, value) for o in 237 self._observers] 238 dl = defer.DeferredList(dList) 239 return dl
240 241 # pb.Cacheable methods 242
243 - def getStateToCacheAndObserveFor(self, perspective, observer):
244 self._observers.append(observer) 245 for hook in self._hooks: 246 hook.observerAppend(observer, len(self._observers)) 247 return self._dict
248
249 - def stoppedObserving(self, perspective, observer):
250 self._observers.remove(observer) 251 for hook in self._hooks: 252 hook.observerRemove(observer, len(self._observers))
253
254 - def addHook(self, hook):
255 """ 256 A helper function that adds an object that would like to get 257 informed by StateCacheable when observers has been added or 258 removed. 259 260 @param hook: an object who would like to receive state events 261 @type hook: object that implements 262 L{flumotion.twisted.flavors.IStateCacheableListener} 263 """ 264 if hook in self._hooks: 265 raise ValueError( 266 "%r is already a hook of %r" % (hook, self)) 267 self._hooks.append(hook)
268
269 - def removeHook(self, hook):
270 """ 271 Remove the object that listens to StateCacheable observer events 272 273 @param hook: the object who would like to unsubscribe to state 274 events 275 @type hook: object that implements 276 L{flumotion.twisted.flavors.IStateCacheableListener} 277 """ 278 self._hooks.remove(hook)
279 280 281 # At some point, a StateRemoteCache will become invalid. The normal way 282 # would be losing the connection to the RemoteCacheable, although 283 # particular kinds of RemoteCache objects might have other ways 284 # (e.g. component removed from flow). 285 # 286 # We support listening for invalidation events. However, in order to 287 # ensure predictable program behavior, we can't do a notifyOnDisconnect 288 # directly on the broker. If we did that, program semantics would be 289 # dependent on the call order of the notifyOnDisconnect methods, which 290 # would likely lead to heisenbugs. 291 # 292 # Instead, invalidation will only be performed by the application, if at 293 # all, via an explicit call to invalidate(). 294 295
296 -class StateRemoteCache(pb.RemoteCache):
297 """ 298 I am a remote cache of a state object. 299 """ 300
301 - def __init__(self):
302 self._listeners = {}
303 # no constructor 304 # pb.RemoteCache.__init__(self) 305
306 - def __getitem__(self, key):
307 return self.get(key)
308 309 # our methods 310
311 - def hasKey(self, key):
312 return key in self._dict.keys()
313
314 - def keys(self):
315 return self._dict.keys()
316
317 - def get(self, key, otherwise=None):
318 """ 319 Get the state cache value for the given key. 320 321 Return otherwise in case where key is present but value None. 322 """ 323 if not key in self._dict.keys(): 324 raise KeyError('%s in %r' % (key, self)) 325 326 v = self._dict[key] 327 # compare to actual None, otherwise we also get zero-like values 328 if v == None: 329 return otherwise 330 331 return v
332
333 - def _ensureListeners(self):
334 # when this is created through serialization from a JobCS, 335 # __init__ does not seem to get called, so create self._listeners 336 if not hasattr(self, '_listeners'): 337 # FIXME: this means that callbacks will be fired in 338 # arbitrary order; should be fired in order of connecting. 339 # Use twisted.python.util.OrderedDict instead 340 self._listeners = {}
341 342 #F0.10: remove set=None and move set_=None there 343
344 - def addListener(self, listener, set=None, append=None, remove=None, 345 setitem=None, delitem=None, invalidate=None, set_=None):
346 """ 347 Adds a listener to the remote cache. 348 349 The caller will be notified of state events via the functions 350 given as the 'set_', 'append', and 'remove', 'setitem', and 351 'delitem' keyword arguments. 352 353 Always call this method using keyword arguments for the functions; 354 calling them with positional arguments is not supported. 355 356 Setting one of the event handlers to None will ignore that 357 event. It is an error for all event handlers to be None. 358 359 @param listener: new listener object that wants to receive 360 cache state change notifications. 361 @type listener: object implementing 362 L{flumotion.twisted.flavors.IStateListener} 363 @param set_: procedure to call when a value is set 364 @type set_: procedure(object, key, value) -> None 365 @param append: procedure to call when a value is appended to a list 366 @type append: procedure(object, key, value) -> None 367 @param remove: procedure to call when a value is removed from 368 a list 369 @type remove: procedure(object, key, value) -> None 370 @param setitem: procedure to call when a value is set in a dict 371 @type setitem: procedure(object, key, subkey, value) -> None 372 @param delitem: procedure to call when a value is removed 373 from a dict. 374 @type delitem: procedure(object, key, subkey, value) -> None 375 @param invalidate: procedure to call when this cache has been 376 invalidated. 377 @type invalidate: procedure(object) -> None 378 """ 379 # F0.10: remove set 380 if set: 381 import warnings 382 warnings.warn('Please use the set_ kwarg instead', 383 DeprecationWarning, stacklevel=2) 384 set_ = set 385 386 if not (set_ or append or remove or setitem or delitem or invalidate): 387 raise ValueError("At least one event handler has to be specified") 388 389 self._ensureListeners() 390 if listener in self._listeners: 391 raise KeyError( 392 "%r is already a listener of %r" % (listener, self)) 393 self._listeners[listener] = [set_, append, remove, setitem, 394 delitem, invalidate] 395 if invalidate and hasattr(self, '_cache_invalid'): 396 invalidate(self)
397
398 - def removeListener(self, listener):
399 self._ensureListeners() 400 if listener not in self._listeners: 401 raise KeyError(listener) 402 del self._listeners[listener]
403 404 # pb.RemoteCache methods 405
406 - def setCopyableState(self, dict):
407 self._dict = dict
408
409 - def _notifyListeners(self, index, *args):
410 # notify our local listeners; compute set of procs first, so as 411 # to allow the listeners set to change during the calls 412 self._ensureListeners() 413 for proc in [tup[index] for tup in self._listeners.values()]: 414 if proc: 415 try: 416 proc(self, *args) 417 except Exception, e: 418 # These are all programming errors 419 log.warning("stateremotecache", 420 'Exception in StateCache handler: %s', 421 log.getExceptionMessage(e))
422
423 - def observe_set(self, key, value):
424 self._dict[key] = value 425 # if we also subclass from Cacheable, then we're a proxy, so proxy 426 if hasattr(self, 'set'): 427 StateCacheable.set(self, key, value) 428 429 self._notifyListeners(0, key, value)
430
431 - def observe_append(self, key, value):
432 # if we also subclass from Cacheable, then we're a proxy, so proxy 433 if hasattr(self, 'append'): 434 StateCacheable.append(self, key, value) 435 else: 436 self._dict[key].append(value) 437 438 self._notifyListeners(1, key, value)
439
440 - def observe_remove(self, key, value):
441 # if we also subclass from Cacheable, then we're a proxy, so proxy 442 if hasattr(self, 'remove'): 443 StateCacheable.remove(self, key, value) 444 else: 445 try: 446 self._dict[key].remove(value) 447 except ValueError: 448 raise ValueError("value %r not under key %r with values %r" % 449 (value, key, self._dict[key])) 450 451 self._notifyListeners(2, key, value)
452
453 - def observe_setitem(self, key, subkey, value):
454 # if we also subclass from Cacheable, then we're a proxy, so proxy 455 if hasattr(self, 'setitem'): 456 StateCacheable.setitem(self, key, subkey, value) 457 else: 458 self._dict[key][subkey] = value 459 460 self._notifyListeners(3, key, subkey, value)
461
462 - def observe_delitem(self, key, subkey, value):
463 # if we also subclass from Cacheable, then we're a proxy, so proxy 464 if hasattr(self, 'delitem'): 465 StateCacheable.delitem(self, key, subkey) 466 else: 467 try: 468 del self._dict[key][subkey] 469 except KeyError: 470 raise KeyError("key %r not in dict %r for state dict %r" % 471 (subkey, self._dict[key], self._dict)) 472 473 self._notifyListeners(4, key, subkey, value)
474
475 - def invalidate(self):
476 """Invalidate this StateRemoteCache. 477 478 Calling this method will result in the invalidate callback being 479 called for all listeners that passed an invalidate handler to 480 addListener. This method is not called automatically; it is 481 provided as a convenience to applications. 482 """ 483 assert not hasattr(self, '_cache_invalid'), \ 484 'object has already been invalidated' 485 # if we also subclass from Cacheable, there is currently no way 486 # to remotely invalidate the cache. that's ok though, because 487 # double-caches are currently only used by the manager, which 488 # does not call invalidate() on its caches. 489 self._cache_invalid = True 490 491 self._notifyListeners(5)
492