Trees | Indices | Help |
---|
|
1 # -*- Mode: Python -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 6 # Copyright (C) 2010,2011 Flumotion Services, S.A. 7 # All rights reserved. 8 # 9 # This file may be distributed and/or modified under the terms of 10 # the GNU Lesser General Public License version 2.1 as published by 11 # the Free Software Foundation. 12 # This file is distributed without any warranty; without even the implied 13 # warranty of merchantability or fitness for a particular purpose. 14 # See "LICENSE.LGPL" in the source distribution for more information. 15 # 16 # Headers in this file shall remain intact. 17 18 """admin model used to connect to multiple managers""" 19 20 from twisted.internet import defer 21 22 from flumotion.common import log, planet, errors, startset, watched 23 from flumotion.admin import admin 24 25 __version__ = "$Rev$" 26 2729 import warnings 30 warnings.warn('Use getAdminForObject', DeprecationWarning, stacklevel=2) 31 return getAdminForObject(object)32 3335 if object.get('parent'): 36 return get_admin_for_object(object.get('parent')) 37 else: 38 return object.admin39 4042 logCategory = 'multiadmin' 43124 125 connectD.addCallbacks(connect_callback, connect_errback) 126 127 def start_callback(_): 128 self._managerConnected(a) 129 130 def start_errback(failure): 131 a.shutdown() 132 return failure 133 134 startD.addCallbacks(start_callback, start_errback) 135 136 return startD 13745 self.admins = watched.WatchedDict() # {managerId: AdminModel} 46 47 self._listeners = [] 48 self._reconnectHandlerIds = {} # managerId => [disconnect, id..] 49 self._startSet = startset.StartSet(self.admins.has_key, 50 errors.AlreadyConnectingError, 51 errors.AlreadyConnectedError)52 53 # Listener implementation 5456 self.debug('emit %r %r %r' % (signal_name, args, kwargs)) 57 assert signal_name != 'handler' 58 for c in self._listeners: 59 if getattr(c, 'model_handler', None): 60 c.model_handler(c, signal_name, *args, **kwargs) 61 elif getattr(c, 'model_%s' % signal_name): 62 getattr(c, 'model_%s' % signal_name)(*args, **kwargs) 63 else: 64 s = 'No model_%s in %r and no model_handler' % (signal_name, c) 65 raise NotImplementedError(s)66 7072 self._listeners.remove(obj)7375 if admin.managerId not in self._reconnectHandlerIds: 76 # the first time a manager is connected to, start listening 77 # for reconnections; intertwingled with removeManager() 78 ids = [] 79 ids.append(admin.connect('connected', 80 self._managerConnected)) 81 ids.append(admin.connect('disconnected', 82 self._managerDisconnected)) 83 self._reconnectHandlerIds[admin.managerId] = admin, ids 84 85 adminplanet = admin.planet 86 self.info('Connected to manager %s (planet %s)', 87 admin.managerId, adminplanet.get('name')) 88 assert admin.managerId not in self.admins 89 self.admins[admin.managerId] = admin 90 self.emit('addPlanet', admin, adminplanet)9193 if admin.managerId in self.admins: 94 self.emit('removePlanet', admin, admin.planet) 95 del self.admins[admin.managerId] 96 else: 97 self.warning('Could not find admin model %r', admin)98101 i = connectionInfo 102 managerId = str(i) 103 104 # This dance of deferreds is here so as to make sure that 105 # removeManager can cancel a pending connection. 106 107 # can raise errors.AlreadyConnectingError or 108 # errors.AlreadyConnectedError 109 try: 110 startD = self._startSet.createStart(managerId) 111 except Exception, e: 112 return defer.fail(e) 113 114 a = admin.AdminModel() 115 connectD = a.connectToManager(i, tenacious, 116 writeConnection=writeConnection) 117 assert a.managerId == managerId 118 119 def connect_callback(_): 120 self._startSet.avatarStarted(managerId)121 122 def connect_errback(failure): 123 self._startSet.avatarStopped(managerId, lambda _: failure)139 self.info('disconnecting from %s', managerId) 140 141 # Four cases: 142 # (1) We have no idea about this managerId, the caller is 143 # confused -- do nothing 144 # (2) We started connecting to this managerId, but never 145 # succeeded -- cancel pending connections 146 # (3) We connected at least once, and are connected now -- we 147 # have entries in the _reconnectHandlerIds and in self.admins -- 148 # disconnect from the signals, disconnect from the remote 149 # manager, and don't try to reconnect 150 # (4) We connected at least once, but are disconnected now -- we 151 # have an entry in _reconnectHandlerIds but not self.admins -- 152 # disconnect from the signals, and stop trying to reconnect 153 154 # stop listening to admin's signals, if the manager had actually 155 # connected at some point 156 if managerId in self._reconnectHandlerIds: 157 admin, handlerIds = self._reconnectHandlerIds.pop(managerId) 158 map(admin.disconnect, handlerIds) # (3) and (4) 159 if managerId not in self.admins: 160 admin.shutdown() # (4) 161 162 if managerId in self.admins: # (3) 163 admin = self.admins[managerId] 164 admin.shutdown() 165 self._managerDisconnected(admin) 166 167 # Firing this has the side effect of errbacking on any pending 168 # start, calling start_errback above if appropriate. (2) 169 self._startSet.avatarStopped( 170 managerId, lambda _: errors.ConnectionCancelledError()) 171 172 # always succeed, see (1) 173 return defer.succeed(managerId)174176 '''Call a procedure on each component that is a child of OBJECT''' 177 # ah, for multimethods... 178 if isinstance(object, planet.AdminPlanetState): 179 self.for_each_component(object.get('atmosphere'), proc) 180 for f in object.get('flows'): 181 self.for_each_component(f, proc) 182 elif (isinstance(object, planet.AdminAtmosphereState) or 183 isinstance(object, planet.AdminFlowState)): 184 for c in object.get('components'): 185 self.for_each_component(c, proc) 186 elif isinstance(object, planet.AdminComponentState): 187 proc(object)188190 '''Call a method on the remote component object associated with 191 a component state''' 192 admin = get_admin_for_object(object) 193 194 def do_op(object): 195 admin.callRemote('component'+op, object)196 self.for_each_component(object, do_op) 197
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Tue Aug 13 06:17:30 2013 | http://epydoc.sourceforge.net |