Package flumotion :: Package manager :: Module manager
[hide private]

Source Code for Module flumotion.manager.manager

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_manager -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3   
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
   6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
   7  # All rights reserved. 
   8  # 
   9  # This file may be distributed and/or modified under the terms of 
  10  # the GNU Lesser General Public License version 2.1 as published by 
  11  # the Free Software Foundation. 
  12  # This file is distributed without any warranty; without even the implied 
  13  # warranty of merchantability or fitness for a particular purpose. 
  14  # See "LICENSE.LGPL" in the source distribution for more information. 
  15  # 
  16  # Headers in this file shall remain intact. 
  17   
  18  """ 
  19  manager implementation and related classes 
  20   
  21  API Stability: semi-stable 
  22   
  23  @var  LOCAL_IDENTITY: an identity for the manager itself; can be used 
  24                        to compare against to verify that the manager 
  25                        requested an action 
  26  @type LOCAL_IDENTITY: L{LocalIdentity} 
  27  """ 
  28   
  29  import os 
  30   
  31  from twisted.internet import reactor, defer 
  32  from twisted.spread import pb 
  33  from twisted.cred import portal 
  34  from zope.interface import implements 
  35   
  36  from flumotion.common import errors, interfaces, log, registry 
  37  from flumotion.common import planet, common, messages, reflectcall, server 
  38  from flumotion.common.i18n import N_, gettexter 
  39  from flumotion.common.identity import RemoteIdentity, LocalIdentity 
  40  from flumotion.common.netutils import addressGetHost 
  41  from flumotion.common.planet import moods 
  42  from flumotion.configure import configure 
  43  from flumotion.manager import admin, component, worker, base, config 
  44  from flumotion.twisted import portal as fportal 
  45  from flumotion.project import project 
  46   
  47  __all__ = ['ManagerServerFactory', 'Vishnu'] 
  48  __version__ = "$Rev$" 
  49  T_ = gettexter() 
  50  LOCAL_IDENTITY = LocalIdentity('manager') 
  51   
  52   
  53  # an internal class 
  54   
  55   
