1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import os
19 import random
20 import socket
21 import string
22 import time
23 from urllib2 import urlparse
24
25 from twisted.cred import portal
26 from twisted.internet import protocol, reactor, error, defer
27 from twisted.spread import pb
28 from zope.interface import implements
29
30 from flumotion.common import medium, log, messages, errors
31 from flumotion.common.i18n import N_, gettexter
32 from flumotion.component import component
33 from flumotion.component.component import moods
34 from flumotion.twisted import fdserver, checkers
35 from flumotion.twisted import reflect
36
37 __version__ = "$Rev$"
38 T_ = gettexter()
39
40
42 """
43 An Avatar in the porter representing a streamer
44 """
45
46 - def __init__(self, avatarId, porter, mind):
53
55 return self.mind != None
56
58 self.debug("porter client %s logging out", self.avatarId)
59 self.mind = None
60
64
68
72
76
78 return self.porter._iptablesPort
79
80
82 """
83 A Realm within the Porter that creates Avatars for streamers logging into
84 the porter.
85 """
86 implements(portal.IRealm)
87
89 """
90 @param porter: The porter that avatars created from here should use.
91 @type porter: L{Porter}
92 """
93 self.porter = porter
94
103
104
106
108 """
109 Return the location, login username/password, and listening port
110 and interface for the porter as a tuple (path, username,
111 password, port, interface, external-interface).
112 """
113 return (self.comp._socketPath, self.comp._username,
114 self.comp._password, self.comp._iptablesPort,
115 self.comp._interface, self.comp._external_interface)
116
117
118 -class Porter(component.BaseComponent, log.Loggable):
119 """
120 The porter optionally sits in front of a set of streamer components.
121 The porter is what actually deals with incoming connections on a socket.
122 It decides which streamer to direct the connection to, then passes the FD
123 (along with some amount of already-read data) to the appropriate streamer.
124 """
125
126 componentMediumClass = PorterMedium
127
129
130
131 self._mappings = {}
132 self._prefixes = {}
133
134 self._socketlistener = None
135
136 self._socketPath = None
137 self._username = None
138 self._password = None
139 self._port = None
140 self._iptablesPort = None
141 self._porterProtocol = None
142
143 self._interface = ''
144 self._external_interface = ''
145
147 """
148 Register a path as being served by a streamer represented by this
149 avatar. Will remove any previous registration at this path.
150
151 @param path: The path to register
152 @type path: str
153 @param avatar: The avatar representing the streamer to direct this path
154 to
155 @type avatar: L{PorterAvatar}
156 """
157 self.debug("Registering porter path \"%s\" to %r" % (path, avatar))
158 if path in self._mappings:
159 self.warning("Replacing existing mapping for path \"%s\"" % path)
160
161 self._mappings[path] = avatar
162
164 """
165 Attempt to deregister the given path. A deregistration will only be
166 accepted if the mapping is to the avatar passed.
167
168 @param path: The path to deregister
169 @type path: str
170 @param avatar: The avatar representing the streamer being deregistered
171 @type avatar: L{PorterAvatar}
172 """
173 if path in self._mappings:
174 if self._mappings[path] == avatar:
175 self.debug("Removing porter mapping for \"%s\"" % path)
176 del self._mappings[path]
177 else:
178 self.warning(
179 "Mapping not removed: refers to a different avatar")
180 else:
181 self.warning("Mapping not removed: no mapping found")
182
184 """
185 Register a destination for all requests directed to anything beginning
186 with a specified prefix. Where there are multiple matching prefixes,
187 the longest is selected.
188
189 @param avatar: The avatar being registered
190 @type avatar: L{PorterAvatar}
191 """
192
193 self.debug("Setting prefix \"%s\" for porter", prefix)
194 if prefix in self._prefixes:
195 self.warning("Overwriting prefix")
196
197 self._prefixes[prefix] = avatar
198
200 """
201 Attempt to deregister a default destination for all requests not
202 directed to a specifically-mapped path. This will only succeed if the
203 default is currently equal to this avatar.
204
205 @param avatar: The avatar being deregistered
206 @type avatar: L{PorterAvatar}
207 """
208 if prefix not in self._prefixes:
209 self.warning("Mapping not removed: no mapping found")
210 return
211
212 if self._prefixes[prefix] == avatar:
213 self.debug("Removing prefix destination from porter")
214 del self._prefixes[prefix]
215 else:
216 self.warning(
217 "Not removing prefix destination: expected avatar not found")
218
220 found = None
221
222 for prefix in self._prefixes.keys():
223 self.log("Checking: %r, %r" % (prefix, path))
224 if (path.startswith(prefix) and
225 (not found or len(found) < len(prefix))):
226 found = prefix
227 if found:
228 return self._prefixes[found]
229 else:
230 return None
231
233 """
234 Find a destination Avatar for this path.
235 @returns: The Avatar for this mapping, or None.
236 """
237
238 if path in self._mappings:
239 return self._mappings[path]
240 else:
241 return self.findPrefixMatch(path)
242
244 """
245 Generate a socket pathname in an appropriate location
246 """
247
248
249 import tempfile
250 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.')
251 os.close(fd)
252
253 return name
254
256 """
257 Generate a random US-ASCII string of length numchars
258 """
259 string = ""
260 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
261 for _ in range(numchars):
262 string += chars[random.randint(0, len(chars) - 1)]
263
264 return string
265
267 props = self.config['properties']
268
269 self.fixRenamedProperties(props,
270 [('socket_path', 'socket-path')])
271
272
273
274
275
276 if 'socket-path' in props:
277
278 self._socketPath = props['socket-path']
279 self._username = props['username']
280 self._password = props['password']
281 else:
282
283
284 self._username = self.generateRandomString(12)
285 self._password = self.generateRandomString(12)
286 self._socketPath = self.generateSocketPath()
287
288 self._requirePassword = props.get('require-password', True)
289 self._socketMode = props.get('socket-mode', 0666)
290 self._port = int(props['port'])
291 self._iptablesPort = int(props.get('iptables-port', self._port))
292 self._porterProtocol = props.get('protocol',
293 'flumotion.component.misc.porter.porter.HTTPPorterProtocol')
294 self._interface = props.get('interface', '')
295
296
297 self._external_interface = props.get('external-interface',
298 self._interface)
299
301 d = None
302 if self._socketlistener:
303
304
305 d = self._socketlistener.stopListening()
306 self._socketlistener = None
307 return d
308
310
311 self.have_properties()
312 realm = PorterRealm(self)
313 checker = checkers.FlexibleCredentialsChecker()
314 checker.addUser(self._username, self._password)
315 if not self._requirePassword:
316 checker.allowPasswordless(True)
317
318 p = portal.Portal(realm, [checker])
319 serverfactory = pb.PBServerFactory(p)
320
321 try:
322
323
324
325 try:
326 os.unlink(self._socketPath)
327 except OSError:
328 pass
329
330 self._socketlistener = reactor.listenWith(
331 fdserver.FDPort, self._socketPath,
332 serverfactory, mode=self._socketMode)
333 self.info("Now listening on socketPath %s", self._socketPath)
334 except error.CannotListenError:
335 self.warning("Failed to create socket %s" % self._socketPath)
336 m = messages.Error(T_(N_(
337 "Network error: socket path %s is not available."),
338 self._socketPath))
339 self.addMessage(m)
340 self.setMood(moods.sad)
341 return defer.fail(errors.ComponentSetupHandledError())
342
343
344
345 try:
346 proto = reflect.namedAny(self._porterProtocol)
347 self.debug("Created proto %r" % proto)
348 except (ImportError, AttributeError):
349 self.warning("Failed to import protocol '%s', defaulting to HTTP" %
350 self._porterProtocol)
351 proto = HTTPPorterProtocol
352
353
354
355 factory = PorterProtocolFactory(self, proto)
356 try:
357 reactor.listenWith(
358 fdserver.PassableServerPort, self._port, factory,
359 interface=self._interface)
360 self.info("Now listening on interface %r on port %d",
361 self._interface, self._port)
362 except error.CannotListenError:
363 self.warning("Failed to listen on interface %r on port %d",
364 self._interface, self._port)
365 m = messages.Error(T_(N_(
366 "Network error: TCP port %d is not available."), self._port))
367 self.addMessage(m)
368 self.setMood(moods.sad)
369 return defer.fail(errors.ComponentSetupHandledError())
370
371
382
383
385 """
386 The base porter is capable of accepting HTTP-like protocols (including
387 RTSP) - it reads the first line of a request, and makes the decision
388 solely on that.
389
390 We can't guarantee that we read precisely a line, so the buffer we
391 accumulate will actually be larger than what we actually parse.
392
393 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line
394 @cvar delimiters: a list of valid line delimiters I check for
395 """
396
397 logCategory = 'porterprotocol'
398
399
400 MAX_SIZE = 4096
401
402
403
404 PORTER_CLIENT_TIMEOUT = 30
405
406
407
408
409
410 delimiters = ['\r\n', '\n', '\r']
411
419
428
434
436 if self._timeoutDC:
437 self._timeoutDC.cancel()
438 self._timeoutDC = None
439
441 self._buffer = self._buffer + data
442 self.log("Got data, buffer now \"%s\"" % self._buffer)
443
444
445 for delim in self.delimiters:
446 try:
447 line, remaining = self._buffer.split(delim, 1)
448 break
449 except ValueError:
450
451 pass
452 else:
453
454 self.log("No valid delimiter found")
455 if len(self._buffer) > self.MAX_SIZE:
456
457
458 self.debug("[fd %5d] (ts %f) (request-id %r) dropping, "
459 "buffer exceeded",
460 self.transport.fileno(), time.time(),
461 self.requestId)
462
463 return self.transport.loseConnection()
464 else:
465
466
467 return
468
469
470
471 parsed = self.parseLine(line)
472 if not parsed:
473 self.log("Couldn't parse the first line")
474 return self.transport.loseConnection()
475
476 identifier = self.extractIdentifier(parsed)
477 if not identifier:
478 self.log("Couldn't find identifier in first line")
479 return self.transport.loseConnection()
480
481 if self.requestId:
482 self.log("Injecting request-id %r", self.requestId)
483 parsed = self.injectRequestId(parsed, self.requestId)
484
485
486
487
488 self._buffer = delim.join((self.unparseLine(parsed), remaining))
489
490
491 self.debug("[fd %5d] (ts %f) (request-id %r) identifier %s",
492 self.transport.fileno(), time.time(), self.requestId,
493 identifier)
494
495
496
497 destinationAvatar = self._porter.findDestination(identifier)
498
499 if not destinationAvatar or not destinationAvatar.isAttached():
500 if destinationAvatar:
501 self.debug("There was an avatar, but it logged out?")
502
503
504 self.debug(
505 "[fd %5d] (ts %f) (request-id %r) no destination avatar found",
506 self.transport.fileno(), time.time(), self.requestId)
507
508 self.writeNotFoundResponse()
509 return self.transport.loseConnection()
510
511
512
513
514
515
516
517
518 self.debug("[fd %5d] (ts %f) (request-id %r) send fd to avatarId %s",
519 self.transport.fileno(), time.time(), self.requestId,
520 destinationAvatar.avatarId)
521
522
523
524 try:
525 destinationAvatar.mind.broker.transport.sendFileDescriptor(
526 self.transport.fileno(), self._buffer)
527 except OSError, e:
528 self.warning("[fd %5d] failed to send FD: %s",
529 self.transport.fileno(), log.getExceptionMessage(e))
530 self.writeServiceUnavailableResponse()
531 return self.transport.loseConnection()
532
533
534 self.debug("[fd %5d] (ts %f) (request-id %r) sent fd to avatarId %s",
535 self.transport.fileno(), time.time(), self.requestId,
536 destinationAvatar.avatarId)
537
538
539
540
541
542 self.transport.keepSocketAlive = True
543 self.transport.loseConnection()
544
546 """
547 Parse the initial line of the request. Return an object that can be
548 used to uniquely identify the stream being requested by passing it to
549 extractIdentifier, or None if the request is unreadable.
550
551 Subclasses should override this.
552 """
553 raise NotImplementedError
554
556 """
557 Recreate the initial request line from the parsed representation. The
558 recreated line does not need to be exactly identical, but both
559 parsedLine(unparseLine(line)) and line should contain the same
560 information (i.e. unparseLine should not lose information).
561
562 UnparseLine has to return a valid line from the porter protocol's
563 scheme point of view (for instance, HTTP).
564
565 Subclasses should override this.
566 """
567 raise NotImplementedError
568
570 """
571 Extract a string that uniquely identifies the requested stream from the
572 parsed representation of the first request line.
573
574 Subclasses should override this, depending on how they implemented
575 parseLine.
576 """
577 raise NotImplementedError
578
580 """
581 Return a string that will uniquely identify the request.
582
583 Subclasses should override this if they want to use request-ids and
584 also implement injectRequestId.
585 """
586 raise NotImplementedError
587
589 """
590 Take the parsed representation of the first request line and a string
591 token, return a parsed representation of the request line with the
592 request-id possibly mixed into it.
593
594 Subclasses should override this if they generate request-ids.
595 """
596
597 return parsed
598
600 """
601 Write a response indicating that the requested resource was not found
602 in this protocol.
603
604 Subclasses should override this to use the correct protocol.
605 """
606 raise NotImplementedError
607
609 """
610 Write a response indicating that the requested resource was
611 temporarily uavailable in this protocol.
612
613 Subclasses should override this to use the correct protocol.
614 """
615 raise NotImplementedError
616
617
619 scheme = 'http'
620 protos = ["HTTP/1.0", "HTTP/1.1"]
621 requestIdParameter = 'FLUREQID'
622 requestIdBitsNo = 256
623
625 try:
626 (method, location, proto) = map(string.strip, line.split(' ', 2))
627
628 if proto not in self.protos:
629 return None
630
631
632 parsed_url = urlparse.urlparse(location)
633
634 return method, parsed_url, proto
635
636 except ValueError:
637 return None
638
640 method, parsed_url, proto = parsed
641 return ' '.join((method, urlparse.urlunparse(parsed_url), proto))
642
648
650 method, parsed_url, proto = parsed
651
652 sep = ''
653 if parsed_url[4] != '':
654 sep = '&'
655 query_string = ''.join((parsed_url[4],
656 sep, self.requestIdParameter, '=',
657 requestId))
658 parsed_url = (parsed_url[:4] +
659 (query_string, )
660 + parsed_url[5:])
661 return method, parsed_url, proto
662
664 method, parsed_url, proto = parsed
665
666 return parsed_url[2]
667
669 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
670
672 self.transport.write("HTTP/1.0 503 Service Unavailable\r\n\r\n"
673 "Service temporarily unavailable")
674
675
677 scheme = 'rtsp'
678 protos = ["RTSP/1.0"]
679
681 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
682
684 self.transport.write("RTSP/1.0 503 Service Unavailable\r\n\r\n"
685 "Service temporarily unavailable")
686