1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """worker-side objects to handle worker clients
19 """
20
21 import signal
22
23 from twisted.internet import defer, error, reactor
24 from zope.interface import implements
25
26 from flumotion.common import errors, interfaces, log
27 from flumotion.worker import medium, job, feedserver
28 from flumotion.twisted.defer import defer_call_later
29
30 __version__ = "$Rev$"
31
32
34 logCategory = "proxybouncer"
35
36 """
37 I am a bouncer that proxies authenticate calls to a remote FPB root
38 object.
39 """
40
42 """
43 @param remote: an object that has .callRemote()
44 """
45 self._remote = remote
46
48 """
49 Call me before asking me to authenticate, so I know what I can
50 authenticate.
51 """
52 return self._remote.callRemote('getKeycardClasses')
53
58
59
60
61
63 """
64 I am the main object in the worker process, managing jobs and everything
65 related.
66 I live in the main worker process.
67
68 @ivar authenticator: authenticator worker used to log in to manager
69 @type authenticator L{flumotion.twisted.pb.Authenticator}
70 @ivar medium:
71 @type medium: L{medium.WorkerMedium}
72 @ivar jobHeaven:
73 @type jobHeaven: L{job.ComponentJobHeaven}
74 @ivar checkHeaven:
75 @type checkHeaven: L{job.CheckJobHeaven}
76 @ivar workerClientFactory:
77 @type workerClientFactory: L{medium.WorkerClientFactory}
78 @ivar feedServerPort: TCP port the Feed Server is listening on
79 @type feedServerPort: int
80 """
81
82 implements(interfaces.IFeedServerParent)
83
84 logCategory = 'workerbrain'
85
87 """
88 @param options: the optparsed dictionary of command-line options
89 @type options: an object with attributes
90 """
91 self.options = options
92 self.workerName = options.name
93
94
95 if not self.options.randomFeederports:
96 self.ports = self.options.feederports[:-1]
97 else:
98 self.ports = []
99
100 self.medium = medium.WorkerMedium(self)
101
102
103 self.jobHeaven = job.ComponentJobHeaven(self)
104
105 self.checkHeaven = job.CheckJobHeaven(self)
106
107 self.managerConnectionInfo = None
108
109
110
111 self.feedServer = None
112
113 self.stopping = False
114 reactor.addSystemEventTrigger('before', 'shutdown',
115 self.shutdownHandler)
116 self._installHUPHandler()
117
119
120 def sighup(signum, frame):
121 if self._oldHUPHandler:
122 self.log('got SIGHUP, calling previous handler %r',
123 self._oldHUPHandler)
124 self._oldHUPHandler(signum, frame)
125 self.debug('telling kids about new log file descriptors')
126 self.jobHeaven.rotateChildLogFDs()
127
128 handler = signal.signal(signal.SIGHUP, sighup)
129 if handler == signal.SIG_DFL or handler == signal.SIG_IGN:
130 self._oldHUPHandler = None
131 else:
132 self._oldHUPHandler = handler
133
135 """
136 Start listening on FeedServer (incoming eater requests) and
137 JobServer (through which we communicate with our children) ports
138
139 @returns: True if we successfully listened on both ports
140 """
141
142 try:
143 self.feedServer = self._makeFeedServer()
144 except error.CannotListenError, e:
145 self.warning("Failed to listen on feed server port: %r", e)
146 return False
147
148 try:
149 self.jobHeaven.listen()
150 except error.CannotListenError, e:
151 self.warning("Failed to listen on job server port: %r", e)
152 return False
153
154 try:
155 self.checkHeaven.listen()
156 except error.CannotListenError, e:
157 self.warning("Failed to listen on check server port: %r", e)
158 return False
159
160 return True
161
163 """
164 @returns: L{flumotion.worker.feedserver.FeedServer}
165 """
166 port = None
167 if self.options.randomFeederports:
168 port = 0
169 elif not self.options.feederports:
170 self.info('Not starting feed server because no port is '
171 'configured')
172 return None
173 else:
174 port = self.options.feederports[-1]
175
176 return feedserver.FeedServer(self, ProxyBouncer(self), port)
177
178 - def login(self, managerConnectionInfo):
179 self.managerConnectionInfo = managerConnectionInfo
180 self.medium.startConnecting(managerConnectionInfo)
181
182 - def callRemote(self, methodName, *args, **kwargs):
184
186 if self.stopping:
187 self.warning("Already shutting down, ignoring shutdown request")
188 return
189
190 self.info("Reactor shutting down, stopping jobHeaven")
191 self.stopping = True
192
193 l = [self.jobHeaven.shutdown(), self.checkHeaven.shutdown()]
194 if self.feedServer:
195 l.append(self.feedServer.shutdown())
196
197 return defer_call_later(defer.DeferredList(l))
198
199
200
201 - def feedToFD(self, componentId, feedName, fd, eaterId):
202 """
203 Called from the FeedAvatar to pass a file descriptor on to
204 the job running the component for this feeder.
205
206 @returns: whether the fd was successfully handed off to the component.
207 """
208 if componentId not in self.jobHeaven.avatars:
209 self.warning("No such component %s running", componentId)
210 return False
211
212 avatar = self.jobHeaven.avatars[componentId]
213 return avatar.sendFeed(feedName, fd, eaterId)
214
215 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
216 """
217 Called from the FeedAvatar to pass a file descriptor on to
218 the job running the given component.
219
220 @returns: whether the fd was successfully handed off to the component.
221 """
222 if componentId not in self.jobHeaven.avatars:
223 self.warning("No such component %s running", componentId)
224 return False
225
226 avatar = self.jobHeaven.avatars[componentId]
227 return avatar.receiveFeed(eaterAlias, fd, feedId)
228
229
230
232 return self.ports, self.options.randomFeederports
233
235 if self.feedServer:
236 return self.feedServer.getPortNum()
237 else:
238 return None
239
240 - def create(self, avatarId, type, moduleName, methodName, nice,
241 conf):
253
254 def spawnJob(bundles):
255 return self.jobHeaven.spawn(avatarId, type, moduleName,
256 methodName, nice, bundles, conf)
257
258 def createError(failure):
259 failure.trap(errors.ComponentCreateError)
260 self.debug('create deferred for %s failed, forwarding error',
261 avatarId)
262 return failure
263
264 def success(res):
265 self.debug('create deferred for %s succeeded (%r)',
266 avatarId, res)
267 return res
268
269 self.info('Starting component "%s" of type "%s"', avatarId,
270 type)
271 d = getBundles()
272 d.addCallback(spawnJob)
273 d.addCallback(success)
274 d.addErrback(createError)
275 return d
276
277 - def runCheck(self, module, function, *args, **kwargs):
282
283 def runCheck(bundles):
284 return self.checkHeaven.runCheck(bundles, module, function,
285 *args, **kwargs)
286
287 d = getBundles()
288 d.addCallback(runCheck)
289 return d
290
293
294 - def killJob(self, avatarId, signum):
296