1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 implementation of a PB Client to interface with feedserver.py
20 """
21
22 import socket
23 import os
24
25 from twisted.internet import reactor, main, defer, tcp
26 from twisted.python import failure
27 from zope.interface import implements
28
29 from flumotion.common import log, common, interfaces
30 from flumotion.twisted import pb as fpb
31
32 __version__ = "$Rev$"
33
34
35
36
37
54
55
58
59
69
70
82
83
84
85
87 """
88 I am a client for a Feed Server.
89
90 I am used as the remote interface between a component and another
91 component.
92
93 @ivar component: the component this is a feed client for
94 @type component: L{flumotion.component.feedcomponent.FeedComponent}
95 @ivar remote: a reference to a
96 L{flumotion.worker.feedserver.FeedAvatar}
97 @type remote: L{twisted.spread.pb.RemoteReference}
98 """
99 logCategory = 'feedmedium'
100 remoteLogName = 'feedserver'
101 implements(interfaces.IFeedMedium)
102
103 remote = None
104
111
112 - def startConnecting(self, host, port, authenticator, timeout=30,
113 bindAddress=None):
114 """Optional helper method to connect to a remote feed server.
115
116 This method starts a client factory connecting via a
117 L{PassableClientConnector}. It offers the possibility of
118 cancelling an in-progress connection via the stopConnecting()
119 method.
120
121 @param host: the remote host name
122 @type host: str
123 @param port: the tcp port on which to connect
124 @param port int
125 @param authenticator: the authenticator, normally provided by
126 the worker
127 @type authenticator: L{flumotion.twisted.pb.Authenticator}
128
129 @returns: a deferred that will fire with the remote reference,
130 once we have authenticated.
131 """
132 assert self._factory is None
133 self._factory = FeedClientFactory(self)
134 reactor.connectWith(PassableClientConnector, host, port,
135 self._factory, timeout, bindAddress)
136 return self._factory.login(authenticator)
137
138 - def requestFeed(self, host, port, authenticator, fullFeedId):
139 """Request a feed from a remote feed server.
140
141 This helper method calls startConnecting() to make the
142 connection and authenticate, and will return the feed file
143 descriptor or an error. A pending connection attempt can be
144 cancelled via stopConnecting().
145
146 @param host: the remote host name
147 @type host: str
148 @param port: the tcp port on which to connect
149 @type port: int
150 @param authenticator: the authenticator, normally provided by
151 the worker
152 @type authenticator: L{flumotion.twisted.pb.Authenticator}
153 @param fullFeedId: the full feed id (/flow/component:feed)
154 offered by the remote side
155 @type fullFeedId: str
156
157 @returns: a deferred that, if successful, will fire with a pair
158 (feedId, fd). In an error case it will errback and close the
159 remote connection.
160 """
161
162 def connected(remote):
163 self.setRemoteReference(remote)
164 return remote.callRemote('sendFeed', fullFeedId)
165
166 def feedSent(res):
167
168
169
170
171 return self._feedToDeferred
172
173 def error(failure):
174 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
175 host, port)
176 self.debug('failure: %s', log.getFailureMessage(failure))
177 self.debug('closing connection')
178 self.stopConnecting()
179 return failure
180
181 d = self.startConnecting(host, port, authenticator)
182 d.addCallback(connected)
183 d.addCallback(feedSent)
184 d.addErrback(error)
185 return d
186
187 - def sendFeed(self, host, port, authenticator, fullFeedId):
188 """Send a feed to a remote feed server.
189
190 This helper method calls startConnecting() to make the
191 connection and authenticate, and will return the feed file
192 descriptor or an error. A pending connection attempt can be
193 cancelled via stopConnecting().
194
195 @param host: the remote host name
196 @type host: str
197 @param port: the tcp port on which to connect
198 @type port: int
199 @param authenticator: the authenticator, normally provided by
200 the worker
201 @type authenticator: L{flumotion.twisted.pb.Authenticator}
202 @param fullFeedId: the full feed id (/flow/component:eaterAlias)
203 to feed to on the remote size
204 @type fullFeedId: str
205
206 @returns: a deferred that, if successful, will fire with a pair
207 (feedId, fd). In an error case it will errback and close the
208 remote connection.
209 """
210
211 def connected(remote):
212 assert isinstance(remote.broker.transport, _SocketMaybeCloser)
213 self.setRemoteReference(remote)
214 return remote.callRemote('receiveFeed', fullFeedId)
215
216 def feedSent(res):
217 t = self.remote.broker.transport
218 self.debug('stop reading from transport')
219 t.stopReading()
220
221 self.debug('flushing PB write queue')
222 t.doWrite()
223 self.debug('stop writing to transport')
224 t.stopWriting()
225
226 t.keepSocketAlive = True
227 fd = os.dup(t.fileno())
228
229
230 self.setRemoteReference(None)
231
232 d = defer.Deferred()
233
234 def loseConnection():
235 t.connectionLost(failure.Failure(main.CONNECTION_DONE))
236 d.callback((fullFeedId, fd))
237
238 reactor.callLater(0, loseConnection)
239 return d
240
241 def error(failure):
242 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
243 host, port)
244 self.debug('failure: %s', log.getFailureMessage(failure))
245 self.debug('closing connection')
246 self.stopConnecting()
247 return failure
248
249 d = self.startConnecting(host, port, authenticator)
250 d.addCallback(connected)
251 d.addCallback(feedSent)
252 d.addErrback(error)
253 return d
254
256 """Stop a pending or established connection made via
257 startConnecting().
258
259 Stops any established or pending connection to a remote feed
260 server started via the startConnecting() method. Safe to call
261 even if connection has not been started.
262 """
263 if self._factory:
264 self._factory.disconnect()
265 self._factory = None
266
267
268 self.setRemoteReference(None)
269
270
271
273 self.remote = remoteReference
274
276 return self.remote is not None
277
280
282 t = self.remote.broker.transport
283
284 self.debug('stop reading from transport')
285 t.stopReading()
286 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
287
314