Package flumotion :: Package worker :: Module job
[hide private]

Source Code for Module flumotion.worker.job

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  worker-side objects to handle worker clients 
 20  """ 
 21   
 22  import os 
 23  import signal 
 24  import sys 
 25   
 26  from twisted.internet import defer, reactor 
 27   
 28  from flumotion.common import errors, log 
 29  from flumotion.common import messages 
 30  from flumotion.common.i18n import N_, gettexter 
 31  from flumotion.configure import configure 
 32  from flumotion.worker import base 
 33   
 34  __version__ = "$Rev$" 
 35  T_ = gettexter() 
 36   
 37   
38 -class ComponentJobAvatar(base.BaseJobAvatar):
39
40 - def haveMind(self):
41 42 def bootstrap(*args): 43 return self.mindCallRemote('bootstrap', *args)
44 45 def create(_, job): 46 self.debug("asking job to create component with avatarId %s," 47 " type %s", job.avatarId, job.type) 48 return self.mindCallRemote('create', job.avatarId, job.type, 49 job.moduleName, job.methodName, 50 job.nice, job.conf)
51 52 def success(_, avatarId): 53 self.debug('job started component with avatarId %s', 54 avatarId) 55 # FIXME: drills down too much? 56 self._heaven._startSet.createSuccess(avatarId) 57 58 def error(failure, job): 59 msg = log.getFailureMessage(failure) 60 if failure.check(errors.ComponentCreateError): 61 self.warning('could not create component %s of type %s:' 62 ' %s', job.avatarId, job.type, msg) 63 else: 64 self.warning('unhandled error creating component %s: %s', 65 job.avatarId, msg) 66 # FIXME: drills down too much? 67 self._heaven._startSet.createFailed(job.avatarId, failure) 68 69 def gotPid(pid): 70 self.pid = pid 71 info = self._heaven.getManagerConnectionInfo() 72 if info.use_ssl: 73 transport = 'ssl' 74 else: 75 transport = 'tcp' 76 job = self._heaven.getJobInfo(pid) 77 workerName = self._heaven.getWorkerName() 78 79 d = bootstrap(workerName, info.host, info.port, transport, 80 info.authenticator, job.bundles) 81 d.addCallback(create, job) 82 d.addCallback(success, job.avatarId) 83 d.addErrback(error, job) 84 return d 85 d = self.mindCallRemote("getPid") 86 d.addCallback(gotPid) 87 return d 88
89 - def stop(self):
90 """ 91 returns: a deferred marking completed stop. 92 """ 93 if not self.mind: 94 self.debug('already logged out') 95 return defer.succeed(None) 96 else: 97 self.debug('stopping') 98 return self.mindCallRemote('stop')
99
100 - def sendFeed(self, feedName, fd, eaterId):
101 """ 102 Tell the feeder to send the given feed to the given fd. 103 104 @returns: whether the fd was successfully handed off to the component. 105 """ 106 self.debug('Sending FD %d to component job to feed %s to fd', 107 fd, feedName) 108 109 # it is possible that the component has logged out, in which 110 # case we don't have a mind. Trying to check for this earlier 111 # only introduces a race, so we handle it here by triggering a 112 # disconnect on the fd. 113 if self.mind: 114 message = "sendFeed %s %s" % (feedName, eaterId) 115 return self._sendFileDescriptor(fd, message) 116 else: 117 self.debug('my mind is gone, trigger disconnect') 118 return False
119
120 - def receiveFeed(self, eaterAlias, fd, feedId):
121 """ 122 Tell the feeder to receive the given feed from the given fd. 123 124 @returns: whether the fd was successfully handed off to the component. 125 """ 126 self.debug('Sending FD %d to component job to eat %s from fd', 127 fd, eaterAlias) 128 129 # same note as in sendFeed 130 if self.mind: 131 message = "receiveFeed %s %s" % (eaterAlias, feedId) 132 return self._sendFileDescriptor(fd, message) 133 else: 134 self.debug('my mind is gone, trigger disconnect') 135 return False
136
137 - def perspective_cleanShutdown(self):
138 """ 139 This notification from the job process will be fired when it is 140 shutting down, so that although the process might still be 141 around, we know it's OK to accept new start requests for this 142 avatar ID. 143 """ 144 self.info("component %s shutting down cleanly", self.avatarId) 145 # FIXME: drills down too much? 146 self._heaven._startSet.shutdownStart(self.avatarId)
147 148
149 -class ComponentJobInfo(base.JobInfo):
150 __slots__ = ('conf', ) 151
152 - def __init__(self, pid, avatarId, type, moduleName, methodName, 153 nice, bundles, conf):
157 158
159 -class ComponentJobHeaven(base.BaseJobHeaven):
160 avatarClass = ComponentJobAvatar 161 logCategory = 'component-job-heaven' 162
163 - def getManagerConnectionInfo(self):
164 """ 165 Gets the L{flumotion.common.connection.PBConnectionInfo} 166 describing how to connect to the manager. 167 168 @rtype: L{flumotion.common.connection.PBConnectionInfo} 169 """ 170 return self.brain.managerConnectionInfo
171
172 - def spawn(self, avatarId, type, moduleName, methodName, nice, 173 bundles, conf):
174 """ 175 Spawn a new job. 176 177 This will spawn a new flumotion-job process, running under the 178 requested nice level. When the job logs in, it will be told to 179 load bundles and run a function, which is expected to return a 180 component. 181 182 @param avatarId: avatarId the component should use to log in 183 @type avatarId: str 184 @param type: type of component to start 185 @type type: str 186 @param moduleName: name of the module to create the component from 187 @type moduleName: str 188 @param methodName: the factory method to use to create the component 189 @type methodName: str 190 @param nice: nice level 191 @type nice: int 192 @param bundles: ordered list of (bundleName, bundlePath) for this 193 component 194 @type bundles: list of (str, str) 195 @param conf: component configuration 196 @type conf: dict 197 """ 198 d = self._startSet.createStart(avatarId) 199 200 p = base.JobProcessProtocol(self, avatarId, self._startSet) 201 executable = os.path.join(configure.bindir, 'flumotion-job') 202 if not os.path.exists(executable): 203 self.error("Trying to spawn job process, but '%s' does not " 204 "exist", executable) 205 argv = [executable, avatarId, self._socketPath] 206 207 realexecutable = executable 208 209 # Run some jobs under valgrind, optionally. Would be nice to have the 210 # arguments to run it with configurable, but this'll do for now. 211 # FLU_VALGRIND_JOB takes a comma-seperated list of full component 212 # avatar IDs. 213 if 'FLU_VALGRIND_JOB' in os.environ: 214 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',') 215 if avatarId in jobnames: 216 realexecutable = 'valgrind' 217 # We can't just valgrind flumotion-job, we have to valgrind 218 # python running flumotion-job, otherwise we'd need 219 # --trace-children (not quite sure why), which we don't want 220 argv = ['valgrind', '--leak-check=full', '--num-callers=24', 221 '--leak-resolution=high', '--show-reachable=yes', 222 'python'] + argv 223 224 childFDs = {0: 0, 1: 1, 2: 2} 225 env = {} 226 env.update(os.environ) 227 env['FLU_DEBUG'] = log.getDebug() 228 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv, 229 childFDs=childFDs) 230 231 p.setPid(process.pid) 232 233 self.addJobInfo(process.pid, 234 ComponentJobInfo(process.pid, avatarId, type, 235 moduleName, methodName, nice, 236 bundles, conf)) 237 return d
238 239
240 -class CheckJobAvatar(base.BaseJobAvatar):
241
242 - def haveMind(self):
243 # FIXME: drills down too much? 244 245 def gotPid(pid): 246 self.pid = pid 247 job = self._heaven.getJobInfo(pid) 248 self._heaven._startSet.createSuccess(job.avatarId)
249 250 d = self.mindCallRemote("getPid") 251 d.addCallback(gotPid) 252 return d
253
254 - def stop(self):
255 """ 256 returns: a deferred marking completed stop. 257 """ 258 self._heaven._startSet.shutdownStart(self.avatarId) 259 self._heaven.killJob(self.avatarId, signal.SIGTERM)
260
261 - def perspective_cleanShutdown(self):
262 self.debug("job is stopping")
263 264
265 -class CheckJobHeaven(base.BaseJobHeaven):
266 avatarClass = CheckJobAvatar 267 logCategory = 'check-job-heaven' 268 269 _checkCount = 0 270 _timeout = 45 271
272 - def __init__(self, brain):
273 base.BaseJobHeaven.__init__(self, brain) 274 275 # job processes that are available to do work (i.e. not actively 276 # running checks) 277 self.jobPool = []
278
279 - def getCheckJobFromPool(self):
280 if self.jobPool: 281 job, expireDC = self.jobPool.pop(0) 282 expireDC.cancel() 283 self.debug('running check in already-running job %s', 284 job.avatarId) 285 return defer.succeed(job) 286 287 avatarId = 'check-%d' % (self._checkCount, ) 288 self._checkCount += 1 289 290 self.debug('spawning new job %s to run a check', avatarId) 291 d = self._startSet.createStart(avatarId) 292 293 p = base.JobProcessProtocol(self, avatarId, self._startSet) 294 executable = os.path.join(configure.bindir, 'flumotion-job') 295 argv = [executable, avatarId, self._socketPath] 296 297 childFDs = {0: 0, 1: 1, 2: 2} 298 env = {} 299 env.update(os.environ) 300 env['FLU_DEBUG'] = log.getDebug() 301 process = reactor.spawnProcess(p, executable, env=env, args=argv, 302 childFDs=childFDs) 303 304 p.setPid(process.pid) 305 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None, 306 None, []) 307 self._jobInfos[process.pid] = jobInfo 308 309 def haveMind(_): 310 # we have a mind, in theory; return the job avatar 311 return self.avatars[avatarId]
312 313 d.addCallback(haveMind) 314 return d
315
316 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
317 318 def haveJob(job): 319 320 def callProc(_): 321 return job.mindCallRemote('runFunction', moduleName, 322 methodName, *args, **kwargs)
323 324 def timeout(sig): 325 self.killJobByPid(job.pid, sig) 326 327 def haveResult(res): 328 if not termtimeout.active(): 329 self.info("Discarding error %s", res) 330 res = messages.Result() 331 res.add(messages.Error( 332 T_(N_("Check timed out.")), 333 debug=("Timed out running %s."%methodName))) 334 else: 335 336 def expire(): 337 if (job, expireDC) in self.jobPool: 338 self.debug('stopping idle check job process %s', 339 job.avatarId) 340 self.jobPool.remove((job, expireDC)) 341 job.mindCallRemote('stop') 342 expireDC = reactor.callLater(self._timeout, expire) 343 self.jobPool.append((job, expireDC)) 344 345 if termtimeout.active(): 346 termtimeout.cancel() 347 if killtimeout.active(): 348 killtimeout.cancel() 349 return res 350 351 # add callbacks and errbacks that kill the job 352 353 termtimeout = reactor.callLater(self._timeout, timeout, 354 signal.SIGTERM) 355 killtimeout = reactor.callLater(self._timeout, timeout, 356 signal.SIGKILL) 357 358 d = job.mindCallRemote('bootstrap', self.getWorkerName(), 359 None, None, None, None, bundles) 360 d.addCallback(callProc) 361 d.addCallbacks(haveResult, haveResult) 362 return d 363 364 d = self.getCheckJobFromPool() 365 d.addCallback(haveJob) 366 367 return d 368