Package flumotion :: Package manager :: Module admin
[hide private]

Source Code for Module flumotion.manager.admin

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_admin -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  manager-side objects to handle administrative clients 
 20  """ 
 21   
 22  import re 
 23  import os 
 24  import errno 
 25  from StringIO import StringIO 
 26   
 27  from twisted.internet import reactor 
 28  from twisted.python import failure 
 29  from zope.interface import implements 
 30   
 31  from flumotion.manager import base 
 32  from flumotion.common import errors, interfaces, log, planet, registry, debug 
 33  from flumotion.common import common 
 34  from flumotion.common.python import makedirs 
 35  from flumotion.monitor.nagios import util 
 36   
 37  # make Result and Message proxyable 
 38  from flumotion.common import messages 
 39   
 40  # make ComponentState proxyable 
 41  from flumotion.common import componentui 
 42   
 43  __version__ = "$Rev$" 
 44   
 45   
 46  # FIXME: rename to Avatar since we are in the admin. namespace ? 
 47   
 48   
49 -class AdminAvatar(base.ManagerAvatar):
50 """ 51 I am an avatar created for an administrative client interface. 52 A reference to me is given (for example, to gui.AdminInterface) 53 when logging in and requesting an "admin" avatar. 54 I live in the manager. 55 """ 56 logCategory = 'admin-avatar' 57 58 # override pb.Avatar implementation so we can run admin actions 59
60 - def perspectiveMessageReceived(self, broker, message, args, kwargs):
61 benignMethods = ('ping', ) 62 63 args = broker.unserialize(args) 64 kwargs = broker.unserialize(kwargs) 65 66 if message not in benignMethods: 67 self.vishnu.adminAction(self.remoteIdentity, message, args, kwargs) 68 69 return base.ManagerAvatar.perspectiveMessageReceivedUnserialised( 70 self, broker, message, args, kwargs)
71 72 ### pb.Avatar IPerspective methods 73
75 """ 76 Get the planet state. 77 78 @rtype: L{flumotion.common.planet.ManagerPlanetState} 79 """ 80 self.debug("returning planet state %r" % self.vishnu.state) 81 return self.vishnu.state
82
84 """ 85 Get the worker heaven state. 86 87 @rtype: L{flumotion.common.worker.ManagerWorkerHeavenState} 88 """ 89 self.debug("returning worker heaven state %r" % self.vishnu.state) 90 return self.vishnu.workerHeaven.state
91
92 - def perspective_componentStart(self, componentState):
93 """ 94 Start the given component. The component should be sleeping before 95 this. 96 97 @type componentState: L{planet.ManagerComponentState} 98 """ 99 self.debug('perspective_componentStart(%r)' % componentState) 100 return self.vishnu.componentCreate(componentState)
101
102 - def perspective_componentStop(self, componentState):
103 """ 104 Stop the given component. 105 If the component was sad, we clear its sad state as well, 106 since the stop was explicitly requested by the admin. 107 108 @type componentState: L{planet.ManagerComponentState} 109 """ 110 self.debug('perspective_componentStop(%r)' % componentState) 111 return self.vishnu.componentStop(componentState)
112
113 - def perspective_componentRestart(self, componentState):
114 """ 115 Restart the given component. 116 117 @type componentState: L{planet.ManagerComponentState} 118 """ 119 self.debug('perspective_componentRestart(%r)' % componentState) 120 d = self.perspective_componentStop(componentState) 121 d.addCallback(lambda *x: self.perspective_componentStart( 122 componentState)) 123 return d
124 125 # Generic interface to call into a component 126
127 - def perspective_componentCallRemote(self, componentState, methodName, 128 *args, **kwargs):
129 """ 130 Call a method on the given component on behalf of an admin client. 131 132 @param componentState: state of the component to call the method on 133 @type componentState: L{planet.ManagerComponentState} 134 @param methodName: name of the method to call. Gets proxied to 135 L{flumotion.component.component.""" \ 136 """BaseComponentMedium}'s remote_(methodName) 137 @type methodName: str 138 139 @rtype: L{twisted.internet.defer.Deferred} 140 """ 141 assert isinstance(componentState, planet.ManagerComponentState), \ 142 "%r is not a componentState" % componentState 143 144 if methodName == "start": 145 self.warning('forwarding "start" to perspective_componentStart') 146 return self.perspective_componentStart(componentState) 147 148 m = self.vishnu.getComponentMapper(componentState) 149 if not m: 150 self.warning('Component not mapped. Maybe deleted.') 151 raise errors.UnknownComponentError(componentState) 152 153 avatar = m.avatar 154 155 if not avatar: 156 self.warning('No avatar for %s, cannot call remote' % 157 componentState.get('name')) 158 raise errors.SleepingComponentError(componentState) 159 160 # XXX: Maybe we need to have a prefix, so we can limit what an 161 # admin interface can call on a component 162 try: 163 return avatar.mindCallRemote(methodName, *args, **kwargs) 164 except Exception, e: 165 msg = "exception on remote call %s: %s" % (methodName, 166 log.getExceptionMessage(e)) 167 self.warning(msg) 168 raise errors.RemoteMethodError(methodName, 169 log.getExceptionMessage(e))
170
172 """ 173 List components in the planet. Returns a list of avatar ids. 174 """ 175 componentStates = self.vishnu.state.getComponents() 176 avatar_ids = [common.componentId(c.get('parent').get('name'), 177 c.get('name')) 178 for c in componentStates] 179 return avatar_ids
180
181 - def perspective_componentInvoke(self, avatarId, methodName, 182 *args, **kwargs):
183 """ 184 Call a remote method on the component. 185 186 @param avatarId: the component avatar id 187 @type avatarId: str 188 @param methodName: name of the method to call 189 @type methodName: str 190 """ 191 component = util.findComponent(self.vishnu.state, avatarId) 192 if not component: 193 self.warning('No component with avatar id %s' % avatarId) 194 raise errors.UnknownComponentError(avatarId) 195 return self.perspective_componentCallRemote(component, methodName, 196 *args, **kwargs)
197
198 - def perspective_upstreamList(self, avatarId):
199 """ 200 List a component and its upstream components along with 201 types and worker hosts. 202 203 @param avatarId: the component avatar id 204 @type avatarId: str 205 """ 206 207 def get_eaters_ids(eaters_dic): 208 avatars = [] 209 for flow in eaters_dic.keys(): 210 comps = eaters_dic[flow] 211 for c in comps: 212 (name, what) = c[0].split(':') 213 avatars.append('/%s/%s' % (flow, name)) 214 return avatars
215 216 def create_response(components, workers): 217 comps = [] 218 for c in components: 219 workerName = c.get('workerName') 220 host = "unknown" 221 for w in workers: 222 if workerName == w.get('name'): 223 host = w.get('host') 224 break 225 comps.append((c.get('name'), c.get('type'), host)) 226 return comps
227 228 component = util.findComponent(self.vishnu.state, avatarId) 229 if not component: 230 self.warning('No component with avatar id %s' % avatarId) 231 raise errors.UnknownComponentError(avatarId) 232 233 eaters = component.get('config').get('eater', {}) 234 eaters_id = get_eaters_ids(eaters) 235 comps = [component] 236 while len(eaters_id) > 0: 237 eaters = {} 238 for i in eaters_id: 239 try: 240 compState = util.findComponent(self.vishnu.state, i) 241 comps.append(compState) 242 eaters.update(compState.get('config').get('eater', {})) 243 except Exception, e: 244 self.debug(log.getExceptionMessage(e)) 245 emsg = "Error retrieving component '%s'" % i 246 raise errors.UnknownComponentError(emsg) 247 eaters_id = get_eaters_ids(eaters) 248 249 workers = self.vishnu.workerHeaven.state.get('workers') 250 return create_response(comps, workers) 251
252 - def perspective_workerCallRemote(self, workerName, methodName, 253 *args, **kwargs):
254 """ 255 Call a remote method on the worker. 256 This is used so that admin clients can call methods from the interface 257 to the worker. 258 259 @param workerName: the worker to call 260 @type workerName: str 261 @param methodName: Name of the method to call. Gets proxied to 262 L{flumotion.worker.medium.WorkerMedium} 's 263 remote_(methodName) 264 @type methodName: str 265 """ 266 267 self.debug('AdminAvatar.workerCallRemote(%r, %r)' % ( 268 workerName, methodName)) 269 workerAvatar = self.vishnu.workerHeaven.getAvatar(workerName) 270 271 # XXX: Maybe we need to a prefix, so we can limit what an admin 272 # interface can call on a worker 273 try: 274 return workerAvatar.mindCallRemote(methodName, *args, **kwargs) 275 except Exception, e: 276 self.warning("exception on remote call: %s" % 277 log.getExceptionMessage(e)) 278 return failure.Failure(errors.RemoteMethodError(methodName, 279 log.getExceptionMessage(e)))
280
281 - def perspective_getEntryByType(self, componentType, entryType):
282 """ 283 Get the entry point for a piece of bundled code in a component by type. 284 @param componentType: the component 285 @type componentType: a string 286 @param entryType: location of the entry point 287 @type entryType: a string 288 Returns: a (filename, methodName) tuple, or raises:: 289 - NoBundleError if the entry location does not exist 290 """ 291 assert componentType is not None 292 293 self.debug('getting entry of type %s for component type %s', 294 entryType, componentType) 295 296 try: 297 componentRegistryEntry = registry.getRegistry().getComponent( 298 componentType) 299 # FIXME: add logic here for default entry points and functions 300 entry = componentRegistryEntry.getEntryByType(entryType) 301 except KeyError: 302 self.warning("Could not find bundle for %s(%s)" % ( 303 componentType, entryType)) 304 raise errors.NoBundleError("entry type %s in component type %s" % 305 (entryType, componentType)) 306 307 filename = os.path.join(componentRegistryEntry.base, entry.location) 308 self.debug('entry point is in file path %s and function %s' % ( 309 filename, entry.function)) 310 return (filename, entry.function)
311
312 - def perspective_getPlugEntry(self, plugType, entryType):
313 """ 314 Get the entry point for a piece of bundled code in a plug by type. 315 @param plugType: the plug 316 @type plugType: a string 317 @param entryType: location of the entry point 318 @type entryType: a string 319 Returns: a (filename, methodName) tuple, or raises:: 320 - NoBundleError if the entry location does not exist 321 """ 322 assert plugType is not None 323 324 self.debug('getting entry of type %s for plug type %s', 325 entryType, plugType) 326 327 try: 328 plugRegistryEntry = registry.getRegistry().getPlug(plugType) 329 entry = plugRegistryEntry.getEntryByType(entryType) 330 except KeyError: 331 self.warning("Could not find bundle for %s(%s)" % ( 332 plugType, entryType)) 333 raise errors.NoBundleError("entry type %s in plug type %s" % 334 (entryType, plugType)) 335 336 self.debug('entry point is in file path %s and function %s' % ( 337 entry.location, entry.function)) 338 return (entry.location, entry.function)
339
340 - def perspective_getConfiguration(self):
341 """ 342 Get the configuration of the manager as an XML string. 343 344 @rtype: str 345 """ 346 return self.vishnu.getConfiguration()
347
348 - def perspective_getScenarioByType(self, scenarioType, entryType):
349 """ 350 Remote method that gets the scenario of a given type. 351 352 @param scenarioType: the component 353 @type scenarioType: a string 354 Returns: a (filename, methodName) tuple, or raises:: 355 - NoBundleError if the entry location does not exist 356 """ 357 assert scenarioType is not None 358 359 self.debug('getting entry of type %s for scenario type %s', 360 entryType, scenarioType) 361 362 try: 363 scenarioRegistryEntry = registry.getRegistry().getScenarioByType( 364 scenarioType) 365 # FIXME: add logic here for default entry points and functions 366 entry = scenarioRegistryEntry.getEntryByType(entryType) 367 except KeyError: 368 self.warning("Could not find bundle for %s(%s)" % ( 369 scenarioType, entryType)) 370 raise errors.NoBundleError("entry type %s in component type %s" % 371 (entryType, scenarioType)) 372 373 filename = os.path.join(scenarioRegistryEntry.getBase(), 374 entry.getLocation()) 375 self.debug('entry point is in file path %s and function %s' % ( 376 filename, entry.function)) 377 378 return (filename, entry.getFunction())
379
380 - def perspective_getScenarios(self):
381 """ 382 Get all the scenarios defined on the registry. 383 384 @rtype : List of L{IScenarioAssistantPlugin} 385 """ 386 r = registry.getRegistry() 387 return r.getScenarios()
388
389 - def _saveFlowFile(self, filename):
390 """Opens a file that the flow should be written to. 391 392 Note that the returned file object might be an existing file, 393 opened in append mode; if the loadConfiguration operation 394 succeeds, the file should first be truncated before writing. 395 """ 396 self.vishnu.adminAction(self.remoteIdentity, 397 '_saveFlowFile', (), {}) 398 399 def ensure_sane(name, extra=''): 400 if not re.match('^[a-zA-Z0-9_' + extra + '-]+$', name): 401 raise errors.ConfigError, \ 402 'Invalid planet or saveAs name: %s' % name
403 404 ensure_sane(self.vishnu.configDir, '/') 405 ensure_sane(filename) 406 directory = os.path.join(self.vishnu.configDir, "flows") 407 self.debug('told to save flow as %s/%s.xml', directory, filename) 408 try: 409 makedirs(directory, 0770) 410 except OSError, e: 411 if e.errno != errno.EEXIST: 412 raise e 413 prev = os.umask(0007) 414 output = open(os.path.join(directory, filename + '.xml'), 'a') 415 os.umask(prev) 416 return output 417
418 - def perspective_loadConfiguration(self, xml, saveAs=None):
419 """ 420 Load the given XML configuration into the manager. If the 421 optional saveAs parameter is passed, the XML snippet will be 422 saved to disk in the manager's flows directory. 423 424 @param xml: the XML configuration snippet. 425 @type xml: str 426 @param saveAs: The name of a file to save the XML as. 427 @type saveAs: str 428 """ 429 430 if saveAs: 431 output = self._saveFlowFile(saveAs) 432 433 # Update the registry if needed, so that new/changed component types 434 # can be parsed. 435 registry.getRegistry().verify() 436 437 f = StringIO(xml) 438 res = self.vishnu.loadComponentConfigurationXML(f, self.remoteIdentity) 439 f.close() 440 441 if saveAs: 442 443 def success(res): 444 self.debug('loadConfiguration succeeded, writing flow to %r', 445 output) 446 output.truncate(0) 447 output.write(xml) 448 output.close() 449 return res
450 451 def failure(res): 452 self.debug('loadConfiguration failed, leaving %r as it was', 453 output) 454 output.close() 455 return res 456 res.addCallbacks(success, failure) 457 458 return res 459
460 - def perspective_loadComponent(self, componentType, componentId, 461 componentLabel, properties, workerName, 462 plugs=None, eaters=None, 463 isClockMaster=None, virtualFeeds=None):
464 """ 465 Load a component into the manager configuration. 466 Returns a deferred that will be called with the component state. 467 468 @param componentType: The registered type of the component to be added 469 @type componentType: str 470 @param componentId: The identifier of the component to add, 471 should be created by the function 472 L{flumotion.common.common.componentId} 473 @type componentId: str 474 @param componentLabel: The human-readable label of the component. 475 if None, no label will be set. 476 @type componentLabel: str or None 477 @param properties: List of property name-value pairs. 478 See L{flumotion.common.config.buildPropertyDict} 479 @type properties: list of (str, object) 480 @param workerName: the name of the worker where the added 481 component should run. 482 @type workerName: str 483 @param plugs: List of plugs, as type-propertyList pairs. 484 See {flumotion.manager.config.buildPlugsSet}. 485 @type plugs: [(str, [(str, object)])] 486 @param eaters: List of (eater name, feed ID) pairs. 487 See L{flumotion.manager.config.buildEatersDict} 488 @type eaters: [(str, str)] 489 @param isClockMaster: True if the component to be added must be 490 a clock master. Passing False here means 491 that the manager will choose what 492 component, if any, will be clock master 493 for this flow. 494 @type isClockMaster: bool 495 @param virtualFeeds: List of (virtual feed, feeder name) pairs. 496 See L{flumotion.manager.config.buildVirtualFeeds} 497 @type virtualFeeds: [(str, str)] 498 """ 499 return self.vishnu.loadComponent(self.remoteIdentity, componentType, 500 componentId, componentLabel, 501 properties, workerName, 502 plugs or [], eaters or [], 503 isClockMaster, virtualFeeds or [])
504
505 - def perspective_deleteFlow(self, flowName):
506 return self.vishnu.deleteFlow(flowName)
507
508 - def perspective_deleteComponent(self, componentState):
509 """Delete a component from the manager. 510 511 A component can only be deleted when it is sleeping or sad. It 512 is the caller's job to ensure this is the case; calling this 513 function on a running component will raise a ComponentBusyError. 514 515 @returns: a deferred that will fire when all listeners have been 516 notified of the component removal 517 """ 518 return self.vishnu.deleteComponent(componentState)
519
520 - def perspective_getVersions(self):
521 return debug.getVersions()
522
523 - def perspective_cleanComponents(self):
524 return self.vishnu.emptyPlanet()
525
526 - def perspective_getWizardEntries(self, types=None, provides=None, 527 accepts=None):
528 """ 529 Fetches the wizard entries which matches the parameters sent in 530 531 @param types: list of component types to fetch, is usually 532 something like ['video-producer'] or ['audio-encoder'] 533 @type types: list of strings 534 @param provides: formats provided, eg ['jpeg', 'speex'] 535 @type provides: list of strings 536 @param accepts: formats accepted, eg ['theora'] 537 @type accepts: list of strings 538 @returns: L{componentui.WizardEntryState} 539 """ 540 541 def extract(wizards): 542 for wizard in wizards: 543 if types is not None: 544 if wizard.type not in types: 545 continue 546 if provides is not None: 547 for formatProvided in wizard.provides: 548 if formatProvided.media_type in provides: 549 break 550 else: 551 continue 552 if accepts is not None: 553 for formatAccepted in wizard.accepts: 554 if formatAccepted.media_type in accepts: 555 break 556 else: 557 continue 558 yield wizard
559 560 retval = [] 561 r = registry.getRegistry() 562 for component in r.getComponents(): 563 retval += extract(component.wizards) 564 for plug in r.getPlugs(): 565 retval += extract(plug.wizards) 566 del r 567 568 return retval 569
570 - def perspective_getComponentEntry(self, componentType):
571 """Fetches a ComponentRegistryEntry given a componentType 572 @param componentType: component type 573 @type componentType: string 574 @returns: the component 575 @rtype: L{ComponentRegistryEntry} 576 """ 577 try: 578 componentRegistryEntry = registry.getRegistry().getComponent( 579 componentType) 580 except KeyError: 581 return None 582 return componentRegistryEntry
583
584 - def perspective_invokeOnComponents(self, componentType, methodName, 585 *args, **kwargs):
586 return self.vishnu.invokeOnComponents(componentType, methodName, 587 *args, **kwargs)
588 589
590 -class AdminHeaven(base.ManagerHeaven):
591 """ 592 I interface between the Manager and administrative clients. 593 For each client I create an L{AdminAvatar} to handle requests. 594 I live in the manager. 595 """ 596 597 logCategory = "admin-heaven" 598 implements(interfaces.IHeaven) 599 avatarClass = AdminAvatar
600