Trees | Indices | Help |
---|
|
1 # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 6 # Copyright (C) 2010,2011 Flumotion Services, S.A. 7 # All rights reserved. 8 # 9 # This file may be distributed and/or modified under the terms of 10 # the GNU Lesser General Public License version 2.1 as published by 11 # the Free Software Foundation. 12 # This file is distributed without any warranty; without even the implied 13 # warranty of merchantability or fitness for a particular purpose. 14 # See "LICENSE.LGPL" in the source distribution for more information. 15 # 16 # Headers in this file shall remain intact. 17 18 """ 19 worker-side objects to handle worker clients 20 """ 21 22 import os 23 import sys 24 import signal 25 26 from twisted.cred import portal 27 from twisted.internet import defer, reactor 28 from twisted.spread import pb 29 from zope.interface import implements 30 31 from flumotion.common import errors, log 32 from flumotion.common import worker, startset 33 from flumotion.common.process import signalPid 34 from flumotion.twisted import checkers, fdserver 35 from flumotion.twisted import pb as fpb 36 37 __version__ = "$Rev$" 38 39 JOB_SHUTDOWN_TIMEOUT = 5 40 4143 # FIXME: there is mkstemp for sockets, so we have a small window 44 # here in which the socket could be created by something else 45 # I didn't succeed in preparing a socket file with that name either 46 47 # caller needs to delete name before using 48 import tempfile 49 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 50 os.close(fd) 51 52 return name53 5456 """ 57 I hold information about a job. 58 59 @cvar pid: PID of the child process 60 @type pid: int 61 @cvar avatarId: avatar identification string 62 @type avatarId: str 63 @cvar type: type of the component to create 64 @type type: str 65 @cvar moduleName: name of the module to create the component from 66 @type moduleName: str 67 @cvar methodName: the factory method to use to create the component 68 @type methodName: str 69 @cvar nice: the nice level to run the job as 70 @type nice: int 71 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 72 create the component 73 @type bundles: list of (str, str) 74 """ 75 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 76 'nice', 'bundles') 7787 8880 self.pid = pid 81 self.avatarId = avatarId 82 self.type = type 83 self.moduleName = moduleName 84 self.methodName = methodName 85 self.nice = nice 86 self.bundles = bundles90131 13292 self._startSet = startSet 93 self._deferredStart = startSet.createRegistered(avatarId) 94 worker.ProcessProtocol.__init__(self, heaven, avatarId, 95 'component', 96 heaven.getWorkerName())9799 heaven = self.loggable 100 heaven.brain.callRemote('componentAddMessage', self.avatarId, 101 message)102104 heaven = self.loggable 105 dstarts = self._startSet 106 signum = status.value.signal 107 108 # we need to trigger a failure on the create deferred 109 # if the job failed before logging in to the worker; 110 # otherwise the manager still thinks it's starting up when it's 111 # dead. If the job already attached to the worker however, 112 # the create deferred will already have callbacked. 113 deferred = dstarts.createRegistered(self.avatarId) 114 if deferred is self._deferredStart: 115 if signum: 116 reason = "received signal %d" % signum 117 else: 118 reason = "unknown reason" 119 text = ("Component '%s' has exited early (%s)." % 120 (self.avatarId, reason)) 121 dstarts.createFailed(self.avatarId, 122 errors.ComponentCreateError(text)) 123 124 if dstarts.shutdownRegistered(self.avatarId): 125 dstarts.shutdownSuccess(self.avatarId) 126 127 heaven.jobStopped(self.pid) 128 129 # chain up 130 worker.ProcessProtocol.processEnded(self, status)134 """ 135 I am similar to but not quite the same as a manager-side Heaven. 136 I manage avatars inside the worker for job processes spawned by the worker. 137 138 @ivar avatars: dict of avatarId -> avatar 139 @type avatars: dict of str -> L{base.BaseJobAvatar} 140 @ivar brain: the worker brain 141 @type brain: L{worker.WorkerBrain} 142 """ 143 144 logCategory = "job-heaven" 145 implements(portal.IRealm) 146 147 avatarClass = None 148275 ret.addCallback(stopListening) 276 return ret 277150 """ 151 @param brain: a reference to the worker brain 152 @type brain: L{worker.WorkerBrain} 153 """ 154 self.avatars = {} # componentId -> avatar 155 self.brain = brain 156 self._socketPath = _getSocketPath() 157 self._port = None 158 self._onShutdown = None # If set, a deferred to fire when 159 # our last child process exits 160 161 self._jobInfos = {} # processid -> JobInfo 162 163 self._startSet = startset.StartSet( 164 lambda x: x in self.avatars, 165 errors.ComponentAlreadyStartingError, 166 errors.ComponentAlreadyRunningError)167169 assert self._port is None 170 assert self.avatarClass is not None 171 # FIXME: we should hand a username and password to log in with to 172 # the job process instead of allowing anonymous 173 checker = checkers.FlexibleCredentialsChecker() 174 checker.allowPasswordless(True) 175 p = portal.Portal(self, [checker]) 176 f = pb.PBServerFactory(p) 177 try: 178 os.unlink(self._socketPath) 179 except OSError: 180 pass 181 182 # Rather than a listenUNIX(), we use listenWith so that we can specify 183 # our particular Port, which creates Transports that we know how to 184 # pass FDs over. 185 self.debug("Listening for FD's on unix socket %s", self._socketPath) 186 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 187 self._port = port188 189 ### portal.IRealm method 190192 if pb.IPerspective in interfaces: 193 avatar = self.avatarClass(self, avatarId, mind) 194 assert avatarId not in self.avatars 195 self.avatars[avatarId] = avatar 196 return pb.IPerspective, avatar, avatar.logout 197 else: 198 raise NotImplementedError("no interface")199201 if avatarId in self.avatars: 202 del self.avatars[avatarId] 203 else: 204 self.warning("some programmer is telling me about an avatar " 205 "I have no idea about: %s", avatarId)206208 """ 209 Gets the name of the worker that spawns the process. 210 211 @rtype: str 212 """ 213 return self.brain.workerName214 217 220 223225 return self._jobInfos.keys()226228 self.debug('telling kids about new log file descriptors') 229 for avatar in self.avatars.values(): 230 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())231233 if pid in self._jobInfos: 234 self.debug('Removing job info for %d', pid) 235 del self._jobInfos[pid] 236 237 if not self._jobInfos and self._onShutdown: 238 self.debug("Last child exited") 239 self._onShutdown.callback(None) 240 else: 241 self.warning("some programmer is telling me about a pid " 242 "I have no idea about: %d", pid)243245 self.debug('Shutting down JobHeaven') 246 self.debug('Stopping all jobs') 247 for avatar in self.avatars.values(): 248 avatar.stop() 249 250 if self.avatars: 251 # If our jobs fail to shut down nicely within some period of 252 # time, shut them down less nicely 253 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 254 255 def cancelDelayedCall(res, dc): 256 # be nice to unit tests 257 if dc.active(): 258 dc.cancel() 259 return res260 261 self._onShutdown = defer.Deferred() 262 self._onShutdown.addCallback(cancelDelayedCall, dc) 263 ret = self._onShutdown 264 else: 265 # everything's gone already, return success 266 ret = defer.succeed(None) 267 268 def stopListening(_): 269 # possible for it to be None, if we haven't been told to 270 # listen yet, as in some test cases 271 if self._port: 272 port = self._port 273 self._port = None 274 return port.stopListening()279 self.warning("Killing all children immediately") 280 for pid in self.getJobPids(): 281 self.killJobByPid(pid, signum)282284 if pid not in self._jobInfos: 285 raise errors.UnknownComponentError(pid) 286 287 jobInfo = self._jobInfos[pid] 288 self.debug("Sending signal %d to job %s at pid %d", signum, 289 jobInfo.avatarId, jobInfo.pid) 290 signalPid(jobInfo.pid, signum)291293 for job in self._jobInfos.values(): 294 if job.avatarId == avatarId: 295 self.killJobByPid(job.pid, signum)296 297299 """ 300 I am an avatar for the job living in the worker. 301 """ 302 logCategory = 'job-avatar' 303359305 """ 306 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 307 @type avatarId: str 308 """ 309 fpb.Avatar.__init__(self, avatarId) 310 self._heaven = heaven 311 self.setMind(mind) 312 self.pid = None313315 """ 316 @param mind: reference to the job's JobMedium on which we can call 317 @type mind: L{twisted.spread.pb.RemoteReference} 318 """ 319 fpb.Avatar.setMind(self, mind) 320 self.haveMind()321 325327 self.log('logout called, %s disconnected', self.avatarId) 328 329 self._heaven.removeAvatar(self.avatarId)330 336338 try: 339 # FIXME: pay attention to the return value of 340 # sendFileDescriptor; is the same as the return value of 341 # sendmsg(2) 342 self.mind.broker.transport.sendFileDescriptor(fd, message) 343 return True 344 except OSError, e: 345 # OSError is what is thrown by the C code doing this 346 # when there are issues 347 self.warning("Error %s sending file descriptors", 348 log.getExceptionMessage(e)) 349 return False350352 """ 353 Tell the job to log to the given file descriptors. 354 """ 355 self.debug('Giving job new stdout and stderr') 356 if self.mind: 357 self._sendFileDescriptor(stdout, "redirectStdout") 358 self._sendFileDescriptor(stdout, "redirectStderr")
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Tue Aug 13 06:17:11 2013 | http://epydoc.sourceforge.net |