Package flumotion :: Package worker :: Module base
[hide private]

Source Code for Module flumotion.worker.base

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  worker-side objects to handle worker clients 
 20  """ 
 21   
 22  import os 
 23  import sys 
 24  import signal 
 25   
 26  from twisted.cred import portal 
 27  from twisted.internet import defer, reactor 
 28  from twisted.spread import pb 
 29  from zope.interface import implements 
 30   
 31  from flumotion.common import errors, log 
 32  from flumotion.common import worker, startset 
 33  from flumotion.common.process import signalPid 
 34  from flumotion.twisted import checkers, fdserver 
 35  from flumotion.twisted import pb as fpb 
 36   
 37  __version__ = "$Rev$" 
 38   
 39  JOB_SHUTDOWN_TIMEOUT = 5 
 40   
 41   
42 -def _getSocketPath():
43 # FIXME: there is mkstemp for sockets, so we have a small window 44 # here in which the socket could be created by something else 45 # I didn't succeed in preparing a socket file with that name either 46 47 # caller needs to delete name before using 48 import tempfile 49 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 50 os.close(fd) 51 52 return name
53 54
55 -class JobInfo(object):
56 """ 57 I hold information about a job. 58 59 @cvar pid: PID of the child process 60 @type pid: int 61 @cvar avatarId: avatar identification string 62 @type avatarId: str 63 @cvar type: type of the component to create 64 @type type: str 65 @cvar moduleName: name of the module to create the component from 66 @type moduleName: str 67 @cvar methodName: the factory method to use to create the component 68 @type methodName: str 69 @cvar nice: the nice level to run the job as 70 @type nice: int 71 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 72 create the component 73 @type bundles: list of (str, str) 74 """ 75 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 76 'nice', 'bundles') 77
78 - def __init__(self, pid, avatarId, type, moduleName, methodName, nice, 79 bundles):
80 self.pid = pid 81 self.avatarId = avatarId 82 self.type = type 83 self.moduleName = moduleName 84 self.methodName = methodName 85 self.nice = nice 86 self.bundles = bundles
87 88
89 -class JobProcessProtocol(worker.ProcessProtocol):
90
91 - def __init__(self, heaven, avatarId, startSet):
92 self._startSet = startSet 93 self._deferredStart = startSet.createRegistered(avatarId) 94 worker.ProcessProtocol.__init__(self, heaven, avatarId, 95 'component', 96 heaven.getWorkerName())
97
98 - def sendMessage(self, message):
99 heaven = self.loggable 100 heaven.brain.callRemote('componentAddMessage', self.avatarId, 101 message)
102
103 - def processEnded(self, status):
104 heaven = self.loggable 105 dstarts = self._startSet 106 signum = status.value.signal 107 108 # we need to trigger a failure on the create deferred 109 # if the job failed before logging in to the worker; 110 # otherwise the manager still thinks it's starting up when it's 111 # dead. If the job already attached to the worker however, 112 # the create deferred will already have callbacked. 113 deferred = dstarts.createRegistered(self.avatarId) 114 if deferred is self._deferredStart: 115 if signum: 116 reason = "received signal %d" % signum 117 else: 118 reason = "unknown reason" 119 text = ("Component '%s' has exited early (%s)." % 120 (self.avatarId, reason)) 121 dstarts.createFailed(self.avatarId, 122 errors.ComponentCreateError(text)) 123 124 if dstarts.shutdownRegistered(self.avatarId): 125 dstarts.shutdownSuccess(self.avatarId) 126 127 heaven.jobStopped(self.pid) 128 129 # chain up 130 worker.ProcessProtocol.processEnded(self, status)
131 132
133 -class BaseJobHeaven(pb.Root, log.Loggable):
134 """ 135 I am similar to but not quite the same as a manager-side Heaven. 136 I manage avatars inside the worker for job processes spawned by the worker. 137 138 @ivar avatars: dict of avatarId -> avatar 139 @type avatars: dict of str -> L{base.BaseJobAvatar} 140 @ivar brain: the worker brain 141 @type brain: L{worker.WorkerBrain} 142 """ 143 144 logCategory = "job-heaven" 145 implements(portal.IRealm) 146 147 avatarClass = None 148
149 - def __init__(self, brain):
150 """ 151 @param brain: a reference to the worker brain 152 @type brain: L{worker.WorkerBrain} 153 """ 154 self.avatars = {} # componentId -> avatar 155 self.brain = brain 156 self._socketPath = _getSocketPath() 157 self._port = None 158 self._onShutdown = None # If set, a deferred to fire when 159 # our last child process exits 160 161 self._jobInfos = {} # processid -> JobInfo 162 163 self._startSet = startset.StartSet( 164 lambda x: x in self.avatars, 165 errors.ComponentAlreadyStartingError, 166 errors.ComponentAlreadyRunningError)
167
168 - def listen(self):
169 assert self._port is None 170 assert self.avatarClass is not None 171 # FIXME: we should hand a username and password to log in with to 172 # the job process instead of allowing anonymous 173 checker = checkers.FlexibleCredentialsChecker() 174 checker.allowPasswordless(True) 175 p = portal.Portal(self, [checker]) 176 f = pb.PBServerFactory(p) 177 try: 178 os.unlink(self._socketPath) 179 except OSError: 180 pass 181 182 # Rather than a listenUNIX(), we use listenWith so that we can specify 183 # our particular Port, which creates Transports that we know how to 184 # pass FDs over. 185 self.debug("Listening for FD's on unix socket %s", self._socketPath) 186 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 187 self._port = port
188 189 ### portal.IRealm method 190
191 - def requestAvatar(self, avatarId, mind, *interfaces):
192 if pb.IPerspective in interfaces: 193 avatar = self.avatarClass(self, avatarId, mind) 194 assert avatarId not in self.avatars 195 self.avatars[avatarId] = avatar 196 return pb.IPerspective, avatar, avatar.logout 197 else: 198 raise NotImplementedError("no interface")
199
200 - def removeAvatar(self, avatarId):
201 if avatarId in self.avatars: 202 del self.avatars[avatarId] 203 else: 204 self.warning("some programmer is telling me about an avatar " 205 "I have no idea about: %s", avatarId)
206
207 - def getWorkerName(self):
208 """ 209 Gets the name of the worker that spawns the process. 210 211 @rtype: str 212 """ 213 return self.brain.workerName
214
215 - def addJobInfo(self, processId, jobInfo):
216 self._jobInfos[processId] = jobInfo
217
218 - def getJobInfo(self, processId):
219 return self._jobInfos[processId]
220
221 - def getJobInfos(self):
222 return self._jobInfos.values()
223
224 - def getJobPids(self):
225 return self._jobInfos.keys()
226
227 - def rotateChildLogFDs(self):
228 self.debug('telling kids about new log file descriptors') 229 for avatar in self.avatars.values(): 230 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
231
232 - def jobStopped(self, pid):
233 if pid in self._jobInfos: 234 self.debug('Removing job info for %d', pid) 235 del self._jobInfos[pid] 236 237 if not self._jobInfos and self._onShutdown: 238 self.debug("Last child exited") 239 self._onShutdown.callback(None) 240 else: 241 self.warning("some programmer is telling me about a pid " 242 "I have no idea about: %d", pid)
243
244 - def shutdown(self):
245 self.debug('Shutting down JobHeaven') 246 self.debug('Stopping all jobs') 247 for avatar in self.avatars.values(): 248 avatar.stop() 249 250 if self.avatars: 251 # If our jobs fail to shut down nicely within some period of 252 # time, shut them down less nicely 253 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 254 255 def cancelDelayedCall(res, dc): 256 # be nice to unit tests 257 if dc.active(): 258 dc.cancel() 259 return res
260 261 self._onShutdown = defer.Deferred() 262 self._onShutdown.addCallback(cancelDelayedCall, dc) 263 ret = self._onShutdown 264 else: 265 # everything's gone already, return success 266 ret = defer.succeed(None) 267 268 def stopListening(_): 269 # possible for it to be None, if we haven't been told to 270 # listen yet, as in some test cases 271 if self._port: 272 port = self._port 273 self._port = None 274 return port.stopListening()
275 ret.addCallback(stopListening) 276 return ret 277
278 - def kill(self, signum=signal.SIGKILL):
279 self.warning("Killing all children immediately") 280 for pid in self.getJobPids(): 281 self.killJobByPid(pid, signum)
282
283 - def killJobByPid(self, pid, signum):
284 if pid not in self._jobInfos: 285 raise errors.UnknownComponentError(pid) 286 287 jobInfo = self._jobInfos[pid] 288 self.debug("Sending signal %d to job %s at pid %d", signum, 289 jobInfo.avatarId, jobInfo.pid) 290 signalPid(jobInfo.pid, signum)
291
292 - def killJob(self, avatarId, signum):
293 for job in self._jobInfos.values(): 294 if job.avatarId == avatarId: 295 self.killJobByPid(job.pid, signum)
296 297
298 -class BaseJobAvatar(fpb.Avatar, log.Loggable):
299 """ 300 I am an avatar for the job living in the worker. 301 """ 302 logCategory = 'job-avatar' 303
304 - def __init__(self, heaven, avatarId, mind):
305 """ 306 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 307 @type avatarId: str 308 """ 309 fpb.Avatar.__init__(self, avatarId) 310 self._heaven = heaven 311 self.setMind(mind) 312 self.pid = None
313
314 - def setMind(self, mind):
315 """ 316 @param mind: reference to the job's JobMedium on which we can call 317 @type mind: L{twisted.spread.pb.RemoteReference} 318 """ 319 fpb.Avatar.setMind(self, mind) 320 self.haveMind()
321
322 - def haveMind(self):
323 # implement me in subclasses 324 pass
325
326 - def logout(self):
327 self.log('logout called, %s disconnected', self.avatarId) 328 329 self._heaven.removeAvatar(self.avatarId)
330
331 - def stop(self):
332 """ 333 returns: a deferred marking completed stop. 334 """ 335 raise NotImplementedError
336
337 - def _sendFileDescriptor(self, fd, message):
338 try: 339 # FIXME: pay attention to the return value of 340 # sendFileDescriptor; is the same as the return value of 341 # sendmsg(2) 342 self.mind.broker.transport.sendFileDescriptor(fd, message) 343 return True 344 except OSError, e: 345 # OSError is what is thrown by the C code doing this 346 # when there are issues 347 self.warning("Error %s sending file descriptors", 348 log.getExceptionMessage(e)) 349 return False
350
351 - def logTo(self, stdout, stderr):
352 """ 353 Tell the job to log to the given file descriptors. 354 """ 355 self.debug('Giving job new stdout and stderr') 356 if self.mind: 357 self._sendFileDescriptor(stdout, "redirectStdout") 358 self._sendFileDescriptor(stdout, "redirectStderr")
359