1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 model abstraction for administration clients supporting different views
20 """
21
22 from twisted.internet import error, defer, reactor
23 from zope.interface import implements
24
25 from flumotion.common import common, errors, interfaces, log
26 from flumotion.common import medium
27 from flumotion.common import messages, signals
28 from flumotion.common import planet, worker
29 from flumotion.common.i18n import N_, gettexter
30 from flumotion.configure import configure
31 from flumotion.twisted import pb as fpb
32
33 __version__ = "$Rev$"
34 T_ = gettexter()
35
36
38 perspectiveInterface = interfaces.IAdminMedium
39
40 - def __init__(self, medium, extraTenacious=False, maxDelay=20):
53
58
63
65 """
66 @type connector: implementation of
67 L{twisted.internet.interfaces.IConnector}
68 @param reason: L{twisted.spread.pb.failure.Failure}
69 """
70 self.debug("Lost connection to %s: %s",
71 connector.getDestination(), log.getFailureMessage(reason))
72 if self.hasBeenAuthenticated:
73 self.log("Have been authenticated before. Trying again.")
74 elif self.extraTenacious:
75 self.log("We are extra tenacious, trying again")
76 else:
77 self.log("Telling medium about connection failure")
78 self.medium.connectionFailed(reason)
79 return
80
81 RFC = fpb.ReconnectingFPBClientFactory
82 RFC.clientConnectionLost(self, connector, reason)
83
85 """
86 @type connector: implementation of
87 L{twisted.internet.interfaces.IConnector}
88 @param reason: L{twisted.spread.pb.failure.Failure}
89 """
90 if reason.check(error.DNSLookupError):
91 self.debug('DNS lookup error')
92 if not self.extraTenacious:
93 self.medium.connectionFailed(reason)
94 return
95 elif (reason.check(error.ConnectionRefusedError)
96 or reason.check(error.ConnectError)):
97
98
99
100
101
102 self.debug("Error connecting to %s: %s",
103 connector.getDestination(),
104 log.getFailureMessage(reason))
105 if self.hasBeenConnected:
106 self.log("we've been connected before though, so going "
107 "to retry")
108
109 elif self.extraTenacious:
110 self.log("trying again due to +100 tenacity")
111
112 else:
113 self.log("telling medium about connection failure")
114 self.medium.connectionFailed(reason)
115 return
116
117 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self,
118 connector, reason)
119
120
121
127
128 def error(failure):
129 if self.extraTenacious:
130 self.debug('connection problem to %s: %s',
131 self._connector.getDestination(),
132 log.getFailureMessage(failure))
133 self.debug('we are tenacious, so trying again later')
134 self.disconnect()
135 elif failure.check(errors.ConnectionFailedError):
136 self.debug("emitting connection-failed")
137 self.medium.emit('connection-failed', "I failed my master")
138 self.debug("emitted connection-failed")
139 elif failure.check(errors.ConnectionRefusedError):
140 self.debug("emitting connection-refused")
141 self.medium.emit('connection-refused')
142 self.debug("emitted connection-refused")
143 elif failure.check(errors.NotAuthenticatedError):
144
145 self.debug("emitting connection-refused")
146 self.medium.emit('connection-refused')
147 self.debug("emitted connection-refused")
148 else:
149 self.medium.emit('connection-error', failure)
150 self.warning('connection error to %s:: %s',
151 self._connector.getDestination(),
152 log.getFailureMessage(failure))
153
154
155 d.addCallbacks(success, error)
156 return d
157
158
159
160
161
162 -class AdminModel(medium.PingingMedium, signals.SignalMixin):
163 """
164 I live in the admin client.
165 I am a data model for any admin view implementing a UI to
166 communicate with one manager.
167 I send signals when things happen.
168
169 Manager calls on us through L{flumotion.manager.admin.AdminAvatar}
170 """
171 __signals__ = ('connected', 'disconnected', 'connection-refused',
172 'connection-failed', 'connection-error', 'reloading',
173 'message', 'update')
174
175 logCategory = 'adminmodel'
176
177 implements(interfaces.IAdminMedium)
178
179
180 planet = None
181
183
184 self.connectionInfo = None
185 self.keepTrying = None
186 self._writeConnection = True
187
188 self.managerId = '<uninitialized>'
189
190 self.connected = False
191 self.clientFactory = None
192
193 self._deferredConnect = None
194
195 self._components = {}
196 self.planet = None
197 self._workerHeavenState = None
198
200 """
201 Disconnects from the actual manager and frees the connection.
202 """
203 if self.clientFactory:
204
205
206 if self.remote:
207 self.remote.dontNotifyOnDisconnect(self._remoteDisconnected)
208
209 self.clientFactory.stopTrying()
210
211 self.clientFactory.disconnect()
212 self.clientFactory = None
213
214 - def connectToManager(self, connectionInfo, keepTrying=False,
215 writeConnection=True):
216 """
217 Connects to the specified manager.
218
219 @param connectionInfo: data for establishing the connection
220 @type connectionInfo: a L{PBConnectionInfo}
221 @param keepTrying: when this is L{True} the Factory will try to
222 reconnect when it loses the connection
223 @type keepTrying: bool
224 @param writeConnection: when this is L{True} the connection is saved
225 for future uses on cache
226 @type writeConnection: bool
227
228 @rtype: L{twisted.internet.defer.Deferred}
229 """
230 assert self.clientFactory is None
231
232 self.connectionInfo = connectionInfo
233 self._writeConnection = writeConnection
234
235
236
237
238 self.managerId = str(connectionInfo)
239 self.logName = self.managerId
240
241 self.info('Connecting to manager %s with %s',
242 self.managerId, connectionInfo.use_ssl and 'SSL' or 'TCP')
243
244 self.clientFactory = AdminClientFactory(self,
245 extraTenacious=keepTrying,
246 maxDelay=20)
247 self.clientFactory.startLogin(connectionInfo.authenticator)
248
249 if connectionInfo.use_ssl:
250 common.assertSSLAvailable()
251 from twisted.internet import ssl
252 reactor.connectSSL(connectionInfo.host, connectionInfo.port,
253 self.clientFactory, ssl.ClientContextFactory())
254 else:
255 reactor.connectTCP(connectionInfo.host, connectionInfo.port,
256 self.clientFactory)
257
258 def connected(model, d):
259
260 d.callback(model)
261
262 def disconnected(model, d):
263
264
265 if not keepTrying:
266 d.errback(errors.ConnectionFailedError('Lost connection'))
267
268 def connection_refused(model, d):
269 if not keepTrying:
270 d.errback(errors.ConnectionRefusedError())
271
272 def connection_failed(model, reason, d):
273 if not keepTrying:
274 d.errback(errors.ConnectionFailedError(reason))
275
276 def connection_error(model, failure, d):
277 if not keepTrying:
278 d.errback(failure)
279
280 d = defer.Deferred()
281 ids = []
282 ids.append(self.connect('connected', connected, d))
283 ids.append(self.connect('disconnected', disconnected, d))
284 ids.append(self.connect('connection-refused', connection_refused, d))
285 ids.append(self.connect('connection-failed', connection_failed, d))
286 ids.append(self.connect('connection-error', connection_error, d))
287
288 def success(model):
289 map(self.disconnect, ids)
290 self._deferredConnect = None
291 return model
292
293 def failure(f):
294 map(self.disconnect, ids)
295 self._deferredConnect = None
296 return f
297
298 d.addCallbacks(success, failure)
299 self._deferredConnect = d
300 return d
301
303 """
304 Handle all coding mistakes that could be triggered by loading bundles.
305 This is a convenience method to help in properly reporting problems.
306 The EntrySyntaxError should be caught and wrapped in a UI message,
307 with the message generated here as debug information.
308
309 @param failure: the failure to be handled
310 @type failure: L{twisted.python.failure.Failure}
311 @param filename: name of the file being loaded
312 @type filename: str
313
314 @raises: L{errors.EntrySyntaxError}
315 """
316 try:
317 raise failure.value
318 except SyntaxError, e:
319
320 where = getattr(e, 'filename', "<entry file>")
321 lineno = getattr(e, 'lineno', 0)
322 msg = "Syntax Error at %s:%d while executing %s" % (
323 where, lineno, fileName)
324 self.warning(msg)
325 raise errors.EntrySyntaxError(msg)
326 except NameError, e:
327 msg = "NameError while executing %s: %s" % (
328 fileName, " ".join(e.args))
329 self.warning(msg)
330 raise errors.EntrySyntaxError(msg)
331 except ImportError, e:
332 msg = "ImportError while executing %s: %s" % (fileName,
333 " ".join(e.args))
334 self.warning(msg)
335 raise errors.EntrySyntaxError(msg)
336
338 self.debug('shutting down')
339 if self.clientFactory is not None:
340
341
342 self.clientFactory.stopTrying()
343 self.clientFactory.disconnect()
344 self.clientFactory = None
345
346 if self._deferredConnect is not None:
347
348 self.debug('cancelling connection attempt')
349 self._deferredConnect.errback(errors.ConnectionCancelledError())
350
352 """Close any existing connection to the manager and
353 reconnect."""
354 self.debug('asked to log in again')
355 self.shutdown()
356 return self.connectToManager(self.connectionInfo, keepTrying)
357
358
359
361 return self.managerId
362
364 return '%s:%s (%s)' % (self.connectionInfo.host,
365 self.connectionInfo.port,
366 self.connectionInfo.use_ssl
367 and 'https' or 'http')
368
369
370
372 assert self.planet
373 return '%s (%s)' % (self.planet.get('name'), self.managerId)
374
390
392 self.debug("setRemoteReference %r", remoteReference)
393
394 def gotPlanetState(planet):
395 self.planet = planet
396
397 self.planet.admin = self
398 self.debug('got planet state')
399 return self.callRemote('getWorkerHeavenState')
400
401 def gotWorkerHeavenState(whs):
402 self._workerHeavenState = whs
403 self.debug('got worker state')
404
405 self.debug('Connected to manager and retrieved all state')
406 self.connected = True
407 if self._writeConnection:
408 writeConnection()
409 self.emit('connected')
410
411 def writeConnection():
412 i = self.connectionInfo
413 if not (i.authenticator.username
414 and i.authenticator.password):
415 self.log('not caching connection information')
416 return
417 s = ''.join(['<connection>',
418 '<host>%s</host>' % i.host,
419 '<manager>%s</manager>' % self.planet.get('name'),
420 '<port>%d</port>' % i.port,
421 '<use_insecure>%d</use_insecure>'
422 % ((not i.use_ssl) and 1 or 0),
423 '<user>%s</user>' % i.authenticator.username,
424 '<passwd>%s</passwd>' % i.authenticator.password,
425 '</connection>'])
426
427 import os
428 from flumotion.common import python
429 md5sum = python.md5(s).hexdigest()
430 f = os.path.join(configure.registrydir, '%s.connection' % md5sum)
431 try:
432 h = open(f, 'w')
433 h.write(s)
434 h.close()
435 except Exception, e:
436 self.info('failed to write connection cache file %s: %s',
437 f, log.getExceptionMessage(e))
438
439
440 medium.PingingMedium.setRemoteReference(self, remoteReference)
441
442
443
444 self.remote.notifyOnDisconnect(self._remoteDisconnected)
445
446 d = self.callRemote('getPlanetState')
447 d.addCallback(gotPlanetState)
448 d.addCallback(gotWorkerHeavenState)
449 return d
450
451
452
453
454
457
458
459
461 """
462 Call the given method on the given component with the given args.
463
464 @param componentState: component to call the method on
465 @type componentState: L{flumotion.common.planet.AdminComponentState}
466 @param methodName: name of method to call; serialized to a
467 remote_methodName on the worker's medium
468
469 @rtype: L{twisted.internet.defer.Deferred}
470 """
471 d = self.callRemote('componentCallRemote',
472 componentState, methodName,
473 *args, **kwargs)
474
475 def errback(failure):
476 msg = None
477 if failure.check(errors.NoMethodError):
478 msg = "Remote method '%s' does not exist." % methodName
479 msg += "\n" + failure.value
480 else:
481 msg = log.getFailureMessage(failure)
482
483
484
485
486 self.warning(msg)
487 m = messages.Warning(T_(N_("Internal error in component.")),
488 debug=msg)
489 componentState.observe_append('messages', m)
490 return failure
491
492 d.addErrback(errback)
493
494 return d
495
497 """
498 Call the the given method on the given worker with the given args.
499
500 @param workerName: name of the worker to call the method on
501 @param methodName: name of method to call; serialized to a
502 remote_methodName on the worker's medium
503
504 @rtype: L{twisted.internet.defer.Deferred}
505 """
506 return self.callRemote('workerCallRemote', workerName,
507 methodName, *args, **kwargs)
508
509
510
512 return self.callRemote('loadConfiguration', xml_string)
513
516
518 """
519 Obtains the available scenarios from the manager.
520
521 @rtype: L{twisted.internet.defer.Deferred}
522 """
523 return self.callRemote('getScenarios')
524
526 """
527 Obtains an scenario given its type.
528
529 @rtype: L{twisted.internet.defer.Deferred}
530 """
531 return self.callRemote('getScenarioByType', type)
532
535
536
537
540
543
544 - def workerRun(self, workerName, moduleName, functionName, *args, **kwargs):
545 """
546 Run the given function and args on the given worker. If the
547 worker does not already have the module, or it is out of date,
548 it will be retrieved from the manager.
549
550 @rtype: L{twisted.internet.defer.Deferred} firing an
551 L{flumotion.common.messages.Result}
552 """
553 return self.workerCallRemote(workerName, 'runFunction', moduleName,
554 functionName, *args, **kwargs)
555
557 return self.callRemote('getWizardEntries',
558 wizardTypes, provides, accepts)
559
561 return self._workerHeavenState
562
564 self.debug("emitting disconnected")
565 self.connected = False
566 self.emit('disconnected')
567 self.debug("emitted disconnected")
568