56 -class Dispatcher(log.Loggable):
57 """ 58 I implement L{twisted.cred.portal.IRealm}. 59 I make sure that when a L{pb.Avatar} is requested through me, the 60 Avatar being returned knows about the mind (client) requesting 61 the Avatar. 62 """ 63 64 implements(portal.IRealm) 65 66 logCategory = 'dispatcher' 67
68 - def __init__(self, computeIdentity):
69 """ 70 @param computeIdentity: see L{Vishnu.computeIdentity} 71 @type computeIdentity: callable 72 """ 73 self._interfaceHeavens = {} # interface -> heaven 74 self._computeIdentity = computeIdentity 75 self._bouncer = None 76 self._avatarKeycards = {} # avatarId -> keycard
77
78 - def setBouncer(self, bouncer):
79 """ 80 @param bouncer: the bouncer to authenticate with 81 @type bouncer: L{flumotion.component.bouncers.bouncer} 82 """ 83 self._bouncer = bouncer
84
85 - def registerHeaven(self, heaven, interface):
86 """ 87 Register a Heaven as managing components with the given interface. 88 89 @type interface: L{twisted.python.components.Interface} 90 @param interface: a component interface to register the heaven with. 91 """ 92 assert isinstance(heaven, base.ManagerHeaven) 93 94 self._interfaceHeavens[interface] = heaven
95 96 ### IRealm methods 97
98 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
99 100 def got_avatar(avatar): 101 if avatar.avatarId in heaven.avatars: 102 raise errors.AlreadyConnectedError(avatar.avatarId) 103 heaven.avatars[avatar.avatarId] = avatar 104 self._avatarKeycards[avatar.avatarId] = keycard 105 106 # OK so this is byzantine, but test_manager_manager actually 107 # uses these kwargs to set its own info. so don't change 108 # these args or their order or you will break your test 109 # suite. 110 111 def cleanup(avatarId=avatar.avatarId, avatar=avatar, mind=mind): 112 self.info('lost connection to client %r', avatar) 113 del heaven.avatars[avatar.avatarId] 114 avatar.onShutdown() 115 # avoid leaking the keycard 116 keycard = self._avatarKeycards.pop(avatarId) 117 if self._bouncer: 118 try: 119 self._bouncer.removeKeycard(keycard) 120 except KeyError: 121 self.warning("bouncer forgot about keycard %r", 122 keycard)
123 124 return (pb.IPerspective, avatar, cleanup)
125 126 def got_error(failure): 127 # If we failed for some reason, we want to drop the connection. 128 # However, we want the failure to get to the client, so we don't 129 # call loseConnection() immediately - we return the failure first. 130 # loseConnection() will then not drop the connection until it has 131 # finished sending the current data to the client. 132 reactor.callLater(0, mind.broker.transport.loseConnection) 133 return failure 134 135 if pb.IPerspective not in ifaces: 136 raise errors.NoPerspectiveError(avatarId) 137 if len(ifaces) != 2: 138 # IPerspective and the specific avatar interface. 139 raise errors.NoPerspectiveError(avatarId) 140 iface = [x for x in ifaces if x != pb.IPerspective][0] 141 if iface not in self._interfaceHeavens: 142 self.warning('unknown interface %r', iface) 143 raise errors.NoPerspectiveError(avatarId) 144 145 heaven = self._interfaceHeavens[iface] 146 klass = heaven.avatarClass 147 host = addressGetHost(mind.broker.transport.getPeer()) 148 d = self._computeIdentity(keycard, host) 149 d.addCallback(lambda identity: \ 150 klass.makeAvatar(heaven, avatarId, identity, mind)) 151 d.addCallbacks(got_avatar, got_error) 152 return d 153 154
155 -class ComponentMapper:
156 """ 157 I am an object that ties together different objects related to a 158 component. I am used as values in a lookup hash in the vishnu. 159 """ 160
161 - def __init__(self):
162 self.state = None # ManagerComponentState; created first 163 self.id = None # avatarId of the eventual ComponentAvatar 164 self.avatar = None # ComponentAvatar 165 self.jobState = None # ManagerJobState of a running component
166 167
168 -class Vishnu(log.Loggable):
169 """ 170 I am the toplevel manager object that knows about all 171 heavens and factories. 172 173 @cvar dispatcher: dispatcher to create avatars 174 @type dispatcher: L{Dispatcher} 175 @cvar workerHeaven: the worker heaven 176 @type workerHeaven: L{worker.WorkerHeaven} 177 @cvar componentHeaven: the component heaven 178 @type componentHeaven: L{component.ComponentHeaven} 179 @cvar adminHeaven: the admin heaven 180 @type adminHeaven: L{admin.AdminHeaven} 181 @cvar configDir: the configuration directory for 182 this Vishnu's manager 183 @type configDir: str 184 """ 185 186 implements(server.IServable) 187 188 logCategory = "vishnu" 189
190 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
191 # create a Dispatcher which will hand out avatars to clients 192 # connecting to me 193 self.dispatcher = Dispatcher(self.computeIdentity) 194 195 self.workerHeaven = self._createHeaven(interfaces.IWorkerMedium, 196 worker.WorkerHeaven) 197 self.componentHeaven = self._createHeaven(interfaces.IComponentMedium, 198 component.ComponentHeaven) 199 self.adminHeaven = self._createHeaven(interfaces.IAdminMedium, 200 admin.AdminHeaven) 201 202 self.running = True 203 204 def setStopped(): 205 self.running = False
206 reactor.addSystemEventTrigger('before', 'shutdown', setStopped) 207 208 if configDir is not None: 209 self.configDir = configDir 210 else: 211 self.configDir = os.path.join(configure.configdir, 212 "managers", name) 213 214 self.bouncer = None # used by manager to authenticate worker/component 215 216 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 217 218 self._componentMappers = {} # any object -> ComponentMapper 219 220 self.state = planet.ManagerPlanetState() 221 self.state.set('name', name) 222 self.state.set('version', configure.version) 223 224 self.plugs = {} # socket -> list of plugs 225 226 # create a portal so that I can be connected to, through our dispatcher 227 # implementing the IRealm and a bouncer 228 self.portal = fportal.BouncerPortal(self.dispatcher, None) 229 #unsafeTracebacks = 1 # for debugging tracebacks to clients 230 self.factory = pb.PBServerFactory(self.portal, 231 unsafeTracebacks=unsafeTracebacks) 232 self.connectionInfo = {} 233 self.setConnectionInfo(None, None, None)
234
235 - def shutdown(self):
236 """Cancel any pending operations in preparation for shutdown. 237 238 This method is mostly useful for unit tests; currently, it is 239 not called during normal operation. Note that the caller is 240 responsible for stopping listening on the port, as the the 241 manager does not have a handle on the twisted port object. 242 243 @returns: A deferred that will fire when the manager has shut 244 down. 245 """ 246 if self.bouncer: 247 return self.bouncer.stop() 248 else: 249 return defer.succeed(None)
250
251 - def setConnectionInfo(self, host, port, use_ssl):
252 info = dict(host=host, port=port, use_ssl=use_ssl) 253 self.connectionInfo.update(info)
254
255 - def getConfiguration(self):
256 """Returns the manager's configuration as a string suitable for 257 importing via loadConfiguration(). 258 """ 259 return config.exportPlanetXml(self.state)
260
261 - def getBundlerBasket(self):
262 """ 263 Return a bundler basket to unbundle from. 264 If the registry files were updated since the last time, the 265 bundlerbasket will be rebuilt. 266 267 @since: 0.2.2 268 @rtype: L{flumotion.common.bundle.BundlerBasket} 269 """ 270 if registry.getRegistry().rebuildNeeded(): 271 self.info("Registry changed, rebuilding") 272 registry.getRegistry().verify(force=True) 273 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 274 elif not self.bundlerBasket.isUptodate(registry.getRegistry().mtime): 275 self.info("BundlerBasket is older than the Registry, rebuilding") 276 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 277 return self.bundlerBasket
278
279 - def addMessage(self, level, mid, format, *args, **kwargs):
280 """ 281 Convenience message to construct a message and add it to the 282 planet state. `format' should be marked as translatable in the 283 source with N_, and *args will be stored as format arguments. 284 Keyword arguments are passed on to the message constructor. See 285 L{flumotion.common.messages.Message} for the meanings of the 286 rest of the arguments. 287 288 For example:: 289 290 self.addMessage(messages.WARNING, 'foo-warning', 291 N_('The answer is %d'), 42, debug='not really') 292 """ 293 self.addMessageObject(messages.Message(level, 294 T_(format, *args), 295 mid=mid, **kwargs))
296
297 - def addMessageObject(self, message):
298 """ 299 Add a message to the planet state. 300 301 @type message: L{flumotion.common.messages.Message} 302 """ 303 self.state.setitem('messages', message.id, message)
304
305 - def clearMessage(self, mid):
306 """ 307 Clear any messages with the given message ID from the planet 308 state. 309 310 @type mid: message ID, normally a str 311 """ 312 if mid in self.state.get('messages'): 313 self.state.delitem('messages', mid)
314
315 - def adminAction(self, identity, message, args, kw):
316 """ 317 @param identity: L{flumotion.common.identity.Identity} 318 """ 319 socket = 'flumotion.component.plugs.adminaction.AdminActionPlug' 320 if socket in self.plugs: 321 for plug in self.plugs[socket]: 322 plug.action(identity, message, args, kw)
323
324 - def computeIdentity(self, keycard, remoteHost):
325 """ 326 Compute a suitable identity for a remote host. First looks to 327 see if there is a 328 L{flumotion.component.plugs.identity.IdentityProviderPlug} plug 329 installed on the manager, falling back to user@host. 330 331 The identity is only used in the adminaction interface. An 332 example of its use is when you have an adminaction plug that 333 checks an admin's privileges before actually doing an action; 334 the identity object you use here might store the privileges that 335 the admin has. 336 337 @param keycard: the keycard that the remote host used to log in. 338 @type keycard: L{flumotion.common.keycards.Keycard} 339 @param remoteHost: the ip of the remote host 340 @type remoteHost: str 341 342 @rtype: a deferred that will fire a 343 L{flumotion.common.identity.RemoteIdentity} 344 """ 345 346 socket = 'flumotion.component.plugs.identity.IdentityProviderPlug' 347 if socket in self.plugs: 348 for plug in self.plugs[socket]: 349 identity = plug.computeIdentity(keycard, remoteHost) 350 if identity: 351 return identity 352 username = getattr(keycard, 'username', None) 353 return defer.succeed(RemoteIdentity(username, remoteHost))
354
355 - def _addComponent(self, conf, parent, identity):
356 """ 357 Add a component state for the given component config entry. 358 359 @rtype: L{flumotion.common.planet.ManagerComponentState} 360 """ 361 362 self.debug('adding component %s to %s' 363 % (conf.name, parent.get('name'))) 364 365 if identity != LOCAL_IDENTITY: 366 self.adminAction(identity, '_addComponent', (conf, parent), {}) 367 368 state = planet.ManagerComponentState() 369 state.set('name', conf.name) 370 state.set('type', conf.getType()) 371 state.set('workerRequested', conf.worker) 372 state.setMood(moods.sleeping.value) 373 state.set('config', conf.getConfigDict()) 374 375 state.set('parent', parent) 376 parent.append('components', state) 377 378 avatarId = conf.getConfigDict()['avatarId'] 379 380 self.clearMessage('loadComponent-%s' % avatarId) 381 382 configDict = conf.getConfigDict() 383 projectName = configDict['project'] 384 versionTuple = configDict['version'] 385 386 projectVersion = None 387 try: 388 projectVersion = project.get(projectName, 'version') 389 except errors.NoProjectError: 390 m = messages.Warning(T_(N_( 391 "This component is configured for Flumotion project '%s', " 392 "but that project is not installed.\n"), 393 projectName)) 394 state.append('messages', m) 395 396 if projectVersion: 397 self.debug('project %s, version %r, project version %r' % ( 398 projectName, versionTuple, projectVersion)) 399 if not common.checkVersionsCompat( 400 versionTuple, 401 common.versionStringToTuple(projectVersion)): 402 m = messages.Warning(T_(N_( 403 "This component is configured for " 404 "Flumotion '%s' version %s, " 405 "but you are running version %s.\n" 406 "Please update the configuration of the component.\n"), 407 projectName, common.versionTupleToString(versionTuple), 408 projectVersion)) 409 state.append('messages', m) 410 411 # add to mapper 412 m = ComponentMapper() 413 m.state = state 414 m.id = avatarId 415 self._componentMappers[state] = m 416 self._componentMappers[avatarId] = m 417 418 return state
419
420 - def _updateStateFromConf(self, _, conf, identity):
421 """ 422 Add a new config object into the planet state. 423 424 @returns: a list of all components added 425 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 426 """ 427 428 self.debug('syncing up planet state with config') 429 added = [] # added components while parsing 430 431 def checkNotRunning(comp, parentState): 432 name = comp.getName() 433 434 comps = dict([(x.get('name'), x) 435 for x in parentState.get('components')]) 436 runningComps = dict([(x.get('name'), x) 437 for x in parentState.get('components') 438 if x.get('mood') != moods.sleeping.value]) 439 if name not in comps: 440 # We don't have it at all; allow it 441 return True 442 elif name not in runningComps: 443 # We have it, but it's not running. Allow it after deleting 444 # the old one. 445 oldComp = comps[name] 446 self.deleteComponent(oldComp) 447 return True 448 449 # if we get here, the component is already running; warn if 450 # the running configuration is different. Return False in 451 # all cases. 452 parent = comps[name].get('parent').get('name') 453 newConf = c.getConfigDict() 454 oldConf = comps[name].get('config') 455 456 if newConf == oldConf: 457 self.debug('%s already has component %s running with ' 458 'same configuration', parent, name) 459 self.clearMessage('loadComponent-%s' % oldConf['avatarId']) 460 return False 461 462 self.info('%s already has component %s, but configuration ' 463 'not the same -- notifying admin', parent, name) 464 465 diff = config.dictDiff(oldConf, newConf) 466 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new') 467 468 self.addMessage(messages.WARNING, 469 'loadComponent-%s' % oldConf['avatarId'], 470 N_('Could not load component %r into %r: ' 471 'a component is already running with ' 472 'this name, but has a different ' 473 'configuration.'), name, parent, 474 debug=diffMsg) 475 return False
476 477 state = self.state 478 atmosphere = state.get('atmosphere') 479 for c in conf.atmosphere.components.values(): 480 if checkNotRunning(c, atmosphere): 481 added.append(self._addComponent(c, atmosphere, identity)) 482 483 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 484 for f in conf.flows: 485 if f.name in flows: 486 flow = flows[f.name] 487 else: 488 self.info('creating flow %r', f.name) 489 flow = planet.ManagerFlowState(name=f.name, parent=state) 490 state.append('flows', flow) 491 492 for c in f.components.values(): 493 if checkNotRunning(c, flow): 494 added.append(self._addComponent(c, flow, identity)) 495 496 return added 497
498 - def _startComponents(self, components, identity):
499 # now start all components that need starting -- collecting into 500 # an temporary dict of the form {workerId => [components]} 501 componentsToStart = {} 502 for c in components: 503 workerId = c.get('workerRequested') 504 if not workerId in componentsToStart: 505 componentsToStart[workerId] = [] 506 componentsToStart[workerId].append(c) 507 self.debug('_startComponents: componentsToStart %r' % 508 (componentsToStart, )) 509 510 for workerId, componentStates in componentsToStart.items(): 511 self._workerCreateComponents(workerId, componentStates)
512
513 - def _loadComponentConfiguration(self, conf, identity):
514 # makeBouncer only makes a bouncer if there is one in the config 515 d = defer.succeed(None) 516 d.addCallback(self._updateStateFromConf, conf, identity) 517 d.addCallback(self._startComponents, identity) 518 return d
519
520 - def loadComponentConfigurationXML(self, file, identity):
521 """ 522 Load the configuration from the given XML, merging it on top of 523 the currently running configuration. 524 525 @param file: file to parse, either as an open file object, 526 or as the name of a file to open 527 @type file: str or file 528 @param identity: The identity making this request.. This is used by the 529 adminaction logging mechanism in order to say who is 530 performing the action. 531 @type identity: L{flumotion.common.identity.Identity} 532 """ 533 self.debug('loading configuration') 534 mid = 'loadComponent-parse-error' 535 if isinstance(file, str): 536 mid += '-%s' % file 537 try: 538 self.clearMessage(mid) 539 conf = config.PlanetConfigParser(file) 540 conf.parse() 541 return self._loadComponentConfiguration(conf, identity) 542 except errors.ConfigError, e: 543 self.addMessage(messages.WARNING, mid, 544 N_('Invalid component configuration.'), 545 debug=e.args[0]) 546 return defer.fail(e) 547 except errors.UnknownComponentError, e: 548 if isinstance(file, str): 549 debug = 'Configuration loaded from file %r' % file 550 else: 551 debug = 'Configuration loaded remotely' 552 self.addMessage(messages.WARNING, mid, 553 N_('Unknown component in configuration: %s.'), 554 e.args[0], debug=debug) 555 return defer.fail(e) 556 except Exception, e: 557 self.addMessage(messages.WARNING, mid, 558 N_('Unknown error while loading configuration.'), 559 debug=log.getExceptionMessage(e)) 560 return defer.fail(e)
561
562 - def _loadManagerPlugs(self, conf):
563 # Load plugs 564 for socket, plugs in conf.plugs.items(): 565 if not socket in self.plugs: 566 self.plugs[socket] = [] 567 568 for args in plugs: 569 self.debug('loading plug type %s for socket %s' 570 % (args['type'], socket)) 571 defs = registry.getRegistry().getPlug(args['type']) 572 e = defs.getEntry() 573 call = reflectcall.reflectCallCatching 574 575 plug = call(errors.ConfigError, 576 e.getModuleName(), e.getFunction(), args) 577 self.plugs[socket].append(plug)
578
579 - def startManagerPlugs(self):
580 for socket in self.plugs: 581 for plug in self.plugs[socket]: 582 self.debug('starting plug %r for socket %s', plug, socket) 583 plug.start(self)
584
585 - def _loadManagerBouncer(self, conf):
586 if not (conf.bouncer): 587 self.warning('no bouncer defined, nothing can access the ' 588 'manager') 589 return defer.succeed(None) 590 591 self.debug('going to start manager bouncer %s of type %s', 592 conf.bouncer.name, conf.bouncer.type) 593 594 defs = registry.getRegistry().getComponent(conf.bouncer.type) 595 entry = defs.getEntryByType('component') 596 # FIXME: use entry.getModuleName() (doesn't work atm?) 597 moduleName = defs.getSource() 598 methodName = entry.getFunction() 599 bouncer = reflectcall.createComponent(moduleName, methodName, 600 conf.bouncer.getConfigDict()) 601 d = bouncer.waitForHappy() 602 603 def setupCallback(result): 604 bouncer.debug('started') 605 self.setBouncer(bouncer)
606 607 def setupErrback(failure): 608 self.warning('Error starting manager bouncer') 609 d.addCallbacks(setupCallback, setupErrback) 610 return d 611
612 - def loadManagerConfigurationXML(self, file):
613 """ 614 Load manager configuration from the given XML. The manager 615 configuration is currently used to load the manager's bouncer 616 and plugs, and is only run once at startup. 617 618 @param file: file to parse, either as an open file object, 619 or as the name of a file to open 620 @type file: str or file 621 """ 622 self.debug('loading configuration') 623 conf = config.ManagerConfigParser(file) 624 conf.parseBouncerAndPlugs() 625 self._loadManagerPlugs(conf) 626 self._loadManagerBouncer(conf) 627 conf.unlink()
628 629 __pychecker__ = 'maxargs=11' # hahaha 630
631 - def loadComponent(self, identity, componentType, componentId, 632 componentLabel, properties, workerName, 633 plugs, eaters, isClockMaster, virtualFeeds):
634 """ 635 Load a component into the manager configuration. 636 637 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent} 638 for a definition of the argument types. 639 """ 640 self.debug('loading %s component %s on %s', 641 componentType, componentId, workerName) 642 parentName, compName = common.parseComponentId(componentId) 643 644 if isClockMaster: 645 raise NotImplementedError("Clock master components are not " 646 "yet supported") 647 if worker is None: 648 raise errors.ConfigError("Component %r needs to specify the" 649 " worker on which it should run" 650 % componentId) 651 652 state = self.state 653 compState = None 654 655 compConf = config.ConfigEntryComponent(compName, parentName, 656 componentType, 657 componentLabel, 658 properties, 659 plugs, workerName, 660 eaters, isClockMaster, 661 None, None, virtualFeeds) 662 663 if compConf.defs.getNeedsSynchronization(): 664 raise NotImplementedError("Components that need " 665 "synchronization are not yet " 666 "supported") 667 668 if parentName == 'atmosphere': 669 parentState = state.get('atmosphere') 670 else: 671 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 672 if parentName in flows: 673 parentState = flows[parentName] 674 else: 675 self.info('creating flow %r', parentName) 676 parentState = planet.ManagerFlowState(name=parentName, 677 parent=state) 678 state.append('flows', parentState) 679 680 components = [x.get('name') for x in parentState.get('components')] 681 if compName in components: 682 self.debug('%r already has component %r', parentName, compName) 683 raise errors.ComponentAlreadyExistsError(compName) 684 685 compState = self._addComponent(compConf, parentState, identity) 686 687 self._startComponents([compState], identity) 688 689 return compState
690
691 - def _createHeaven(self, interface, klass):
692 """ 693 Create a heaven of the given klass that will send avatars to clients 694 implementing the given medium interface. 695 696 @param interface: the medium interface to create a heaven for 697 @type interface: L{flumotion.common.interfaces.IMedium} 698 @param klass: the type of heaven to create 699 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven} 700 """ 701 assert issubclass(interface, interfaces.IMedium) 702 heaven = klass(self) 703 self.dispatcher.registerHeaven(heaven, interface) 704 return heaven
705
706 - def setBouncer(self, bouncer):
707 """ 708 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer} 709 """ 710 if self.bouncer: 711 self.warning("manager already had a bouncer, setting anyway") 712 713 self.bouncer = bouncer 714 self.portal.bouncer = bouncer 715 self.dispatcher.setBouncer(bouncer)
716
717 - def getFactory(self):
718 return self.factory
719
720 - def componentCreate(self, componentState):
721 """ 722 Create the given component. This will currently also trigger 723 a start eventually when the component avatar attaches. 724 725 The component should be sleeping. 726 The worker it should be started on should be present. 727 """ 728 m = componentState.get('mood') 729 if m != moods.sleeping.value: 730 raise errors.ComponentMoodError("%r not sleeping but %s" % ( 731 componentState, moods.get(m).name)) 732 733 p = componentState.get('moodPending') 734 if p != None: 735 raise errors.ComponentMoodError( 736 "%r already has a pending mood %s" % ( 737 componentState, moods.get(p).name)) 738 739 # find a worker this component can start on 740 workerId = (componentState.get('workerName') 741 or componentState.get('workerRequested')) 742 743 if not workerId in self.workerHeaven.avatars: 744 raise errors.ComponentNoWorkerError( 745 "worker %s is not logged in" % workerId) 746 else: 747 return self._workerCreateComponents(workerId, [componentState])
748
749 - def _componentStopNoAvatar(self, componentState, avatarId):
750 # NB: reset moodPending if asked to stop without an avatar 751 # because we changed above to allow stopping even if moodPending 752 # is happy 753 754 def stopSad(): 755 self.debug('asked to stop a sad component without avatar') 756 for mid in componentState.get('messages')[:]: 757 self.debug("Deleting message %r", mid) 758 componentState.remove('messages', mid) 759 760 componentState.setMood(moods.sleeping.value) 761 componentState.set('moodPending', None) 762 return defer.succeed(None)
763 764 def stopLost(): 765 766 def gotComponents(comps): 767 return avatarId in comps 768 769 def gotJobRunning(running): 770 if running: 771 self.warning('asked to stop lost component %r, but ' 772 'it is still running', avatarId) 773 # FIXME: put a message on the state to suggest a 774 # kill? 775 msg = "Cannot stop lost component which is still running." 776 raise errors.ComponentMoodError(msg) 777 else: 778 self.debug('component %r seems to be really lost, ' 779 'setting to sleeping') 780 componentState.setMood(moods.sleeping.value) 781 componentState.set('moodPending', None) 782 return None 783 784 self.debug('asked to stop a lost component without avatar') 785 workerName = componentState.get('workerRequested') 786 if workerName and self.workerHeaven.hasAvatar(workerName): 787 self.debug('checking if component has job process running') 788 d = self.workerHeaven.getAvatar(workerName).getComponents() 789 d.addCallback(gotComponents) 790 d.addCallback(gotJobRunning) 791 return d 792 else: 793 self.debug('component lacks a worker, setting to sleeping') 794 d = defer.maybeDeferred(gotJobRunning, False) 795 return d 796 797 def stopUnknown(): 798 msg = ('asked to stop a component without avatar in mood %s' 799 % moods.get(mood)) 800 self.warning(msg) 801 return defer.fail(errors.ComponentMoodError(msg)) 802 803 mood = componentState.get('mood') 804 stoppers = {moods.sad.value: stopSad, 805 moods.lost.value: stopLost} 806 return stoppers.get(mood, stopUnknown)() 807
808 - def _componentStopWithAvatar(self, componentState, componentAvatar):
809 # FIXME: This deferred is just the remote call; there's no actual 810 # deferred for completion of shutdown. 811 d = componentAvatar.stop() 812 813 return d
814
815 - def componentStop(self, componentState):
816 """ 817 Stop the given component. 818 If the component was sad, we clear its sad state as well, 819 since the stop was explicitly requested by the admin. 820 821 @type componentState: L{planet.ManagerComponentState} 822 823 @rtype: L{twisted.internet.defer.Deferred} 824 """ 825 self.debug('componentStop(%r)', componentState) 826 # We permit stopping a component even if it has a pending mood of 827 # happy, so that if it never gets to happy, we can still stop it. 828 if (componentState.get('moodPending') != None and 829 componentState.get('moodPending') != moods.happy.value): 830 self.debug("Pending mood is %r", componentState.get('moodPending')) 831 832 raise errors.BusyComponentError(componentState) 833 834 m = self.getComponentMapper(componentState) 835 if not m: 836 # We have a stale componentState for an already-deleted 837 # component 838 self.warning("Component mapper for component state %r doesn't " 839 "exist", componentState) 840 raise errors.UnknownComponentError(componentState) 841 elif not m.avatar: 842 return self._componentStopNoAvatar(componentState, m.id) 843 else: 844 return self._componentStopWithAvatar(componentState, m.avatar)
845
846 - def componentAddMessage(self, avatarId, message):
847 """ 848 Set the given message on the given component's state. 849 Can be called e.g. by a worker to report on a crashed component. 850 Sets the mood to sad if it is an error message. 851 """ 852 if not avatarId in self._componentMappers: 853 self.warning('asked to set a message on non-mapped component %s' % 854 avatarId) 855 return 856 857 m = self._componentMappers[avatarId] 858 m.state.append('messages', message) 859 if message.level == messages.ERROR: 860 self.debug('Error message makes component sad') 861 m.state.setMood(moods.sad.value)
862 863 # FIXME: unify naming of stuff like this 864
865 - def workerAttached(self, workerAvatar):
866 # called when a worker logs in 867 workerId = workerAvatar.avatarId 868 self.debug('vishnu.workerAttached(): id %s' % workerId) 869 870 # Create all components assigned to this worker. Note that the 871 # order of creation is unimportant, it's only the order of 872 # starting that matters (and that's different code). 873 components = [c for c in self._getComponentsToCreate() 874 if c.get('workerRequested') in (workerId, None)] 875 # So now, check what components worker is running 876 # so we can remove them from this components list 877 # also add components we have that are lost but not 878 # in list given by worker 879 d = workerAvatar.getComponents() 880 881 def workerAvatarComponentListReceived(workerComponents): 882 # list() is called to work around a pychecker bug. FIXME. 883 lostComponents = list([c for c in self.getComponentStates() 884 if c.get('workerRequested') == workerId and \ 885 c.get('mood') == moods.lost.value]) 886 for comp in workerComponents: 887 # comp is an avatarId string 888 # components is a list of {ManagerComponentState} 889 if comp in self._componentMappers: 890 compState = self._componentMappers[comp].state 891 if compState in components: 892 components.remove(compState) 893 if compState in lostComponents: 894 lostComponents.remove(compState) 895 896 for compState in lostComponents: 897 self.info( 898 "Restarting previously lost component %s on worker %s", 899 self._componentMappers[compState].id, workerId) 900 # We set mood to sleeping first. This allows things to 901 # distinguish between a newly-started component and a lost 902 # component logging back in. 903 compState.set('moodPending', None) 904 compState.setMood(moods.sleeping.value) 905 906 allComponents = components + lostComponents 907 908 if not allComponents: 909 self.debug( 910 "vishnu.workerAttached(): no components for this worker") 911 return 912 913 self._workerCreateComponents(workerId, allComponents)
914 d.addCallback(workerAvatarComponentListReceived) 915 916 reactor.callLater(0, self.componentHeaven.feedServerAvailable, 917 workerId) 918
919 - def _workerCreateComponents(self, workerId, components):
920 """ 921 Create the list of components on the given worker, sequentially, but 922 in no specific order. 923 924 @param workerId: avatarId of the worker 925 @type workerId: string 926 @param components: components to start 927 @type components: list of 928 L{flumotion.common.planet.ManagerComponentState} 929 """ 930 self.debug("_workerCreateComponents: workerId %r, components %r" % ( 931 workerId, components)) 932 933 if not workerId in self.workerHeaven.avatars: 934 self.debug('worker %s not logged in yet, delaying ' 935 'component start' % workerId) 936 return defer.succeed(None) 937 938 workerAvatar = self.workerHeaven.avatars[workerId] 939 940 d = defer.Deferred() 941 942 for c in components: 943 componentType = c.get('type') 944 conf = c.get('config') 945 self.debug('scheduling create of %s on %s' 946 % (conf['avatarId'], workerId)) 947 d.addCallback(self._workerCreateComponentDelayed, 948 workerAvatar, c, componentType, conf) 949 950 d.addCallback(lambda result: self.debug( 951 '_workerCreateComponents(): completed setting up create chain')) 952 953 # now trigger the chain 954 self.debug('_workerCreateComponents(): triggering create chain') 955 d.callback(None) 956 #reactor.callLater(0, d.callback, None) 957 return d
958
959 - def _workerCreateComponentDelayed(self, result, workerAvatar, 960 componentState, componentType, conf):
961 962 avatarId = conf['avatarId'] 963 nice = conf.get('nice', 0) 964 965 # we set the moodPending to HAPPY, so this component only gets 966 # asked to start once 967 componentState.set('moodPending', moods.happy.value) 968 969 d = workerAvatar.createComponent(avatarId, componentType, nice, 970 conf) 971 # FIXME: here we get the avatar Id of the component we wanted 972 # started, so now attach it to the planetState's component state 973 d.addCallback(self._createCallback, componentState) 974 d.addErrback(self._createErrback, componentState)
975 976 # FIXME: shouldn't we return d here to make sure components 977 # wait on each other to be started ? 978
979 - def _createCallback(self, result, componentState):
980 self.debug('got avatarId %s for state %s' % (result, componentState)) 981 m = self._componentMappers[componentState] 982 assert result == m.id, "received id %s is not the expected id %s" % ( 983 result, m.id)
984
985 - def _createErrback(self, failure, state):
986 # FIXME: make ConfigError copyable so we can .check() it here 987 # and print a nicer warning 988 self.warning('failed to create component %s: %s', 989 state.get('name'), log.getFailureMessage(failure)) 990 991 if failure.check(errors.ComponentAlreadyRunningError): 992 if self._componentMappers[state].jobState: 993 self.info('component appears to have logged in in the ' 994 'meantime') 995 else: 996 self.info('component appears to be running already; ' 997 'treating it as lost until it logs in') 998 state.setMood(moods.lost.value) 999 else: 1000 message = messages.Error(T_( 1001 N_("The component could not be started.")), 1002 debug=log.getFailureMessage(failure)) 1003 1004 state.setMood(moods.sad.value) 1005 state.append('messages', message) 1006 1007 return None
1008
1009 - def workerDetached(self, workerAvatar):
1010 # called when a worker logs out 1011 workerId = workerAvatar.avatarId 1012 self.debug('vishnu.workerDetached(): id %s' % workerId) 1013 # Get all sad components for the detached worker and set the mood to 1014 # sleeping 1015 sadComponents = list([c for c in self.getComponentStates() 1016 if c.get('workerRequested') == workerId and \ 1017 c.get('mood') == moods.sad.value]) 1018 map(lambda c: c.setMood(moods.sleeping.value), sadComponents)
1019
1020 - def addComponentToFlow(self, componentState, flowName):
1021 # check if we have this flow yet and add if not 1022 if flowName == 'atmosphere': 1023 # treat the atmosphere like a flow, although it's not 1024 flow = self.state.get('atmosphere') 1025 else: 1026 flow = self._getFlowByName(flowName) 1027 if not flow: 1028 self.info('Creating flow "%s"' % flowName) 1029 flow = planet.ManagerFlowState() 1030 flow.set('name', flowName) 1031 flow.set('parent', self.state) 1032 self.state.append('flows', flow) 1033 1034 componentState.set('parent', flow) 1035 flow.append('components', componentState)
1036
1037 - def registerComponent(self, componentAvatar):
1038 # fetch or create a new mapper 1039 m = (self.getComponentMapper(componentAvatar.avatarId) 1040 or ComponentMapper()) 1041 1042 m.state = componentAvatar.componentState 1043 m.jobState = componentAvatar.jobState 1044 m.id = componentAvatar.avatarId 1045 m.avatar = componentAvatar 1046 1047 self._componentMappers[m.state] = m 1048 self._componentMappers[m.jobState] = m 1049 self._componentMappers[m.id] = m 1050 self._componentMappers[m.avatar] = m
1051
1052 - def unregisterComponent(self, componentAvatar):
1053 # called when the component is logging out 1054 # clear up jobState and avatar 1055 self.debug('unregisterComponent(%r): cleaning up state' % 1056 componentAvatar) 1057 1058 m = self._componentMappers[componentAvatar] 1059 1060 # unmap jobstate 1061 try: 1062 del self._componentMappers[m.jobState] 1063 except KeyError: 1064 self.warning('Could not remove jobState for %r' % componentAvatar) 1065 m.jobState = None 1066 1067 m.state.set('pid', None) 1068 m.state.set('workerName', None) 1069 m.state.set('moodPending', None) 1070 1071 # unmap avatar 1072 del self._componentMappers[m.avatar] 1073 m.avatar = None
1074
1075 - def getComponentStates(self):
1076 cList = self.state.getComponents() 1077 self.debug('getComponentStates(): %d components' % len(cList)) 1078 for c in cList: 1079 self.log(repr(c)) 1080 mood = c.get('mood') 1081 if mood == None: 1082 self.warning('%s has mood None' % c.get('name')) 1083 1084 return cList
1085
1086 - def deleteComponent(self, componentState):
1087 """ 1088 Empty the planet of the given component. 1089 1090 @returns: a deferred that will fire when all listeners have been 1091 notified of the removal of the component. 1092 """ 1093 self.debug('deleting component %r from state', componentState) 1094 c = componentState 1095 if c not in self._componentMappers: 1096 raise errors.UnknownComponentError(c) 1097 1098 flow = componentState.get('parent') 1099 if (c.get('moodPending') != None 1100 or c.get('mood') is not moods.sleeping.value): 1101 raise errors.BusyComponentError(c) 1102 1103 del self._componentMappers[self._componentMappers[c].id] 1104 del self._componentMappers[c] 1105 return flow.remove('components', c)
1106
1107 - def _getFlowByName(self, flowName):
1108 for flow in self.state.get('flows'): 1109 if flow.get('name') == flowName: 1110 return flow
1111
1112 - def deleteFlow(self, flowName):
1113 """ 1114 Empty the planet of a flow. 1115 1116 @returns: a deferred that will fire when the flow is removed. 1117 """ 1118 1119 flow = self._getFlowByName(flowName) 1120 if flow is None: 1121 raise ValueError("No flow called %s found" % (flowName, )) 1122 1123 components = flow.get('components') 1124 for c in components: 1125 # if any component is already in a mood change/command, fail 1126 if (c.get('moodPending') != None or 1127 c.get('mood') is not moods.sleeping.value): 1128 raise errors.BusyComponentError(c) 1129 for c in components: 1130 del self._componentMappers[self._componentMappers[c].id] 1131 del self._componentMappers[c] 1132 d = flow.empty() 1133 d.addCallback(lambda _: self.state.remove('flows', flow)) 1134 return d
1135
1136 - def emptyPlanet(self):
1137 """ 1138 Empty the planet of all components, and flows. Also clears all 1139 messages. 1140 1141 @returns: a deferred that will fire when the planet is empty. 1142 """ 1143 for mid in self.state.get('messages').keys(): 1144 self.clearMessage(mid) 1145 1146 # first get all components to sleep 1147 components = self.getComponentStates() 1148 1149 # if any component is already in a mood change/command, fail 1150 components = [c for c in components 1151 if c.get('moodPending') != None] 1152 if components: 1153 state = components[0] 1154 raise errors.BusyComponentError( 1155 state, 1156 "moodPending is %s" % moods.get(state.get('moodPending'))) 1157 1158 # filter out the ones that aren't sleeping and stop them 1159 components = [c for c in self.getComponentStates() 1160 if c.get('mood') is not moods.sleeping.value] 1161 1162 # create a big deferred for stopping everything 1163 d = defer.Deferred() 1164 1165 self.debug('need to stop %d components: %r' % ( 1166 len(components), components)) 1167 1168 for c in components: 1169 avatar = self._componentMappers[c].avatar 1170 # If this has logged out, but isn't sleeping (so is sad or lost), 1171 # we won't have an avatar. So, stop if it we can. 1172 if avatar: 1173 d.addCallback(lambda result, a: a.stop(), avatar) 1174 else: 1175 assert (c.get('mood') is moods.sad.value or 1176 c.get('mood') is moods.lost.value) 1177 1178 d.addCallback(self._emptyPlanetCallback) 1179 1180 # trigger the deferred after returning 1181 reactor.callLater(0, d.callback, None) 1182 1183 return d
1184
1185 - def _emptyPlanetCallback(self, result):
1186 # gets called after all components have stopped 1187 # cleans up the rest of the planet state 1188 components = self.getComponentStates() 1189 self.debug('_emptyPlanetCallback: need to delete %d components' % 1190 len(components)) 1191 1192 for c in components: 1193 if c.get('mood') is not moods.sleeping.value: 1194 self.warning('Component %s is not sleeping', c.get('name')) 1195 # clear mapper; remove componentstate and id 1196 m = self._componentMappers[c] 1197 del self._componentMappers[m.id] 1198 del self._componentMappers[c] 1199 1200 # if anything's left, we have a mistake somewhere 1201 l = self._componentMappers.keys() 1202 if len(l) > 0: 1203 self.warning('mappers still has keys %r' % (repr(l))) 1204 1205 dList = [] 1206 1207 dList.append(self.state.get('atmosphere').empty()) 1208 1209 for f in self.state.get('flows'): 1210 self.debug('appending deferred for emptying flow %r' % f) 1211 dList.append(f.empty()) 1212 self.debug('appending deferred for removing flow %r' % f) 1213 dList.append(self.state.remove('flows', f)) 1214 self.debug('appended deferreds') 1215 1216 dl = defer.DeferredList(dList) 1217 return dl
1218
1219 - def _getComponentsToCreate(self):
1220 """ 1221 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 1222 """ 1223 # return a list of components that are sleeping 1224 components = self.state.getComponents() 1225 1226 # filter the ones that are sleeping 1227 # NOTE: now sleeping indicates that there is no existing job 1228 # as when jobs are created, mood becomes waking, so no need to 1229 # filter on moodPending 1230 isSleeping = lambda c: c.get('mood') == moods.sleeping.value 1231 components = filter(isSleeping, components) 1232 return components
1233
1234 - def _getWorker(self, workerName):
1235 # returns the WorkerAvatar with the given name 1236 if not workerName in self.workerHeaven.avatars: 1237 raise errors.ComponentNoWorkerError("Worker %s not logged in?" 1238 % workerName) 1239 1240 return self.workerHeaven.avatars[workerName]
1241
1242 - def getWorkerFeedServerPort(self, workerName):
1243 if workerName in self.workerHeaven.avatars: 1244 return self._getWorker(workerName).feedServerPort 1245 return None
1246
1247 - def reservePortsOnWorker(self, workerName, numPorts):
1248 """ 1249 Requests a number of ports on the worker named workerName. The 1250 ports will be reserved for the use of the caller until 1251 releasePortsOnWorker is called. 1252 1253 @returns: a list of ports as integers 1254 """ 1255 return self._getWorker(workerName).reservePorts(numPorts)
1256
1257 - def releasePortsOnWorker(self, workerName, ports):
1258 """ 1259 Tells the manager that the given ports are no longer being used, 1260 and may be returned to the allocation pool. 1261 """ 1262 try: 1263 return self._getWorker(workerName).releasePorts(ports) 1264 except errors.ComponentNoWorkerError, e: 1265 self.warning('could not release ports: %r' % e.args)
1266
1267 - def getComponentMapper(self, object):
1268 """ 1269 Look up an object mapper given the object. 1270 1271 @rtype: L{ComponentMapper} or None 1272 """ 1273 if object in self._componentMappers.keys(): 1274 return self._componentMappers[object] 1275 1276 return None
1277
1278 - def getManagerComponentState(self, object):
1279 """ 1280 Look up an object mapper given the object. 1281 1282 @rtype: L{ComponentMapper} or None 1283 """ 1284 if object in self._componentMappers.keys(): 1285 return self._componentMappers[object].state 1286 1287 return None
1288
1289 - def invokeOnComponents(self, componentType, methodName, *args, **kwargs):
1290 """ 1291 Invokes method on all components of a certain type 1292 """ 1293 1294 def invokeOnOneComponent(component, methodName, *args, **kwargs): 1295 m = self.getComponentMapper(component) 1296 if not m: 1297 self.warning('Component %s not mapped. Maybe deleted.', 1298 component.get('name')) 1299 raise errors.UnknownComponentError(component) 1300 1301 avatar = m.avatar 1302 if not avatar: 1303 self.warning('No avatar for %s, cannot call remote', 1304 component.get('name')) 1305 raise errors.SleepingComponentError(component) 1306 1307 try: 1308 return avatar.mindCallRemote(methodName, *args, **kwargs) 1309 except Exception, e: 1310 log_message = log.getExceptionMessage(e) 1311 msg = "exception on remote call %s: %s" % (methodName, 1312 log_message) 1313 self.warning(msg) 1314 raise errors.RemoteMethodError(methodName, 1315 log_message)
1316 1317 # only do this on happy or hungry components of type componentType 1318 dl_array = [] 1319 for c in self.getComponentStates(): 1320 if c.get('type') == componentType and \ 1321 (c.get('mood') is moods.happy.value or 1322 c.get('mood') is moods.hungry.value): 1323 self.info("component %r to have %s run", c, methodName) 1324 d = invokeOnOneComponent(c, methodName, *args, **kwargs) 1325 dl_array.append(d) 1326 dl = defer.DeferredList(dl_array) 1327 return dl 1328