1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 manager-side objects to handle worker clients
20 """
21
22 from twisted.internet import defer
23
24 from flumotion.manager import base
25 from flumotion.common import errors, log, registry
26 from flumotion.common import worker, common
27 from flumotion.common.vfs import registerVFSJelly
28
29 __version__ = "$Rev$"
30
31
33 """
34 I am an avatar created for a worker.
35 A reference to me is given when logging in and requesting a worker avatar.
36 I live in the manager.
37
38 @ivar feedServerPort: TCP port the feed server is listening on
39 @type feedServerPort: int
40 """
41 logCategory = 'worker-avatar'
42
43 _portSet = None
44 feedServerPort = None
45
46 - def __init__(self, heaven, avatarId, remoteIdentity, mind,
47 feedServerPort, ports, randomPorts):
58
61
64
65 def havePorts(res):
66 log.debug('worker-avatar', 'got port information')
67 (_s1, feedServerPort), (_s2, (ports, random)) = res
68 return (heaven, avatarId, remoteIdentity, mind,
69 feedServerPort, ports, random)
70 log.debug('worker-avatar', 'calling mind for port information')
71 d = defer.DeferredList([mind.callRemote('getFeedServerPort'),
72 mind.callRemote('getPorts')],
73 fireOnOneErrback=True)
74 d.addCallback(havePorts)
75 return d
76 makeAvatarInitArgs = classmethod(makeAvatarInitArgs)
77
82
84 """
85 Reserve the given number of ports on the worker.
86
87 @param numPorts: how many ports to reserve
88 @type numPorts: int
89 """
90 return self._portSet.reservePorts(numPorts)
91
93 """
94 Release the given list of ports on the worker.
95
96 @param ports: list of ports to release
97 @type ports: list of int
98 """
99 self._portSet.releasePorts(ports)
100
102 """
103 Create a component of the given type with the given nice level.
104
105 @param avatarId: avatarId the component should use to log in
106 @type avatarId: str
107 @param type: type of the component to create
108 @type type: str
109 @param nice: the nice level to create the component at
110 @type nice: int
111 @param conf: the component's config dict
112 @type conf: dict
113
114 @returns: a deferred that will give the avatarId the component
115 will use to log in to the manager
116 """
117 self.debug('creating %s (%s) on worker %s with nice level %d',
118 avatarId, type, self.avatarId, nice)
119 defs = registry.getRegistry().getComponent(type)
120 try:
121 entry = defs.getEntryByType('component')
122
123 moduleName = defs.getSource()
124 methodName = entry.getFunction()
125 except KeyError:
126 self.warning('no "component" entry in registry of type %s, %s',
127 type, 'falling back to createComponent')
128 moduleName = defs.getSource()
129 methodName = "createComponent"
130
131 self.debug('call remote create')
132 return self.mindCallRemote('create', avatarId, type, moduleName,
133 methodName, nice, conf)
134
136 """
137 Get a list of components that the worker is running.
138
139 @returns: a deferred that will give the avatarIds running on the
140 worker
141 """
142 self.debug('getting component list from worker %s' %
143 self.avatarId)
144 return self.mindCallRemote('getComponents')
145
146
147
149 """
150 Called by the worker to tell the manager to add a given message to
151 the given component.
152
153 Useful in cases where the component can't report messages itself,
154 for example because it crashed.
155
156 @param avatarId: avatarId of the component the message is about
157 @type message: L{flumotion.common.messages.Message}
158 """
159 self.debug('received message from component %s' % avatarId)
160 self.vishnu.componentAddMessage(avatarId, message)
161
162
164 """
165 I interface between the Manager and worker clients.
166 For each worker client I create an L{WorkerAvatar} to handle requests.
167 I live in the manager.
168 """
169
170 logCategory = "workerheaven"
171 avatarClass = WorkerAvatar
172
176
177
178
196
198 """
199 Notify the heaven that the given worker has logged out.
200
201 @type workerAvatar: L{WorkerAvatar}
202 """
203 workerName = workerAvatar.getName()
204 try:
205 self.state.remove('names', workerName)
206 for state in list(self.state.get('workers')):
207 if state.get('name') == workerName:
208 self.state.remove('workers', state)
209 except ValueError:
210 self.warning('worker %s was never registered in the heaven',
211 workerName)
212