Package flumotion :: Package manager :: Module worker
[hide private]

Source Code for Module flumotion.manager.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  manager-side objects to handle worker clients 
 20  """ 
 21   
 22  from twisted.internet import defer 
 23   
 24  from flumotion.manager import base 
 25  from flumotion.common import errors, log, registry 
 26  from flumotion.common import worker, common 
 27  from flumotion.common.vfs import registerVFSJelly 
 28   
 29  __version__ = "$Rev$" 
 30   
 31   
32 -class WorkerAvatar(base.ManagerAvatar):
33 """ 34 I am an avatar created for a worker. 35 A reference to me is given when logging in and requesting a worker avatar. 36 I live in the manager. 37 38 @ivar feedServerPort: TCP port the feed server is listening on 39 @type feedServerPort: int 40 """ 41 logCategory = 'worker-avatar' 42 43 _portSet = None 44 feedServerPort = None 45
46 - def __init__(self, heaven, avatarId, remoteIdentity, mind, 47 feedServerPort, ports, randomPorts):
48 base.ManagerAvatar.__init__(self, heaven, avatarId, 49 remoteIdentity, mind) 50 self.feedServerPort = feedServerPort 51 52 self._portSet = worker.PortSet(self.avatarId, ports, randomPorts) 53 54 self.heaven.workerAttached(self) 55 self.vishnu.workerAttached(self) 56 57 registerVFSJelly()
58
59 - def getName(self):
60 return self.avatarId
61
62 - def makeAvatarInitArgs(klass, heaven, avatarId, remoteIdentity, 63 mind):
64 65 def havePorts(res): 66 log.debug('worker-avatar', 'got port information') 67 (_s1, feedServerPort), (_s2, (ports, random)) = res 68 return (heaven, avatarId, remoteIdentity, mind, 69 feedServerPort, ports, random)
70 log.debug('worker-avatar', 'calling mind for port information') 71 d = defer.DeferredList([mind.callRemote('getFeedServerPort'), 72 mind.callRemote('getPorts')], 73 fireOnOneErrback=True) 74 d.addCallback(havePorts) 75 return d
76 makeAvatarInitArgs = classmethod(makeAvatarInitArgs) 77
78 - def onShutdown(self):
79 self.heaven.workerDetached(self) 80 self.vishnu.workerDetached(self) 81 base.ManagerAvatar.onShutdown(self)
82
83 - def reservePorts(self, numPorts):
84 """ 85 Reserve the given number of ports on the worker. 86 87 @param numPorts: how many ports to reserve 88 @type numPorts: int 89 """ 90 return self._portSet.reservePorts(numPorts)
91
92 - def releasePorts(self, ports):
93 """ 94 Release the given list of ports on the worker. 95 96 @param ports: list of ports to release 97 @type ports: list of int 98 """ 99 self._portSet.releasePorts(ports)
100
101 - def createComponent(self, avatarId, type, nice, conf):
102 """ 103 Create a component of the given type with the given nice level. 104 105 @param avatarId: avatarId the component should use to log in 106 @type avatarId: str 107 @param type: type of the component to create 108 @type type: str 109 @param nice: the nice level to create the component at 110 @type nice: int 111 @param conf: the component's config dict 112 @type conf: dict 113 114 @returns: a deferred that will give the avatarId the component 115 will use to log in to the manager 116 """ 117 self.debug('creating %s (%s) on worker %s with nice level %d', 118 avatarId, type, self.avatarId, nice) 119 defs = registry.getRegistry().getComponent(type) 120 try: 121 entry = defs.getEntryByType('component') 122 # FIXME: use entry.getModuleName() (doesn't work atm?) 123 moduleName = defs.getSource() 124 methodName = entry.getFunction() 125 except KeyError: 126 self.warning('no "component" entry in registry of type %s, %s', 127 type, 'falling back to createComponent') 128 moduleName = defs.getSource() 129 methodName = "createComponent" 130 131 self.debug('call remote create') 132 return self.mindCallRemote('create', avatarId, type, moduleName, 133 methodName, nice, conf)
134
135 - def getComponents(self):
136 """ 137 Get a list of components that the worker is running. 138 139 @returns: a deferred that will give the avatarIds running on the 140 worker 141 """ 142 self.debug('getting component list from worker %s' % 143 self.avatarId) 144 return self.mindCallRemote('getComponents')
145 146 ### IPerspective methods, called by the worker's component 147
148 - def perspective_componentAddMessage(self, avatarId, message):
149 """ 150 Called by the worker to tell the manager to add a given message to 151 the given component. 152 153 Useful in cases where the component can't report messages itself, 154 for example because it crashed. 155 156 @param avatarId: avatarId of the component the message is about 157 @type message: L{flumotion.common.messages.Message} 158 """ 159 self.debug('received message from component %s' % avatarId) 160 self.vishnu.componentAddMessage(avatarId, message)
161 162
163 -class WorkerHeaven(base.ManagerHeaven):
164 """ 165 I interface between the Manager and worker clients. 166 For each worker client I create an L{WorkerAvatar} to handle requests. 167 I live in the manager. 168 """ 169 170 logCategory = "workerheaven" 171 avatarClass = WorkerAvatar 172
173 - def __init__(self, vishnu):
176 177 ### my methods 178
179 - def workerAttached(self, workerAvatar):
180 """ 181 Notify the heaven that the given worker has logged in. 182 183 @type workerAvatar: L{WorkerAvatar} 184 """ 185 workerName = workerAvatar.getName() 186 if not workerName in self.state.get('names'): 187 # wheee 188 host = workerAvatar.mind.broker.transport.getPeer().host 189 state = worker.ManagerWorkerState(name=workerName, host=host) 190 self.state.append('names', workerName) 191 self.state.append('workers', state) 192 else: 193 self.warning('worker %s was already registered in the heaven', 194 workerName) 195 raise errors.AlreadyConnectedError()
196
197 - def workerDetached(self, workerAvatar):
198 """ 199 Notify the heaven that the given worker has logged out. 200 201 @type workerAvatar: L{WorkerAvatar} 202 """ 203 workerName = workerAvatar.getName() 204 try: 205 self.state.remove('names', workerName) 206 for state in list(self.state.get('workers')): 207 if state.get('name') == workerName: 208 self.state.remove('workers', state) 209 except ValueError: 210 self.warning('worker %s was never registered in the heaven', 211 workerName)
212