Package flumotion :: Package admin :: Module multi
[hide private]

Source Code for Module flumotion.admin.multi

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """admin model used to connect to multiple managers""" 
 19   
 20  from twisted.internet import defer 
 21   
 22  from flumotion.common import log, planet, errors, startset, watched 
 23  from flumotion.admin import admin 
 24   
 25  __version__ = "$Rev$" 
 26   
 27   
28 -def get_admin_for_object(object):
29 import warnings 30 warnings.warn('Use getAdminForObject', DeprecationWarning, stacklevel=2) 31 return getAdminForObject(object)
32 33
34 -def getAdminForObject(object):
35 if object.get('parent'): 36 return get_admin_for_object(object.get('parent')) 37 else: 38 return object.admin
39 40
41 -class MultiAdminModel(log.Loggable):
42 logCategory = 'multiadmin' 43
44 - def __init__(self):
45 self.admins = watched.WatchedDict() # {managerId: AdminModel} 46 47 self._listeners = [] 48 self._reconnectHandlerIds = {} # managerId => [disconnect, id..] 49 self._startSet = startset.StartSet(self.admins.has_key, 50 errors.AlreadyConnectingError, 51 errors.AlreadyConnectedError)
52 53 # Listener implementation 54
55 - def emit(self, signal_name, *args, **kwargs):
56 self.debug('emit %r %r %r' % (signal_name, args, kwargs)) 57 assert signal_name != 'handler' 58 for c in self._listeners: 59 if getattr(c, 'model_handler', None): 60 c.model_handler(c, signal_name, *args, **kwargs) 61 elif getattr(c, 'model_%s' % signal_name): 62 getattr(c, 'model_%s' % signal_name)(*args, **kwargs) 63 else: 64 s = 'No model_%s in %r and no model_handler' % (signal_name, c) 65 raise NotImplementedError(s)
66
67 - def addListener(self, obj):
68 assert not obj in self._listeners 69 self._listeners.append(obj)
70
71 - def removeListener(self, obj):
72 self._listeners.remove(obj)
73
74 - def _managerConnected(self, admin):
75 if admin.managerId not in self._reconnectHandlerIds: 76 # the first time a manager is connected to, start listening 77 # for reconnections; intertwingled with removeManager() 78 ids = [] 79 ids.append(admin.connect('connected', 80 self._managerConnected)) 81 ids.append(admin.connect('disconnected', 82 self._managerDisconnected)) 83 self._reconnectHandlerIds[admin.managerId] = admin, ids 84 85 adminplanet = admin.planet 86 self.info('Connected to manager %s (planet %s)', 87 admin.managerId, adminplanet.get('name')) 88 assert admin.managerId not in self.admins 89 self.admins[admin.managerId] = admin 90 self.emit('addPlanet', admin, adminplanet)
91
92 - def _managerDisconnected(self, admin):
93 if admin.managerId in self.admins: 94 self.emit('removePlanet', admin, admin.planet) 95 del self.admins[admin.managerId] 96 else: 97 self.warning('Could not find admin model %r', admin)
98
99 - def addManager(self, connectionInfo, tenacious=False, 100 writeConnection=True):
101 i = connectionInfo 102 managerId = str(i) 103 104 # This dance of deferreds is here so as to make sure that 105 # removeManager can cancel a pending connection. 106 107 # can raise errors.AlreadyConnectingError or 108 # errors.AlreadyConnectedError 109 try: 110 startD = self._startSet.createStart(managerId) 111 except Exception, e: 112 return defer.fail(e) 113 114 a = admin.AdminModel() 115 connectD = a.connectToManager(i, tenacious, 116 writeConnection=writeConnection) 117 assert a.managerId == managerId 118 119 def connect_callback(_): 120 self._startSet.avatarStarted(managerId)
121 122 def connect_errback(failure): 123 self._startSet.avatarStopped(managerId, lambda _: failure)
124 125 connectD.addCallbacks(connect_callback, connect_errback) 126 127 def start_callback(_): 128 self._managerConnected(a) 129 130 def start_errback(failure): 131 a.shutdown() 132 return failure 133 134 startD.addCallbacks(start_callback, start_errback) 135 136 return startD 137
138 - def removeManager(self, managerId):
139 self.info('disconnecting from %s', managerId) 140 141 # Four cases: 142 # (1) We have no idea about this managerId, the caller is 143 # confused -- do nothing 144 # (2) We started connecting to this managerId, but never 145 # succeeded -- cancel pending connections 146 # (3) We connected at least once, and are connected now -- we 147 # have entries in the _reconnectHandlerIds and in self.admins -- 148 # disconnect from the signals, disconnect from the remote 149 # manager, and don't try to reconnect 150 # (4) We connected at least once, but are disconnected now -- we 151 # have an entry in _reconnectHandlerIds but not self.admins -- 152 # disconnect from the signals, and stop trying to reconnect 153 154 # stop listening to admin's signals, if the manager had actually 155 # connected at some point 156 if managerId in self._reconnectHandlerIds: 157 admin, handlerIds = self._reconnectHandlerIds.pop(managerId) 158 map(admin.disconnect, handlerIds) # (3) and (4) 159 if managerId not in self.admins: 160 admin.shutdown() # (4) 161 162 if managerId in self.admins: # (3) 163 admin = self.admins[managerId] 164 admin.shutdown() 165 self._managerDisconnected(admin) 166 167 # Firing this has the side effect of errbacking on any pending 168 # start, calling start_errback above if appropriate. (2) 169 self._startSet.avatarStopped( 170 managerId, lambda _: errors.ConnectionCancelledError()) 171 172 # always succeed, see (1) 173 return defer.succeed(managerId)
174
175 - def for_each_component(self, object, proc):
176 '''Call a procedure on each component that is a child of OBJECT''' 177 # ah, for multimethods... 178 if isinstance(object, planet.AdminPlanetState): 179 self.for_each_component(object.get('atmosphere'), proc) 180 for f in object.get('flows'): 181 self.for_each_component(f, proc) 182 elif (isinstance(object, planet.AdminAtmosphereState) or 183 isinstance(object, planet.AdminFlowState)): 184 for c in object.get('components'): 185 self.for_each_component(c, proc) 186 elif isinstance(object, planet.AdminComponentState): 187 proc(object)
188
189 - def do_component_op(self, object, op):
190 '''Call a method on the remote component object associated with 191 a component state''' 192 admin = get_admin_for_object(object) 193 194 def do_op(object): 195 admin.callRemote('component'+op, object)
196 self.for_each_component(object, do_op) 197