Package flumotion :: Package job :: Module job
[hide private]

Source Code for Module flumotion.job.job

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  the job-side half of the worker-job connection 
 20  """ 
 21   
 22  import os 
 23  import resource 
 24  import sys 
 25   
 26  # I've read somewhere that importing the traceback module messes up the 
 27  # exception state, so it's better to import it globally instead of in the 
 28  # exception handler 
 29  # import traceback 
 30   
 31  from twisted.cred import credentials 
 32  from twisted.internet import reactor, defer 
 33  from twisted.python import failure 
 34  from twisted.spread import pb 
 35  from zope.interface import implements 
 36   
 37  from flumotion.common import errors, interfaces, log 
 38   
 39  from flumotion.common import medium, package 
 40  from flumotion.common.reflectcall import createComponent, reflectCallCatching 
 41  from flumotion.component import component 
 42   
 43  from flumotion.twisted import fdserver 
 44  from flumotion.twisted import pb as fpb 
 45  from flumotion.twisted import defer as fdefer 
 46   
 47  # register serializables 
 48  from flumotion.common import keycards 
 49   
 50  __version__ = "$Rev$" 
 51   
 52   
53 -class JobMedium(medium.BaseMedium):
54 """ 55 I am a medium between the job and the worker's job avatar. 56 I live in the job process. 57 58 @cvar component: the component this is a medium for; created as part of 59 L{remote_create} 60 @type component: L{flumotion.component.component.BaseComponent} 61 """ 62 logCategory = 'jobmedium' 63 remoteLogName = 'jobavatar' 64 65 implements(interfaces.IJobMedium) 66
67 - def __init__(self):
68 self.avatarId = None 69 self.logName = None 70 self.component = None 71 72 self._workerName = None 73 self._managerHost = None 74 self._managerPort = None 75 self._managerTransport = None 76 self._managerKeycard = None 77 self._componentClientFactory = None # from component to manager 78 79 self._hasStoppedReactor = False
80 81 ### pb.Referenceable remote methods called on by the WorkerBrain 82
83 - def remote_bootstrap(self, workerName, host, port, 84 transport, authenticator, packagePaths):
85 """ 86 I receive the information on how to connect to the manager. I also set 87 up package paths to be able to run the component. 88 89 Called by the worker's JobAvatar. 90 91 @param workerName: the name of the worker running this job 92 @type workerName: str 93 @param host: the host that is running the manager 94 @type host: str 95 @param port: port on which the manager is listening 96 @type port: int 97 @param transport: 'tcp' or 'ssl' 98 @type transport: str 99 @param authenticator: remote reference to the worker-side authenticator 100 @type authenticator: L{twisted.spread.pb.RemoteReference} to a 101 L{flumotion.twisted.pb.Authenticator} 102 @param packagePaths: ordered list of 103 (package name, package path) tuples 104 @type packagePaths: list of (str, str) 105 """ 106 self._workerName = workerName 107 self._managerHost = host 108 self._managerPort = port 109 self._managerTransport = transport 110 if authenticator: 111 self._authenticator = fpb.RemoteAuthenticator(authenticator) 112 else: 113 self.debug('no authenticator, will not be able to log ' 114 'into manager') 115 self._authenticator = None 116 117 packager = package.getPackager() 118 for name, path in packagePaths: 119 self.debug('registering package path for %s' % name) 120 self.log('... from path %s' % path) 121 packager.registerPackagePath(path, name)
122
123 - def remote_getPid(self):
124 return os.getpid()
125
126 - def remote_runFunction(self, moduleName, methodName, *args, **kwargs):
127 """ 128 I am called on by the worker's JobAvatar to run a function, 129 normally on behalf of the flumotion.admin.gtk. 130 131 @param moduleName: name of the module containing the function 132 @type moduleName: str 133 @param methodName: the method to run 134 @type methodName: str 135 @param args: args to pass to the method 136 @type args: tuple 137 @param kwargs: kwargs to pass to the method 138 @type kwargs: dict 139 140 @returns: the result of invoking the method 141 """ 142 self.info('Running %s.%s(*%r, **%r)' % (moduleName, methodName, 143 args, kwargs)) 144 # FIXME: do we want to do this? 145 self._enableCoreDumps() 146 147 return reflectCallCatching(errors.RemoteRunError, moduleName, 148 methodName, *args, **kwargs)
149
150 - def remote_create(self, avatarId, type, moduleName, methodName, 151 nice, conf):
152 """ 153 I am called on by the worker's JobAvatar to create a component. 154 155 @param avatarId: avatarId for component to log in to manager 156 @type avatarId: str 157 @param type: type of component to start 158 @type type: str 159 @param moduleName: name of the module to create the component from 160 @type moduleName: str 161 @param methodName: the factory method to use to create the component 162 @type methodName: str 163 @param nice: the nice level 164 @type nice: int 165 @param conf: the component configuration 166 @type conf: dict 167 """ 168 self.avatarId = avatarId 169 self.logName = avatarId 170 171 self.component = self._createComponent(avatarId, type, moduleName, 172 methodName, nice, conf) 173 self.component.setShutdownHook(self._componentStopped)
174
175 - def _componentStopped(self):
176 # stop reactor from a callLater so remote methods finish nicely 177 reactor.callLater(0, self.shutdown)
178
179 - def remote_stop(self):
180 if self.component: 181 self.debug('stopping component and shutting down') 182 self.component.stop() 183 else: 184 reactor.callLater(0, self.shutdown)
185
186 - def shutdownHandler(self):
187 dlist = [] 188 if self.hasRemoteReference(): 189 # tell the worker we are shutting down 190 dlist.append(self.callRemote("cleanShutdown")) 191 if self.component: 192 medium = self.component.medium 193 if medium.hasRemoteReference(): 194 dlist.append(medium.callRemote("cleanShutdown")) 195 196 # We mustn't fire the deferred returned from here except from a 197 # callLater. 198 dl = defer.DeferredList(dlist, fireOnOneErrback=False) 199 return fdefer.defer_call_later(dl)
200 201 ### our methods 202
203 - def shutdown(self):
204 """ 205 Shut down the job process completely, cleaning up the component 206 so the reactor can be left from. 207 """ 208 if self._hasStoppedReactor: 209 self.debug("Not stopping reactor again, already shutting down") 210 else: 211 self._hasStoppedReactor = True 212 self.info("Stopping reactor in job process") 213 reactor.stop()
214
215 - def _setNice(self, nice):
216 if not nice: 217 return 218 219 try: 220 os.nice(nice) 221 except OSError, e: 222 self.warning('Failed to set nice level: %s' % str(e)) 223 else: 224 self.debug('Nice level set to %d' % nice)
225
226 - def _enableCoreDumps(self):
227 soft, hard = resource.getrlimit(resource.RLIMIT_CORE) 228 if hard != resource.RLIM_INFINITY: 229 self.warning('Could not set unlimited core dump sizes, ' 230 'setting to %d instead' % hard) 231 else: 232 self.debug('Enabling core dumps of unlimited size') 233 234 resource.setrlimit(resource.RLIMIT_CORE, (hard, hard))
235
236 - def _createComponent(self, avatarId, type, moduleName, methodName, 237 nice, conf):
238 """ 239 Create a component of the given type. 240 Log in to the manager with the given avatarId. 241 242 @param avatarId: avatarId component will use to log in to manager 243 @type avatarId: str 244 @param type: type of component to start 245 @type type: str 246 @param moduleName: name of the module that contains the entry point 247 @type moduleName: str 248 @param methodName: name of the factory method to create the component 249 @type methodName: str 250 @param nice: the nice level to run with 251 @type nice: int 252 @param conf: the component configuration 253 @type conf: dict 254 """ 255 self.info('Creating component "%s" of type "%s"', avatarId, type) 256 257 self._setNice(nice) 258 self._enableCoreDumps() 259 260 try: 261 comp = createComponent(moduleName, methodName, conf) 262 except Exception, e: 263 msg = "Exception %s during createComponent: %s" % ( 264 e.__class__.__name__, " ".join(e.args)) 265 # traceback.print_exc() 266 # a ComponentCreateError is already formatted 267 if isinstance(e, errors.ComponentCreateError): 268 msg = e.args[0] 269 self.warning( 270 "raising ComponentCreateError(%s) and stopping job" % msg) 271 # This is a Nasty Hack. We raise ComponentCreateError, which can be 272 # caught on the other side and marshalled as a reasonably 273 # comprehensible error message. However, if we shutdown 274 # immediately, the PB connection won't be available, so 275 # the worker will just get an error about that! So, instead, 276 # we shut down in a tenth of a second, usually allowing 277 # the worker to get scheduled and read the exception over PB. 278 # Ick! 279 reactor.callLater(0.1, self.shutdown) 280 raise errors.ComponentCreateError(msg) 281 282 comp.setWorkerName(self._workerName) 283 284 # make component log in to manager 285 self.debug('creating ComponentClientFactory') 286 managerClientFactory = component.ComponentClientFactory(comp) 287 self._componentClientFactory = managerClientFactory 288 self.debug('created ComponentClientFactory %r' % managerClientFactory) 289 self._authenticator.avatarId = avatarId 290 managerClientFactory.startLogin(self._authenticator) 291 292 host = self._managerHost 293 port = self._managerPort 294 transport = self._managerTransport 295 self.debug('logging in with authenticator %r' % self._authenticator) 296 if transport == "ssl": 297 from flumotion.common import common 298 common.assertSSLAvailable() 299 from twisted.internet import ssl 300 self.info('Connecting to manager %s:%d with SSL' % (host, port)) 301 reactor.connectSSL(host, port, managerClientFactory, 302 ssl.ClientContextFactory()) 303 elif transport == "tcp": 304 self.info('Connecting to manager %s:%d with TCP' % (host, port)) 305 reactor.connectTCP(host, port, managerClientFactory) 306 else: 307 self.warning( 308 'Unknown transport protocol %s' % self._managerTransport) 309 310 return comp
311 312
313 -class JobClientBroker(pb.Broker, log.Loggable):
314 """ 315 A pb.Broker subclass that handles FDs being passed (with associated data) 316 over the same connection as the normal PB data stream. 317 When an FD is seen, the FD should be added to a given eater or feeder 318 element. 319 """ 320
321 - def __init__(self, connectionClass, **kwargs):
322 """ 323 @param connectionClass: subclass of L{twisted.internet.tcp.Connection} 324 """ 325 pb.Broker.__init__(self, **kwargs) 326 327 self._connectionClass = connectionClass
328
329 - def fileDescriptorsReceived(self, fds, message):
330 # file descriptors get delivered to the component 331 self.debug('received fds %r, message %r' % (fds, message)) 332 if message.startswith('sendFeed '): 333 334 def parseargs(_, feedName, eaterId=None): 335 return feedName, eaterId
336 feedName, eaterId = parseargs(*message.split(' ')) 337 self.factory.medium.component.feedToFD(feedName, fds[0], 338 os.close, eaterId) 339 elif message.startswith('receiveFeed '): 340 341 def parseargs2(_, eaterAlias, feedId=None): 342 return eaterAlias, feedId
343 eaterAlias, feedId = parseargs2(*message.split(' ')) 344 self.factory.medium.component.eatFromFD(eaterAlias, feedId, 345 fds[0]) 346 elif message == 'redirectStdout': 347 self.debug('told to rotate stdout to fd %d', fds[0]) 348 os.dup2(fds[0], sys.stdout.fileno()) 349 os.close(fds[0]) 350 self.debug('rotated stdout') 351 elif message == 'redirectStderr': 352 self.debug('told to rotate stderr to fd %d', fds[0]) 353 os.dup2(fds[0], sys.stderr.fileno()) 354 os.close(fds[0]) 355 self.info('rotated stderr') 356 else: 357 self.warning('Unknown message received: %r' % message) 358 359
360 -class JobClientFactory(pb.PBClientFactory, log.Loggable):
361 """ 362 I am a client factory that logs in to the WorkerBrain. 363 I live in the flumotion-job process spawned by the worker. 364 365 @cvar medium: the medium for the JobHeaven to access us through 366 @type medium: L{JobMedium} 367 """ 368 logCategory = "job" 369 perspectiveInterface = interfaces.IJobMedium 370
371 - def __init__(self, id):
372 """ 373 @param id: the avatar id used for logging into the workerbrain 374 @type id: str 375 """ 376 pb.PBClientFactory.__init__(self) 377 378 self.medium = JobMedium() 379 self.logName = id 380 self.login(id) 381 382 # use an FD-passing broker instead 383 self.protocol = JobClientBroker
384 385 ### pb.PBClientFactory methods 386
387 - def buildProtocol(self, addr):
388 p = self.protocol(fdserver.FDServer) 389 p.factory = self 390 return p
391 392 # FIXME: might be nice if jobs got a password to use to log in to brain 393
394 - def login(self, username):
395 396 def haveReference(remoteReference): 397 self.info('Logged in to worker') 398 self.debug('perspective %r connected', remoteReference) 399 self.medium.setRemoteReference(remoteReference)
400 401 self.info('Logging in to worker') 402 d = pb.PBClientFactory.login(self, 403 credentials.UsernamePassword(username, ''), 404 self.medium) 405 d.addCallback(haveReference) 406 return d
407 408 # the only way stopFactory can be called is if the WorkerBrain closes 409 # the pb server. Ideally though we would have gotten a notice before. 410 # This ensures we shut down the component/job in ALL cases where the worker 411 # goes away. 412
413 - def stopFactory(self):
414 self.debug('shutting down medium') 415 self.medium.shutdown() 416 self.debug('shut down medium')
417