Package flumotion :: Package common :: Module startset
[hide private]

Source Code for Module flumotion.common.startset

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """a data structure to manage asynchronous avatar starts and shutdowns 
 19  """ 
 20   
 21  from twisted.internet import defer 
 22   
 23  from flumotion.common import log 
 24   
 25  __version__ = "$Rev$" 
 26   
 27   
 28  # This class was factored out of the worker's jobheaven, so sometimes 
 29  # the comments talk about jobs, but they refer to any asynchronous 
 30  # process. For example the multiadmin uses this to manage its 
 31  # connections to remote managers. 
 32   
 33   
34 -class StartSet(log.Loggable):
35
36 - def __init__(self, avatarLoggedIn, alreadyStartingError, 37 alreadyRunningError):
38 """Create a StartSet, a data structure for managing starts and 39 stops of remote processes, for example jobs in a jobheaven. 40 41 @param avatarLoggedIn: a procedure of type avatarId->boolean; 42 should return True if the avatarId is logged in and "ready", and 43 False otherwise. An avatarId is ready if avatarStarted() could 44 have been called on it. This interface is made this way because 45 it is assumed that whatever code instantiates a StartSet keeps 46 track of "ready" remote processes, and this way we prevent data 47 duplication. 48 @param alreadyStartingError: An exception class to raise if 49 createStart() is called, but there is already a create deferred 50 registered for that avatarId. 51 @param alreadyRunningError: An exception class to raise if 52 createStart() is called, but there is already a "ready" process 53 with that avatarId. 54 """ 55 self._avatarLoggedIn = avatarLoggedIn 56 self._alreadyStartingError = alreadyStartingError 57 self._alreadyRunningError = alreadyRunningError 58 59 self._createDeferreds = {} # avatarId => deferred 60 self._shutdownDeferreds = {} # avatarId => deferred
61
62 - def createStart(self, avatarId):
63 """ 64 Create and register a deferred for starting a given process. 65 The deferred will be fired when the process is ready, as 66 triggered by a call to createSuccess(). 67 68 @param avatarId: the id of the remote process, for example the 69 avatarId of the job 70 71 @rtype: L{twisted.internet.defer.Deferred} 72 """ 73 self.debug('making create deferred for %s', avatarId) 74 75 d = defer.Deferred() 76 77 # the question of "what jobs do we know about" is answered in 78 # three places: the create deferreds hash, the set of logged in 79 # avatars, and the shutdown deferreds hash. there are four 80 # possible answers: 81 if avatarId in self._createDeferreds: 82 # (1) a job is already starting: it is in the 83 # createdeferreds hash 84 self.info('already have a create deferred for %s', avatarId) 85 raise self._alreadyStartingError(avatarId) 86 elif avatarId in self._shutdownDeferreds: 87 # (2) a job is shutting down; note it is also in 88 # heaven.avatars 89 self.debug('waiting for previous %s to shut down like it ' 90 'said it would', avatarId) 91 # fixme: i don't understand this code 92 93 def ensureShutdown(res, 94 shutdown=self._shutdownDeferreds[avatarId]): 95 shutdown.addCallback(lambda _: res) 96 return shutdown
97 d.addCallback(ensureShutdown) 98 elif self._avatarLoggedIn(avatarId): 99 # (3) a job is running fine 100 self.info('avatar named %s already running', avatarId) 101 raise self._alreadyRunningError(avatarId) 102 else: 103 # (4) it's new; we know of nothing with this avatarId 104 pass 105 106 self.debug('registering deferredCreate for %s', avatarId) 107 self._createDeferreds[avatarId] = d 108 return d
109
110 - def createSuccess(self, avatarId):
111 """ 112 Trigger a deferred start previously registerd via createStart(). 113 For example, a JobHeaven might call this method when a job has 114 logged in and been told to start a component. 115 116 @param avatarId: the id of the remote process, for example the 117 avatarId of the job 118 """ 119 self.debug('triggering create deferred for %s', avatarId) 120 if not avatarId in self._createDeferreds: 121 self.warning('No create deferred registered for %s', avatarId) 122 return 123 124 d = self._createDeferreds[avatarId] 125 del self._createDeferreds[avatarId] 126 # return the avatarId the component will use to the original caller 127 d.callback(avatarId)
128
129 - def createFailed(self, avatarId, exception):
130 """ 131 Notify the caller that a create has failed, and remove the create 132 from the list of pending creates. 133 134 @param avatarId: the id of the remote process, for example the 135 avatarId of the job 136 @param exception: either an exception or a failure describing 137 why the create failed. 138 """ 139 self.debug('create deferred failed for %s', avatarId) 140 if not avatarId in self._createDeferreds: 141 self.warning('No create deferred registered for %s', avatarId) 142 return 143 144 d = self._createDeferreds[avatarId] 145 del self._createDeferreds[avatarId] 146 d.errback(exception)
147
148 - def createRegistered(self, avatarId):
149 """ 150 Check if a deferred create has been registered for the given avatarId. 151 152 @param avatarId: the id of the remote process, for example the 153 avatarId of the job 154 155 @returns: The deferred create, if one has been registered. 156 Otherwise None. 157 """ 158 return self._createDeferreds.get(avatarId, None)
159
160 - def shutdownStart(self, avatarId):
161 """ 162 Create and register a deferred that will be fired when a process 163 has shut down cleanly. 164 165 @param avatarId: the id of the remote process, for example the 166 avatarId of the job 167 168 @rtype: L{twisted.internet.defer.Deferred} 169 """ 170 self.debug('making shutdown deferred for %s', avatarId) 171 172 if avatarId in self._shutdownDeferreds: 173 self.warning('already have a shutdown deferred for %s', 174 avatarId) 175 return self._shutdownDeferreds[avatarId] 176 else: 177 self.debug('registering shutdown for %s', avatarId) 178 d = defer.Deferred() 179 self._shutdownDeferreds[avatarId] = d 180 return d
181
182 - def shutdownSuccess(self, avatarId):
183 """ 184 Trigger a callback on a deferred previously registered via 185 shutdownStart(). For example, a JobHeaven would call this when a 186 job for which shutdownStart() was called is reaped. 187 188 @param avatarId: the id of the remote process, for example the 189 avatarId of the job 190 """ 191 self.debug('triggering shutdown deferred for %s', avatarId) 192 if not avatarId in self._shutdownDeferreds: 193 self.warning('No shutdown deferred registered for %s', avatarId) 194 return 195 196 d = self._shutdownDeferreds.pop(avatarId) 197 d.callback(avatarId)
198
199 - def shutdownRegistered(self, avatarId):
200 """ 201 Check if a deferred shutdown has been registered for the given 202 avatarId. 203 204 @param avatarId: the id of the remote process, for example the 205 avatarId of the job 206 207 @returns: True if a deferred shutdown has been registered for 208 this object, False otherwise 209 """ 210 return avatarId in self._shutdownDeferreds
211
212 - def avatarStarted(self, avatarId):
213 """ 214 Notify the startset that an avatar has started. If there was a 215 create deferred registered for this avatar, this will cause 216 createSuccess() to be called. 217 218 @param avatarId: the id of the remote process, for example the 219 avatarId of the job 220 """ 221 if avatarId in self._createDeferreds: 222 self.createSuccess(avatarId) 223 else: 224 self.log('avatar %s started, but we were not waiting for' 225 ' it', avatarId)
226
227 - def avatarStopped(self, avatarId, getFailure):
228 """ 229 Notify the startset that an avatar has stopped. If there was a 230 shutdown deferred registered for this avatar, this will cause 231 shutdownSuccess() to be called. 232 233 On the other hand, if there was a create deferred still pending, 234 we will call createFailed with the result of calling getFailure. 235 236 If no start or create was registered, we do nothing. 237 238 @param avatarId: the id of the remote process, for example the 239 avatarId of the job 240 @param getFailure: procedure of type avatarId -> Failure. The 241 returned failure should describe the reason that the job failed. 242 """ 243 if avatarId in self._createDeferreds: 244 self.createFailed(avatarId, getFailure(avatarId)) 245 elif avatarId in self._shutdownDeferreds: 246 self.shutdownSuccess(avatarId) 247 else: 248 self.debug('unknown avatar %s logged out', avatarId)
249