Package flumotion :: Package common :: Module medium
[hide private]

Source Code for Module flumotion.common.medium

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """base classes for PB client-side mediums. 
 19  """ 
 20   
 21  from twisted.spread import pb 
 22  from twisted.internet import defer 
 23  from zope.interface import implements 
 24   
 25  from flumotion.common import log, interfaces, bundleclient, errors, netutils 
 26  from flumotion.configure import configure 
 27  from flumotion.twisted import pb as fpb 
 28  from flumotion.twisted.compat import reactor 
 29   
 30  # register serializables 
 31  from flumotion.common import messages 
 32   
 33  __version__ = "$Rev$" 
 34   
 35   
36 -class BaseMedium(fpb.Referenceable):
37 """ 38 I am a base interface for PB clients interfacing with PB server-side 39 avatars. 40 Used by admin/worker/component to talk to manager's vishnu, 41 and by job to talk to worker's brain. 42 43 @ivar remote: a remote reference to the server-side object on 44 which perspective_(methodName) methods can be called 45 @type remote: L{twisted.spread.pb.RemoteReference} 46 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader} 47 """ 48 49 # subclasses will need to set this to the specific medium type 50 # tho... 51 implements(interfaces.IMedium) 52 logCategory = "basemedium" 53 remoteLogName = "baseavatar" 54 55 remote = None 56 bundleLoader = None 57
58 - def setRemoteReference(self, remoteReference):
59 """ 60 Set the given remoteReference as the reference to the server-side 61 avatar. 62 63 @param remoteReference: L{twisted.spread.pb.RemoteReference} 64 """ 65 self.debug('%r.setRemoteReference: %r' % (self, remoteReference)) 66 self.remote = remoteReference 67 68 def nullRemote(x): 69 self.debug('%r: disconnected from %r' % (self, self.remote)) 70 self.remote = None
71 self.remote.notifyOnDisconnect(nullRemote) 72 73 self.bundleLoader = bundleclient.BundleLoader(self.callRemote) 74 75 # figure out connection addresses if it's an internet address 76 tarzan = None 77 jane = None 78 try: 79 transport = remoteReference.broker.transport 80 tarzan = transport.getHost() 81 jane = transport.getPeer() 82 except Exception, e: 83 self.debug("could not get connection info, reason %r" % e) 84 if tarzan and jane: 85 self.debug("connection is from me on %s to remote on %s" % ( 86 netutils.addressGetHost(tarzan), 87 netutils.addressGetHost(jane)))
88
89 - def hasRemoteReference(self):
90 """ 91 Does the medium have a remote reference to a server-side avatar ? 92 """ 93 return self.remote != None
94
95 - def callRemoteLogging(self, level, stackDepth, name, *args, **kwargs):
96 """ 97 Call the given method with the given arguments remotely on the 98 server-side avatar. 99 100 Gets serialized to server-side perspective_ methods. 101 102 @param level: the level we should log at (log.DEBUG, log.INFO, etc) 103 @type level: int 104 @param stackDepth: the number of stack frames to go back to get 105 file and line information, negative or zero. 106 @type stackDepth: non-positive int 107 @param name: name of the remote method 108 @type name: str 109 """ 110 if level is not None: 111 debugClass = str(self.__class__).split(".")[-1].upper() 112 startArgs = [self.remoteLogName, debugClass, name] 113 formatString, debugArgs = log.getFormatArgs( 114 '%s --> %s: callRemote(%s, ', startArgs, 115 ')', (), args, kwargs) 116 logKwArgs = self.doLog(level, stackDepth - 1, 117 formatString, *debugArgs) 118 119 if not self.remote: 120 self.warning('Tried to callRemote(%s), but we are disconnected' 121 % name) 122 return defer.fail(errors.NotConnectedError()) 123 124 def callback(result): 125 formatString, debugArgs = log.getFormatArgs( 126 '%s <-- %s: callRemote(%s, ', startArgs, 127 '): %s', (log.ellipsize(result), ), args, kwargs) 128 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs) 129 return result
130 131 def errback(failure): 132 formatString, debugArgs = log.getFormatArgs( 133 '%s <-- %s: callRemote(%s, ', startArgs, 134 '): %r', (failure, ), args, kwargs) 135 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs) 136 return failure 137 138 d = self.remote.callRemote(name, *args, **kwargs) 139 if level is not None: 140 d.addCallbacks(callback, errback) 141 return d 142
143 - def callRemote(self, name, *args, **kwargs):
144 """ 145 Call the given method with the given arguments remotely on the 146 server-side avatar. 147 148 Gets serialized to server-side perspective_ methods. 149 """ 150 return self.callRemoteLogging(log.DEBUG, -1, name, *args, 151 **kwargs)
152
153 - def getBundledFunction(self, module, function):
154 """ 155 Returns the given function in the given module, loading the 156 module from a bundle. 157 158 If we can't find the bundle for the given module, or if the 159 given module does not contain the requested function, we will 160 raise L{flumotion.common.errors.RemoteRunError} (perhaps a 161 poorly chosen error). If importing the module raises an 162 exception, that exception will be passed through unmodified. 163 164 @param module: module the function lives in 165 @type module: str 166 @param function: function to run 167 @type function: str 168 169 @returns: a callable, the given function in the given module. 170 """ 171 172 def gotModule(mod): 173 if hasattr(mod, function): 174 return getattr(mod, function) 175 else: 176 msg = 'No procedure named %s in module %s' % (function, 177 module) 178 self.warning('%s', msg) 179 raise errors.RemoteRunError(msg)
180 181 def gotModuleError(failure): 182 failure.trap(errors.NoBundleError) 183 msg = 'Failed to find bundle for module %s' % module 184 self.warning('%s', msg) 185 raise errors.RemoteRunError(msg) 186 187 d = self.bundleLoader.loadModule(module) 188 d.addCallbacks(gotModule, gotModuleError) 189 return d 190
191 - def runBundledFunction(self, module, function, *args, **kwargs):
192 """ 193 Runs the given function in the given module with the given 194 arguments. 195 196 This method calls getBundledFunction and then invokes the 197 function. Any error raised by getBundledFunction or by invoking 198 the function will be passed through unmodified. 199 200 Callers that expect to return their result over a PB connection 201 should catch nonserializable exceptions so as to prevent nasty 202 backtraces in the logs. 203 204 @param module: module the function lives in 205 @type module: str 206 @param function: function to run 207 @type function: str 208 209 @returns: the return value of the given function in the module. 210 """ 211 self.debug('runBundledFunction(%r, %r)', module, function) 212 213 def gotFunction(proc): 214 215 def invocationError(failure): 216 self.warning('Exception raised while calling ' 217 '%s.%s(*args=%r, **kwargs=%r): %s', 218 module, function, args, kwargs, 219 log.getFailureMessage(failure)) 220 return failure
221 222 self.debug('calling %s.%s(%r, %r)', module, function, args, 223 kwargs) 224 d = defer.maybeDeferred(proc, *args, **kwargs) 225 d.addErrback(invocationError) 226 return d 227 228 d = self.getBundledFunction(module, function) 229 d.addCallback(gotFunction) 230 return d 231 232
233 -class PingingMedium(BaseMedium):
234 _pingInterval = configure.heartbeatInterval 235 _pingCheckInterval = (configure.heartbeatInterval * 236 configure.pingTimeoutMultiplier) 237 _pingDC = None 238 _clock = reactor 239
240 - def startPinging(self, disconnect):
241 """ 242 @param disconnect: a method to call when we do not get ping replies 243 @type disconnect: callable 244 """ 245 self.debug('startPinging') 246 self._lastPingback = self._clock.seconds() 247 if self._pingDC: 248 self.debug("Cannot start pinging, already pinging") 249 return 250 self._pingDisconnect = disconnect 251 self._ping() 252 self._pingCheck()
253
254 - def _ping(self):
255 256 def pingback(result): 257 self._lastPingback = self._clock.seconds() 258 self.log('pinged, pingback at %r' % self._lastPingback)
259 260 def pingFailed(failure): 261 # ignoring the connection failures so they don't end up in 262 # the logs - we'll notice the lack of pingback eventually 263 failure.trap(pb.PBConnectionLost) 264 self.log('ping failed: %s' % log.getFailureMessage(failure))
265 266 if self.remote: 267 self.log('pinging') 268 d = self.callRemoteLogging(log.LOG, 0, 'ping') 269 d.addCallbacks(pingback, pingFailed) 270 else: 271 self.info('tried to ping, but disconnected yo') 272 273 self._pingDC = self._clock.callLater(self._pingInterval, 274 self._ping) 275
276 - def remoteMessageReceived(self, broker, message, args, kw):
277 self._lastPingback = self._clock.seconds() 278 return BaseMedium.remoteMessageReceived( 279 self, broker, message, args, kw)
280
281 - def callRemoteLogging(self, level, stackDepth, name, *args, **kwargs):
282 d = BaseMedium.callRemoteLogging( 283 self, level, stackDepth, name, *args, **kwargs) 284 285 def cb(result): 286 self._lastPingback = self._clock.seconds() 287 return result
288 d.addCallback(cb) 289 return d 290
291 - def _pingCheck(self):
292 self._pingCheckDC = None 293 if (self.remote and 294 ((self._clock.seconds() - self._lastPingback) > 295 self._pingCheckInterval)): 296 self.info('no pingback in %f seconds, closing connection', 297 self._pingCheckInterval) 298 self._pingDisconnect() 299 else: 300 self._pingCheckDC = self._clock.callLater(self._pingCheckInterval, 301 self._pingCheck)
302
303 - def stopPinging(self):
304 if self._pingCheckDC: 305 self._pingCheckDC.cancel() 306 self._pingCheckDC = None 307 308 if self._pingDC: 309 self._pingDC.cancel() 310 self._pingDC = None
311
312 - def _disconnect(self):
313 if self.remote: 314 self.remote.broker.transport.loseConnection()
315
316 - def setRemoteReference(self, remote, clock=reactor):
317 self._clock = clock 318 319 BaseMedium.setRemoteReference(self, remote) 320 321 def stopPingingCb(x): 322 self.debug('stop pinging') 323 self.stopPinging()
324 self.remote.notifyOnDisconnect(stopPingingCb) 325 326 self.startPinging(self._disconnect) 327
328 - def remote_writeFluDebugMarker(self, level, marker):
329 """ 330 Sets a marker that will be prefixed to the log strings. Setting this 331 marker to multiple elements at a time helps debugging. 332 @param marker: A string to prefix all the log strings. 333 @type marker: str 334 """ 335 self.writeMarker(marker, level)
336