Package flumotion :: Package component :: Package misc :: Package porter :: Module porter
[hide private]

Source Code for Module flumotion.component.misc.porter.porter

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_porter -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import os 
 19  import random 
 20  import socket 
 21  import string 
 22  import time 
 23  from urllib2 import urlparse 
 24   
 25  from twisted.cred import portal 
 26  from twisted.internet import protocol, reactor, error, defer 
 27  from twisted.spread import pb 
 28  from zope.interface import implements 
 29   
 30  from flumotion.common import medium, log, messages, errors 
 31  from flumotion.common.i18n import N_, gettexter 
 32  from flumotion.component import component 
 33  from flumotion.component.component import moods 
 34  from flumotion.twisted import fdserver, checkers 
 35  from flumotion.twisted import reflect 
 36   
 37  __version__ = "$Rev$" 
 38  T_ = gettexter() 
 39   
 40   
41 -class PorterAvatar(pb.Avatar, log.Loggable):
42 """ 43 An Avatar in the porter representing a streamer 44 """ 45
46 - def __init__(self, avatarId, porter, mind):
47 self.avatarId = avatarId 48 self.porter = porter 49 50 # The underlying transport is now accessible as 51 # self.mind.broker.transport, on which we can call sendFileDescriptor 52 self.mind = mind
53
54 - def isAttached(self):
55 return self.mind != None
56
57 - def logout(self):
58 self.debug("porter client %s logging out", self.avatarId) 59 self.mind = None
60
61 - def perspective_registerPath(self, path):
62 self.log("Perspective called: registering path \"%s\"" % path) 63 self.porter.registerPath(path, self)
64
65 - def perspective_deregisterPath(self, path):
66 self.log("Perspective called: deregistering path \"%s\"" % path) 67 self.porter.deregisterPath(path, self)
68
69 - def perspective_registerPrefix(self, prefix):
70 self.log("Perspective called: registering default") 71 self.porter.registerPrefix(prefix, self)
72
73 - def perspective_deregisterPrefix(self, prefix):
74 self.log("Perspective called: deregistering default") 75 self.porter.deregisterPrefix(prefix, self)
76
77 - def perspective_getPort(self):
78 return self.porter._iptablesPort
79 80
81 -class PorterRealm(log.Loggable):
82 """ 83 A Realm within the Porter that creates Avatars for streamers logging into 84 the porter. 85 """ 86 implements(portal.IRealm) 87
88 - def __init__(self, porter):
89 """ 90 @param porter: The porter that avatars created from here should use. 91 @type porter: L{Porter} 92 """ 93 self.porter = porter
94
95 - def requestAvatar(self, avatarId, mind, *interfaces):
96 self.log("Avatar requested for avatarId %s, mind %r, interfaces %r", 97 avatarId, mind, interfaces) 98 if pb.IPerspective in interfaces: 99 avatar = PorterAvatar(avatarId, self.porter, mind) 100 return pb.IPerspective, avatar, avatar.logout 101 else: 102 raise NotImplementedError("no interface")
103 104
105 -class PorterMedium(component.BaseComponentMedium):
106
107 - def remote_getPorterDetails(self):
108 """ 109 Return the location, login username/password, and listening port 110 and interface for the porter as a tuple (path, username, 111 password, port, interface, external-interface). 112 """ 113 return (self.comp._socketPath, self.comp._username, 114 self.comp._password, self.comp._iptablesPort, 115 self.comp._interface, self.comp._external_interface)
116 117
118 -class Porter(component.BaseComponent, log.Loggable):
119 """ 120 The porter optionally sits in front of a set of streamer components. 121 The porter is what actually deals with incoming connections on a socket. 122 It decides which streamer to direct the connection to, then passes the FD 123 (along with some amount of already-read data) to the appropriate streamer. 124 """ 125 126 componentMediumClass = PorterMedium 127
128 - def init(self):
129 # We maintain a map of path -> avatar (the underlying transport is 130 # accessible from the avatar, we need this for FD-passing) 131 self._mappings = {} 132 self._prefixes = {} 133 134 self._socketlistener = None 135 136 self._socketPath = None 137 self._username = None 138 self._password = None 139 self._port = None 140 self._iptablesPort = None 141 self._porterProtocol = None 142 143 self._interface = '' 144 self._external_interface = ''
145
146 - def registerPath(self, path, avatar):
147 """ 148 Register a path as being served by a streamer represented by this 149 avatar. Will remove any previous registration at this path. 150 151 @param path: The path to register 152 @type path: str 153 @param avatar: The avatar representing the streamer to direct this path 154 to 155 @type avatar: L{PorterAvatar} 156 """ 157 self.debug("Registering porter path \"%s\" to %r" % (path, avatar)) 158 if path in self._mappings: 159 self.warning("Replacing existing mapping for path \"%s\"" % path) 160 161 self._mappings[path] = avatar
162
163 - def deregisterPath(self, path, avatar):
164 """ 165 Attempt to deregister the given path. A deregistration will only be 166 accepted if the mapping is to the avatar passed. 167 168 @param path: The path to deregister 169 @type path: str 170 @param avatar: The avatar representing the streamer being deregistered 171 @type avatar: L{PorterAvatar} 172 """ 173 if path in self._mappings: 174 if self._mappings[path] == avatar: 175 self.debug("Removing porter mapping for \"%s\"" % path) 176 del self._mappings[path] 177 else: 178 self.warning( 179 "Mapping not removed: refers to a different avatar") 180 else: 181 self.warning("Mapping not removed: no mapping found")
182
183 - def registerPrefix(self, prefix, avatar):
184 """ 185 Register a destination for all requests directed to anything beginning 186 with a specified prefix. Where there are multiple matching prefixes, 187 the longest is selected. 188 189 @param avatar: The avatar being registered 190 @type avatar: L{PorterAvatar} 191 """ 192 193 self.debug("Setting prefix \"%s\" for porter", prefix) 194 if prefix in self._prefixes: 195 self.warning("Overwriting prefix") 196 197 self._prefixes[prefix] = avatar
198
199 - def deregisterPrefix(self, prefix, avatar):
200 """ 201 Attempt to deregister a default destination for all requests not 202 directed to a specifically-mapped path. This will only succeed if the 203 default is currently equal to this avatar. 204 205 @param avatar: The avatar being deregistered 206 @type avatar: L{PorterAvatar} 207 """ 208 if prefix not in self._prefixes: 209 self.warning("Mapping not removed: no mapping found") 210 return 211 212 if self._prefixes[prefix] == avatar: 213 self.debug("Removing prefix destination from porter") 214 del self._prefixes[prefix] 215 else: 216 self.warning( 217 "Not removing prefix destination: expected avatar not found")
218
219 - def findPrefixMatch(self, path):
220 found = None 221 # TODO: Horribly inefficient. Replace with pathtree code. 222 for prefix in self._prefixes.keys(): 223 self.log("Checking: %r, %r" % (prefix, path)) 224 if (path.startswith(prefix) and 225 (not found or len(found) < len(prefix))): 226 found = prefix 227 if found: 228 return self._prefixes[found] 229 else: 230 return None
231
232 - def findDestination(self, path):
233 """ 234 Find a destination Avatar for this path. 235 @returns: The Avatar for this mapping, or None. 236 """ 237 238 if path in self._mappings: 239 return self._mappings[path] 240 else: 241 return self.findPrefixMatch(path)
242
243 - def generateSocketPath(self):
244 """ 245 Generate a socket pathname in an appropriate location 246 """ 247 # Also see worker/worker.py:_getSocketPath(), and note that 248 # this suffers from the same potential race. 249 import tempfile 250 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.') 251 os.close(fd) 252 253 return name
254
255 - def generateRandomString(self, numchars):
256 """ 257 Generate a random US-ASCII string of length numchars 258 """ 259 string = "" 260 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" 261 for _ in range(numchars): 262 string += chars[random.randint(0, len(chars) - 1)] 263 264 return string
265
266 - def have_properties(self):
267 props = self.config['properties'] 268 269 self.fixRenamedProperties(props, 270 [('socket_path', 'socket-path')]) 271 272 # We can operate in two modes: explicitly configured (neccesary if you 273 # want to handle connections from components in other managers), and 274 # self-configured (which is sufficient for slaving only streamers 275 # within this manager 276 if 'socket-path' in props: 277 # Explicitly configured 278 self._socketPath = props['socket-path'] 279 self._username = props['username'] 280 self._password = props['password'] 281 else: 282 # Self-configuring. Use a randomly create username/password, and 283 # a socket with a random name. 284 self._username = self.generateRandomString(12) 285 self._password = self.generateRandomString(12) 286 self._socketPath = self.generateSocketPath() 287 288 self._requirePassword = props.get('require-password', True) 289 self._socketMode = props.get('socket-mode', 0666) 290 self._port = int(props['port']) 291 self._iptablesPort = int(props.get('iptables-port', self._port)) 292 self._porterProtocol = props.get('protocol', 293 'flumotion.component.misc.porter.porter.HTTPPorterProtocol') 294 self._interface = props.get('interface', '') 295 # if a config has no external-interface set, set it to the same as 296 # interface 297 self._external_interface = props.get('external-interface', 298 self._interface)
299
300 - def do_stop(self):
301 d = None 302 if self._socketlistener: 303 # stopListening() calls (via a callLater) connectionLost(), which 304 # will unlink our socket, so we don't need to explicitly delete it. 305 d = self._socketlistener.stopListening() 306 self._socketlistener = None 307 return d
308
309 - def do_setup(self):
310 # Create our combined PB-server/fd-passing channel 311 self.have_properties() 312 realm = PorterRealm(self) 313 checker = checkers.FlexibleCredentialsChecker() 314 checker.addUser(self._username, self._password) 315 if not self._requirePassword: 316 checker.allowPasswordless(True) 317 318 p = portal.Portal(realm, [checker]) 319 serverfactory = pb.PBServerFactory(p) 320 321 try: 322 # Rather than a normal listenTCP() or listenUNIX(), we use 323 # listenWith so that we can specify our particular Port, which 324 # creates Transports that we know how to pass FDs over. 325 try: 326 os.unlink(self._socketPath) 327 except OSError: 328 pass 329 330 self._socketlistener = reactor.listenWith( 331 fdserver.FDPort, self._socketPath, 332 serverfactory, mode=self._socketMode) 333 self.info("Now listening on socketPath %s", self._socketPath) 334 except error.CannotListenError: 335 self.warning("Failed to create socket %s" % self._socketPath) 336 m = messages.Error(T_(N_( 337 "Network error: socket path %s is not available."), 338 self._socketPath)) 339 self.addMessage(m) 340 self.setMood(moods.sad) 341 return defer.fail(errors.ComponentSetupHandledError()) 342 343 # Create the class that deals with the specific protocol we're proxying 344 # in this porter. 345 try: 346 proto = reflect.namedAny(self._porterProtocol) 347 self.debug("Created proto %r" % proto) 348 except (ImportError, AttributeError): 349 self.warning("Failed to import protocol '%s', defaulting to HTTP" % 350 self._porterProtocol) 351 proto = HTTPPorterProtocol 352 353 # And of course we also want to listen for incoming requests in the 354 # appropriate protocol (HTTP, RTSP, etc.) 355 factory = PorterProtocolFactory(self, proto) 356 try: 357 reactor.listenWith( 358 fdserver.PassableServerPort, self._port, factory, 359 interface=self._interface) 360 self.info("Now listening on interface %r on port %d", 361 self._interface, self._port) 362 except error.CannotListenError: 363 self.warning("Failed to listen on interface %r on port %d", 364 self._interface, self._port) 365 m = messages.Error(T_(N_( 366 "Network error: TCP port %d is not available."), self._port)) 367 self.addMessage(m) 368 self.setMood(moods.sad) 369 return defer.fail(errors.ComponentSetupHandledError())
370 371
372 -class PorterProtocolFactory(protocol.Factory):
373
374 - def __init__(self, porter, protocol):
375 self._porter = porter 376 self.protocol = protocol
377
378 - def buildProtocol(self, addr):
379 p = self.protocol(self._porter) 380 p.factory = self 381 return p
382 383
384 -class PorterProtocol(protocol.Protocol, log.Loggable):
385 """ 386 The base porter is capable of accepting HTTP-like protocols (including 387 RTSP) - it reads the first line of a request, and makes the decision 388 solely on that. 389 390 We can't guarantee that we read precisely a line, so the buffer we 391 accumulate will actually be larger than what we actually parse. 392 393 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line 394 @cvar delimiters: a list of valid line delimiters I check for 395 """ 396 397 logCategory = 'porterprotocol' 398 399 # Don't permit a first line longer than this. 400 MAX_SIZE = 4096 401 402 # Timeout any client connected to the porter for longer than this. A normal 403 # client should only ever be connected for a fraction of a second. 404 PORTER_CLIENT_TIMEOUT = 30 405 406 # In fact, because we check \r, we'll never need to check for \r\n - we 407 # leave this in as \r\n is the more correct form. At the other end, this 408 # gets processed by a full protocol implementation, so being flexible hurts 409 # us not at all 410 delimiters = ['\r\n', '\n', '\r'] 411
412 - def __init__(self, porter):
413 self._buffer = '' 414 self._porter = porter 415 self.requestId = None # a string that should identify the request 416 417 self._timeoutDC = reactor.callLater(self.PORTER_CLIENT_TIMEOUT, 418 self._timeout)
419
420 - def connectionMade(self):
421 422 self.requestId = self.generateRequestId() 423 # PROBE: accepted connection 424 self.debug("[fd %5d] (ts %f) (request-id %r) accepted connection", 425 self.transport.fileno(), time.time(), self.requestId) 426 427 protocol.Protocol.connectionMade(self)
428
429 - def _timeout(self):
430 self._timeoutDC = None 431 self.debug("Timing out porter client after %d seconds", 432 self.PORTER_CLIENT_TIMEOUT) 433 self.transport.loseConnection()
434
435 - def connectionLost(self, reason):
436 if self._timeoutDC: 437 self._timeoutDC.cancel() 438 self._timeoutDC = None
439
440 - def dataReceived(self, data):
441 self._buffer = self._buffer + data 442 self.log("Got data, buffer now \"%s\"" % self._buffer) 443 # We accept more than just '\r\n' (the true HTTP line end) in the 444 # interests of compatibility. 445 for delim in self.delimiters: 446 try: 447 line, remaining = self._buffer.split(delim, 1) 448 break 449 except ValueError: 450 # We didn't find this delimiter; continue with the others. 451 pass 452 else: 453 # Failed to find a valid delimiter. 454 self.log("No valid delimiter found") 455 if len(self._buffer) > self.MAX_SIZE: 456 457 # PROBE: dropping 458 self.debug("[fd %5d] (ts %f) (request-id %r) dropping, " 459 "buffer exceeded", 460 self.transport.fileno(), time.time(), 461 self.requestId) 462 463 return self.transport.loseConnection() 464 else: 465 # No delimiter found; haven't reached the length limit yet. 466 # Wait for more data. 467 return 468 469 # Got a line. self._buffer is still our entire buffer, should be 470 # provided to the slaved process. 471 parsed = self.parseLine(line) 472 if not parsed: 473 self.log("Couldn't parse the first line") 474 return self.transport.loseConnection() 475 476 identifier = self.extractIdentifier(parsed) 477 if not identifier: 478 self.log("Couldn't find identifier in first line") 479 return self.transport.loseConnection() 480 481 if self.requestId: 482 self.log("Injecting request-id %r", self.requestId) 483 parsed = self.injectRequestId(parsed, self.requestId) 484 # Since injecting the token might have modified the parsed 485 # representation of the request, we need to reconstruct the buffer. 486 # Fortunately, we know what delimiter did we split on, what's the 487 # remaining part and that we only split the buffer in two parts 488 self._buffer = delim.join((self.unparseLine(parsed), remaining)) 489 490 # PROBE: request 491 self.debug("[fd %5d] (ts %f) (request-id %r) identifier %s", 492 self.transport.fileno(), time.time(), self.requestId, 493 identifier) 494 495 # Ok, we have an identifier. Is it one we know about, or do we have 496 # a default destination? 497 destinationAvatar = self._porter.findDestination(identifier) 498 499 if not destinationAvatar or not destinationAvatar.isAttached(): 500 if destinationAvatar: 501 self.debug("There was an avatar, but it logged out?") 502 503 # PROBE: no destination; see send fd 504 self.debug( 505 "[fd %5d] (ts %f) (request-id %r) no destination avatar found", 506 self.transport.fileno(), time.time(), self.requestId) 507 508 self.writeNotFoundResponse() 509 return self.transport.loseConnection() 510 511 # Transfer control over this FD. Pass all the data so-far received 512 # along in the same message. The receiver will push that data into 513 # the Twisted Protocol object as if it had been normally received, 514 # so it looks to the receiver like it has read the entire data stream 515 # itself. 516 517 # PROBE: send fd; see no destination and fdserver.py 518 self.debug("[fd %5d] (ts %f) (request-id %r) send fd to avatarId %s", 519 self.transport.fileno(), time.time(), self.requestId, 520 destinationAvatar.avatarId) 521 522 # TODO: Check out blocking characteristics of sendFileDescriptor, fix 523 # if it blocks. 524 try: 525 destinationAvatar.mind.broker.transport.sendFileDescriptor( 526 self.transport.fileno(), self._buffer) 527 except OSError, e: 528 self.warning("[fd %5d] failed to send FD: %s", 529 self.transport.fileno(), log.getExceptionMessage(e)) 530 self.writeServiceUnavailableResponse() 531 return self.transport.loseConnection() 532 533 # PROBE: sent fd; see no destination and fdserver.py 534 self.debug("[fd %5d] (ts %f) (request-id %r) sent fd to avatarId %s", 535 self.transport.fileno(), time.time(), self.requestId, 536 destinationAvatar.avatarId) 537 538 # After this, we don't want to do anything with the FD, other than 539 # close our reference to it - but not close the actual TCP connection. 540 # We set keepSocketAlive to make loseConnection() only call close() 541 # rather than shutdown() then close() 542 self.transport.keepSocketAlive = True 543 self.transport.loseConnection()
544
545 - def parseLine(self, line):
546 """ 547 Parse the initial line of the request. Return an object that can be 548 used to uniquely identify the stream being requested by passing it to 549 extractIdentifier, or None if the request is unreadable. 550 551 Subclasses should override this. 552 """ 553 raise NotImplementedError
554
555 - def unparseLine(self, parsed):
556 """ 557 Recreate the initial request line from the parsed representation. The 558 recreated line does not need to be exactly identical, but both 559 parsedLine(unparseLine(line)) and line should contain the same 560 information (i.e. unparseLine should not lose information). 561 562 UnparseLine has to return a valid line from the porter protocol's 563 scheme point of view (for instance, HTTP). 564 565 Subclasses should override this. 566 """ 567 raise NotImplementedError
568
569 - def extractIdentifier(self, parsed):
570 """ 571 Extract a string that uniquely identifies the requested stream from the 572 parsed representation of the first request line. 573 574 Subclasses should override this, depending on how they implemented 575 parseLine. 576 """ 577 raise NotImplementedError
578
579 - def generateRequestId(self):
580 """ 581 Return a string that will uniquely identify the request. 582 583 Subclasses should override this if they want to use request-ids and 584 also implement injectRequestId. 585 """ 586 raise NotImplementedError
587
588 - def injectRequestId(self, parsed, requestId):
589 """ 590 Take the parsed representation of the first request line and a string 591 token, return a parsed representation of the request line with the 592 request-id possibly mixed into it. 593 594 Subclasses should override this if they generate request-ids. 595 """ 596 # by default, ignore the request-id 597 return parsed
598
599 - def writeNotFoundResponse(self):
600 """ 601 Write a response indicating that the requested resource was not found 602 in this protocol. 603 604 Subclasses should override this to use the correct protocol. 605 """ 606 raise NotImplementedError
607
609 """ 610 Write a response indicating that the requested resource was 611 temporarily uavailable in this protocol. 612 613 Subclasses should override this to use the correct protocol. 614 """ 615 raise NotImplementedError
616 617
618 -class HTTPPorterProtocol(PorterProtocol):
619 scheme = 'http' 620 protos = ["HTTP/1.0", "HTTP/1.1"] 621 requestIdParameter = 'FLUREQID' 622 requestIdBitsNo = 256 623
624 - def parseLine(self, line):
625 try: 626 (method, location, proto) = map(string.strip, line.split(' ', 2)) 627 628 if proto not in self.protos: 629 return None 630 631 # Currently, we just use the URL parsing code from urllib2 632 parsed_url = urlparse.urlparse(location) 633 634 return method, parsed_url, proto 635 636 except ValueError: 637 return None
638
639 - def unparseLine(self, parsed):
640 method, parsed_url, proto = parsed 641 return ' '.join((method, urlparse.urlunparse(parsed_url), proto))
642
643 - def generateRequestId(self):
644 # Remember to return something that does not need quoting to be put in 645 # a GET parameter. This way we spare ourselves the effort of quoting in 646 # injectRequestId. 647 return hex(random.getrandbits(self.requestIdBitsNo))[2:]
648
649 - def injectRequestId(self, parsed, requestId):
650 method, parsed_url, proto = parsed 651 # assuming no need to escape the requestId, see generateRequestId 652 sep = '' 653 if parsed_url[4] != '': 654 sep = '&' 655 query_string = ''.join((parsed_url[4], 656 sep, self.requestIdParameter, '=', 657 requestId)) 658 parsed_url = (parsed_url[:4] + 659 (query_string, ) 660 + parsed_url[5:]) 661 return method, parsed_url, proto
662
663 - def extractIdentifier(self, parsed):
664 method, parsed_url, proto = parsed 665 # Currently, we just return the path part of the URL. 666 return parsed_url[2]
667
668 - def writeNotFoundResponse(self):
669 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
670
672 self.transport.write("HTTP/1.0 503 Service Unavailable\r\n\r\n" 673 "Service temporarily unavailable")
674 675
676 -class RTSPPorterProtocol(HTTPPorterProtocol):
677 scheme = 'rtsp' 678 protos = ["RTSP/1.0"] 679
680 - def writeNotFoundResponse(self):
681 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
682
684 self.transport.write("RTSP/1.0 503 Service Unavailable\r\n\r\n" 685 "Service temporarily unavailable")
686