Package flumotion :: Package admin :: Module admin
[hide private]

Source Code for Module flumotion.admin.admin

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  model abstraction for administration clients supporting different views 
 20  """ 
 21   
 22  from twisted.internet import error, defer, reactor 
 23  from zope.interface import implements 
 24   
 25  from flumotion.common import common, errors, interfaces, log 
 26  from flumotion.common import medium 
 27  from flumotion.common import messages, signals 
 28  from flumotion.common import planet, worker # register jelly 
 29  from flumotion.common.i18n import N_, gettexter 
 30  from flumotion.configure import configure 
 31  from flumotion.twisted import pb as fpb 
 32   
 33  __version__ = "$Rev$" 
 34  T_ = gettexter() 
 35   
 36   
37 -class AdminClientFactory(fpb.ReconnectingFPBClientFactory):
38 perspectiveInterface = interfaces.IAdminMedium 39
40 - def __init__(self, medium, extraTenacious=False, maxDelay=20):
41 """ 42 @type medium: AdminModel 43 """ 44 fpb.ReconnectingFPBClientFactory.__init__(self) 45 self.medium = medium 46 self.maxDelay = maxDelay 47 48 self.extraTenacious = extraTenacious 49 self.hasBeenConnected = 0 50 self.hasBeenAuthenticated = 0 51 52 self._connector = None
53
54 - def startedConnecting(self, connector):
55 self._connector = connector 56 return fpb.ReconnectingFPBClientFactory.startedConnecting( 57 self, connector)
58
59 - def clientConnectionMade(self, broker):
60 self.hasBeenConnected = 1 61 62 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
63
64 - def clientConnectionLost(self, connector, reason):
65 """ 66 @type connector: implementation of 67 L{twisted.internet.interfaces.IConnector} 68 @param reason: L{twisted.spread.pb.failure.Failure} 69 """ 70 self.debug("Lost connection to %s: %s", 71 connector.getDestination(), log.getFailureMessage(reason)) 72 if self.hasBeenAuthenticated: 73 self.log("Have been authenticated before. Trying again.") 74 elif self.extraTenacious: 75 self.log("We are extra tenacious, trying again") 76 else: 77 self.log("Telling medium about connection failure") 78 self.medium.connectionFailed(reason) 79 return 80 81 RFC = fpb.ReconnectingFPBClientFactory 82 RFC.clientConnectionLost(self, connector, reason)
83
84 - def clientConnectionFailed(self, connector, reason):
85 """ 86 @type connector: implementation of 87 L{twisted.internet.interfaces.IConnector} 88 @param reason: L{twisted.spread.pb.failure.Failure} 89 """ 90 if reason.check(error.DNSLookupError): 91 self.debug('DNS lookup error') 92 if not self.extraTenacious: 93 self.medium.connectionFailed(reason) 94 return 95 elif (reason.check(error.ConnectionRefusedError) 96 or reason.check(error.ConnectError)): 97 # If we're logging in for the first time, we want to make this a 98 # real error; we present a dialog, etc. 99 # However, if we fail later on (e.g. manager shut down, and 100 # hasn't yet been restarted), we want to keep trying to reconnect, 101 # so we just log a message. 102 self.debug("Error connecting to %s: %s", 103 connector.getDestination(), 104 log.getFailureMessage(reason)) 105 if self.hasBeenConnected: 106 self.log("we've been connected before though, so going " 107 "to retry") 108 # fall through 109 elif self.extraTenacious: 110 self.log("trying again due to +100 tenacity") 111 # fall through 112 else: 113 self.log("telling medium about connection failure") 114 self.medium.connectionFailed(reason) 115 return 116 117 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self, 118 connector, reason)
119 120 # vmethod implementation 121
122 - def gotDeferredLogin(self, d):
123 124 def success(remote): 125 self.hasBeenAuthenticated = 1 126 self.medium.setRemoteReference(remote)
127 128 def error(failure): 129 if self.extraTenacious: 130 self.debug('connection problem to %s: %s', 131 self._connector.getDestination(), 132 log.getFailureMessage(failure)) 133 self.debug('we are tenacious, so trying again later') 134 self.disconnect() 135 elif failure.check(errors.ConnectionFailedError): 136 self.debug("emitting connection-failed") 137 self.medium.emit('connection-failed', "I failed my master") 138 self.debug("emitted connection-failed") 139 elif failure.check(errors.ConnectionRefusedError): 140 self.debug("emitting connection-refused") 141 self.medium.emit('connection-refused') 142 self.debug("emitted connection-refused") 143 elif failure.check(errors.NotAuthenticatedError): 144 # FIXME: unauthorized login emit ! 145 self.debug("emitting connection-refused") 146 self.medium.emit('connection-refused') 147 self.debug("emitted connection-refused") 148 else: 149 self.medium.emit('connection-error', failure) 150 self.warning('connection error to %s:: %s', 151 self._connector.getDestination(), 152 log.getFailureMessage(failure))
153 # swallow error 154 155 d.addCallbacks(success, error) 156 return d 157 158 # FIXME: stop using signals, we can provide a richer interface with actual 159 # objects and real interfaces for the views a model communicates with 160 161
162 -class AdminModel(medium.PingingMedium, signals.SignalMixin):
163 """ 164 I live in the admin client. 165 I am a data model for any admin view implementing a UI to 166 communicate with one manager. 167 I send signals when things happen. 168 169 Manager calls on us through L{flumotion.manager.admin.AdminAvatar} 170 """ 171 __signals__ = ('connected', 'disconnected', 'connection-refused', 172 'connection-failed', 'connection-error', 'reloading', 173 'message', 'update') 174 175 logCategory = 'adminmodel' 176 177 implements(interfaces.IAdminMedium) 178 179 # Public instance variables (read-only) 180 planet = None 181
182 - def __init__(self):
183 # All of these instance variables are private. Cuidado cabrones! 184 self.connectionInfo = None 185 self.keepTrying = None 186 self._writeConnection = True 187 188 self.managerId = '<uninitialized>' 189 190 self.connected = False 191 self.clientFactory = None 192 193 self._deferredConnect = None 194 195 self._components = {} # dict of components 196 self.planet = None 197 self._workerHeavenState = None
198
199 - def disconnectFromManager(self):
200 """ 201 Disconnects from the actual manager and frees the connection. 202 """ 203 if self.clientFactory: 204 # We are disconnecting, so we don't want to be 205 # notified by the model about it. 206 if self.remote: 207 self.remote.dontNotifyOnDisconnect(self._remoteDisconnected) 208 209 self.clientFactory.stopTrying() 210 211 self.clientFactory.disconnect() 212 self.clientFactory = None
213
214 - def connectToManager(self, connectionInfo, keepTrying=False, 215 writeConnection=True):
216 """ 217 Connects to the specified manager. 218 219 @param connectionInfo: data for establishing the connection 220 @type connectionInfo: a L{PBConnectionInfo} 221 @param keepTrying: when this is L{True} the Factory will try to 222 reconnect when it loses the connection 223 @type keepTrying: bool 224 @param writeConnection: when this is L{True} the connection is saved 225 for future uses on cache 226 @type writeConnection: bool 227 228 @rtype: L{twisted.internet.defer.Deferred} 229 """ 230 assert self.clientFactory is None 231 232 self.connectionInfo = connectionInfo 233 self._writeConnection = writeConnection 234 235 # give the admin an id unique to the manager -- if a program is 236 # adminning multiple managers, this id should tell them apart 237 # (and identify duplicates) 238 self.managerId = str(connectionInfo) 239 self.logName = self.managerId 240 241 self.info('Connecting to manager %s with %s', 242 self.managerId, connectionInfo.use_ssl and 'SSL' or 'TCP') 243 244 self.clientFactory = AdminClientFactory(self, 245 extraTenacious=keepTrying, 246 maxDelay=20) 247 self.clientFactory.startLogin(connectionInfo.authenticator) 248 249 if connectionInfo.use_ssl: 250 common.assertSSLAvailable() 251 from twisted.internet import ssl 252 reactor.connectSSL(connectionInfo.host, connectionInfo.port, 253 self.clientFactory, ssl.ClientContextFactory()) 254 else: 255 reactor.connectTCP(connectionInfo.host, connectionInfo.port, 256 self.clientFactory) 257 258 def connected(model, d): 259 # model is really "self". yay gobject? 260 d.callback(model)
261 262 def disconnected(model, d): 263 # can happen after setRemoteReference but before 264 # getPlanetState or getWorkerHeavenState returns 265 if not keepTrying: 266 d.errback(errors.ConnectionFailedError('Lost connection'))
267 268 def connection_refused(model, d): 269 if not keepTrying: 270 d.errback(errors.ConnectionRefusedError()) 271 272 def connection_failed(model, reason, d): 273 if not keepTrying: 274 d.errback(errors.ConnectionFailedError(reason)) 275 276 def connection_error(model, failure, d): 277 if not keepTrying: 278 d.errback(failure) 279 280 d = defer.Deferred() 281 ids = [] 282 ids.append(self.connect('connected', connected, d)) 283 ids.append(self.connect('disconnected', disconnected, d)) 284 ids.append(self.connect('connection-refused', connection_refused, d)) 285 ids.append(self.connect('connection-failed', connection_failed, d)) 286 ids.append(self.connect('connection-error', connection_error, d)) 287 288 def success(model): 289 map(self.disconnect, ids) 290 self._deferredConnect = None 291 return model 292 293 def failure(f): 294 map(self.disconnect, ids) 295 self._deferredConnect = None 296 return f 297 298 d.addCallbacks(success, failure) 299 self._deferredConnect = d 300 return d 301
302 - def bundleErrback(self, failure, fileName='<unknown>'):
303 """ 304 Handle all coding mistakes that could be triggered by loading bundles. 305 This is a convenience method to help in properly reporting problems. 306 The EntrySyntaxError should be caught and wrapped in a UI message, 307 with the message generated here as debug information. 308 309 @param failure: the failure to be handled 310 @type failure: L{twisted.python.failure.Failure} 311 @param filename: name of the file being loaded 312 @type filename: str 313 314 @raises: L{errors.EntrySyntaxError} 315 """ 316 try: 317 raise failure.value 318 except SyntaxError, e: 319 # the syntax error can happen in the entry file, or any import 320 where = getattr(e, 'filename', "<entry file>") 321 lineno = getattr(e, 'lineno', 0) 322 msg = "Syntax Error at %s:%d while executing %s" % ( 323 where, lineno, fileName) 324 self.warning(msg) 325 raise errors.EntrySyntaxError(msg) 326 except NameError, e: 327 msg = "NameError while executing %s: %s" % ( 328 fileName, " ".join(e.args)) 329 self.warning(msg) 330 raise errors.EntrySyntaxError(msg) 331 except ImportError, e: 332 msg = "ImportError while executing %s: %s" % (fileName, 333 " ".join(e.args)) 334 self.warning(msg) 335 raise errors.EntrySyntaxError(msg)
336
337 - def shutdown(self):
338 self.debug('shutting down') 339 if self.clientFactory is not None: 340 # order not semantically important, but this way we avoid a 341 # "reconnecting in X seconds" in the log 342 self.clientFactory.stopTrying() 343 self.clientFactory.disconnect() 344 self.clientFactory = None 345 346 if self._deferredConnect is not None: 347 # this can happen with keepTrying=True 348 self.debug('cancelling connection attempt') 349 self._deferredConnect.errback(errors.ConnectionCancelledError())
350
351 - def reconnect(self, keepTrying=False):
352 """Close any existing connection to the manager and 353 reconnect.""" 354 self.debug('asked to log in again') 355 self.shutdown() 356 return self.connectToManager(self.connectionInfo, keepTrying)
357 358 # FIXME: give these three sensible names 359
360 - def adminInfoStr(self):
361 return self.managerId
362
363 - def connectionInfoStr(self):
364 return '%s:%s (%s)' % (self.connectionInfo.host, 365 self.connectionInfo.port, 366 self.connectionInfo.use_ssl 367 and 'https' or 'http')
368 369 # used in fgc 370
371 - def managerInfoStr(self):
372 assert self.planet 373 return '%s (%s)' % (self.planet.get('name'), self.managerId)
374
375 - def connectionFailed(self, failure):
376 # called by client factory 377 if failure.check(error.DNSLookupError): 378 message = ("Could not look up host '%s'." 379 % self.connectionInfo.host) 380 elif failure.check(error.ConnectionRefusedError): 381 message = ("Could not connect to host '%s' on port %d." 382 % (self.connectionInfo.host, 383 self.connectionInfo.port)) 384 else: 385 message = ("Unexpected failure.\nDebug information: %s" 386 % log.getFailureMessage(failure)) 387 self.debug('emitting connection-failed') 388 self.emit('connection-failed', message) 389 self.debug('emitted connection-failed')
390
391 - def setRemoteReference(self, remoteReference):
392 self.debug("setRemoteReference %r", remoteReference) 393 394 def gotPlanetState(planet): 395 self.planet = planet 396 # monkey, Monkey, MONKEYPATCH!!!!! 397 self.planet.admin = self 398 self.debug('got planet state') 399 return self.callRemote('getWorkerHeavenState')
400 401 def gotWorkerHeavenState(whs): 402 self._workerHeavenState = whs 403 self.debug('got worker state') 404 405 self.debug('Connected to manager and retrieved all state') 406 self.connected = True 407 if self._writeConnection: 408 writeConnection() 409 self.emit('connected') 410 411 def writeConnection(): 412 i = self.connectionInfo 413 if not (i.authenticator.username 414 and i.authenticator.password): 415 self.log('not caching connection information') 416 return 417 s = ''.join(['<connection>', 418 '<host>%s</host>' % i.host, 419 '<manager>%s</manager>' % self.planet.get('name'), 420 '<port>%d</port>' % i.port, 421 '<use_insecure>%d</use_insecure>' 422 % ((not i.use_ssl) and 1 or 0), 423 '<user>%s</user>' % i.authenticator.username, 424 '<passwd>%s</passwd>' % i.authenticator.password, 425 '</connection>']) 426 427 import os 428 from flumotion.common import python 429 md5sum = python.md5(s).hexdigest() 430 f = os.path.join(configure.registrydir, '%s.connection' % md5sum) 431 try: 432 h = open(f, 'w') 433 h.write(s) 434 h.close() 435 except Exception, e: 436 self.info('failed to write connection cache file %s: %s', 437 f, log.getExceptionMessage(e)) 438 439 # chain up 440 medium.PingingMedium.setRemoteReference(self, remoteReference) 441 442 # fixme: push the disconnect notification upstream 443 444 self.remote.notifyOnDisconnect(self._remoteDisconnected) 445 446 d = self.callRemote('getPlanetState') 447 d.addCallback(gotPlanetState) 448 d.addCallback(gotWorkerHeavenState) 449 return d 450 451 ### model functions; called by UI's to send requests to manager or comp 452 453 ## view management functions 454
455 - def isConnected(self):
456 return self.connected
457 458 ## generic remote call methods 459
460 - def componentCallRemote(self, componentState, methodName, *args, **kwargs):
461 """ 462 Call the given method on the given component with the given args. 463 464 @param componentState: component to call the method on 465 @type componentState: L{flumotion.common.planet.AdminComponentState} 466 @param methodName: name of method to call; serialized to a 467 remote_methodName on the worker's medium 468 469 @rtype: L{twisted.internet.defer.Deferred} 470 """ 471 d = self.callRemote('componentCallRemote', 472 componentState, methodName, 473 *args, **kwargs) 474 475 def errback(failure): 476 msg = None 477 if failure.check(errors.NoMethodError): 478 msg = "Remote method '%s' does not exist." % methodName 479 msg += "\n" + failure.value 480 else: 481 msg = log.getFailureMessage(failure) 482 483 # FIXME: we probably need a nicer way of getting component 484 # messages shown from the admin model, but this allows us to 485 # make sure every type of admin has these messages 486 self.warning(msg) 487 m = messages.Warning(T_(N_("Internal error in component.")), 488 debug=msg) 489 componentState.observe_append('messages', m) 490 return failure
491 492 d.addErrback(errback) 493 # FIXME: dialog for other errors ? 494 return d 495
496 - def workerCallRemote(self, workerName, methodName, *args, **kwargs):
497 """ 498 Call the the given method on the given worker with the given args. 499 500 @param workerName: name of the worker to call the method on 501 @param methodName: name of method to call; serialized to a 502 remote_methodName on the worker's medium 503 504 @rtype: L{twisted.internet.defer.Deferred} 505 """ 506 return self.callRemote('workerCallRemote', workerName, 507 methodName, *args, **kwargs)
508 509 ## manager remote methods 510
511 - def loadConfiguration(self, xml_string):
512 return self.callRemote('loadConfiguration', xml_string)
513
514 - def getConfiguration(self):
515 return self.callRemote('getConfiguration')
516
517 - def getScenarios(self):
518 """ 519 Obtains the available scenarios from the manager. 520 521 @rtype: L{twisted.internet.defer.Deferred} 522 """ 523 return self.callRemote('getScenarios')
524
525 - def getScenarioByType(self, type):
526 """ 527 Obtains an scenario given its type. 528 529 @rtype: L{twisted.internet.defer.Deferred} 530 """ 531 return self.callRemote('getScenarioByType', type)
532
533 - def cleanComponents(self):
534 return self.callRemote('cleanComponents')
535 536 ## worker remote methods 537
538 - def checkElements(self, workerName, elements):
539 return self.workerCallRemote(workerName, 'checkElements', elements)
540
541 - def checkImport(self, workerName, moduleName):
542 return self.workerCallRemote(workerName, 'checkImport', moduleName)
543
544 - def workerRun(self, workerName, moduleName, functionName, *args, **kwargs):
545 """ 546 Run the given function and args on the given worker. If the 547 worker does not already have the module, or it is out of date, 548 it will be retrieved from the manager. 549 550 @rtype: L{twisted.internet.defer.Deferred} firing an 551 L{flumotion.common.messages.Result} 552 """ 553 return self.workerCallRemote(workerName, 'runFunction', moduleName, 554 functionName, *args, **kwargs)
555
556 - def getWizardEntries(self, wizardTypes=None, provides=None, accepts=None):
557 return self.callRemote('getWizardEntries', 558 wizardTypes, provides, accepts)
559
560 - def getWorkerHeavenState(self):
561 return self._workerHeavenState
562
563 - def _remoteDisconnected(self, remoteReference):
564 self.debug("emitting disconnected") 565 self.connected = False 566 self.emit('disconnected') 567 self.debug("emitted disconnected")
568