1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """base classes for PB client-side mediums.
19 """
20
21 from twisted.spread import pb
22 from twisted.internet import defer
23 from zope.interface import implements
24
25 from flumotion.common import log, interfaces, bundleclient, errors, netutils
26 from flumotion.configure import configure
27 from flumotion.twisted import pb as fpb
28 from flumotion.twisted.compat import reactor
29
30
31 from flumotion.common import messages
32
33 __version__ = "$Rev$"
34
35
37 """
38 I am a base interface for PB clients interfacing with PB server-side
39 avatars.
40 Used by admin/worker/component to talk to manager's vishnu,
41 and by job to talk to worker's brain.
42
43 @ivar remote: a remote reference to the server-side object on
44 which perspective_(methodName) methods can be called
45 @type remote: L{twisted.spread.pb.RemoteReference}
46 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader}
47 """
48
49
50
51 implements(interfaces.IMedium)
52 logCategory = "basemedium"
53 remoteLogName = "baseavatar"
54
55 remote = None
56 bundleLoader = None
57
59 """
60 Set the given remoteReference as the reference to the server-side
61 avatar.
62
63 @param remoteReference: L{twisted.spread.pb.RemoteReference}
64 """
65 self.debug('%r.setRemoteReference: %r' % (self, remoteReference))
66 self.remote = remoteReference
67
68 def nullRemote(x):
69 self.debug('%r: disconnected from %r' % (self, self.remote))
70 self.remote = None
71 self.remote.notifyOnDisconnect(nullRemote)
72
73 self.bundleLoader = bundleclient.BundleLoader(self.callRemote)
74
75
76 tarzan = None
77 jane = None
78 try:
79 transport = remoteReference.broker.transport
80 tarzan = transport.getHost()
81 jane = transport.getPeer()
82 except Exception, e:
83 self.debug("could not get connection info, reason %r" % e)
84 if tarzan and jane:
85 self.debug("connection is from me on %s to remote on %s" % (
86 netutils.addressGetHost(tarzan),
87 netutils.addressGetHost(jane)))
88
90 """
91 Does the medium have a remote reference to a server-side avatar ?
92 """
93 return self.remote != None
94
96 """
97 Call the given method with the given arguments remotely on the
98 server-side avatar.
99
100 Gets serialized to server-side perspective_ methods.
101
102 @param level: the level we should log at (log.DEBUG, log.INFO, etc)
103 @type level: int
104 @param stackDepth: the number of stack frames to go back to get
105 file and line information, negative or zero.
106 @type stackDepth: non-positive int
107 @param name: name of the remote method
108 @type name: str
109 """
110 if level is not None:
111 debugClass = str(self.__class__).split(".")[-1].upper()
112 startArgs = [self.remoteLogName, debugClass, name]
113 formatString, debugArgs = log.getFormatArgs(
114 '%s --> %s: callRemote(%s, ', startArgs,
115 ')', (), args, kwargs)
116 logKwArgs = self.doLog(level, stackDepth - 1,
117 formatString, *debugArgs)
118
119 if not self.remote:
120 self.warning('Tried to callRemote(%s), but we are disconnected'
121 % name)
122 return defer.fail(errors.NotConnectedError())
123
124 def callback(result):
125 formatString, debugArgs = log.getFormatArgs(
126 '%s <-- %s: callRemote(%s, ', startArgs,
127 '): %s', (log.ellipsize(result), ), args, kwargs)
128 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs)
129 return result
130
131 def errback(failure):
132 formatString, debugArgs = log.getFormatArgs(
133 '%s <-- %s: callRemote(%s, ', startArgs,
134 '): %r', (failure, ), args, kwargs)
135 self.doLog(level, -1, formatString, *debugArgs, **logKwArgs)
136 return failure
137
138 d = self.remote.callRemote(name, *args, **kwargs)
139 if level is not None:
140 d.addCallbacks(callback, errback)
141 return d
142
144 """
145 Call the given method with the given arguments remotely on the
146 server-side avatar.
147
148 Gets serialized to server-side perspective_ methods.
149 """
150 return self.callRemoteLogging(log.DEBUG, -1, name, *args,
151 **kwargs)
152
154 """
155 Returns the given function in the given module, loading the
156 module from a bundle.
157
158 If we can't find the bundle for the given module, or if the
159 given module does not contain the requested function, we will
160 raise L{flumotion.common.errors.RemoteRunError} (perhaps a
161 poorly chosen error). If importing the module raises an
162 exception, that exception will be passed through unmodified.
163
164 @param module: module the function lives in
165 @type module: str
166 @param function: function to run
167 @type function: str
168
169 @returns: a callable, the given function in the given module.
170 """
171
172 def gotModule(mod):
173 if hasattr(mod, function):
174 return getattr(mod, function)
175 else:
176 msg = 'No procedure named %s in module %s' % (function,
177 module)
178 self.warning('%s', msg)
179 raise errors.RemoteRunError(msg)
180
181 def gotModuleError(failure):
182 failure.trap(errors.NoBundleError)
183 msg = 'Failed to find bundle for module %s' % module
184 self.warning('%s', msg)
185 raise errors.RemoteRunError(msg)
186
187 d = self.bundleLoader.loadModule(module)
188 d.addCallbacks(gotModule, gotModuleError)
189 return d
190
192 """
193 Runs the given function in the given module with the given
194 arguments.
195
196 This method calls getBundledFunction and then invokes the
197 function. Any error raised by getBundledFunction or by invoking
198 the function will be passed through unmodified.
199
200 Callers that expect to return their result over a PB connection
201 should catch nonserializable exceptions so as to prevent nasty
202 backtraces in the logs.
203
204 @param module: module the function lives in
205 @type module: str
206 @param function: function to run
207 @type function: str
208
209 @returns: the return value of the given function in the module.
210 """
211 self.debug('runBundledFunction(%r, %r)', module, function)
212
213 def gotFunction(proc):
214
215 def invocationError(failure):
216 self.warning('Exception raised while calling '
217 '%s.%s(*args=%r, **kwargs=%r): %s',
218 module, function, args, kwargs,
219 log.getFailureMessage(failure))
220 return failure
221
222 self.debug('calling %s.%s(%r, %r)', module, function, args,
223 kwargs)
224 d = defer.maybeDeferred(proc, *args, **kwargs)
225 d.addErrback(invocationError)
226 return d
227
228 d = self.getBundledFunction(module, function)
229 d.addCallback(gotFunction)
230 return d
231
232
265
266 if self.remote:
267 self.log('pinging')
268 d = self.callRemoteLogging(log.LOG, 0, 'ping')
269 d.addCallbacks(pingback, pingFailed)
270 else:
271 self.info('tried to ping, but disconnected yo')
272
273 self._pingDC = self._clock.callLater(self._pingInterval,
274 self._ping)
275
280
288 d.addCallback(cb)
289 return d
290
302
311
315
324 self.remote.notifyOnDisconnect(stopPingingCb)
325
326 self.startPinging(self._disconnect)
327
329 """
330 Sets a marker that will be prefixed to the log strings. Setting this
331 marker to multiple elements at a time helps debugging.
332 @param marker: A string to prefix all the log strings.
333 @type marker: str
334 """
335 self.writeMarker(marker, level)
336