1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 worker-side objects to handle worker clients
20 """
21
22 import signal
23
24 from twisted.internet import reactor, error
25 from twisted.spread import flavors
26 from zope.interface import implements
27
28 from flumotion.common import errors, interfaces, debug
29 from flumotion.common import medium
30 from flumotion.common.vfs import listDirectory, registerVFSJelly
31 from flumotion.twisted.pb import ReconnectingFPBClientFactory
32
33 __version__ = "$Rev$"
34 JOB_SHUTDOWN_TIMEOUT = 5
35
36
38 """
39 I am a client factory for the worker to log in to the manager.
40 """
41 logCategory = 'worker'
42 perspectiveInterface = interfaces.IWorkerMedium
43
57
68
69
70
72
73
74
75 def remoteDisconnected(remoteReference):
76 if reactor.killed:
77 self.log('Connection to manager lost due to shutdown')
78 else:
79 self.warning('Lost connection to manager, '
80 'will attempt to reconnect')
81
82 def loginCallback(reference):
83 self.info("Logged in to manager")
84 self.debug("remote reference %r" % reference)
85
86 self.medium.setRemoteReference(reference)
87 reference.notifyOnDisconnect(remoteDisconnected)
88
89 def alreadyConnectedErrback(failure):
90 failure.trap(errors.AlreadyConnectedError)
91 self.warning('A worker with the name "%s" is already connected.' %
92 failure.value)
93
94 def accessDeniedErrback(failure):
95 failure.trap(errors.NotAuthenticatedError)
96 self.warning('Access denied.')
97
98 def connectionRefusedErrback(failure):
99 failure.trap(error.ConnectionRefusedError)
100 self.warning('Connection to %s:%d refused.' % (self._managerHost,
101 self._managerPort))
102
103 def NoSuchMethodErrback(failure):
104 failure.trap(flavors.NoSuchMethod)
105
106 if failure.value.find('remote_getKeycardClasses') > -1:
107 self.warning(
108 "Manager %s:%d is older than version 0.3.0. "
109 "Please upgrade." % (self._managerHost, self._managerPort))
110 return
111
112 return failure
113
114 def loginFailedErrback(failure):
115 self.warning('Login failed, reason: %s' % str(failure))
116
117 d.addCallback(loginCallback)
118 d.addErrback(accessDeniedErrback)
119 d.addErrback(connectionRefusedErrback)
120 d.addErrback(alreadyConnectedErrback)
121 d.addErrback(NoSuchMethodErrback)
122 d.addErrback(loginFailedErrback)
123
124
126 """
127 I am a medium interfacing with the manager-side WorkerAvatar.
128
129 @ivar brain: the worker brain
130 @type brain: L{worker.WorkerBrain}
131 @ivar factory: the worker client factory
132 @type factory: L{WorkerClientFactory}
133 """
134
135 logCategory = 'workermedium'
136
137 implements(interfaces.IWorkerMedium)
138
140 """
141 @type brain: L{worker.WorkerBrain}
142 """
143 self.brain = brain
144 self.factory = None
145 registerVFSJelly()
146
161
166
167
168
170 """
171 Gets the set of TCP ports that this worker is configured to use.
172
173 @rtype: 2-tuple: (list of int, bool)
174 @return: list of ports, and a boolean if we allocate ports
175 randomly
176 """
177 return self.brain.getPorts()
178
180 """
181 Return the TCP port the Feed Server is listening on.
182
183 @rtype: int, or NoneType
184 @return: TCP port number, or None if there is no feed server
185 """
186 return self.brain.getFeedServerPort()
187
188 - def remote_create(self, avatarId, type, moduleName, methodName,
189 nice, conf):
190 """
191 Start a component of the given type with the given nice level.
192 Will spawn a new job process to run the component in.
193
194 @param avatarId: avatar identification string
195 @type avatarId: str
196 @param type: type of the component to create
197 @type type: str
198 @param moduleName: name of the module to create the component from
199 @type moduleName: str
200 @param methodName: the factory method to use to create the component
201 @type methodName: str
202 @param nice: nice level
203 @type nice: int
204 @param conf: component config
205 @type conf: dict
206
207 @returns: a deferred fired when the process has started and created
208 the component
209 """
210 return self.brain.create(avatarId, type, moduleName, methodName,
211 nice, conf)
212
214 """
215 Checks if one or more GStreamer elements are present and can be
216 instantiated.
217
218 @param elementNames: names of the Gstreamer elements
219 @type elementNames: list of str
220
221 @rtype: list of str
222 @returns: a list of instantiatable element names
223 """
224 return self.brain.runCheck('flumotion.worker.checks.check',
225 'checkElements', elementNames)
226
228 """
229 Checks if the given module can be imported.
230
231 @param moduleName: name of the module to check
232 @type moduleName: str
233
234 @returns: None or Failure
235 """
236 return self.brain.runCheck(
237 'flumotion.worker.checks.check', 'checkImport',
238 moduleName)
239
241 """
242 Runs the given function in the given module with the given arguments.
243
244 @param module: module the function lives in
245 @type module: str
246 @param function: function to run
247 @type function: str
248
249 @returns: the return value of the given function in the module.
250 """
251 return self.brain.runCheck(module, function, *args, **kwargs)
252 remote_runFunction = remote_runCheck
253
255 """
256 I return a list of componentAvatarIds, I have. I am called by the
257 manager soon after I attach to it. This is needed on reconnects
258 so that the manager knows what components it needs to start on me.
259
260 @returns: a list of componentAvatarIds
261 """
262 return self.brain.getComponents()
263
265 """Kill one of the worker's jobs.
266
267 This method is intended for exceptional purposes only; a normal
268 component shutdown is performed by the manager via calling
269 remote_stop() on the component avatar.
270
271 Raises L{flumotion.common.errors.UnknownComponentError} if the
272 job is unknown.
273
274 @param avatarId: the avatar Id of the component, e.g.
275 '/default/audio-encoder'
276 @type avatarId: string
277 @param signum: Signal to send, optional. Defaults to SIGKILL.
278 @type signum: int
279 """
280 self.brain.killJob(avatarId, signum)
281
284
286 """List the directory called path.
287
288 Raises L{flumotion.common.errors.NotDirectoryError} if directoryName is
289 not a directory.
290
291 @param directoryName: the name of the directory to list
292 @type directoryName: string
293 @returns: the directory
294 @rtype: deferred that will fire an object implementing L{IDirectory}
295 """
296 return listDirectory(directoryName)
297