1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects to handle worker clients
24 """
25
26 import os
27 import signal
28 import sys
29
30 from twisted.internet import defer, reactor
31
32 from flumotion.common import errors, log
33 from flumotion.common import messages
34 from flumotion.common.i18n import N_, gettexter
35 from flumotion.configure import configure
36 from flumotion.worker import base
37
38 __version__ = "$Rev: 7162 $"
39 T_ = gettexter()
40
41
43
45
46 def bootstrap(*args):
47 return self.mindCallRemote('bootstrap', *args)
48
49 def create(_, job):
50 self.debug("asking job to create component with avatarId %s,"
51 " type %s", job.avatarId, job.type)
52 return self.mindCallRemote('create', job.avatarId, job.type,
53 job.moduleName, job.methodName,
54 job.nice, job.conf)
55
56 def success(_, avatarId):
57 self.debug('job started component with avatarId %s',
58 avatarId)
59
60 self._heaven._startSet.createSuccess(avatarId)
61
62 def error(failure, job):
63 msg = log.getFailureMessage(failure)
64 if failure.check(errors.ComponentCreateError):
65 self.warning('could not create component %s of type %s:'
66 ' %s', job.avatarId, job.type, msg)
67 else:
68 self.warning('unhandled error creating component %s: %s',
69 job.avatarId, msg)
70
71 self._heaven._startSet.createFailed(job.avatarId, failure)
72
73 def gotPid(pid):
74 self.pid = pid
75 info = self._heaven.getManagerConnectionInfo()
76 if info.use_ssl:
77 transport = 'ssl'
78 else:
79 transport = 'tcp'
80 job = self._heaven.getJobInfo(pid)
81 workerName = self._heaven.getWorkerName()
82
83 d = bootstrap(workerName, info.host, info.port, transport,
84 info.authenticator, job.bundles)
85 d.addCallback(create, job)
86 d.addCallback(success, job.avatarId)
87 d.addErrback(error, job)
88 return d
89 d = self.mindCallRemote("getPid")
90 d.addCallback(gotPid)
91 return d
92
94 """
95 returns: a deferred marking completed stop.
96 """
97 if not self.mind:
98 self.debug('already logged out')
99 return defer.succeed(None)
100 else:
101 self.debug('stopping')
102 return self.mindCallRemote('stop')
103
104 - def sendFeed(self, feedName, fd, eaterId):
105 """
106 Tell the feeder to send the given feed to the given fd.
107
108 @returns: whether the fd was successfully handed off to the component.
109 """
110 self.debug('Sending FD %d to component job to feed %s to fd',
111 fd, feedName)
112
113
114
115
116
117 if self.mind:
118 message = "sendFeed %s %s" % (feedName, eaterId)
119 return self._sendFileDescriptor(fd, message)
120 else:
121 self.debug('my mind is gone, trigger disconnect')
122 return False
123
125 """
126 Tell the feeder to receive the given feed from the given fd.
127
128 @returns: whether the fd was successfully handed off to the component.
129 """
130 self.debug('Sending FD %d to component job to eat %s from fd',
131 fd, eaterAlias)
132
133
134 if self.mind:
135 message = "receiveFeed %s %s" % (eaterAlias, feedId)
136 return self._sendFileDescriptor(fd, message)
137 else:
138 self.debug('my mind is gone, trigger disconnect')
139 return False
140
142 """
143 This notification from the job process will be fired when it is
144 shutting down, so that although the process might still be
145 around, we know it's OK to accept new start requests for this
146 avatar ID.
147 """
148 self.info("component %s shutting down cleanly", self.avatarId)
149
150 self._heaven._startSet.shutdownStart(self.avatarId)
151
152
154 __slots__ = ('conf', )
155
156 - def __init__(self, pid, avatarId, type, moduleName, methodName,
157 nice, bundles, conf):
161
162
164 avatarClass = ComponentJobAvatar
165
167 """
168 Gets the L{flumotion.common.connection.PBConnectionInfo}
169 describing how to connect to the manager.
170
171 @rtype: L{flumotion.common.connection.PBConnectionInfo}
172 """
173 return self.brain.managerConnectionInfo
174
175 - def spawn(self, avatarId, type, moduleName, methodName, nice,
176 bundles, conf):
177 """
178 Spawn a new job.
179
180 This will spawn a new flumotion-job process, running under the
181 requested nice level. When the job logs in, it will be told to
182 load bundles and run a function, which is expected to return a
183 component.
184
185 @param avatarId: avatarId the component should use to log in
186 @type avatarId: str
187 @param type: type of component to start
188 @type type: str
189 @param moduleName: name of the module to create the component from
190 @type moduleName: str
191 @param methodName: the factory method to use to create the component
192 @type methodName: str
193 @param nice: nice level
194 @type nice: int
195 @param bundles: ordered list of (bundleName, bundlePath) for this
196 component
197 @type bundles: list of (str, str)
198 @param conf: component configuration
199 @type conf: dict
200 """
201 d = self._startSet.createStart(avatarId)
202
203 p = base.JobProcessProtocol(self, avatarId, self._startSet)
204 executable = os.path.join(configure.bindir, 'flumotion-job')
205 if not os.path.exists(executable):
206 self.error("Trying to spawn job process, but '%s' does not "
207 "exist", executable)
208 argv = [executable, avatarId, self._socketPath]
209
210 realexecutable = executable
211
212
213
214
215
216 if 'FLU_VALGRIND_JOB' in os.environ:
217 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',')
218 if avatarId in jobnames:
219 realexecutable = 'valgrind'
220
221
222
223 argv = ['valgrind', '--leak-check=full', '--num-callers=24',
224 '--leak-resolution=high', '--show-reachable=yes',
225 'python'] + argv
226
227 childFDs = {0: 0, 1: 1, 2: 2}
228 env = {}
229 env.update(os.environ)
230 env['FLU_DEBUG'] = log.getDebug()
231 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv,
232 childFDs=childFDs)
233
234 p.setPid(process.pid)
235
236 self.addJobInfo(process.pid,
237 ComponentJobInfo(process.pid, avatarId, type,
238 moduleName, methodName, nice,
239 bundles, conf))
240 return d
241
242
244
252
253 d = self.mindCallRemote("getPid")
254 d.addCallback(gotPid)
255 return d
256
263
265 self.debug("job is stopping")
266
267
269 avatarClass = CheckJobAvatar
270
271 _checkCount = 0
272 _timeout = 45
273
280
282 if self.jobPool:
283 job, expireDC = self.jobPool.pop(0)
284 expireDC.cancel()
285 self.debug('running check in already-running job %s',
286 job.avatarId)
287 return defer.succeed(job)
288
289 avatarId = 'check-%d' % (self._checkCount, )
290 self._checkCount += 1
291
292 self.debug('spawning new job %s to run a check', avatarId)
293 d = self._startSet.createStart(avatarId)
294
295 p = base.JobProcessProtocol(self, avatarId, self._startSet)
296 executable = os.path.join(configure.bindir, 'flumotion-job')
297 argv = [executable, avatarId, self._socketPath]
298
299 childFDs = {0: 0, 1: 1, 2: 2}
300 env = {}
301 env.update(os.environ)
302 env['FLU_DEBUG'] = log.getDebug()
303 process = reactor.spawnProcess(p, executable, env=env, args=argv,
304 childFDs=childFDs)
305
306 p.setPid(process.pid)
307 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None,
308 None, [])
309 self._jobInfos[process.pid] = jobInfo
310
311 def haveMind(_):
312
313 return self.avatars[avatarId]
314
315 d.addCallback(haveMind)
316 return d
317
318 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
325
326 def timeout(sig):
327 self.killJobByPid(job.pid, sig)
328
329 def haveResult(res):
330 if not termtimeout.active():
331 self.info("Discarding error %s", res)
332 res = messages.Result()
333 res.add(messages.Error(
334 T_(N_("Check timed out.")),
335 debug=("Timed out running %s."%methodName)))
336 else:
337
338 def expire():
339 if (job, expireDC) in self.jobPool:
340 self.debug('stopping idle check job process %s',
341 job.avatarId)
342 self.jobPool.remove((job, expireDC))
343 job.mindCallRemote('stop')
344 expireDC = reactor.callLater(self._timeout, expire)
345 self.jobPool.append((job, expireDC))
346
347 if termtimeout.active():
348 termtimeout.cancel()
349 if killtimeout.active():
350 killtimeout.cancel()
351 return res
352
353
354
355 termtimeout = reactor.callLater(self._timeout, timeout,
356 signal.SIGTERM)
357 killtimeout = reactor.callLater(self._timeout, timeout,
358 signal.SIGKILL)
359
360 d = job.mindCallRemote('bootstrap', self.getWorkerName(),
361 None, None, None, None, bundles)
362 d.addCallback(callProc)
363 d.addCallbacks(haveResult, haveResult)
364 return d
365
366 d = self.getCheckJobFromPool()
367 d.addCallback(haveJob)
368
369 return d
370