1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 manager-side objects for components
20
21 API Stability: semi-stable
22 """
23
24 from twisted.internet import reactor, defer
25 from twisted.python.failure import Failure
26 from zope.interface import implements
27
28 from flumotion.manager import base, config
29 from flumotion.common import errors, interfaces, log, planet
30 from flumotion.common import messages, common
31
32
33 from flumotion.common import keycards
34
35 from flumotion.common.i18n import N_, gettexter
36 from flumotion.common.planet import moods
37
38 __version__ = "$Rev$"
39 T_ = gettexter()
40
41
43 """
44 I am a Manager-side avatar for a component.
45 I live in the L{ComponentHeaven}.
46
47 Each component that logs in to the manager gets an avatar created for it
48 in the manager.
49
50 @cvar avatarId: the L{componentId<common.componentId>}
51 @type avatarId: str
52 @cvar jobState: job state of this avatar's component
53 @type jobState: L{flumotion.common.planet.ManagerJobState}
54 @cvar componentState: component state of this avatar's component
55 @type componentState: L{flumotion.common.planet.ManagerComponentState}
56 """
57
58 logCategory = 'comp-avatar'
59
60 - def __init__(self, heaven, avatarId, remoteIdentity, mind, conf,
61 jobState, clocking):
77
78
79
88
89
90
93
94 def gotStates(result):
95 (_s1, conf), (_s2, jobState), (_s3, clocking) = result
96 assert _s1 and _s2 and _s3
97 log.debug('component-avatar', 'got state information')
98 return (heaven, avatarId, remoteIdentity, mind,
99 conf, jobState, clocking)
100 log.debug('component-avatar', 'calling mind for state information')
101 d = defer.DeferredList([mind.callRemote('getConfig'),
102 mind.callRemote('getState'),
103 mind.callRemote('getMasterClockInfo')],
104 fireOnOneErrback=True)
105 d.addCallback(gotStates)
106 return d
107 makeAvatarInitArgs = classmethod(makeAvatarInitArgs)
108
135
136
137
138 - def addMessage(self, level, mid, format, *args, **kwargs):
139 """
140 Convenience message to construct a message and add it to the
141 component state. `format' should be marked as translatable in
142 the source with N_, and *args will be stored as format
143 arguments. Keyword arguments are passed on to the message
144 constructor. See L{flumotion.common.messages.Message} for the
145 meanings of the rest of the arguments.
146
147 For example::
148
149 self.addMessage(messages.WARNING, 'foo-warning',
150 N_('The answer is %d'), 42, debug='not really')
151 """
152 self.addMessageObject(messages.Message(level,
153 T_(format, *args),
154 mid=mid, **kwargs))
155
157 """
158 Add a message to the planet state.
159
160 @type message: L{flumotion.common.messages.Message}
161 """
162 self.componentState.append('messages', message)
163
180
182
183
184
185
186
187
188
189
190
191 def verifyExistingComponentState(conf, state):
192
193 state.setJobState(self.jobState)
194 self.componentState = state
195
196 self.upgradeConfig(state, conf)
197 if state.get('config') != conf:
198 diff = config.dictDiff(state.get('config'), conf)
199 diffMsg = config.dictDiffMessageString(diff,
200 'internal conf',
201 'running conf')
202 self.addMessage(messages.WARNING, 'stale-config',
203 N_("Component logged in with stale "
204 "configuration. To fix this, stop "
205 "this component and then restart "
206 "the manager."),
207 debug=("Updating internal conf from "
208 "running conf:\n" + diffMsg))
209 self.warning('updating internal component state for %r',
210 state)
211 self.debug('changes to conf: %s',
212 config.dictDiffMessageString(diff))
213 state.set('config', conf)
214
215 def makeNewComponentState(conf):
216
217 state = planet.ManagerComponentState()
218 state.setJobState(self.jobState)
219 self.componentState = state
220
221 self.upgradeConfig(state, conf)
222
223 flowName, compName = conf['parent'], conf['name']
224
225 state.set('name', compName)
226 state.set('type', conf['type'])
227 state.set('workerRequested', self.jobState.get('workerName'))
228 state.set('config', conf)
229 self.vishnu.addComponentToFlow(state, flowName)
230 return state
231
232 mState = self.vishnu.getManagerComponentState(self.avatarId)
233 if mState:
234 verifyExistingComponentState(conf, mState)
235 else:
236 makeNewComponentState(conf)
237
239 """
240 Tell the component to provide a master clock.
241
242 @rtype: L{twisted.internet.defer.Deferred}
243 """
244
245 def success(clocking):
246 self.clocking = clocking
247 self.heaven.masterClockAvailable(self)
248
249 def error(failure):
250 self.addMessage(messages.WARNING, 'provide-master-clock',
251 N_('Failed to provide the master clock'),
252 debug=log.getFailureMessage(failure))
253 self.vishnu.releasePortsOnWorker(self.getWorkerName(), [port])
254
255 if self.clocking:
256 self.heaven.masterClockAvailable(self)
257 else:
258 (port, ) = self.vishnu.reservePortsOnWorker(
259 self.getWorkerName(), 1)
260 self.debug('provideMasterClock on port %d', port)
261
262 d = self.mindCallRemote('provideMasterClock', port)
263 d.addCallbacks(success, error)
264
266 """
267 Returns the port on which a feed server for this component is
268 listening on.
269
270 @rtype: int
271 """
272 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
273
275 """
276 Get the IP address of the manager as seen by the component.
277
278 @rtype: str
279 """
280 return self.jobState.get('manager-ip')
281
283 """
284 Return the name of the worker.
285
286 @rtype: str
287 """
288 return self.jobState.get('workerName')
289
291 """
292 Return the PID of the component.
293
294 @rtype: int
295 """
296 return self.jobState.get('pid')
297
299 """
300 Get the name of the component.
301
302 @rtype: str
303 """
304 return self.componentState.get('name')
305
307 """
308 Get the name of the component's parent.
309
310 @rtype: str
311 """
312 return self.componentState.get('parent').get('name')
313
315 """
316 Get the component type name of the component.
317
318 @rtype: str
319 """
320 return self.componentState.get('type')
321
323 """
324 Get the set of eaters that this component eats from.
325
326 @rtype: dict of eaterName -> [(feedId, eaterAlias)]
327 """
328 return self.componentState.get('config').get('eater', {})
329
331 """
332 Get the list of feeders that this component provides.
333
334 @rtype: list of feederName
335 """
336 return self.componentState.get('config').get('feed', [])
337
339 """
340 Get the feedId of a feed provided or consumed by this component.
341
342 @param feedName: The name of the feed (i.e., eater alias or
343 feeder name)
344 @rtype: L{flumotion.common.common.feedId}
345 """
346 return common.feedId(self.getName(), feedName)
347
349 """
350 Get the full feedId of a feed provided or consumed by this
351 component.
352
353 @param feedName: The name of the feed (i.e., eater alias or
354 feeder name)
355 @rtype: L{flumotion.common.common.fullFeedId}
356 """
357 return common.fullFeedId(self.getParentName(),
358 self.getName(), feedName)
359
361 """
362 Get the set of virtual feeds provided by this component.
363
364 @rtype: dict of fullFeedId -> (ComponentAvatar, feederName)
365 """
366 conf = self.componentState.get('config')
367 ret = {}
368 for feedId, feederName in conf.get('virtual-feeds', {}).items():
369 vComp, vFeed = common.parseFeedId(feedId)
370 ffid = common.fullFeedId(self.getParentName(), vComp, vFeed)
371 ret[ffid] = (self, feederName)
372 return ret
373
375 """
376 Get the worker that this component should run on.
377
378 @rtype: str
379 """
380 return self.componentState.get('workerRequested')
381
383 """
384 Get this component's clock master, if any.
385
386 @rtype: avatarId or None
387 """
388 return self.componentState.get('config')['clock-master']
389
391 """
392 Tell the remote component to shut down.
393 """
394 self._shutdownDeferred = defer.Deferred()
395
396 self.mindCallRemote('stop')
397
398 return self._shutdownDeferred
399
403
404 - def eatFrom(self, eaterAlias, fullFeedId, host, port):
408
409 - def feedTo(self, feederName, fullFeedId, host, port):
413
415 """
416 Tell the remote component to modify a property with a new value.
417
418 @param property_name: The name of the property to change
419 @param value: The new value of the property
420 @rtype: L{twisted.internet.defer.Deferred}
421 """
422 return self.mindCallRemote('modifyProperty', property_name, value)
423
424
425
427 """
428 Authenticate the given keycard.
429 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \
430 """BouncerMedium.remote_authenticate}
431 The component should be a subclass of
432 L{flumotion.component.bouncers.bouncer.Bouncer}
433
434 @type keycard: L{flumotion.common.keycards.Keycard}
435 """
436 return self.mindCallRemote('authenticate', keycard)
437
439 """
440 Remove a keycard managed by this bouncer because the requester
441 has gone.
442
443 @type keycardId: str
444 """
445 return self.mindCallRemote('removeKeycardId', keycardId)
446
448 """
449 Expire a keycard issued to this component because the bouncer decided
450 to.
451
452 @type keycardId: str
453 """
454 return self.mindCallRemote('expireKeycard', keycardId)
455
457 """
458 Expire keycards issued to this component because the bouncer
459 decided to.
460
461 @type keycardIds: sequence of str
462 """
463 return self.mindCallRemote('expireKeycards', keycardIds)
464
466 """
467 Resets the expiry timeout for keycards issued by issuerName.
468
469 @param issuerName: the issuer for which keycards should be kept
470 alive; that is to say, keycards with the
471 attribute 'issuerName' set to this value will
472 have their ttl values reset.
473 @type issuerName: str
474 @param ttl: the new expiry timeout
475 @type ttl: number
476 """
477 return self.mindCallRemote('keepAlive', issuerName, ttl)
478
479
480
482 """
483 Called by a component to tell the manager that it's shutting down
484 cleanly (and thus should go to sleeping, rather than lost or sad)
485 """
486 self.debug("shutdown is clean, shouldn't go to lost")
487 self._shutdown_requested = True
488
506
508 """
509 Expire a keycard (and thus the requester's connection)
510 issued to the given requester.
511
512 This is called by the bouncer component that authenticated the keycard.
513
514 @param requesterId: name (avatarId) of the component that originally
515 requested authentication for the given keycardId
516 @type requesterId: str
517 @param keycardId: id of keycard to expire
518 @type keycardId: str
519 """
520
521 if not self.heaven.hasAvatar(requesterId):
522 self.warning('asked to expire keycard %s for requester %s, '
523 'but no such component registered',
524 keycardId, requesterId)
525 raise errors.UnknownComponentError(requesterId)
526
527 return self.heaven.getAvatar(requesterId).expireKeycard(keycardId)
528
530 """
531 Expire multiple keycards (and thus the requester's connections)
532 issued to the given requester.
533
534 This is called by the bouncer component that authenticated
535 the keycards.
536
537 @param requesterId: name (avatarId) of the component that originally
538 requested authentication for the given keycardId
539 @type requesterId: str
540 @param keycardIds: sequence of id of keycards to expire
541 @type keycardIds: sequence of str
542 """
543 if not self.heaven.hasAvatar(requesterId):
544 self.warning('asked to expire %d keycards for requester %s, '
545 'but no such component registered',
546 len(keycardIds), requesterId)
547 raise errors.UnknownComponentError(requesterId)
548
549 return self.heaven.getAvatar(requesterId).expireKeycards(keycardIds)
550
551
553
554 - def add(self, key, value):
555 if key not in self:
556 self[key] = []
557 self[key].append(value)
558
559 - def remove(self, key, value):
560 self[key].remove(value)
561 if not self[key]:
562 del self[key]
563
564
565 -class FeedMap(object, log.Loggable):
566 logName = 'feed-map'
567
569
570 self.avatars = {}
571 self._ordered_avatars = []
572 self._dirty = True
573 self._recalc()
574
576 assert avatar.avatarId not in self.avatars
577 self.avatars[avatar.avatarId] = avatar
578 self._ordered_avatars.append(avatar)
579 self._dirty = True
580
582
583
584 del self.avatars[avatar.avatarId]
585 self._ordered_avatars.remove(avatar)
586 self._dirty = True
587
588
589 return [(a, f) for a, f in self.feedDeps.pop(avatar, [])
590 if a.avatarId in self.avatars]
591
605
607 if not self._dirty:
608 return
609 self.feedersForEaters = ffe = {}
610 self.eatersForFeeders = eff = dictlist()
611 self.feeds = dictlist()
612 self.feedDeps = dictlist()
613
614 for comp in self._ordered_avatars:
615 for feederName in comp.getFeeders():
616 self.feeds.add(comp.getFullFeedId(feederName),
617 (comp, feederName))
618 for ffid, pair in comp.getVirtualFeeds().items():
619 self.feeds.add(ffid, pair)
620
621 for eater in self.avatars.values():
622 for pairs in eater.getEaters().values():
623 for feedId, eName in pairs:
624 feeder, fName = self._getFeederAvatar(eater, feedId)
625 if feeder:
626 ffe[eater.getFullFeedId(eName)] = (
627 eName, feeder, fName)
628 eff.add(feeder.getFullFeedId(fName),
629 (fName, eater, eName))
630 else:
631 self.debug('eater %s waiting for feed %s to log in',
632 eater.getFeedId(eName), feedId)
633 self._dirty = False
634
636 """Get the set of feeds that this component is eating from,
637 keyed by eater alias.
638
639 @return: a list of (eaterAlias, feederAvatar, feedName) tuples
640 @rtype: list of (str, ComponentAvatar, str)
641 """
642 self._recalc()
643 ret = []
644 for tups in avatar.getEaters().values():
645 for feedId, alias in tups:
646 ffid = avatar.getFullFeedId(alias)
647 if ffid in self.feedersForEaters:
648 ret.append(self.feedersForEaters[ffid])
649 return ret
650
652 """Get the set of feeds that this component is eating from
653 for the given feedId.
654
655 @param avatar: the eater component
656 @type avatar: L{ComponentAvatar}
657 @param ffid: full feed id for which to return feeders
658 @type ffid: str
659 @return: a list of (eaterAlias, feederAvatar, feedName) tuples
660 @rtype: list of (str, L{ComponentAvatar}, str)
661 """
662 self._recalc()
663 ret = []
664 for feeder, feedName in self.feeds.get(ffid, []):
665 rffid = feeder.getFullFeedId(feedName)
666 eff = self.eatersForFeeders.get(rffid, [])
667 for fName, eater, eaterName in eff:
668 if eater == avatar:
669 ret.append((eaterName, feeder, feedName))
670 return ret
671
673 """Get the set of eaters that this component feeds, keyed by
674 feeder name.
675
676 @return: a list of (feederName, eaterAvatar, eaterAlias) tuples
677 @rtype: list of (str, ComponentAvatar, str)
678 """
679 self._recalc()
680 ret = []
681 for feedName in avatar.getFeeders():
682 ffid = avatar.getFullFeedId(feedName)
683 if ffid in self.eatersForFeeders:
684 ret.extend(self.eatersForFeeders[ffid])
685 return ret
686
687
689 """
690 I handle all registered components and provide L{ComponentAvatar}s
691 for them.
692 """
693
694 implements(interfaces.IHeaven)
695 avatarClass = ComponentAvatar
696
697 logCategory = 'comp-heaven'
698
703
704
705
714
725
727 master = avatar.getClockMaster()
728 if master:
729 if master == avatar.avatarId:
730 self.debug('Need for %r to provide a clock master',
731 master)
732 avatar.provideMasterClock()
733 else:
734 self.debug('Need to synchronize with clock master %r',
735 master)
736
737
738
739
740 m = self.vishnu.getComponentMapper(master)
741 if m and m.avatar:
742 clocking = m.avatar.clocking
743 if clocking:
744 host, port, base_time = clocking
745 avatar.setClocking(host, port, base_time)
746 else:
747 self.warning('%r should provide a clock master '
748 'but is not doing so', master)
749
750 else:
751 self.debug('clock master not logged in yet, will '
752 'set clocking later')
753
760
762 assert avatar.avatarId not in self.avatars
763 compsNeedingReconnect = self.feedMap.componentDetached(avatar)
764 if self.vishnu.running:
765 self.debug('will reconnect: %r', compsNeedingReconnect)
766
767
768 for comp, ffid in compsNeedingReconnect:
769 self._connectEaters(comp, ffid)
770
772 """
773 @param fromAvatar: the avatar to connect from
774 @type fromAvatar: L{ComponentAvatar}
775 @param fromAvatar: the avatar to connect to
776 @type toAvatar: L{ComponentAvatar}
777
778 @returns: the host and port on which to make the connection to
779 toAvatar from fromAvatar
780 @rtype: tuple of (str, int or None)
781 """
782 toHost = toAvatar.getClientAddress()
783 toPort = toAvatar.getFeedServerPort()
784
785
786
787
788
789
790
791 fromHost = fromAvatar.mind.broker.transport.getPeer().host
792 if fromHost == toHost:
793 toHost = '127.0.0.1'
794
795 self.debug('mapNetFeed from %r to %r: %s:%r', fromAvatar, toAvatar,
796 toHost, toPort)
797 return toHost, toPort
798
809
811
812
813 def always(otherComp):
814 return True
815
816 def never(otherComp):
817 return False
818 directions = [(self.feedMap.getFeedersForEaters,
819 always, 'eatFrom', 'feedTo'),
820 (self.feedMap.getEatersForFeeders,
821 never, 'feedTo', 'eatFrom')]
822
823 myComp = avatar
824 for getPeers, initiate, directMethod, reversedMethod in directions:
825 for myFeedName, otherComp, otherFeedName in getPeers(myComp):
826 if initiate(otherComp):
827
828 self._connectFeederToEater(myComp, myFeedName, otherComp,
829 otherFeedName, directMethod)
830 else:
831
832 self._connectFeederToEater(otherComp, otherFeedName,
833 myComp, myFeedName,
834 reversedMethod)
835
837
838 ffe = self.feedMap.getFeedersForEater(avatar, ffid)
839 for myFeedName, otherComp, otherFeedName in ffe:
840 self._connectFeederToEater(avatar, myFeedName, otherComp,
841 otherFeedName, 'eatFrom')
842