Package flumotion :: Package twisted :: Module pb
[hide private]

Source Code for Module flumotion.twisted.pb

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_pb -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  Flumotion Perspective Broker using keycards 
 20   
 21  Inspired by L{twisted.spread.pb} 
 22  """ 
 23   
 24  from twisted.cred.portal import IRealm, Portal 
 25  from twisted.internet import protocol, defer 
 26  from twisted.internet import error as terror 
 27  from twisted.python import log, reflect 
 28  from twisted.spread import pb, flavors 
 29  from twisted.spread.pb import PBClientFactory 
 30  from zope.interface import implements 
 31   
 32  from flumotion.configure import configure 
 33  from flumotion.common import keycards, errors 
 34  from flumotion.common import log as flog 
 35  from flumotion.common.netutils import addressGetHost 
 36  from flumotion.twisted import reflect as freflect 
 37  from flumotion.twisted.compat import reactor 
 38   
 39  __version__ = "$Rev$" 
 40   
 41   
 42  # TODO: 
 43  #   merge FMCF back into twisted 
 44   
 45  ### Keycard-based FPB objects 
 46   
 47  # we made three changes to the standard PBClientFactory: 
 48  # 1) the root object has a getKeycardClasses() call that the server 
 49  #    uses to tell clients about the interfaces it supports 
 50  # 2) you can request a specific interface for the avatar to 
 51  #    implement, instead of only IPerspective 
 52  # 3) you send in a keycard, on which you can set a preference for an avatarId 
 53  # this way you can request a different avatarId than the user you authenticate 
 54  # with, or you can login without a username 
 55   
 56   
57 -class FPBClientFactory(pb.PBClientFactory, flog.Loggable):
58 """ 59 I am an extended Perspective Broker client factory using generic 60 keycards for login. 61 62 63 @ivar keycard: the keycard used last for logging in; set after 64 self.login has completed 65 @type keycard: L{keycards.Keycard} 66 @ivar medium: the client-side referenceable for the PB server 67 to call on, and for the client to call to the 68 PB server 69 @type medium: L{flumotion.common.medium.BaseMedium} 70 @ivar perspectiveInterface: the interface we want to request a perspective 71 for 72 @type perspectiveInterface: subclass of 73 L{flumotion.common.interfaces.IMedium} 74 """ 75 logCategory = "FPBClientFactory" 76 keycard = None 77 medium = None 78 perspectiveInterface = None # override in subclass 79 _fpbconnector = None 80 81 ## from protocol.ClientFactory 82
83 - def startedConnecting(self, connector):
84 self._fpbconnector = connector 85 return pb.PBClientFactory.startedConnecting(self, connector)
86 87 ## from twisted.spread.pb.ClientFactory 88
89 - def disconnect(self):
90 if self._fpbconnector: 91 try: 92 self._fpbconnector.stopConnecting() 93 except terror.NotConnectingError: 94 pass 95 return pb.PBClientFactory.disconnect(self)
96
97 - def getKeycardClasses(self):
98 """ 99 Ask the remote PB server for all the keycard interfaces it supports. 100 101 @rtype: L{twisted.internet.defer.Deferred} returning list of str 102 """ 103 104 def getRootObjectCb(root): 105 return root.callRemote('getKeycardClasses')
106 107 d = self.getRootObject() 108 d.addCallback(getRootObjectCb) 109 return d
110
111 - def login(self, authenticator):
112 """ 113 Login, respond to challenges, and eventually get perspective 114 from remote PB server. 115 116 Currently only credentials implementing IUsernamePassword are 117 supported. 118 119 @return: Deferred of RemoteReference to the perspective. 120 """ 121 assert authenticator, "I really do need an authenticator" 122 assert not isinstance(authenticator, keycards.Keycard) 123 interfaces = [] 124 if self.perspectiveInterface: 125 self.debug('perspectiveInterface is %r' % 126 self.perspectiveInterface) 127 interfaces.append(self.perspectiveInterface) 128 else: 129 self.warning('No perspectiveInterface set on %r' % self) 130 if not pb.IPerspective in interfaces: 131 interfaces.append(pb.IPerspective) 132 interfaces = [reflect.qual(interface) 133 for interface in interfaces] 134 135 def getKeycardClassesCb(keycardClasses): 136 self.log('supported keycard classes: %r' % keycardClasses) 137 d = authenticator.issue(keycardClasses) 138 return d
139 140 def issueCb(keycard): 141 self.keycard = keycard 142 self.debug('using keycard: %r' % self.keycard) 143 return self.keycard 144 145 d = self.getKeycardClasses() 146 d.addCallback(getKeycardClassesCb) 147 d.addCallback(issueCb) 148 d.addCallback(lambda r: self.getRootObject()) 149 d.addCallback(self._cbSendKeycard, authenticator, self.medium, 150 interfaces) 151 return d 152 153 # we are a different kind of PB client, so warn 154
155 - def _cbSendUsername(self, root, username, password, 156 avatarId, client, interfaces):
157 self.warning("you really want to use cbSendKeycard")
158
159 - def _cbSendKeycard(self, root, authenticator, client, interfaces, count=0):
160 self.log("_cbSendKeycard(root=%r, authenticator=%r, client=%r, " 161 "interfaces=%r, count=%d", root, authenticator, client, 162 interfaces, count) 163 count = count + 1 164 d = root.callRemote("login", self.keycard, client, *interfaces) 165 return d.addCallback(self._cbLoginCallback, root, 166 authenticator, client, interfaces, count)
167 168 # we can get either a keycard, None (?) or a remote reference 169
170 - def _cbLoginCallback(self, result, root, authenticator, client, interfaces, 171 count):
172 if count > 5: 173 # too many recursions, server is h0rked 174 self.warning('Too many recursions, internal error.') 175 self.log("FPBClientFactory(): result %r" % result) 176 177 if isinstance(result, pb.RemoteReference): 178 # everything done, return reference 179 self.debug('login successful, returning %r', result) 180 return result 181 182 # must be a keycard 183 keycard = result 184 if not keycard.state == keycards.AUTHENTICATED: 185 self.log("FPBClientFactory(): requester needs to resend %r", 186 keycard) 187 d = authenticator.respond(keycard) 188 189 def _loginAgainCb(keycard): 190 d = root.callRemote("login", keycard, client, *interfaces) 191 return d.addCallback(self._cbLoginCallback, root, 192 authenticator, client, 193 interfaces, count)
194 d.addCallback(_loginAgainCb) 195 return d 196 197 self.debug("FPBClientFactory(): authenticated %r" % keycard) 198 return keycard 199 200
201 -class ReconnectingPBClientFactory(pb.PBClientFactory, flog.Loggable, 202 protocol.ReconnectingClientFactory):
203 """ 204 Reconnecting client factory for normal PB brokers. 205 206 Users of this factory call startLogin to start logging in, and should 207 override getLoginDeferred to get the deferred returned from the PB server 208 for each login attempt. 209 """ 210
211 - def __init__(self):
212 pb.PBClientFactory.__init__(self) 213 self._doingLogin = False
214
215 - def clientConnectionFailed(self, connector, reason):
216 log.msg("connection failed to %s, reason %r" % ( 217 connector.getDestination(), reason)) 218 pb.PBClientFactory.clientConnectionFailed(self, connector, reason) 219 RCF = protocol.ReconnectingClientFactory 220 RCF.clientConnectionFailed(self, connector, reason)
221
222 - def clientConnectionLost(self, connector, reason):
223 log.msg("connection lost to %s, reason %r" % ( 224 connector.getDestination(), reason)) 225 pb.PBClientFactory.clientConnectionLost(self, connector, reason, 226 reconnecting=True) 227 RCF = protocol.ReconnectingClientFactory 228 RCF.clientConnectionLost(self, connector, reason)
229
230 - def clientConnectionMade(self, broker):
231 log.msg("connection made") 232 self.resetDelay() 233 pb.PBClientFactory.clientConnectionMade(self, broker) 234 if self._doingLogin: 235 d = self.login(self._credentials, self._client) 236 self.gotDeferredLogin(d)
237
238 - def startLogin(self, credentials, client=None):
239 self._credentials = credentials 240 self._client = client 241 242 self._doingLogin = True
243 244 # methods to override 245
246 - def gotDeferredLogin(self, deferred):
247 """ 248 The deferred from login is now available. 249 """ 250 raise NotImplementedError
251 252
253 -class ReconnectingFPBClientFactory(FPBClientFactory, 254 protocol.ReconnectingClientFactory):
255 """ 256 Reconnecting client factory for FPB brokers (using keycards for login). 257 258 Users of this factory call startLogin to start logging in. 259 Override getLoginDeferred to get a handle to the deferred returned 260 from the PB server. 261 """ 262
263 - def __init__(self):
264 FPBClientFactory.__init__(self) 265 self._doingLogin = False 266 self._doingGetPerspective = False
267
268 - def clientConnectionFailed(self, connector, reason):
269 log.msg("connection failed to %s, reason %r" % ( 270 connector.getDestination(), reason)) 271 FPBClientFactory.clientConnectionFailed(self, connector, reason) 272 RCF = protocol.ReconnectingClientFactory 273 RCF.clientConnectionFailed(self, connector, reason) 274 if self.continueTrying: 275 self.debug("will try reconnect in %f seconds", self.delay) 276 else: 277 self.debug("not trying to reconnect")
278
279 - def clientConnectionLost(self, connector, reason):
280 log.msg("connection lost to %s, reason %r" % ( 281 connector.getDestination(), reason)) 282 FPBClientFactory.clientConnectionLost(self, connector, reason, 283 reconnecting=True) 284 RCF = protocol.ReconnectingClientFactory 285 RCF.clientConnectionLost(self, connector, reason)
286
287 - def clientConnectionMade(self, broker):
288 log.msg("connection made") 289 self.resetDelay() 290 FPBClientFactory.clientConnectionMade(self, broker) 291 if self._doingLogin: 292 d = self.login(self._authenticator) 293 self.gotDeferredLogin(d)
294 295 # TODO: This is a poorly named method; it just provides the appropriate 296 # authentication information, and doesn't actually _start_ login at all. 297
298 - def startLogin(self, authenticator):
299 assert not isinstance(authenticator, keycards.Keycard) 300 self._authenticator = authenticator 301 self._doingLogin = True
302 303 # methods to override 304
305 - def gotDeferredLogin(self, deferred):
306 """ 307 The deferred from login is now available. 308 """ 309 raise NotImplementedError
310 311 ### FIXME: this code is an adaptation of twisted/spread/pb.py 312 # it allows you to login to a FPB server requesting interfaces other than 313 # IPerspective. 314 # in other terms, you can request different "kinds" of avatars from the same 315 # PB server. 316 # this code needs to be sent upstream to Twisted 317 318
319 -class _FPortalRoot:
320 """ 321 Root object, used to login to bouncer. 322 """ 323 324 implements(flavors.IPBRoot) 325
326 - def __init__(self, bouncerPortal):
327 """ 328 @type bouncerPortal: L{flumotion.twisted.portal.BouncerPortal} 329 """ 330 self.bouncerPortal = bouncerPortal
331
332 - def rootObject(self, broker):
333 return _BouncerWrapper(self.bouncerPortal, broker)
334 335
336 -class _BouncerWrapper(pb.Referenceable, flog.Loggable):
337 338 logCategory = "_BouncerWrapper" 339
340 - def __init__(self, bouncerPortal, broker):
341 self.bouncerPortal = bouncerPortal 342 self.broker = broker
343
344 - def remote_getKeycardClasses(self):
345 """ 346 @returns: the fully-qualified class names of supported keycard 347 interfaces 348 @rtype: L{twisted.internet.defer.Deferred} firing list of str 349 """ 350 return self.bouncerPortal.getKeycardClasses()
351
352 - def remote_login(self, keycard, mind, *interfaces):
353 """ 354 Start of keycard login. 355 356 @param interfaces: list of fully qualified names of interface objects 357 358 @returns: one of 359 - a L{flumotion.common.keycards.Keycard} when more steps 360 need to be performed 361 - a L{twisted.spread.pb.AsReferenceable} when authentication 362 has succeeded, which will turn into a 363 L{twisted.spread.pb.RemoteReference} on the client side 364 - a L{flumotion.common.errors.NotAuthenticatedError} when 365 authentication is denied 366 """ 367 368 def loginResponse(result): 369 self.log("loginResponse: result=%r", result) 370 # if the result is a keycard, we're not yet ready 371 if isinstance(result, keycards.Keycard): 372 return result 373 else: 374 # authenticated, so the result is the tuple 375 interface, perspective, logout = result 376 self.broker.notifyOnDisconnect(logout) 377 return pb.AsReferenceable(perspective, "perspective")
378 379 # corresponds with FPBClientFactory._cbSendKeycard 380 self.log("remote_login(keycard=%s, *interfaces=%r" % ( 381 keycard, interfaces)) 382 interfaces = [freflect.namedAny(interface) for interface in interfaces] 383 d = self.bouncerPortal.login(keycard, mind, *interfaces) 384 d.addCallback(loginResponse) 385 return d
386 387
388 -class Authenticator(flog.Loggable, pb.Referenceable):
389 """ 390 I am an object used by FPB clients to create keycards for me 391 and respond to challenges. 392 393 I encapsulate keycard-related data, plus secrets which are used locally 394 and not put on the keycard. 395 396 I can be serialized over PB connections to a RemoteReference and then 397 adapted with RemoteAuthenticator to present the same interface. 398 399 @cvar username: a username to log in with 400 @type username: str 401 @cvar password: a password to log in with 402 @type password: str 403 @cvar address: an address to log in from 404 @type address: str 405 @cvar avatarId: the avatarId we want to request from the PB server 406 @type avatarId: str 407 """ 408 logCategory = "authenticator" 409 410 avatarId = None 411 412 username = None 413 password = None 414 address = None 415 ttl = 30 416 # FIXME: we can add ssh keys and similar here later on 417
418 - def __init__(self, **kwargs):
419 for key in kwargs: 420 setattr(self, key, kwargs[key])
421
422 - def issue(self, keycardClasses):
423 """ 424 Issue a keycard that implements one of the given interfaces. 425 426 @param keycardClasses: list of fully qualified keycard classes 427 @type keycardClasses: list of str 428 429 @rtype: L{twisted.internet.defer.Deferred} firing L{keycards.Keycard} 430 """ 431 # this method returns a deferred so we present the same interface 432 # as the RemoteAuthenticator adapter 433 434 # construct a list of keycard interfaces we can support right now 435 supported = [] 436 # address is allowed to be None 437 if self.username is not None and self.password is not None: 438 # We only want to support challenge-based keycards, for 439 # security. Maybe later we want this to be configurable 440 # supported.append(keycards.KeycardUACPP) 441 supported.append(keycards.KeycardUACPCC) 442 supported.append(keycards.KeycardUASPCC) 443 444 # expand to fully qualified names 445 supported = [reflect.qual(k) for k in supported] 446 447 for i in keycardClasses: 448 if i in supported: 449 self.log('Keycard interface %s supported, looking up', i) 450 name = i.split(".")[-1] 451 methodName = "issue_%s" % name 452 method = getattr(self, methodName) 453 keycard = method() 454 self.debug('Issuing keycard %r of class %s', keycard, 455 name) 456 keycard.avatarId = self.avatarId 457 if self.ttl is not None: 458 keycard.ttl = self.ttl 459 return defer.succeed(keycard) 460 461 self.debug('Could not issue a keycard') 462 return defer.succeed(None)
463 464 # non-challenge types 465
466 - def issue_KeycardUACPP(self):
467 return keycards.KeycardUACPP(self.username, self.password, 468 self.address)
469
470 - def issue_KeycardGeneric(self):
475 # challenge types 476
477 - def issue_KeycardUACPCC(self):
478 return keycards.KeycardUACPCC(self.username, self.address)
479
480 - def issue_KeycardUASPCC(self):
481 return keycards.KeycardUASPCC(self.username, self.address)
482
483 - def respond(self, keycard):
484 """ 485 Respond to a challenge on the given keycard, based on the secrets 486 we have. 487 488 @param keycard: the keycard with the challenge to respond to 489 @type keycard: L{keycards.Keycard} 490 491 @rtype: L{twisted.internet.defer.Deferred} firing 492 a {keycards.Keycard} 493 @returns: a deferred firing the keycard with a response set 494 """ 495 self.debug('responding to challenge on keycard %r' % keycard) 496 methodName = "respond_%s" % keycard.__class__.__name__ 497 method = getattr(self, methodName) 498 return defer.succeed(method(keycard))
499
500 - def respond_KeycardUACPCC(self, keycard):
501 self.log('setting password') 502 keycard.setPassword(self.password) 503 return keycard
504
505 - def respond_KeycardUASPCC(self, keycard):
506 self.log('setting password') 507 keycard.setPassword(self.password) 508 return keycard
509 510 ### pb.Referenceable methods 511
512 - def remote_issue(self, interfaces):
513 return self.issue(interfaces)
514
515 - def remote_respond(self, keycard):
516 return self.respond(keycard)
517 518
519 -class RemoteAuthenticator:
520 """ 521 I am an adapter for a pb.RemoteReference to present the same interface 522 as L{Authenticator} 523 """ 524 525 avatarId = None # not serialized 526 username = None # for convenience, will always be None 527 password = None # for convenience, will always be None 528
529 - def __init__(self, remoteReference):
530 self._remote = remoteReference
531
532 - def copy(self, avatarId=None):
533 ret = RemoteAuthenticator(self._remote) 534 ret.avatarId = avatarId or self.avatarId 535 return ret
536
537 - def issue(self, interfaces):
538 539 def issueCb(keycard): 540 keycard.avatarId = self.avatarId 541 return keycard
542 543 d = self._remote.callRemote('issue', interfaces) 544 d.addCallback(issueCb) 545 return d
546
547 - def respond(self, keycard):
548 return self._remote.callRemote('respond', keycard)
549 550
551 -class Referenceable(pb.Referenceable, flog.Loggable):
552 """ 553 @cvar remoteLogName: name to use to log the other side of the connection 554 @type remoteLogName: str 555 """ 556 logCategory = 'referenceable' 557 remoteLogName = 'remote' 558 559 560 # a referenceable that logs receiving remote messages 561
562 - def remoteMessageReceived(self, broker, message, args, kwargs):
563 args = broker.unserialize(args) 564 kwargs = broker.unserialize(kwargs) 565 method = getattr(self, "remote_%s" % message, None) 566 if method is None: 567 raise pb.NoSuchMethod("No such method: remote_%s" % (message, )) 568 569 level = flog.DEBUG 570 if message == 'ping': 571 level = flog.LOG 572 573 debugClass = self.logCategory.upper() 574 # all this malarkey is to avoid actually interpolating variables 575 # if it is not needed 576 startArgs = [self.remoteLogName, debugClass, message] 577 formatString, debugArgs = flog.getFormatArgs( 578 '%s --> %s: remote_%s(', startArgs, 579 ')', (), args, kwargs) 580 # log going into the method 581 logKwArgs = self.doLog(level, method, formatString, *debugArgs) 582 583 # invoke the remote_ method 584 d = defer.maybeDeferred(method, *args, **kwargs) 585 586 # log coming out of the method 587 588 def callback(result): 589 formatString, debugArgs = flog.getFormatArgs( 590 '%s <-- %s: remote_%s(', startArgs, 591 '): %r', (flog.ellipsize(result), ), args, kwargs) 592 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs) 593 return result
594 595 def errback(failure): 596 formatString, debugArgs = flog.getFormatArgs( 597 '%s <-- %s: remote_%s(', startArgs, 598 '): failure %r', (failure, ), args, kwargs) 599 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs) 600 return failure
601 602 d.addCallbacks(callback, errback) 603 return broker.serialize(d, self.perspective) 604 605
606 -class Avatar(pb.Avatar, flog.Loggable):
607 """ 608 @cvar remoteLogName: name to use to log the other side of the connection 609 @type remoteLogName: str 610 """ 611 logCategory = 'avatar' 612 remoteLogName = 'remote' 613
614 - def __init__(self, avatarId):
615 self.avatarId = avatarId 616 self.logName = avatarId 617 self.mind = None 618 self.debug("created new Avatar with id %s", avatarId)
619 620 # a referenceable that logs receiving remote messages 621
622 - def perspectiveMessageReceived(self, broker, message, args, kwargs):
623 args = broker.unserialize(args) 624 kwargs = broker.unserialize(kwargs) 625 return self.perspectiveMessageReceivedUnserialised(broker, message, 626 args, kwargs)
627
628 - def perspectiveMessageReceivedUnserialised(self, broker, message, 629 args, kwargs):
630 method = getattr(self, "perspective_%s" % message, None) 631 if method is None: 632 raise pb.NoSuchMethod("No such method: perspective_%s" % ( 633 message, )) 634 635 level = flog.DEBUG 636 if message == 'ping': 637 level = flog.LOG 638 debugClass = self.logCategory.upper() 639 startArgs = [self.remoteLogName, debugClass, message] 640 formatString, debugArgs = flog.getFormatArgs( 641 '%s --> %s: perspective_%s(', startArgs, 642 ')', (), args, kwargs) 643 # log going into the method 644 logKwArgs = self.doLog(level, method, formatString, *debugArgs) 645 646 # invoke the perspective_ method 647 d = defer.maybeDeferred(method, *args, **kwargs) 648 649 # log coming out of the method 650 651 def callback(result): 652 formatString, debugArgs = flog.getFormatArgs( 653 '%s <-- %s: perspective_%s(', startArgs, 654 '): %r', (flog.ellipsize(result), ), args, kwargs) 655 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs) 656 return result
657 658 def errback(failure): 659 formatString, debugArgs = flog.getFormatArgs( 660 '%s <-- %s: perspective_%s(', startArgs, 661 '): failure %r', (failure, ), args, kwargs) 662 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs) 663 return failure
664 665 d.addCallbacks(callback, errback) 666 667 return broker.serialize(d, self, method, args, kwargs) 668
669 - def setMind(self, mind):
670 """ 671 Tell the avatar that the given mind has been attached. 672 This gives the avatar a way to call remotely to the client that 673 requested this avatar. 674 675 It is best to call setMind() from within the avatar's __init__ 676 method. Some old code still does this via a callLater, however. 677 678 @type mind: L{twisted.spread.pb.RemoteReference} 679 """ 680 self.mind = mind 681 682 def nullMind(x): 683 self.debug('%r: disconnected from %r' % (self, self.mind)) 684 self.mind = None
685 self.mind.notifyOnDisconnect(nullMind) 686 687 transport = self.mind.broker.transport 688 tarzan = transport.getHost() 689 jane = transport.getPeer() 690 if tarzan and jane: 691 self.debug( 692 "PB client connection seen by me is from me %s to %s" % ( 693 addressGetHost(tarzan), 694 addressGetHost(jane))) 695 self.log('Client attached is mind %s', mind) 696
697 - def mindCallRemoteLogging(self, level, stackDepth, name, *args, 698 **kwargs):
699 """ 700 Call the given remote method, and log calling and returning nicely. 701 702 @param level: the level we should log at (log.DEBUG, log.INFO, etc) 703 @type level: int 704 @param stackDepth: the number of stack frames to go back to get 705 file and line information, negative or zero. 706 @type stackDepth: non-positive int 707 @param name: name of the remote method 708 @type name: str 709 """ 710 if level is not None: 711 debugClass = str(self.__class__).split(".")[-1].upper() 712 startArgs = [self.remoteLogName, debugClass, name] 713 formatString, debugArgs = flog.getFormatArgs( 714 '%s --> %s: callRemote(%s, ', startArgs, 715 ')', (), args, kwargs) 716 logKwArgs = self.doLog(level, stackDepth - 1, formatString, 717 *debugArgs) 718 719 if not self.mind: 720 self.warning('Tried to mindCallRemote(%s), but we are ' 721 'disconnected', name) 722 return defer.fail(errors.NotConnectedError()) 723 724 def callback(result): 725 formatString, debugArgs = flog.getFormatArgs( 726 '%s <-- %s: callRemote(%s, ', startArgs, 727 '): %r', (flog.ellipsize(result), ), args, kwargs) 728 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs) 729 return result
730 731 def errback(failure): 732 formatString, debugArgs = flog.getFormatArgs( 733 '%s <-- %s: callRemote(%s, ', startArgs, 734 '): %r', (failure, ), args, kwargs) 735 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs) 736 return failure 737 738 d = self.mind.callRemote(name, *args, **kwargs) 739 if level is not None: 740 d.addCallbacks(callback, errback) 741 return d 742
743 - def mindCallRemote(self, name, *args, **kwargs):
744 """ 745 Call the given remote method, and log calling and returning nicely. 746 747 @param name: name of the remote method 748 @type name: str 749 """ 750 return self.mindCallRemoteLogging(flog.DEBUG, -1, name, *args, 751 **kwargs)
752
753 - def disconnect(self):
754 """ 755 Disconnect the remote PB client. If we are already disconnected, 756 do nothing. 757 """ 758 if self.mind: 759 return self.mind.broker.transport.loseConnection()
760 761
762 -class PingableAvatar(Avatar):
763 _pingCheckInterval = (configure.heartbeatInterval * 764 configure.pingTimeoutMultiplier) 765
766 - def __init__(self, avatarId, clock=reactor):
767 self._clock = clock 768 Avatar.__init__(self, avatarId)
769
770 - def perspectiveMessageReceivedUnserialised(self, broker, message, 771 args, kwargs):
772 self._lastPing = self._clock.seconds() 773 return Avatar.perspectiveMessageReceivedUnserialised( 774 self, broker, message, args, kwargs)
775
776 - def perspective_ping(self):
777 return defer.succeed(True)
778
779 - def mindCallRemoteLogging(self, level, stackDepth, name, *args, 780 **kwargs):
781 d = Avatar.mindCallRemoteLogging(self, level, stackDepth, name, *args, 782 **kwargs) 783 784 def cb(result): 785 self._lastPing = self._clock.seconds() 786 return result
787 d.addCallback(cb) 788 return d
789
790 - def startPingChecking(self, disconnect):
791 self._lastPing = self._clock.seconds() 792 self._pingCheckDisconnect = disconnect 793 self._pingCheck()
794
795 - def _pingCheck(self):
796 self._pingCheckDC = None 797 if self._clock.seconds() - self._lastPing > self._pingCheckInterval: 798 self.info('no ping in %f seconds, closing connection', 799 self._pingCheckInterval) 800 self._pingCheckDisconnect() 801 else: 802 self._pingCheckDC = self._clock.callLater(self._pingCheckInterval, 803 self._pingCheck)
804
805 - def stopPingChecking(self):
806 if self._pingCheckDC: 807 self._pingCheckDC.cancel() 808 self._pingCheckDC = None 809 810 # release the disconnect function, too, to help break any 811 # potential cycles 812 self._pingCheckDisconnect = None
813
814 - def setMind(self, mind):
815 # chain up 816 Avatar.setMind(self, mind) 817 818 def stopPingCheckingCb(x): 819 self.debug('stop pinging') 820 self.stopPingChecking()
821 self.mind.notifyOnDisconnect(stopPingCheckingCb) 822 823 # Now we have a remote reference, so start checking pings. 824 825 def _disconnect(): 826 if self.mind: 827 self.mind.broker.transport.loseConnection() 828 self.startPingChecking(_disconnect) 829