1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 worker-side objects to handle worker clients
20 """
21
22 import os
23 import signal
24 import sys
25
26 from twisted.internet import defer, reactor
27
28 from flumotion.common import errors, log
29 from flumotion.common import messages
30 from flumotion.common.i18n import N_, gettexter
31 from flumotion.configure import configure
32 from flumotion.worker import base
33
34 __version__ = "$Rev$"
35 T_ = gettexter()
36
37
39
44
45 def create(_, job):
46 self.debug("asking job to create component with avatarId %s,"
47 " type %s", job.avatarId, job.type)
48 return self.mindCallRemote('create', job.avatarId, job.type,
49 job.moduleName, job.methodName,
50 job.nice, job.conf)
51
52 def success(_, avatarId):
53 self.debug('job started component with avatarId %s',
54 avatarId)
55
56 self._heaven._startSet.createSuccess(avatarId)
57
58 def error(failure, job):
59 msg = log.getFailureMessage(failure)
60 if failure.check(errors.ComponentCreateError):
61 self.warning('could not create component %s of type %s:'
62 ' %s', job.avatarId, job.type, msg)
63 else:
64 self.warning('unhandled error creating component %s: %s',
65 job.avatarId, msg)
66
67 self._heaven._startSet.createFailed(job.avatarId, failure)
68
69 def gotPid(pid):
70 self.pid = pid
71 info = self._heaven.getManagerConnectionInfo()
72 if info.use_ssl:
73 transport = 'ssl'
74 else:
75 transport = 'tcp'
76 job = self._heaven.getJobInfo(pid)
77 workerName = self._heaven.getWorkerName()
78
79 d = bootstrap(workerName, info.host, info.port, transport,
80 info.authenticator, job.bundles)
81 d.addCallback(create, job)
82 d.addCallback(success, job.avatarId)
83 d.addErrback(error, job)
84 return d
85 d = self.mindCallRemote("getPid")
86 d.addCallback(gotPid)
87 return d
88
90 """
91 returns: a deferred marking completed stop.
92 """
93 if not self.mind:
94 self.debug('already logged out')
95 return defer.succeed(None)
96 else:
97 self.debug('stopping')
98 return self.mindCallRemote('stop')
99
100 - def sendFeed(self, feedName, fd, eaterId):
101 """
102 Tell the feeder to send the given feed to the given fd.
103
104 @returns: whether the fd was successfully handed off to the component.
105 """
106 self.debug('Sending FD %d to component job to feed %s to fd',
107 fd, feedName)
108
109
110
111
112
113 if self.mind:
114 message = "sendFeed %s %s" % (feedName, eaterId)
115 return self._sendFileDescriptor(fd, message)
116 else:
117 self.debug('my mind is gone, trigger disconnect')
118 return False
119
121 """
122 Tell the feeder to receive the given feed from the given fd.
123
124 @returns: whether the fd was successfully handed off to the component.
125 """
126 self.debug('Sending FD %d to component job to eat %s from fd',
127 fd, eaterAlias)
128
129
130 if self.mind:
131 message = "receiveFeed %s %s" % (eaterAlias, feedId)
132 return self._sendFileDescriptor(fd, message)
133 else:
134 self.debug('my mind is gone, trigger disconnect')
135 return False
136
138 """
139 This notification from the job process will be fired when it is
140 shutting down, so that although the process might still be
141 around, we know it's OK to accept new start requests for this
142 avatar ID.
143 """
144 self.info("component %s shutting down cleanly", self.avatarId)
145
146 self._heaven._startSet.shutdownStart(self.avatarId)
147
148
150 __slots__ = ('conf', )
151
152 - def __init__(self, pid, avatarId, type, moduleName, methodName,
153 nice, bundles, conf):
157
158
160 avatarClass = ComponentJobAvatar
161 logCategory = 'component-job-heaven'
162
164 """
165 Gets the L{flumotion.common.connection.PBConnectionInfo}
166 describing how to connect to the manager.
167
168 @rtype: L{flumotion.common.connection.PBConnectionInfo}
169 """
170 return self.brain.managerConnectionInfo
171
172 - def spawn(self, avatarId, type, moduleName, methodName, nice,
173 bundles, conf):
174 """
175 Spawn a new job.
176
177 This will spawn a new flumotion-job process, running under the
178 requested nice level. When the job logs in, it will be told to
179 load bundles and run a function, which is expected to return a
180 component.
181
182 @param avatarId: avatarId the component should use to log in
183 @type avatarId: str
184 @param type: type of component to start
185 @type type: str
186 @param moduleName: name of the module to create the component from
187 @type moduleName: str
188 @param methodName: the factory method to use to create the component
189 @type methodName: str
190 @param nice: nice level
191 @type nice: int
192 @param bundles: ordered list of (bundleName, bundlePath) for this
193 component
194 @type bundles: list of (str, str)
195 @param conf: component configuration
196 @type conf: dict
197 """
198 d = self._startSet.createStart(avatarId)
199
200 p = base.JobProcessProtocol(self, avatarId, self._startSet)
201 executable = os.path.join(configure.bindir, 'flumotion-job')
202 if not os.path.exists(executable):
203 self.error("Trying to spawn job process, but '%s' does not "
204 "exist", executable)
205 argv = [executable, avatarId, self._socketPath]
206
207 realexecutable = executable
208
209
210
211
212
213 if 'FLU_VALGRIND_JOB' in os.environ:
214 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',')
215 if avatarId in jobnames:
216 realexecutable = 'valgrind'
217
218
219
220 argv = ['valgrind', '--leak-check=full', '--num-callers=24',
221 '--leak-resolution=high', '--show-reachable=yes',
222 'python'] + argv
223
224 childFDs = {0: 0, 1: 1, 2: 2}
225 env = {}
226 env.update(os.environ)
227 env['FLU_DEBUG'] = log.getDebug()
228 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv,
229 childFDs=childFDs)
230
231 p.setPid(process.pid)
232
233 self.addJobInfo(process.pid,
234 ComponentJobInfo(process.pid, avatarId, type,
235 moduleName, methodName, nice,
236 bundles, conf))
237 return d
238
239
241
249
250 d = self.mindCallRemote("getPid")
251 d.addCallback(gotPid)
252 return d
253
260
262 self.debug("job is stopping")
263
264
266 avatarClass = CheckJobAvatar
267 logCategory = 'check-job-heaven'
268
269 _checkCount = 0
270 _timeout = 45
271
278
280 if self.jobPool:
281 job, expireDC = self.jobPool.pop(0)
282 expireDC.cancel()
283 self.debug('running check in already-running job %s',
284 job.avatarId)
285 return defer.succeed(job)
286
287 avatarId = 'check-%d' % (self._checkCount, )
288 self._checkCount += 1
289
290 self.debug('spawning new job %s to run a check', avatarId)
291 d = self._startSet.createStart(avatarId)
292
293 p = base.JobProcessProtocol(self, avatarId, self._startSet)
294 executable = os.path.join(configure.bindir, 'flumotion-job')
295 argv = [executable, avatarId, self._socketPath]
296
297 childFDs = {0: 0, 1: 1, 2: 2}
298 env = {}
299 env.update(os.environ)
300 env['FLU_DEBUG'] = log.getDebug()
301 process = reactor.spawnProcess(p, executable, env=env, args=argv,
302 childFDs=childFDs)
303
304 p.setPid(process.pid)
305 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None,
306 None, [])
307 self._jobInfos[process.pid] = jobInfo
308
309 def haveMind(_):
310
311 return self.avatars[avatarId]
312
313 d.addCallback(haveMind)
314 return d
315
316 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
323
324 def timeout(sig):
325 self.killJobByPid(job.pid, sig)
326
327 def haveResult(res):
328 if not termtimeout.active():
329 self.info("Discarding error %s", res)
330 res = messages.Result()
331 res.add(messages.Error(
332 T_(N_("Check timed out.")),
333 debug=("Timed out running %s."%methodName)))
334 else:
335
336 def expire():
337 if (job, expireDC) in self.jobPool:
338 self.debug('stopping idle check job process %s',
339 job.avatarId)
340 self.jobPool.remove((job, expireDC))
341 job.mindCallRemote('stop')
342 expireDC = reactor.callLater(self._timeout, expire)
343 self.jobPool.append((job, expireDC))
344
345 if termtimeout.active():
346 termtimeout.cancel()
347 if killtimeout.active():
348 killtimeout.cancel()
349 return res
350
351
352
353 termtimeout = reactor.callLater(self._timeout, timeout,
354 signal.SIGTERM)
355 killtimeout = reactor.callLater(self._timeout, timeout,
356 signal.SIGKILL)
357
358 d = job.mindCallRemote('bootstrap', self.getWorkerName(),
359 None, None, None, None, bundles)
360 d.addCallback(callProc)
361 d.addCallbacks(haveResult, haveResult)
362 return d
363
364 d = self.getCheckJobFromPool()
365 d.addCallback(haveJob)
366
367 return d
368