Package flumotion :: Package worker :: Module worker
[hide private]

Source Code for Module flumotion.worker.worker

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """worker-side objects to handle worker clients 
 19  """ 
 20   
 21  import signal 
 22   
 23  from twisted.internet import defer, error, reactor 
 24  from zope.interface import implements 
 25   
 26  from flumotion.common import errors, interfaces, log 
 27  from flumotion.worker import medium, job, feedserver 
 28  from flumotion.twisted.defer import defer_call_later 
 29   
 30  __version__ = "$Rev$" 
 31   
 32   
33 -class ProxyBouncer(log.Loggable):
34 logCategory = "proxybouncer" 35 36 """ 37 I am a bouncer that proxies authenticate calls to a remote FPB root 38 object. 39 """ 40
41 - def __init__(self, remote):
42 """ 43 @param remote: an object that has .callRemote() 44 """ 45 self._remote = remote
46
47 - def getKeycardClasses(self):
48 """ 49 Call me before asking me to authenticate, so I know what I can 50 authenticate. 51 """ 52 return self._remote.callRemote('getKeycardClasses')
53
54 - def authenticate(self, keycard):
55 self.debug("Authenticating keycard %r against remote bouncer", 56 keycard) 57 return self._remote.callRemote('authenticate', None, keycard)
58 59 # Similar to Vishnu, but for worker related classes 60 61
62 -class WorkerBrain(log.Loggable):
63 """ 64 I am the main object in the worker process, managing jobs and everything 65 related. 66 I live in the main worker process. 67 68 @ivar authenticator: authenticator worker used to log in to manager 69 @type authenticator L{flumotion.twisted.pb.Authenticator} 70 @ivar medium: 71 @type medium: L{medium.WorkerMedium} 72 @ivar jobHeaven: 73 @type jobHeaven: L{job.ComponentJobHeaven} 74 @ivar checkHeaven: 75 @type checkHeaven: L{job.CheckJobHeaven} 76 @ivar workerClientFactory: 77 @type workerClientFactory: L{medium.WorkerClientFactory} 78 @ivar feedServerPort: TCP port the Feed Server is listening on 79 @type feedServerPort: int 80 """ 81 82 implements(interfaces.IFeedServerParent) 83 84 logCategory = 'workerbrain' 85
86 - def __init__(self, options):
87 """ 88 @param options: the optparsed dictionary of command-line options 89 @type options: an object with attributes 90 """ 91 self.options = options 92 self.workerName = options.name 93 94 # the last port is reserved for our FeedServer 95 if not self.options.randomFeederports: 96 self.ports = self.options.feederports[:-1] 97 else: 98 self.ports = [] 99 100 self.medium = medium.WorkerMedium(self) 101 102 # really should be componentJobHeaven, but this is shorter :) 103 self.jobHeaven = job.ComponentJobHeaven(self) 104 # for ephemeral checks 105 self.checkHeaven = job.CheckJobHeaven(self) 106 107 self.managerConnectionInfo = None 108 109 # it's possible we don't have a feed server, if we are 110 # configured to have 0 tcp ports; setup this in listen() 111 self.feedServer = None 112 113 self.stopping = False 114 reactor.addSystemEventTrigger('before', 'shutdown', 115 self.shutdownHandler) 116 self._installHUPHandler()
117
118 - def _installHUPHandler(self):
119 120 def sighup(signum, frame): 121 if self._oldHUPHandler: 122 self.log('got SIGHUP, calling previous handler %r', 123 self._oldHUPHandler) 124 self._oldHUPHandler(signum, frame) 125 self.debug('telling kids about new log file descriptors') 126 self.jobHeaven.rotateChildLogFDs()
127 128 handler = signal.signal(signal.SIGHUP, sighup) 129 if handler == signal.SIG_DFL or handler == signal.SIG_IGN: 130 self._oldHUPHandler = None 131 else: 132 self._oldHUPHandler = handler
133
134 - def listen(self):
135 """ 136 Start listening on FeedServer (incoming eater requests) and 137 JobServer (through which we communicate with our children) ports 138 139 @returns: True if we successfully listened on both ports 140 """ 141 # set up feed server if we have the feederports for it 142 try: 143 self.feedServer = self._makeFeedServer() 144 except error.CannotListenError, e: 145 self.warning("Failed to listen on feed server port: %r", e) 146 return False 147 148 try: 149 self.jobHeaven.listen() 150 except error.CannotListenError, e: 151 self.warning("Failed to listen on job server port: %r", e) 152 return False 153 154 try: 155 self.checkHeaven.listen() 156 except error.CannotListenError, e: 157 self.warning("Failed to listen on check server port: %r", e) 158 return False 159 160 return True
161
162 - def _makeFeedServer(self):
163 """ 164 @returns: L{flumotion.worker.feedserver.FeedServer} 165 """ 166 port = None 167 if self.options.randomFeederports: 168 port = 0 169 elif not self.options.feederports: 170 self.info('Not starting feed server because no port is ' 171 'configured') 172 return None 173 else: 174 port = self.options.feederports[-1] 175 176 return feedserver.FeedServer(self, ProxyBouncer(self), port)
177
178 - def login(self, managerConnectionInfo):
179 self.managerConnectionInfo = managerConnectionInfo 180 self.medium.startConnecting(managerConnectionInfo)
181
182 - def callRemote(self, methodName, *args, **kwargs):
183 return self.medium.callRemote(methodName, *args, **kwargs)
184
185 - def shutdownHandler(self):
186 if self.stopping: 187 self.warning("Already shutting down, ignoring shutdown request") 188 return 189 190 self.info("Reactor shutting down, stopping jobHeaven") 191 self.stopping = True 192 193 l = [self.jobHeaven.shutdown(), self.checkHeaven.shutdown()] 194 if self.feedServer: 195 l.append(self.feedServer.shutdown()) 196 # Don't fire this other than from a callLater 197 return defer_call_later(defer.DeferredList(l))
198 199 ### These methods called by feed server 200
201 - def feedToFD(self, componentId, feedName, fd, eaterId):
202 """ 203 Called from the FeedAvatar to pass a file descriptor on to 204 the job running the component for this feeder. 205 206 @returns: whether the fd was successfully handed off to the component. 207 """ 208 if componentId not in self.jobHeaven.avatars: 209 self.warning("No such component %s running", componentId) 210 return False 211 212 avatar = self.jobHeaven.avatars[componentId] 213 return avatar.sendFeed(feedName, fd, eaterId)
214
215 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
216 """ 217 Called from the FeedAvatar to pass a file descriptor on to 218 the job running the given component. 219 220 @returns: whether the fd was successfully handed off to the component. 221 """ 222 if componentId not in self.jobHeaven.avatars: 223 self.warning("No such component %s running", componentId) 224 return False 225 226 avatar = self.jobHeaven.avatars[componentId] 227 return avatar.receiveFeed(eaterAlias, fd, feedId)
228 229 ### these methods called by WorkerMedium 230
231 - def getPorts(self):
232 return self.ports, self.options.randomFeederports
233
234 - def getFeedServerPort(self):
235 if self.feedServer: 236 return self.feedServer.getPortNum() 237 else: 238 return None
239
240 - def create(self, avatarId, type, moduleName, methodName, nice, 241 conf):
242 243 def getBundles(): 244 # set up bundles as we need to have a pb connection to 245 # download the modules -- can't do that in the kid yet. 246 moduleNames = [moduleName] 247 for plugs in conf.get('plugs', {}).values(): 248 for plug in plugs: 249 for entry in plug.get('entries', {}).values(): 250 moduleNames.append(entry['module-name']) 251 self.debug('setting up bundles for %r', moduleNames) 252 return self.medium.bundleLoader.getBundles(moduleName=moduleNames)
253 254 def spawnJob(bundles): 255 return self.jobHeaven.spawn(avatarId, type, moduleName, 256 methodName, nice, bundles, conf) 257 258 def createError(failure): 259 failure.trap(errors.ComponentCreateError) 260 self.debug('create deferred for %s failed, forwarding error', 261 avatarId) 262 return failure 263 264 def success(res): 265 self.debug('create deferred for %s succeeded (%r)', 266 avatarId, res) 267 return res 268 269 self.info('Starting component "%s" of type "%s"', avatarId, 270 type) 271 d = getBundles() 272 d.addCallback(spawnJob) 273 d.addCallback(success) 274 d.addErrback(createError) 275 return d 276
277 - def runCheck(self, module, function, *args, **kwargs):
278 279 def getBundles(): 280 self.debug('setting up bundles for %s', module) 281 return self.medium.bundleLoader.getBundles(moduleName=module)
282 283 def runCheck(bundles): 284 return self.checkHeaven.runCheck(bundles, module, function, 285 *args, **kwargs) 286 287 d = getBundles() 288 d.addCallback(runCheck) 289 return d 290
291 - def getComponents(self):
292 return [job.avatarId for job in self.jobHeaven.getJobInfos()]
293
294 - def killJob(self, avatarId, signum):
295 self.jobHeaven.killJob(avatarId, signum)
296