1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 the job-side half of the worker-job connection
20 """
21
22 import os
23 import resource
24 import sys
25
26
27
28
29
30
31 from twisted.cred import credentials
32 from twisted.internet import reactor, defer
33 from twisted.python import failure
34 from twisted.spread import pb
35 from zope.interface import implements
36
37 from flumotion.common import errors, interfaces, log
38
39 from flumotion.common import medium, package
40 from flumotion.common.reflectcall import createComponent, reflectCallCatching
41 from flumotion.component import component
42
43 from flumotion.twisted import fdserver
44 from flumotion.twisted import pb as fpb
45 from flumotion.twisted import defer as fdefer
46
47
48 from flumotion.common import keycards
49
50 __version__ = "$Rev$"
51
52
54 """
55 I am a medium between the job and the worker's job avatar.
56 I live in the job process.
57
58 @cvar component: the component this is a medium for; created as part of
59 L{remote_create}
60 @type component: L{flumotion.component.component.BaseComponent}
61 """
62 logCategory = 'jobmedium'
63 remoteLogName = 'jobavatar'
64
65 implements(interfaces.IJobMedium)
66
68 self.avatarId = None
69 self.logName = None
70 self.component = None
71
72 self._workerName = None
73 self._managerHost = None
74 self._managerPort = None
75 self._managerTransport = None
76 self._managerKeycard = None
77 self._componentClientFactory = None
78
79 self._hasStoppedReactor = False
80
81
82
83 - def remote_bootstrap(self, workerName, host, port,
84 transport, authenticator, packagePaths):
85 """
86 I receive the information on how to connect to the manager. I also set
87 up package paths to be able to run the component.
88
89 Called by the worker's JobAvatar.
90
91 @param workerName: the name of the worker running this job
92 @type workerName: str
93 @param host: the host that is running the manager
94 @type host: str
95 @param port: port on which the manager is listening
96 @type port: int
97 @param transport: 'tcp' or 'ssl'
98 @type transport: str
99 @param authenticator: remote reference to the worker-side authenticator
100 @type authenticator: L{twisted.spread.pb.RemoteReference} to a
101 L{flumotion.twisted.pb.Authenticator}
102 @param packagePaths: ordered list of
103 (package name, package path) tuples
104 @type packagePaths: list of (str, str)
105 """
106 self._workerName = workerName
107 self._managerHost = host
108 self._managerPort = port
109 self._managerTransport = transport
110 if authenticator:
111 self._authenticator = fpb.RemoteAuthenticator(authenticator)
112 else:
113 self.debug('no authenticator, will not be able to log '
114 'into manager')
115 self._authenticator = None
116
117 packager = package.getPackager()
118 for name, path in packagePaths:
119 self.debug('registering package path for %s' % name)
120 self.log('... from path %s' % path)
121 packager.registerPackagePath(path, name)
122
125
127 """
128 I am called on by the worker's JobAvatar to run a function,
129 normally on behalf of the flumotion.admin.gtk.
130
131 @param moduleName: name of the module containing the function
132 @type moduleName: str
133 @param methodName: the method to run
134 @type methodName: str
135 @param args: args to pass to the method
136 @type args: tuple
137 @param kwargs: kwargs to pass to the method
138 @type kwargs: dict
139
140 @returns: the result of invoking the method
141 """
142 self.info('Running %s.%s(*%r, **%r)' % (moduleName, methodName,
143 args, kwargs))
144
145 self._enableCoreDumps()
146
147 return reflectCallCatching(errors.RemoteRunError, moduleName,
148 methodName, *args, **kwargs)
149
150 - def remote_create(self, avatarId, type, moduleName, methodName,
151 nice, conf):
152 """
153 I am called on by the worker's JobAvatar to create a component.
154
155 @param avatarId: avatarId for component to log in to manager
156 @type avatarId: str
157 @param type: type of component to start
158 @type type: str
159 @param moduleName: name of the module to create the component from
160 @type moduleName: str
161 @param methodName: the factory method to use to create the component
162 @type methodName: str
163 @param nice: the nice level
164 @type nice: int
165 @param conf: the component configuration
166 @type conf: dict
167 """
168 self.avatarId = avatarId
169 self.logName = avatarId
170
171 self.component = self._createComponent(avatarId, type, moduleName,
172 methodName, nice, conf)
173 self.component.setShutdownHook(self._componentStopped)
174
176
177 reactor.callLater(0, self.shutdown)
178
185
200
201
202
204 """
205 Shut down the job process completely, cleaning up the component
206 so the reactor can be left from.
207 """
208 if self._hasStoppedReactor:
209 self.debug("Not stopping reactor again, already shutting down")
210 else:
211 self._hasStoppedReactor = True
212 self.info("Stopping reactor in job process")
213 reactor.stop()
214
216 if not nice:
217 return
218
219 try:
220 os.nice(nice)
221 except OSError, e:
222 self.warning('Failed to set nice level: %s' % str(e))
223 else:
224 self.debug('Nice level set to %d' % nice)
225
227 soft, hard = resource.getrlimit(resource.RLIMIT_CORE)
228 if hard != resource.RLIM_INFINITY:
229 self.warning('Could not set unlimited core dump sizes, '
230 'setting to %d instead' % hard)
231 else:
232 self.debug('Enabling core dumps of unlimited size')
233
234 resource.setrlimit(resource.RLIMIT_CORE, (hard, hard))
235
236 - def _createComponent(self, avatarId, type, moduleName, methodName,
237 nice, conf):
238 """
239 Create a component of the given type.
240 Log in to the manager with the given avatarId.
241
242 @param avatarId: avatarId component will use to log in to manager
243 @type avatarId: str
244 @param type: type of component to start
245 @type type: str
246 @param moduleName: name of the module that contains the entry point
247 @type moduleName: str
248 @param methodName: name of the factory method to create the component
249 @type methodName: str
250 @param nice: the nice level to run with
251 @type nice: int
252 @param conf: the component configuration
253 @type conf: dict
254 """
255 self.info('Creating component "%s" of type "%s"', avatarId, type)
256
257 self._setNice(nice)
258 self._enableCoreDumps()
259
260 try:
261 comp = createComponent(moduleName, methodName, conf)
262 except Exception, e:
263 msg = "Exception %s during createComponent: %s" % (
264 e.__class__.__name__, " ".join(e.args))
265
266
267 if isinstance(e, errors.ComponentCreateError):
268 msg = e.args[0]
269 self.warning(
270 "raising ComponentCreateError(%s) and stopping job" % msg)
271
272
273
274
275
276
277
278
279 reactor.callLater(0.1, self.shutdown)
280 raise errors.ComponentCreateError(msg)
281
282 comp.setWorkerName(self._workerName)
283
284
285 self.debug('creating ComponentClientFactory')
286 managerClientFactory = component.ComponentClientFactory(comp)
287 self._componentClientFactory = managerClientFactory
288 self.debug('created ComponentClientFactory %r' % managerClientFactory)
289 self._authenticator.avatarId = avatarId
290 managerClientFactory.startLogin(self._authenticator)
291
292 host = self._managerHost
293 port = self._managerPort
294 transport = self._managerTransport
295 self.debug('logging in with authenticator %r' % self._authenticator)
296 if transport == "ssl":
297 from flumotion.common import common
298 common.assertSSLAvailable()
299 from twisted.internet import ssl
300 self.info('Connecting to manager %s:%d with SSL' % (host, port))
301 reactor.connectSSL(host, port, managerClientFactory,
302 ssl.ClientContextFactory())
303 elif transport == "tcp":
304 self.info('Connecting to manager %s:%d with TCP' % (host, port))
305 reactor.connectTCP(host, port, managerClientFactory)
306 else:
307 self.warning(
308 'Unknown transport protocol %s' % self._managerTransport)
309
310 return comp
311
312
314 """
315 A pb.Broker subclass that handles FDs being passed (with associated data)
316 over the same connection as the normal PB data stream.
317 When an FD is seen, the FD should be added to a given eater or feeder
318 element.
319 """
320
321 - def __init__(self, connectionClass, **kwargs):
322 """
323 @param connectionClass: subclass of L{twisted.internet.tcp.Connection}
324 """
325 pb.Broker.__init__(self, **kwargs)
326
327 self._connectionClass = connectionClass
328
330
331 self.debug('received fds %r, message %r' % (fds, message))
332 if message.startswith('sendFeed '):
333
334 def parseargs(_, feedName, eaterId=None):
335 return feedName, eaterId
336 feedName, eaterId = parseargs(*message.split(' '))
337 self.factory.medium.component.feedToFD(feedName, fds[0],
338 os.close, eaterId)
339 elif message.startswith('receiveFeed '):
340
341 def parseargs2(_, eaterAlias, feedId=None):
342 return eaterAlias, feedId
343 eaterAlias, feedId = parseargs2(*message.split(' '))
344 self.factory.medium.component.eatFromFD(eaterAlias, feedId,
345 fds[0])
346 elif message == 'redirectStdout':
347 self.debug('told to rotate stdout to fd %d', fds[0])
348 os.dup2(fds[0], sys.stdout.fileno())
349 os.close(fds[0])
350 self.debug('rotated stdout')
351 elif message == 'redirectStderr':
352 self.debug('told to rotate stderr to fd %d', fds[0])
353 os.dup2(fds[0], sys.stderr.fileno())
354 os.close(fds[0])
355 self.info('rotated stderr')
356 else:
357 self.warning('Unknown message received: %r' % message)
358
359
361 """
362 I am a client factory that logs in to the WorkerBrain.
363 I live in the flumotion-job process spawned by the worker.
364
365 @cvar medium: the medium for the JobHeaven to access us through
366 @type medium: L{JobMedium}
367 """
368 logCategory = "job"
369 perspectiveInterface = interfaces.IJobMedium
370
384
385
386
391
392
393
394 - def login(self, username):
395
396 def haveReference(remoteReference):
397 self.info('Logged in to worker')
398 self.debug('perspective %r connected', remoteReference)
399 self.medium.setRemoteReference(remoteReference)
400
401 self.info('Logging in to worker')
402 d = pb.PBClientFactory.login(self,
403 credentials.UsernamePassword(username, ''),
404 self.medium)
405 d.addCallback(haveReference)
406 return d
407
408
409
410
411
412
417