1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 manager implementation and related classes
20
21 API Stability: semi-stable
22
23 @var LOCAL_IDENTITY: an identity for the manager itself; can be used
24 to compare against to verify that the manager
25 requested an action
26 @type LOCAL_IDENTITY: L{LocalIdentity}
27 """
28
29 import os
30
31 from twisted.internet import reactor, defer
32 from twisted.spread import pb
33 from twisted.cred import portal
34 from zope.interface import implements
35
36 from flumotion.common import errors, interfaces, log, registry
37 from flumotion.common import planet, common, messages, reflectcall, server
38 from flumotion.common.i18n import N_, gettexter
39 from flumotion.common.identity import RemoteIdentity, LocalIdentity
40 from flumotion.common.netutils import addressGetHost
41 from flumotion.common.planet import moods
42 from flumotion.configure import configure
43 from flumotion.manager import admin, component, worker, base, config
44 from flumotion.twisted import portal as fportal
45 from flumotion.project import project
46
47 __all__ = ['ManagerServerFactory', 'Vishnu']
48 __version__ = "$Rev$"
49 T_ = gettexter()
50 LOCAL_IDENTITY = LocalIdentity('manager')
51
52
53
54
55
57 """
58 I implement L{twisted.cred.portal.IRealm}.
59 I make sure that when a L{pb.Avatar} is requested through me, the
60 Avatar being returned knows about the mind (client) requesting
61 the Avatar.
62 """
63
64 implements(portal.IRealm)
65
66 logCategory = 'dispatcher'
67
69 """
70 @param computeIdentity: see L{Vishnu.computeIdentity}
71 @type computeIdentity: callable
72 """
73 self._interfaceHeavens = {}
74 self._computeIdentity = computeIdentity
75 self._bouncer = None
76 self._avatarKeycards = {}
77
79 """
80 @param bouncer: the bouncer to authenticate with
81 @type bouncer: L{flumotion.component.bouncers.bouncer}
82 """
83 self._bouncer = bouncer
84
86 """
87 Register a Heaven as managing components with the given interface.
88
89 @type interface: L{twisted.python.components.Interface}
90 @param interface: a component interface to register the heaven with.
91 """
92 assert isinstance(heaven, base.ManagerHeaven)
93
94 self._interfaceHeavens[interface] = heaven
95
96
97
123
124 return (pb.IPerspective, avatar, cleanup)
125
126 def got_error(failure):
127
128
129
130
131
132 reactor.callLater(0, mind.broker.transport.loseConnection)
133 return failure
134
135 if pb.IPerspective not in ifaces:
136 raise errors.NoPerspectiveError(avatarId)
137 if len(ifaces) != 2:
138
139 raise errors.NoPerspectiveError(avatarId)
140 iface = [x for x in ifaces if x != pb.IPerspective][0]
141 if iface not in self._interfaceHeavens:
142 self.warning('unknown interface %r', iface)
143 raise errors.NoPerspectiveError(avatarId)
144
145 heaven = self._interfaceHeavens[iface]
146 klass = heaven.avatarClass
147 host = addressGetHost(mind.broker.transport.getPeer())
148 d = self._computeIdentity(keycard, host)
149 d.addCallback(lambda identity: \
150 klass.makeAvatar(heaven, avatarId, identity, mind))
151 d.addCallbacks(got_avatar, got_error)
152 return d
153
154
156 """
157 I am an object that ties together different objects related to a
158 component. I am used as values in a lookup hash in the vishnu.
159 """
160
162 self.state = None
163 self.id = None
164 self.avatar = None
165 self.jobState = None
166
167
169 """
170 I am the toplevel manager object that knows about all
171 heavens and factories.
172
173 @cvar dispatcher: dispatcher to create avatars
174 @type dispatcher: L{Dispatcher}
175 @cvar workerHeaven: the worker heaven
176 @type workerHeaven: L{worker.WorkerHeaven}
177 @cvar componentHeaven: the component heaven
178 @type componentHeaven: L{component.ComponentHeaven}
179 @cvar adminHeaven: the admin heaven
180 @type adminHeaven: L{admin.AdminHeaven}
181 @cvar configDir: the configuration directory for
182 this Vishnu's manager
183 @type configDir: str
184 """
185
186 implements(server.IServable)
187
188 logCategory = "vishnu"
189
190 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
206 reactor.addSystemEventTrigger('before', 'shutdown', setStopped)
207
208 if configDir is not None:
209 self.configDir = configDir
210 else:
211 self.configDir = os.path.join(configure.configdir,
212 "managers", name)
213
214 self.bouncer = None
215
216 self.bundlerBasket = registry.getRegistry().makeBundlerBasket()
217
218 self._componentMappers = {}
219
220 self.state = planet.ManagerPlanetState()
221 self.state.set('name', name)
222 self.state.set('version', configure.version)
223
224 self.plugs = {}
225
226
227
228 self.portal = fportal.BouncerPortal(self.dispatcher, None)
229
230 self.factory = pb.PBServerFactory(self.portal,
231 unsafeTracebacks=unsafeTracebacks)
232 self.connectionInfo = {}
233 self.setConnectionInfo(None, None, None)
234
236 """Cancel any pending operations in preparation for shutdown.
237
238 This method is mostly useful for unit tests; currently, it is
239 not called during normal operation. Note that the caller is
240 responsible for stopping listening on the port, as the the
241 manager does not have a handle on the twisted port object.
242
243 @returns: A deferred that will fire when the manager has shut
244 down.
245 """
246 if self.bouncer:
247 return self.bouncer.stop()
248 else:
249 return defer.succeed(None)
250
254
256 """Returns the manager's configuration as a string suitable for
257 importing via loadConfiguration().
258 """
259 return config.exportPlanetXml(self.state)
260
278
279 - def addMessage(self, level, mid, format, *args, **kwargs):
280 """
281 Convenience message to construct a message and add it to the
282 planet state. `format' should be marked as translatable in the
283 source with N_, and *args will be stored as format arguments.
284 Keyword arguments are passed on to the message constructor. See
285 L{flumotion.common.messages.Message} for the meanings of the
286 rest of the arguments.
287
288 For example::
289
290 self.addMessage(messages.WARNING, 'foo-warning',
291 N_('The answer is %d'), 42, debug='not really')
292 """
293 self.addMessageObject(messages.Message(level,
294 T_(format, *args),
295 mid=mid, **kwargs))
296
298 """
299 Add a message to the planet state.
300
301 @type message: L{flumotion.common.messages.Message}
302 """
303 self.state.setitem('messages', message.id, message)
304
306 """
307 Clear any messages with the given message ID from the planet
308 state.
309
310 @type mid: message ID, normally a str
311 """
312 if mid in self.state.get('messages'):
313 self.state.delitem('messages', mid)
314
316 """
317 @param identity: L{flumotion.common.identity.Identity}
318 """
319 socket = 'flumotion.component.plugs.adminaction.AdminActionPlug'
320 if socket in self.plugs:
321 for plug in self.plugs[socket]:
322 plug.action(identity, message, args, kw)
323
325 """
326 Compute a suitable identity for a remote host. First looks to
327 see if there is a
328 L{flumotion.component.plugs.identity.IdentityProviderPlug} plug
329 installed on the manager, falling back to user@host.
330
331 The identity is only used in the adminaction interface. An
332 example of its use is when you have an adminaction plug that
333 checks an admin's privileges before actually doing an action;
334 the identity object you use here might store the privileges that
335 the admin has.
336
337 @param keycard: the keycard that the remote host used to log in.
338 @type keycard: L{flumotion.common.keycards.Keycard}
339 @param remoteHost: the ip of the remote host
340 @type remoteHost: str
341
342 @rtype: a deferred that will fire a
343 L{flumotion.common.identity.RemoteIdentity}
344 """
345
346 socket = 'flumotion.component.plugs.identity.IdentityProviderPlug'
347 if socket in self.plugs:
348 for plug in self.plugs[socket]:
349 identity = plug.computeIdentity(keycard, remoteHost)
350 if identity:
351 return identity
352 username = getattr(keycard, 'username', None)
353 return defer.succeed(RemoteIdentity(username, remoteHost))
354
356 """
357 Add a component state for the given component config entry.
358
359 @rtype: L{flumotion.common.planet.ManagerComponentState}
360 """
361
362 self.debug('adding component %s to %s'
363 % (conf.name, parent.get('name')))
364
365 if identity != LOCAL_IDENTITY:
366 self.adminAction(identity, '_addComponent', (conf, parent), {})
367
368 state = planet.ManagerComponentState()
369 state.set('name', conf.name)
370 state.set('type', conf.getType())
371 state.set('workerRequested', conf.worker)
372 state.setMood(moods.sleeping.value)
373 state.set('config', conf.getConfigDict())
374
375 state.set('parent', parent)
376 parent.append('components', state)
377
378 avatarId = conf.getConfigDict()['avatarId']
379
380 self.clearMessage('loadComponent-%s' % avatarId)
381
382 configDict = conf.getConfigDict()
383 projectName = configDict['project']
384 versionTuple = configDict['version']
385
386 projectVersion = None
387 try:
388 projectVersion = project.get(projectName, 'version')
389 except errors.NoProjectError:
390 m = messages.Warning(T_(N_(
391 "This component is configured for Flumotion project '%s', "
392 "but that project is not installed.\n"),
393 projectName))
394 state.append('messages', m)
395
396 if projectVersion:
397 self.debug('project %s, version %r, project version %r' % (
398 projectName, versionTuple, projectVersion))
399 if not common.checkVersionsCompat(
400 versionTuple,
401 common.versionStringToTuple(projectVersion)):
402 m = messages.Warning(T_(N_(
403 "This component is configured for "
404 "Flumotion '%s' version %s, "
405 "but you are running version %s.\n"
406 "Please update the configuration of the component.\n"),
407 projectName, common.versionTupleToString(versionTuple),
408 projectVersion))
409 state.append('messages', m)
410
411
412 m = ComponentMapper()
413 m.state = state
414 m.id = avatarId
415 self._componentMappers[state] = m
416 self._componentMappers[avatarId] = m
417
418 return state
419
421 """
422 Add a new config object into the planet state.
423
424 @returns: a list of all components added
425 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
426 """
427
428 self.debug('syncing up planet state with config')
429 added = []
430
431 def checkNotRunning(comp, parentState):
432 name = comp.getName()
433
434 comps = dict([(x.get('name'), x)
435 for x in parentState.get('components')])
436 runningComps = dict([(x.get('name'), x)
437 for x in parentState.get('components')
438 if x.get('mood') != moods.sleeping.value])
439 if name not in comps:
440
441 return True
442 elif name not in runningComps:
443
444
445 oldComp = comps[name]
446 self.deleteComponent(oldComp)
447 return True
448
449
450
451
452 parent = comps[name].get('parent').get('name')
453 newConf = c.getConfigDict()
454 oldConf = comps[name].get('config')
455
456 if newConf == oldConf:
457 self.debug('%s already has component %s running with '
458 'same configuration', parent, name)
459 self.clearMessage('loadComponent-%s' % oldConf['avatarId'])
460 return False
461
462 self.info('%s already has component %s, but configuration '
463 'not the same -- notifying admin', parent, name)
464
465 diff = config.dictDiff(oldConf, newConf)
466 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new')
467
468 self.addMessage(messages.WARNING,
469 'loadComponent-%s' % oldConf['avatarId'],
470 N_('Could not load component %r into %r: '
471 'a component is already running with '
472 'this name, but has a different '
473 'configuration.'), name, parent,
474 debug=diffMsg)
475 return False
476
477 state = self.state
478 atmosphere = state.get('atmosphere')
479 for c in conf.atmosphere.components.values():
480 if checkNotRunning(c, atmosphere):
481 added.append(self._addComponent(c, atmosphere, identity))
482
483 flows = dict([(x.get('name'), x) for x in state.get('flows')])
484 for f in conf.flows:
485 if f.name in flows:
486 flow = flows[f.name]
487 else:
488 self.info('creating flow %r', f.name)
489 flow = planet.ManagerFlowState(name=f.name, parent=state)
490 state.append('flows', flow)
491
492 for c in f.components.values():
493 if checkNotRunning(c, flow):
494 added.append(self._addComponent(c, flow, identity))
495
496 return added
497
499
500
501 componentsToStart = {}
502 for c in components:
503 workerId = c.get('workerRequested')
504 if not workerId in componentsToStart:
505 componentsToStart[workerId] = []
506 componentsToStart[workerId].append(c)
507 self.debug('_startComponents: componentsToStart %r' %
508 (componentsToStart, ))
509
510 for workerId, componentStates in componentsToStart.items():
511 self._workerCreateComponents(workerId, componentStates)
512
519
521 """
522 Load the configuration from the given XML, merging it on top of
523 the currently running configuration.
524
525 @param file: file to parse, either as an open file object,
526 or as the name of a file to open
527 @type file: str or file
528 @param identity: The identity making this request.. This is used by the
529 adminaction logging mechanism in order to say who is
530 performing the action.
531 @type identity: L{flumotion.common.identity.Identity}
532 """
533 self.debug('loading configuration')
534 mid = 'loadComponent-parse-error'
535 if isinstance(file, str):
536 mid += '-%s' % file
537 try:
538 self.clearMessage(mid)
539 conf = config.PlanetConfigParser(file)
540 conf.parse()
541 return self._loadComponentConfiguration(conf, identity)
542 except errors.ConfigError, e:
543 self.addMessage(messages.WARNING, mid,
544 N_('Invalid component configuration.'),
545 debug=e.args[0])
546 return defer.fail(e)
547 except errors.UnknownComponentError, e:
548 if isinstance(file, str):
549 debug = 'Configuration loaded from file %r' % file
550 else:
551 debug = 'Configuration loaded remotely'
552 self.addMessage(messages.WARNING, mid,
553 N_('Unknown component in configuration: %s.'),
554 e.args[0], debug=debug)
555 return defer.fail(e)
556 except Exception, e:
557 self.addMessage(messages.WARNING, mid,
558 N_('Unknown error while loading configuration.'),
559 debug=log.getExceptionMessage(e))
560 return defer.fail(e)
561
578
584
606
607 def setupErrback(failure):
608 self.warning('Error starting manager bouncer')
609 d.addCallbacks(setupCallback, setupErrback)
610 return d
611
628
629 __pychecker__ = 'maxargs=11'
630
631 - def loadComponent(self, identity, componentType, componentId,
632 componentLabel, properties, workerName,
633 plugs, eaters, isClockMaster, virtualFeeds):
634 """
635 Load a component into the manager configuration.
636
637 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent}
638 for a definition of the argument types.
639 """
640 self.debug('loading %s component %s on %s',
641 componentType, componentId, workerName)
642 parentName, compName = common.parseComponentId(componentId)
643
644 if isClockMaster:
645 raise NotImplementedError("Clock master components are not "
646 "yet supported")
647 if worker is None:
648 raise errors.ConfigError("Component %r needs to specify the"
649 " worker on which it should run"
650 % componentId)
651
652 state = self.state
653 compState = None
654
655 compConf = config.ConfigEntryComponent(compName, parentName,
656 componentType,
657 componentLabel,
658 properties,
659 plugs, workerName,
660 eaters, isClockMaster,
661 None, None, virtualFeeds)
662
663 if compConf.defs.getNeedsSynchronization():
664 raise NotImplementedError("Components that need "
665 "synchronization are not yet "
666 "supported")
667
668 if parentName == 'atmosphere':
669 parentState = state.get('atmosphere')
670 else:
671 flows = dict([(x.get('name'), x) for x in state.get('flows')])
672 if parentName in flows:
673 parentState = flows[parentName]
674 else:
675 self.info('creating flow %r', parentName)
676 parentState = planet.ManagerFlowState(name=parentName,
677 parent=state)
678 state.append('flows', parentState)
679
680 components = [x.get('name') for x in parentState.get('components')]
681 if compName in components:
682 self.debug('%r already has component %r', parentName, compName)
683 raise errors.ComponentAlreadyExistsError(compName)
684
685 compState = self._addComponent(compConf, parentState, identity)
686
687 self._startComponents([compState], identity)
688
689 return compState
690
692 """
693 Create a heaven of the given klass that will send avatars to clients
694 implementing the given medium interface.
695
696 @param interface: the medium interface to create a heaven for
697 @type interface: L{flumotion.common.interfaces.IMedium}
698 @param klass: the type of heaven to create
699 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven}
700 """
701 assert issubclass(interface, interfaces.IMedium)
702 heaven = klass(self)
703 self.dispatcher.registerHeaven(heaven, interface)
704 return heaven
705
707 """
708 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer}
709 """
710 if self.bouncer:
711 self.warning("manager already had a bouncer, setting anyway")
712
713 self.bouncer = bouncer
714 self.portal.bouncer = bouncer
715 self.dispatcher.setBouncer(bouncer)
716
719
748
763
764 def stopLost():
765
766 def gotComponents(comps):
767 return avatarId in comps
768
769 def gotJobRunning(running):
770 if running:
771 self.warning('asked to stop lost component %r, but '
772 'it is still running', avatarId)
773
774
775 msg = "Cannot stop lost component which is still running."
776 raise errors.ComponentMoodError(msg)
777 else:
778 self.debug('component %r seems to be really lost, '
779 'setting to sleeping')
780 componentState.setMood(moods.sleeping.value)
781 componentState.set('moodPending', None)
782 return None
783
784 self.debug('asked to stop a lost component without avatar')
785 workerName = componentState.get('workerRequested')
786 if workerName and self.workerHeaven.hasAvatar(workerName):
787 self.debug('checking if component has job process running')
788 d = self.workerHeaven.getAvatar(workerName).getComponents()
789 d.addCallback(gotComponents)
790 d.addCallback(gotJobRunning)
791 return d
792 else:
793 self.debug('component lacks a worker, setting to sleeping')
794 d = defer.maybeDeferred(gotJobRunning, False)
795 return d
796
797 def stopUnknown():
798 msg = ('asked to stop a component without avatar in mood %s'
799 % moods.get(mood))
800 self.warning(msg)
801 return defer.fail(errors.ComponentMoodError(msg))
802
803 mood = componentState.get('mood')
804 stoppers = {moods.sad.value: stopSad,
805 moods.lost.value: stopLost}
806 return stoppers.get(mood, stopUnknown)()
807
809
810
811 d = componentAvatar.stop()
812
813 return d
814
845
847 """
848 Set the given message on the given component's state.
849 Can be called e.g. by a worker to report on a crashed component.
850 Sets the mood to sad if it is an error message.
851 """
852 if not avatarId in self._componentMappers:
853 self.warning('asked to set a message on non-mapped component %s' %
854 avatarId)
855 return
856
857 m = self._componentMappers[avatarId]
858 m.state.append('messages', message)
859 if message.level == messages.ERROR:
860 self.debug('Error message makes component sad')
861 m.state.setMood(moods.sad.value)
862
863
864
866
867 workerId = workerAvatar.avatarId
868 self.debug('vishnu.workerAttached(): id %s' % workerId)
869
870
871
872
873 components = [c for c in self._getComponentsToCreate()
874 if c.get('workerRequested') in (workerId, None)]
875
876
877
878
879 d = workerAvatar.getComponents()
880
881 def workerAvatarComponentListReceived(workerComponents):
882
883 lostComponents = list([c for c in self.getComponentStates()
884 if c.get('workerRequested') == workerId and \
885 c.get('mood') == moods.lost.value])
886 for comp in workerComponents:
887
888
889 if comp in self._componentMappers:
890 compState = self._componentMappers[comp].state
891 if compState in components:
892 components.remove(compState)
893 if compState in lostComponents:
894 lostComponents.remove(compState)
895
896 for compState in lostComponents:
897 self.info(
898 "Restarting previously lost component %s on worker %s",
899 self._componentMappers[compState].id, workerId)
900
901
902
903 compState.set('moodPending', None)
904 compState.setMood(moods.sleeping.value)
905
906 allComponents = components + lostComponents
907
908 if not allComponents:
909 self.debug(
910 "vishnu.workerAttached(): no components for this worker")
911 return
912
913 self._workerCreateComponents(workerId, allComponents)
914 d.addCallback(workerAvatarComponentListReceived)
915
916 reactor.callLater(0, self.componentHeaven.feedServerAvailable,
917 workerId)
918
920 """
921 Create the list of components on the given worker, sequentially, but
922 in no specific order.
923
924 @param workerId: avatarId of the worker
925 @type workerId: string
926 @param components: components to start
927 @type components: list of
928 L{flumotion.common.planet.ManagerComponentState}
929 """
930 self.debug("_workerCreateComponents: workerId %r, components %r" % (
931 workerId, components))
932
933 if not workerId in self.workerHeaven.avatars:
934 self.debug('worker %s not logged in yet, delaying '
935 'component start' % workerId)
936 return defer.succeed(None)
937
938 workerAvatar = self.workerHeaven.avatars[workerId]
939
940 d = defer.Deferred()
941
942 for c in components:
943 componentType = c.get('type')
944 conf = c.get('config')
945 self.debug('scheduling create of %s on %s'
946 % (conf['avatarId'], workerId))
947 d.addCallback(self._workerCreateComponentDelayed,
948 workerAvatar, c, componentType, conf)
949
950 d.addCallback(lambda result: self.debug(
951 '_workerCreateComponents(): completed setting up create chain'))
952
953
954 self.debug('_workerCreateComponents(): triggering create chain')
955 d.callback(None)
956
957 return d
958
975
976
977
978
980 self.debug('got avatarId %s for state %s' % (result, componentState))
981 m = self._componentMappers[componentState]
982 assert result == m.id, "received id %s is not the expected id %s" % (
983 result, m.id)
984
1008
1010
1011 workerId = workerAvatar.avatarId
1012 self.debug('vishnu.workerDetached(): id %s' % workerId)
1013
1014
1015 sadComponents = list([c for c in self.getComponentStates()
1016 if c.get('workerRequested') == workerId and \
1017 c.get('mood') == moods.sad.value])
1018 map(lambda c: c.setMood(moods.sleeping.value), sadComponents)
1019
1036
1038
1039 m = (self.getComponentMapper(componentAvatar.avatarId)
1040 or ComponentMapper())
1041
1042 m.state = componentAvatar.componentState
1043 m.jobState = componentAvatar.jobState
1044 m.id = componentAvatar.avatarId
1045 m.avatar = componentAvatar
1046
1047 self._componentMappers[m.state] = m
1048 self._componentMappers[m.jobState] = m
1049 self._componentMappers[m.id] = m
1050 self._componentMappers[m.avatar] = m
1051
1053
1054
1055 self.debug('unregisterComponent(%r): cleaning up state' %
1056 componentAvatar)
1057
1058 m = self._componentMappers[componentAvatar]
1059
1060
1061 try:
1062 del self._componentMappers[m.jobState]
1063 except KeyError:
1064 self.warning('Could not remove jobState for %r' % componentAvatar)
1065 m.jobState = None
1066
1067 m.state.set('pid', None)
1068 m.state.set('workerName', None)
1069 m.state.set('moodPending', None)
1070
1071
1072 del self._componentMappers[m.avatar]
1073 m.avatar = None
1074
1076 cList = self.state.getComponents()
1077 self.debug('getComponentStates(): %d components' % len(cList))
1078 for c in cList:
1079 self.log(repr(c))
1080 mood = c.get('mood')
1081 if mood == None:
1082 self.warning('%s has mood None' % c.get('name'))
1083
1084 return cList
1085
1087 """
1088 Empty the planet of the given component.
1089
1090 @returns: a deferred that will fire when all listeners have been
1091 notified of the removal of the component.
1092 """
1093 self.debug('deleting component %r from state', componentState)
1094 c = componentState
1095 if c not in self._componentMappers:
1096 raise errors.UnknownComponentError(c)
1097
1098 flow = componentState.get('parent')
1099 if (c.get('moodPending') != None
1100 or c.get('mood') is not moods.sleeping.value):
1101 raise errors.BusyComponentError(c)
1102
1103 del self._componentMappers[self._componentMappers[c].id]
1104 del self._componentMappers[c]
1105 return flow.remove('components', c)
1106
1108 for flow in self.state.get('flows'):
1109 if flow.get('name') == flowName:
1110 return flow
1111
1113 """
1114 Empty the planet of a flow.
1115
1116 @returns: a deferred that will fire when the flow is removed.
1117 """
1118
1119 flow = self._getFlowByName(flowName)
1120 if flow is None:
1121 raise ValueError("No flow called %s found" % (flowName, ))
1122
1123 components = flow.get('components')
1124 for c in components:
1125
1126 if (c.get('moodPending') != None or
1127 c.get('mood') is not moods.sleeping.value):
1128 raise errors.BusyComponentError(c)
1129 for c in components:
1130 del self._componentMappers[self._componentMappers[c].id]
1131 del self._componentMappers[c]
1132 d = flow.empty()
1133 d.addCallback(lambda _: self.state.remove('flows', flow))
1134 return d
1135
1137 """
1138 Empty the planet of all components, and flows. Also clears all
1139 messages.
1140
1141 @returns: a deferred that will fire when the planet is empty.
1142 """
1143 for mid in self.state.get('messages').keys():
1144 self.clearMessage(mid)
1145
1146
1147 components = self.getComponentStates()
1148
1149
1150 components = [c for c in components
1151 if c.get('moodPending') != None]
1152 if components:
1153 state = components[0]
1154 raise errors.BusyComponentError(
1155 state,
1156 "moodPending is %s" % moods.get(state.get('moodPending')))
1157
1158
1159 components = [c for c in self.getComponentStates()
1160 if c.get('mood') is not moods.sleeping.value]
1161
1162
1163 d = defer.Deferred()
1164
1165 self.debug('need to stop %d components: %r' % (
1166 len(components), components))
1167
1168 for c in components:
1169 avatar = self._componentMappers[c].avatar
1170
1171
1172 if avatar:
1173 d.addCallback(lambda result, a: a.stop(), avatar)
1174 else:
1175 assert (c.get('mood') is moods.sad.value or
1176 c.get('mood') is moods.lost.value)
1177
1178 d.addCallback(self._emptyPlanetCallback)
1179
1180
1181 reactor.callLater(0, d.callback, None)
1182
1183 return d
1184
1186
1187
1188 components = self.getComponentStates()
1189 self.debug('_emptyPlanetCallback: need to delete %d components' %
1190 len(components))
1191
1192 for c in components:
1193 if c.get('mood') is not moods.sleeping.value:
1194 self.warning('Component %s is not sleeping', c.get('name'))
1195
1196 m = self._componentMappers[c]
1197 del self._componentMappers[m.id]
1198 del self._componentMappers[c]
1199
1200
1201 l = self._componentMappers.keys()
1202 if len(l) > 0:
1203 self.warning('mappers still has keys %r' % (repr(l)))
1204
1205 dList = []
1206
1207 dList.append(self.state.get('atmosphere').empty())
1208
1209 for f in self.state.get('flows'):
1210 self.debug('appending deferred for emptying flow %r' % f)
1211 dList.append(f.empty())
1212 self.debug('appending deferred for removing flow %r' % f)
1213 dList.append(self.state.remove('flows', f))
1214 self.debug('appended deferreds')
1215
1216 dl = defer.DeferredList(dList)
1217 return dl
1218
1220 """
1221 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
1222 """
1223
1224 components = self.state.getComponents()
1225
1226
1227
1228
1229
1230 isSleeping = lambda c: c.get('mood') == moods.sleeping.value
1231 components = filter(isSleeping, components)
1232 return components
1233
1235
1236 if not workerName in self.workerHeaven.avatars:
1237 raise errors.ComponentNoWorkerError("Worker %s not logged in?"
1238 % workerName)
1239
1240 return self.workerHeaven.avatars[workerName]
1241
1243 if workerName in self.workerHeaven.avatars:
1244 return self._getWorker(workerName).feedServerPort
1245 return None
1246
1248 """
1249 Requests a number of ports on the worker named workerName. The
1250 ports will be reserved for the use of the caller until
1251 releasePortsOnWorker is called.
1252
1253 @returns: a list of ports as integers
1254 """
1255 return self._getWorker(workerName).reservePorts(numPorts)
1256
1258 """
1259 Tells the manager that the given ports are no longer being used,
1260 and may be returned to the allocation pool.
1261 """
1262 try:
1263 return self._getWorker(workerName).releasePorts(ports)
1264 except errors.ComponentNoWorkerError, e:
1265 self.warning('could not release ports: %r' % e.args)
1266
1268 """
1269 Look up an object mapper given the object.
1270
1271 @rtype: L{ComponentMapper} or None
1272 """
1273 if object in self._componentMappers.keys():
1274 return self._componentMappers[object]
1275
1276 return None
1277
1279 """
1280 Look up an object mapper given the object.
1281
1282 @rtype: L{ComponentMapper} or None
1283 """
1284 if object in self._componentMappers.keys():
1285 return self._componentMappers[object].state
1286
1287 return None
1288
1290 """
1291 Invokes method on all components of a certain type
1292 """
1293
1294 def invokeOnOneComponent(component, methodName, *args, **kwargs):
1295 m = self.getComponentMapper(component)
1296 if not m:
1297 self.warning('Component %s not mapped. Maybe deleted.',
1298 component.get('name'))
1299 raise errors.UnknownComponentError(component)
1300
1301 avatar = m.avatar
1302 if not avatar:
1303 self.warning('No avatar for %s, cannot call remote',
1304 component.get('name'))
1305 raise errors.SleepingComponentError(component)
1306
1307 try:
1308 return avatar.mindCallRemote(methodName, *args, **kwargs)
1309 except Exception, e:
1310 log_message = log.getExceptionMessage(e)
1311 msg = "exception on remote call %s: %s" % (methodName,
1312 log_message)
1313 self.warning(msg)
1314 raise errors.RemoteMethodError(methodName,
1315 log_message)
1316
1317
1318 dl_array = []
1319 for c in self.getComponentStates():
1320 if c.get('type') == componentType and \
1321 (c.get('mood') is moods.happy.value or
1322 c.get('mood') is moods.hungry.value):
1323 self.info("component %r to have %s run", c, methodName)
1324 d = invokeOnOneComponent(c, methodName, *args, **kwargs)
1325 dl_array.append(d)
1326 dl = defer.DeferredList(dl_array)
1327 return dl
1328