Package flumotion :: Package manager :: Module component
[hide private]

Source Code for Module flumotion.manager.component

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_component -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  manager-side objects for components 
 20   
 21  API Stability: semi-stable 
 22  """ 
 23   
 24  from twisted.internet import reactor, defer 
 25  from twisted.python.failure import Failure 
 26  from zope.interface import implements 
 27   
 28  from flumotion.manager import base, config 
 29  from flumotion.common import errors, interfaces, log, planet 
 30  from flumotion.common import messages, common 
 31   
 32  # registers serializable 
 33  from flumotion.common import keycards 
 34   
 35  from flumotion.common.i18n import N_, gettexter 
 36  from flumotion.common.planet import moods 
 37   
 38  __version__ = "$Rev$" 
 39  T_ = gettexter() 
 40   
 41   
42 -class ComponentAvatar(base.ManagerAvatar):
43 """ 44 I am a Manager-side avatar for a component. 45 I live in the L{ComponentHeaven}. 46 47 Each component that logs in to the manager gets an avatar created for it 48 in the manager. 49 50 @cvar avatarId: the L{componentId<common.componentId>} 51 @type avatarId: str 52 @cvar jobState: job state of this avatar's component 53 @type jobState: L{flumotion.common.planet.ManagerJobState} 54 @cvar componentState: component state of this avatar's component 55 @type componentState: L{flumotion.common.planet.ManagerComponentState} 56 """ 57 58 logCategory = 'comp-avatar' 59
60 - def __init__(self, heaven, avatarId, remoteIdentity, mind, conf, 61 jobState, clocking):
62 # doc in base class 63 base.ManagerAvatar.__init__(self, heaven, avatarId, 64 remoteIdentity, mind) 65 66 self.jobState = jobState 67 self.makeComponentState(conf) 68 self.clocking = clocking 69 70 self._shutdown_requested = False 71 self._shutdownDeferred = None 72 73 self.vishnu.registerComponent(self) 74 # calllater to allow the component a chance to receive its 75 # avatar, so that it has set medium.remote 76 reactor.callLater(0, self.heaven.componentAttached, self)
77 78 ### python methods 79
80 - def __repr__(self):
81 mood = '(unknown)' 82 if self.componentState: 83 moodValue = self.componentState.get('mood') 84 if moodValue is not None: 85 mood = moods.get(moodValue).name 86 return '<%s %s (mood %s)>' % (self.__class__.__name__, 87 self.avatarId, mood)
88 89 ### ComponentAvatar methods 90
91 - def makeAvatarInitArgs(klass, heaven, avatarId, remoteIdentity, 92 mind):
93 94 def gotStates(result): 95 (_s1, conf), (_s2, jobState), (_s3, clocking) = result 96 assert _s1 and _s2 and _s3 # fireOnErrback=1 97 log.debug('component-avatar', 'got state information') 98 return (heaven, avatarId, remoteIdentity, mind, 99 conf, jobState, clocking)
100 log.debug('component-avatar', 'calling mind for state information') 101 d = defer.DeferredList([mind.callRemote('getConfig'), 102 mind.callRemote('getState'), 103 mind.callRemote('getMasterClockInfo')], 104 fireOnOneErrback=True) 105 d.addCallback(gotStates) 106 return d
107 makeAvatarInitArgs = classmethod(makeAvatarInitArgs) 108
109 - def onShutdown(self):
110 # doc in base class 111 self.info('component "%s" logged out', self.avatarId) 112 113 self.vishnu.unregisterComponent(self) 114 115 if self.clocking: 116 ip, port, base_time = self.clocking 117 self.vishnu.releasePortsOnWorker(self.getWorkerName(), 118 [port]) 119 120 self.componentState.clearJobState(self._shutdown_requested) 121 122 # FIXME: why? 123 self.componentState.set('moodPending', None) 124 125 self.componentState = None 126 self.jobState = None 127 128 self.heaven.componentDetached(self) 129 130 if self._shutdownDeferred: 131 reactor.callLater(0, self._shutdownDeferred.callback, True) 132 self._shutdownDeferred = None 133 134 base.ManagerAvatar.onShutdown(self)
135 136 # my methods 137
138 - def addMessage(self, level, mid, format, *args, **kwargs):
139 """ 140 Convenience message to construct a message and add it to the 141 component state. `format' should be marked as translatable in 142 the source with N_, and *args will be stored as format 143 arguments. Keyword arguments are passed on to the message 144 constructor. See L{flumotion.common.messages.Message} for the 145 meanings of the rest of the arguments. 146 147 For example:: 148 149 self.addMessage(messages.WARNING, 'foo-warning', 150 N_('The answer is %d'), 42, debug='not really') 151 """ 152 self.addMessageObject(messages.Message(level, 153 T_(format, *args), 154 mid=mid, **kwargs))
155
156 - def addMessageObject(self, message):
157 """ 158 Add a message to the planet state. 159 160 @type message: L{flumotion.common.messages.Message} 161 """ 162 self.componentState.append('messages', message)
163
164 - def upgradeConfig(self, state, conf):
165 # different from conf['version'], eh... 166 version = conf.get('config-version', 0) 167 while version < config.CURRENT_VERSION: 168 try: 169 config.UPGRADERS[version](conf) 170 version += 1 171 conf['config-version'] = version 172 except Exception, e: 173 self.addMessage(messages.WARNING, 174 'upgrade-%d' % version, 175 N_("Failed to upgrade config %r " 176 "from version %d. Please file " 177 "a bug."), conf, version, 178 debug=log.getExceptionMessage(e)) 179 return
180
181 - def makeComponentState(self, conf):
182 # the component just logged in with good credentials. we fetched 183 # its config and job state. now there are two possibilities: 184 # (1) we were waiting for such a component to start. There was 185 # a ManagerComponentState and an avatarId in the 186 # componentMappers waiting for us. 187 # (2) we don't know anything about this component, but it has a 188 # state and config. We deal with it, creating all the 189 # neccesary internal state. 190 191 def verifyExistingComponentState(conf, state): 192 # condition (1) 193 state.setJobState(self.jobState) 194 self.componentState = state 195 196 self.upgradeConfig(state, conf) 197 if state.get('config') != conf: 198 diff = config.dictDiff(state.get('config'), conf) 199 diffMsg = config.dictDiffMessageString(diff, 200 'internal conf', 201 'running conf') 202 self.addMessage(messages.WARNING, 'stale-config', 203 N_("Component logged in with stale " 204 "configuration. To fix this, stop " 205 "this component and then restart " 206 "the manager."), 207 debug=("Updating internal conf from " 208 "running conf:\n" + diffMsg)) 209 self.warning('updating internal component state for %r', 210 state) 211 self.debug('changes to conf: %s', 212 config.dictDiffMessageString(diff)) 213 state.set('config', conf)
214 215 def makeNewComponentState(conf): 216 # condition (2) 217 state = planet.ManagerComponentState() 218 state.setJobState(self.jobState) 219 self.componentState = state 220 221 self.upgradeConfig(state, conf) 222 223 flowName, compName = conf['parent'], conf['name'] 224 225 state.set('name', compName) 226 state.set('type', conf['type']) 227 state.set('workerRequested', self.jobState.get('workerName')) 228 state.set('config', conf) 229 self.vishnu.addComponentToFlow(state, flowName) 230 return state 231 232 mState = self.vishnu.getManagerComponentState(self.avatarId) 233 if mState: 234 verifyExistingComponentState(conf, mState) 235 else: 236 makeNewComponentState(conf) 237
238 - def provideMasterClock(self):
239 """ 240 Tell the component to provide a master clock. 241 242 @rtype: L{twisted.internet.defer.Deferred} 243 """ 244 245 def success(clocking): 246 self.clocking = clocking 247 self.heaven.masterClockAvailable(self)
248 249 def error(failure): 250 self.addMessage(messages.WARNING, 'provide-master-clock', 251 N_('Failed to provide the master clock'), 252 debug=log.getFailureMessage(failure)) 253 self.vishnu.releasePortsOnWorker(self.getWorkerName(), [port]) 254 255 if self.clocking: 256 self.heaven.masterClockAvailable(self) 257 else: 258 (port, ) = self.vishnu.reservePortsOnWorker( 259 self.getWorkerName(), 1) 260 self.debug('provideMasterClock on port %d', port) 261 262 d = self.mindCallRemote('provideMasterClock', port) 263 d.addCallbacks(success, error) 264
265 - def getFeedServerPort(self):
266 """ 267 Returns the port on which a feed server for this component is 268 listening on. 269 270 @rtype: int 271 """ 272 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
273
274 - def getRemoteManagerIP(self):
275 """ 276 Get the IP address of the manager as seen by the component. 277 278 @rtype: str 279 """ 280 return self.jobState.get('manager-ip')
281
282 - def getWorkerName(self):
283 """ 284 Return the name of the worker. 285 286 @rtype: str 287 """ 288 return self.jobState.get('workerName')
289
290 - def getPid(self):
291 """ 292 Return the PID of the component. 293 294 @rtype: int 295 """ 296 return self.jobState.get('pid')
297
298 - def getName(self):
299 """ 300 Get the name of the component. 301 302 @rtype: str 303 """ 304 return self.componentState.get('name')
305
306 - def getParentName(self):
307 """ 308 Get the name of the component's parent. 309 310 @rtype: str 311 """ 312 return self.componentState.get('parent').get('name')
313
314 - def getType(self):
315 """ 316 Get the component type name of the component. 317 318 @rtype: str 319 """ 320 return self.componentState.get('type')
321
322 - def getEaters(self):
323 """ 324 Get the set of eaters that this component eats from. 325 326 @rtype: dict of eaterName -> [(feedId, eaterAlias)] 327 """ 328 return self.componentState.get('config').get('eater', {})
329
330 - def getFeeders(self):
331 """ 332 Get the list of feeders that this component provides. 333 334 @rtype: list of feederName 335 """ 336 return self.componentState.get('config').get('feed', [])
337
338 - def getFeedId(self, feedName):
339 """ 340 Get the feedId of a feed provided or consumed by this component. 341 342 @param feedName: The name of the feed (i.e., eater alias or 343 feeder name) 344 @rtype: L{flumotion.common.common.feedId} 345 """ 346 return common.feedId(self.getName(), feedName)
347
348 - def getFullFeedId(self, feedName):
349 """ 350 Get the full feedId of a feed provided or consumed by this 351 component. 352 353 @param feedName: The name of the feed (i.e., eater alias or 354 feeder name) 355 @rtype: L{flumotion.common.common.fullFeedId} 356 """ 357 return common.fullFeedId(self.getParentName(), 358 self.getName(), feedName)
359
360 - def getVirtualFeeds(self):
361 """ 362 Get the set of virtual feeds provided by this component. 363 364 @rtype: dict of fullFeedId -> (ComponentAvatar, feederName) 365 """ 366 conf = self.componentState.get('config') 367 ret = {} 368 for feedId, feederName in conf.get('virtual-feeds', {}).items(): 369 vComp, vFeed = common.parseFeedId(feedId) 370 ffid = common.fullFeedId(self.getParentName(), vComp, vFeed) 371 ret[ffid] = (self, feederName) 372 return ret
373
374 - def getWorker(self):
375 """ 376 Get the worker that this component should run on. 377 378 @rtype: str 379 """ 380 return self.componentState.get('workerRequested')
381
382 - def getClockMaster(self):
383 """ 384 Get this component's clock master, if any. 385 386 @rtype: avatarId or None 387 """ 388 return self.componentState.get('config')['clock-master']
389
390 - def stop(self):
391 """ 392 Tell the remote component to shut down. 393 """ 394 self._shutdownDeferred = defer.Deferred() 395 396 self.mindCallRemote('stop') 397 398 return self._shutdownDeferred
399
400 - def setClocking(self, host, port, base_time):
401 # setMood on error? 402 return self.mindCallRemote('setMasterClock', host, port, base_time)
403
404 - def eatFrom(self, eaterAlias, fullFeedId, host, port):
405 self.debug('connecting eater %s to feed %s', eaterAlias, fullFeedId) 406 return self.mindCallRemote('eatFrom', eaterAlias, fullFeedId, 407 host, port)
408
409 - def feedTo(self, feederName, fullFeedId, host, port):
410 self.debug('connecting feeder %s to feed %s', feederName, fullFeedId) 411 return self.mindCallRemote('feedTo', feederName, fullFeedId, 412 host, port)
413
414 - def modifyProperty(self, property_name, value):
415 """ 416 Tell the remote component to modify a property with a new value. 417 418 @param property_name: The name of the property to change 419 @param value: The new value of the property 420 @rtype: L{twisted.internet.defer.Deferred} 421 """ 422 return self.mindCallRemote('modifyProperty', property_name, value)
423 424 # FIXME: maybe make a BouncerComponentAvatar subclass ? 425
426 - def authenticate(self, keycard):
427 """ 428 Authenticate the given keycard. 429 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \ 430 """BouncerMedium.remote_authenticate} 431 The component should be a subclass of 432 L{flumotion.component.bouncers.bouncer.Bouncer} 433 434 @type keycard: L{flumotion.common.keycards.Keycard} 435 """ 436 return self.mindCallRemote('authenticate', keycard)
437
438 - def removeKeycardId(self, keycardId):
439 """ 440 Remove a keycard managed by this bouncer because the requester 441 has gone. 442 443 @type keycardId: str 444 """ 445 return self.mindCallRemote('removeKeycardId', keycardId)
446
447 - def expireKeycard(self, keycardId):
448 """ 449 Expire a keycard issued to this component because the bouncer decided 450 to. 451 452 @type keycardId: str 453 """ 454 return self.mindCallRemote('expireKeycard', keycardId)
455
456 - def expireKeycards(self, keycardIds):
457 """ 458 Expire keycards issued to this component because the bouncer 459 decided to. 460 461 @type keycardIds: sequence of str 462 """ 463 return self.mindCallRemote('expireKeycards', keycardIds)
464
465 - def keepAlive(self, issuerName, ttl):
466 """ 467 Resets the expiry timeout for keycards issued by issuerName. 468 469 @param issuerName: the issuer for which keycards should be kept 470 alive; that is to say, keycards with the 471 attribute 'issuerName' set to this value will 472 have their ttl values reset. 473 @type issuerName: str 474 @param ttl: the new expiry timeout 475 @type ttl: number 476 """ 477 return self.mindCallRemote('keepAlive', issuerName, ttl)
478 479 ### IPerspective methods, called by the worker's component 480
481 - def perspective_cleanShutdown(self):
482 """ 483 Called by a component to tell the manager that it's shutting down 484 cleanly (and thus should go to sleeping, rather than lost or sad) 485 """ 486 self.debug("shutdown is clean, shouldn't go to lost") 487 self._shutdown_requested = True
488
489 - def perspective_removeKeycardId(self, bouncerName, keycardId):
490 """ 491 Remove a keycard on the given bouncer on behalf of a 492 component's medium. 493 494 This is requested by a component that created the keycard. 495 496 @type bouncerName: str 497 @param keycardId: id of keycard to remove 498 @type keycardId: str 499 """ 500 avatarId = common.componentId('atmosphere', bouncerName) 501 if not self.heaven.hasAvatar(avatarId): 502 self.warning('No bouncer with id %s registered', avatarId) 503 raise errors.UnknownComponentError(avatarId) 504 505 return self.heaven.getAvatar(avatarId).removeKeycardId(keycardId)
506
507 - def perspective_expireKeycard(self, requesterId, keycardId):
508 """ 509 Expire a keycard (and thus the requester's connection) 510 issued to the given requester. 511 512 This is called by the bouncer component that authenticated the keycard. 513 514 @param requesterId: name (avatarId) of the component that originally 515 requested authentication for the given keycardId 516 @type requesterId: str 517 @param keycardId: id of keycard to expire 518 @type keycardId: str 519 """ 520 # FIXME: we should also be able to expire manager bouncer keycards 521 if not self.heaven.hasAvatar(requesterId): 522 self.warning('asked to expire keycard %s for requester %s, ' 523 'but no such component registered', 524 keycardId, requesterId) 525 raise errors.UnknownComponentError(requesterId) 526 527 return self.heaven.getAvatar(requesterId).expireKeycard(keycardId)
528
529 - def perspective_expireKeycards(self, requesterId, keycardIds):
530 """ 531 Expire multiple keycards (and thus the requester's connections) 532 issued to the given requester. 533 534 This is called by the bouncer component that authenticated 535 the keycards. 536 537 @param requesterId: name (avatarId) of the component that originally 538 requested authentication for the given keycardId 539 @type requesterId: str 540 @param keycardIds: sequence of id of keycards to expire 541 @type keycardIds: sequence of str 542 """ 543 if not self.heaven.hasAvatar(requesterId): 544 self.warning('asked to expire %d keycards for requester %s, ' 545 'but no such component registered', 546 len(keycardIds), requesterId) 547 raise errors.UnknownComponentError(requesterId) 548 549 return self.heaven.getAvatar(requesterId).expireKeycards(keycardIds)
550 551
552 -class dictlist(dict):
553
554 - def add(self, key, value):
555 if key not in self: 556 self[key] = [] 557 self[key].append(value)
558
559 - def remove(self, key, value):
560 self[key].remove(value) 561 if not self[key]: 562 del self[key]
563 564
565 -class FeedMap(object, log.Loggable):
566 logName = 'feed-map' 567
568 - def __init__(self):
569 #FIXME: Use twisted.python.util.OrderedDict instead 570 self.avatars = {} 571 self._ordered_avatars = [] 572 self._dirty = True 573 self._recalc()
574
575 - def componentAttached(self, avatar):
576 assert avatar.avatarId not in self.avatars 577 self.avatars[avatar.avatarId] = avatar 578 self._ordered_avatars.append(avatar) 579 self._dirty = True
580
581 - def componentDetached(self, avatar):
582 # returns the a list of other components that will need to be 583 # reconnected 584 del self.avatars[avatar.avatarId] 585 self._ordered_avatars.remove(avatar) 586 self._dirty = True 587 # NB, feedDeps is dirty. Scrub it of avatars that have logged 588 # out 589 return [(a, f) for a, f in self.feedDeps.pop(avatar, []) 590 if a.avatarId in self.avatars]
591
592 - def _getFeederAvatar(self, eater, feedId):
593 # FIXME: 'get' part is confusing - this methods _modifies_ structures! 594 flowName = eater.getParentName() 595 compName, feedName = common.parseFeedId(feedId) 596 ffid = common.fullFeedId(flowName, compName, feedName) 597 feeder = None 598 if ffid in self.feeds: 599 feeder, feedName = self.feeds[ffid][0] 600 self.feedDeps.add(feeder, (eater, ffid)) 601 if feeder.getFeedId(feedName) != feedId: 602 self.debug('chose %s for feed %s', 603 feeder.getFeedId(feedName), feedId) 604 return feeder, feedName
605
606 - def _recalc(self):
607 if not self._dirty: 608 return 609 self.feedersForEaters = ffe = {} 610 self.eatersForFeeders = eff = dictlist() 611 self.feeds = dictlist() 612 self.feedDeps = dictlist() 613 614 for comp in self._ordered_avatars: 615 for feederName in comp.getFeeders(): 616 self.feeds.add(comp.getFullFeedId(feederName), 617 (comp, feederName)) 618 for ffid, pair in comp.getVirtualFeeds().items(): 619 self.feeds.add(ffid, pair) 620 621 for eater in self.avatars.values(): 622 for pairs in eater.getEaters().values(): 623 for feedId, eName in pairs: 624 feeder, fName = self._getFeederAvatar(eater, feedId) 625 if feeder: 626 ffe[eater.getFullFeedId(eName)] = ( 627 eName, feeder, fName) 628 eff.add(feeder.getFullFeedId(fName), 629 (fName, eater, eName)) 630 else: 631 self.debug('eater %s waiting for feed %s to log in', 632 eater.getFeedId(eName), feedId) 633 self._dirty = False
634
635 - def getFeedersForEaters(self, avatar):
636 """Get the set of feeds that this component is eating from, 637 keyed by eater alias. 638 639 @return: a list of (eaterAlias, feederAvatar, feedName) tuples 640 @rtype: list of (str, ComponentAvatar, str) 641 """ 642 self._recalc() 643 ret = [] 644 for tups in avatar.getEaters().values(): 645 for feedId, alias in tups: 646 ffid = avatar.getFullFeedId(alias) 647 if ffid in self.feedersForEaters: 648 ret.append(self.feedersForEaters[ffid]) 649 return ret
650
651 - def getFeedersForEater(self, avatar, ffid):
652 """Get the set of feeds that this component is eating from 653 for the given feedId. 654 655 @param avatar: the eater component 656 @type avatar: L{ComponentAvatar} 657 @param ffid: full feed id for which to return feeders 658 @type ffid: str 659 @return: a list of (eaterAlias, feederAvatar, feedName) tuples 660 @rtype: list of (str, L{ComponentAvatar}, str) 661 """ 662 self._recalc() 663 ret = [] 664 for feeder, feedName in self.feeds.get(ffid, []): 665 rffid = feeder.getFullFeedId(feedName) 666 eff = self.eatersForFeeders.get(rffid, []) 667 for fName, eater, eaterName in eff: 668 if eater == avatar: 669 ret.append((eaterName, feeder, feedName)) 670 return ret
671
672 - def getEatersForFeeders(self, avatar):
673 """Get the set of eaters that this component feeds, keyed by 674 feeder name. 675 676 @return: a list of (feederName, eaterAvatar, eaterAlias) tuples 677 @rtype: list of (str, ComponentAvatar, str) 678 """ 679 self._recalc() 680 ret = [] 681 for feedName in avatar.getFeeders(): 682 ffid = avatar.getFullFeedId(feedName) 683 if ffid in self.eatersForFeeders: 684 ret.extend(self.eatersForFeeders[ffid]) 685 return ret
686 687
688 -class ComponentHeaven(base.ManagerHeaven):
689 """ 690 I handle all registered components and provide L{ComponentAvatar}s 691 for them. 692 """ 693 694 implements(interfaces.IHeaven) 695 avatarClass = ComponentAvatar 696 697 logCategory = 'comp-heaven' 698
699 - def __init__(self, vishnu):
700 # doc in base class 701 base.ManagerHeaven.__init__(self, vishnu) 702 self.feedMap = FeedMap()
703 704 ### our methods 705
706 - def feedServerAvailable(self, workerName):
707 self.debug('feed server %s logged in, we can connect to its port', 708 workerName) 709 # can be made more efficient 710 for avatar in self.avatars.values(): 711 if avatar.getWorkerName() == workerName: 712 self._setupClocking(avatar) 713 self._connectEatersAndFeeders(avatar)
714
715 - def masterClockAvailable(self, component):
716 self.debug('master clock for %r provided on %r', component.avatarId, 717 component.clocking) 718 component_flow = component.getParentName() 719 # can be made more efficient 720 for avatar in self.avatars.values(): 721 if avatar.avatarId != component.avatarId: 722 flow = avatar.getParentName() 723 if flow == component_flow: 724 self._setupClocking(avatar)
725
726 - def _setupClocking(self, avatar):
727 master = avatar.getClockMaster() 728 if master: 729 if master == avatar.avatarId: 730 self.debug('Need for %r to provide a clock master', 731 master) 732 avatar.provideMasterClock() 733 else: 734 self.debug('Need to synchronize with clock master %r', 735 master) 736 # if master in self.avatars would be natural, but it seems 737 # that for now due to the getClocking() calls etc we need to 738 # check against the componentMapper set. could (and probably 739 # should) be fixed in the future. 740 m = self.vishnu.getComponentMapper(master) 741 if m and m.avatar: 742 clocking = m.avatar.clocking 743 if clocking: 744 host, port, base_time = clocking 745 avatar.setClocking(host, port, base_time) 746 else: 747 self.warning('%r should provide a clock master ' 748 'but is not doing so', master) 749 # should we componentAvatar.provideMasterClock() ? 750 else: 751 self.debug('clock master not logged in yet, will ' 752 'set clocking later')
753
754 - def componentAttached(self, avatar):
755 # No need to wait for any of this, they are not interdependent 756 assert avatar.avatarId in self.avatars 757 self.feedMap.componentAttached(avatar) 758 self._setupClocking(avatar) 759 self._connectEatersAndFeeders(avatar)
760
761 - def componentDetached(self, avatar):
762 assert avatar.avatarId not in self.avatars 763 compsNeedingReconnect = self.feedMap.componentDetached(avatar) 764 if self.vishnu.running: 765 self.debug('will reconnect: %r', compsNeedingReconnect) 766 # FIXME: this will need revision when we have the 'feedTo' 767 # direction working 768 for comp, ffid in compsNeedingReconnect: 769 self._connectEaters(comp, ffid)
770
771 - def mapNetFeed(self, fromAvatar, toAvatar):
772 """ 773 @param fromAvatar: the avatar to connect from 774 @type fromAvatar: L{ComponentAvatar} 775 @param fromAvatar: the avatar to connect to 776 @type toAvatar: L{ComponentAvatar} 777 778 @returns: the host and port on which to make the connection to 779 toAvatar from fromAvatar 780 @rtype: tuple of (str, int or None) 781 """ 782 toHost = toAvatar.getClientAddress() 783 toPort = toAvatar.getFeedServerPort() # can be None 784 785 # FIXME: until network map is implemented, hack to assume that 786 # connections from what appears to us to be the same IP go 787 # through localhost instead. Allows connections between 788 # components on a worker behind a firewall, but not between 789 # components running on different workers, both behind a 790 # firewall 791 fromHost = fromAvatar.mind.broker.transport.getPeer().host 792 if fromHost == toHost: 793 toHost = '127.0.0.1' 794 795 self.debug('mapNetFeed from %r to %r: %s:%r', fromAvatar, toAvatar, 796 toHost, toPort) 797 return toHost, toPort
798
799 - def _connectFeederToEater(self, fromComp, fromFeed, 800 toComp, toFeed, method):
801 host, port = self.mapNetFeed(fromComp, toComp) 802 if port: 803 fullFeedId = toComp.getFullFeedId(toFeed) 804 proc = getattr(fromComp, method) 805 proc(fromFeed, fullFeedId, host, port) 806 else: 807 self.debug('postponing connection to %s: feed server ' 808 'unavailable', toComp.getFeedId(toFeed))
809
810 - def _connectEatersAndFeeders(self, avatar):
811 # FIXME: all connections are upstream for now 812 813 def always(otherComp): 814 return True
815 816 def never(otherComp): 817 return False
818 directions = [(self.feedMap.getFeedersForEaters, 819 always, 'eatFrom', 'feedTo'), 820 (self.feedMap.getEatersForFeeders, 821 never, 'feedTo', 'eatFrom')] 822 823 myComp = avatar 824 for getPeers, initiate, directMethod, reversedMethod in directions: 825 for myFeedName, otherComp, otherFeedName in getPeers(myComp): 826 if initiate(otherComp): 827 # we initiate the connection 828 self._connectFeederToEater(myComp, myFeedName, otherComp, 829 otherFeedName, directMethod) 830 else: 831 # make the other component initiate connection 832 self._connectFeederToEater(otherComp, otherFeedName, 833 myComp, myFeedName, 834 reversedMethod) 835
836 - def _connectEaters(self, avatar, ffid):
837 # FIXME: all connections are upstream for now 838 ffe = self.feedMap.getFeedersForEater(avatar, ffid) 839 for myFeedName, otherComp, otherFeedName in ffe: 840 self._connectFeederToEater(avatar, myFeedName, otherComp, 841 otherFeedName, 'eatFrom')
842