Package flumotion :: Package component :: Package common :: Package fgdp :: Module protocol
[hide private]

Source Code for Module flumotion.component.common.fgdp.protocol

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import base64 
 19  from random import Random 
 20   
 21  from twisted.internet import reactor 
 22  from twisted.internet.protocol import ReconnectingClientFactory, Factory 
 23  from twisted.protocols.basic import LineReceiver 
 24   
 25  from Crypto.Hash import SHA 
 26   
 27  from flumotion.common import log 
 28   
 29  __version__ = "$Rev$" 
 30   
 31  PROTOCOL_NAME = "FGDP" 
32 33 34 -class ProtocolException(Exception):
35 error = '' 36
37 - def __init__(self, reason=''):
38 self.reason = reason
39
40 - def __str__(self):
41 return "%s: %s" % (self.error, self.reason)
42
43 44 -class UnexpectedResponse(ProtocolException):
45 "Received an unexpected response, in a wrong state" 46 error = "Unexpected response"
47
48 49 -class UnexpectedCommand(ProtocolException):
50 "Received an unexpected command, in a wrong state" 51 error = "Unexpected command"
52
53 54 -class MalformedCommand(ProtocolException):
55 "The command is badly formed" 56 error = "Malformed command"
57
58 59 -class MalformedResponse(ProtocolException):
60 "The response is badly formed" 61 error = "Malformed response"
62
63 64 -class InvalidVersion(ProtocolException):
65 "The version of the protocol is not valid" 66 error = "Invalid version"
67
68 69 -class AuthenticationFailed(ProtocolException):
70 "Authentication is not valid and failed" 71 error = "Invalid authentication"
72
73 74 -class ErrorResponse(ProtocolException):
75 "Got an error response" 76 error = "Received error response"
77
78 79 -class Command(object):
80 ''' 81 Command for the FGDP procotol, sent by the client to the server 82 83 @type command: str 84 @type content: content 85 @type version: tuple 86 ''' 87
88 - def __init__(self, command, content, version):
89 self.command = command 90 self.content = content 91 self.version = version
92 93 @staticmethod
94 - def parseCommand(commandString, versionCheck=None):
95 ''' 96 Parses a command string 97 98 @param commandString: string with the command 99 @type commandString: str 100 @param versionCheck: version number to check 101 @type versionCheck: tuple 102 103 @return: A newly created L{Command} 104 @rtype: L{Command} 105 @raise MalformedCommand: raised when the command is not valid 106 ''' 107 try: 108 command, line = commandString.split(' ', 1) 109 content, protoVersion = line.rsplit(' ', 1) 110 protocol, versionStr = protoVersion.split('/') 111 version = tuple(map(int, versionStr.split('.'))) 112 except ValueError: 113 raise MalformedCommand(commandString) 114 if protocol != PROTOCOL_NAME: 115 raise MalformedCommand(commandString) 116 if versionCheck and version != versionCheck: 117 raise InvalidVersion('version %s is not compatible with %s ' % 118 (version, versionCheck)) 119 return Command(command, content, version)
120
121 - def __str__(self):
122 return "%s %s %s/%s" % (self.command, self.content, PROTOCOL_NAME, 123 '.'.join(map(str, self.version)))
124
125 - def __eq__(self, obj):
126 if not isinstance(obj, Command): 127 return False 128 return (str(self) == str(obj))
129
130 131 -class Response(object):
132 ''' 133 Response for the FGDP protocol, sent by the server to the client 134 135 @type command: str 136 @type content: str 137 @type version: tuple 138 ''' 139
140 - def __init__(self, response, content, version):
141 self.response = response 142 self.content = content 143 self.version = version
144 145 @staticmethod
146 - def parseResponse(responseString, versionCheck=None):
147 ''' 148 Parses a response string 149 150 @param responseString: string with the response 151 @type responseString: str 152 @param versionCheck: version number to check 153 @type versionCheck: tuple 154 155 @return: A newly created L{Response} 156 @rtype: L{Response} 157 @raise MalformedResponse: raised when the command is not valid 158 ''' 159 try: 160 protoVersion, line = responseString.split(' ', 1) 161 protocol, versionStr = protoVersion.split('/') 162 version = tuple(map(int, versionStr.split('.'))) 163 response, content = line.split(' ', 1) 164 except ValueError: 165 raise MalformedResponse(responseString) 166 if protocol != PROTOCOL_NAME: 167 raise MalformedResponse(responseString) 168 if versionCheck and version != versionCheck: 169 raise InvalidVersion('version %s is not compatible with %s ' % 170 (version, versionCheck)) 171 return Response(response, content, version)
172
173 - def __str__(self):
174 return "%s/%s %s %s" % (PROTOCOL_NAME, 175 '.'.join(map(str, self.version)), 176 self.response, self.content)
177
178 - def __eq__(self, obj):
179 if not isinstance(obj, Response): 180 return False 181 return (str(self) == str(obj))
182
183 184 -class FGDP_0_1(object):
185 ''' 186 Definitions for the version 0.1 of the FGDP protocol. 187 Future extensions of the protocol should subclass this class. 188 ''' 189 190 LOGIN_COMMAND = 'LOGIN' 191 AUTH_COMMAND = 'AUTH' 192 193 OK_RESPONSE = 'OK' 194 ERROR_RESPONSE = 'ERROR' 195 CHALLENGE_RESPONSE = 'CHALLENGE'
196
197 198 -class FGDPBaseProtocol(LineReceiver, log.Loggable):
199 """ 200 Base class for the twisted side of the FGDP protocol 201 """ 202 203 _transport = None 204 _gstElement = None 205 _fd = None 206 _user = '' 207 _password = '' 208
209 - def __init__(self, gstElement):
210 self._gstElement = gstElement 211 self._gstElement.protocol = self
212
213 - def startProtocol(self):
214 """ 215 Subclasses must implement this method to start the protocol after a 216 new connection has been made 217 """ 218 raise NotImplemented('Subclasses must implement "startProtocol"')
219
220 - def stopProtocol(self, reason):
221 """ 222 Subclasses must implement this method to stop the protocol after the 223 connection has been closed 224 225 @type reason: L{twsited.python.failure.Failure} 226 """ 227 raise NotImplemented('Subclasses must implement "stopProtocol"')
228
229 - def lineReceived(self, line):
230 """ 231 Subclasess must implement this method to process the messages of the 232 line-based protocol 233 234 @type line: str 235 """ 236 raise NotImplemented('Subclasses must implement "lineReceived"')
237
238 - def makeConnection(self, transport):
239 """ 240 Store a reference of the trasport and file descriptor of the 241 used by the new connection 242 """ 243 self._transport = transport 244 self._fd = transport.fileno() 245 self.connectionMade()
246
247 - def connectionMade(self):
248 self.info("Connection made with peer, starting protocol") 249 self.startProtocol()
250
251 - def connectionLost(self, reason):
252 self.info("Connection lost with FGDP peer: %s", 253 reason.getErrorMessage()) 254 self.stopProtocol(reason)
255
256 - def loseConnection(self):
257 """ 258 Loses the current connection and triggers the stop of the protocol. 259 Once the authentication has finished, the file descriptor is not 260 handled anymore by the twisted reactor. A disconnection in the 261 gstreamer element handling the file descriptor should call this method 262 to notify the protocol about it. 263 """ 264 if self._transport is not None: 265 self._transport.loseConnection()
266
267 - def _makeHash(self, values):
268 sha = SHA.new() 269 sha.update(':'.join(values)) 270 return sha.hexdigest()
271
272 - def _sendMessage(self, message, transport=None):
273 transport = transport or self._transport 274 self.debug('Sending message: "%s"', message) 275 transport.write("%s\r\n" % message)
276
277 - def _disconnectFD(self, reason):
278 if self._fd != None: 279 self._gstElement.disconnectFd(self._fd) 280 self._gstElement.emit('disconnected', reason.getErrorMessage())
281
282 - def _delegateFD(self):
283 # Take out the fd from twisted reactor and pass it to element 284 # using it 285 # See http://twistedmatrix.com/trac/ticket/1796 286 reactor.removeReader(self._transport) 287 self._gstElement.connectFd(self._fd) 288 self._gstElement.emit('connected')
289
290 291 -class FGDPServer_0_1(FGDP_0_1, FGDPBaseProtocol):
292 ''' 293 Implementation of the server-side FGDP protocol for version 0.1 294 ''' 295 296 logCategory = 'fgdp-server' 297 298 SERVER_STATE_DISCONNECTED = "disconnected" 299 SERVER_STATE_AUTHENTICATE = "authenticate" 300 SERVER_STATE_CONNECTED = "connected" 301 302 _state = SERVER_STATE_DISCONNECTED 303 _version = (0, 1) 304 _challenge = '' 305 _random = None 306
307 - def __init__(self, gstElement):
308 self._user = gstElement.username 309 self._password = gstElement.password 310 self._random = Random() 311 FGDPBaseProtocol.__init__(self, gstElement)
312
313 - def makeConnection(self, transport):
314 # The protocol must refuse new connections if a client is already 315 # connected. 316 self.factory.clients += 1 317 if self.factory.clients > 1: 318 r = Response(self.ERROR_RESPONSE, "already connected", 319 self._version) 320 self._sendMessage(r, transport) 321 self.warning("Trying to make a new connection, but a client " 322 "is already connected.") 323 transport.loseConnection() 324 return 325 FGDPBaseProtocol.makeConnection(self, transport)
326
327 - def startProtocol(self):
328 pass
329
330 - def stopProtocol(self, reason):
331 self.info("Stopping protocol session") 332 self.connected = 0 333 self.factory.clients -= 1 334 self._state = self.SERVER_STATE_DISCONNECTED 335 self._disconnectFD(reason)
336
337 - def lineReceived(self, line):
338 # Parse command and check state 339 try: 340 command = Command.parseCommand(line) 341 self._checkState(command) 342 except (ErrorResponse, MalformedCommand, UnexpectedCommand), e: 343 self._handleError(e) 344 return 345 # State DISCONNECTED 346 if self._state == self.SERVER_STATE_DISCONNECTED: 347 self._user = command.content 348 self._challengeClient() 349 # State AUTHENTICATE 350 elif self._state == self.SERVER_STATE_AUTHENTICATE: 351 try: 352 self._checkAuthentication(command) 353 except AuthenticationFailed, e: 354 self._handleError(e) 355 return 356 self._startStreaming()
357
358 - def _checkState(self, command):
359 if self._state == self.SERVER_STATE_DISCONNECTED and \ 360 command.command != self.LOGIN_COMMAND: 361 raise UnexpectedCommand(command.command) 362 if self._state == self.SERVER_STATE_AUTHENTICATE and \ 363 command.command != self.AUTH_COMMAND: 364 raise UnexpectedCommand(command.command) 365 if self._state == self.SERVER_STATE_CONNECTED: 366 # FIXME: Non fatal error 367 raise UnexpectedCommand(command.command)
368
369 - def _handleError(self, error):
370 self.warning("%s", error) 371 response = Response(ErrorResponse, "Server error: %s" % error, 372 self._version) 373 self._sendMessage(response) 374 self.loseConnection()
375
376 - def _challengeClient(self):
377 self.info("Challenging client") 378 self._state = self.SERVER_STATE_AUTHENTICATE 379 self._challenge = base64.b64encode( 380 str(self._random.getrandbits(1024))) 381 response = Response(self.CHALLENGE_RESPONSE, self._challenge, 382 self._version) 383 self._sendMessage(response)
384
385 - def _startStreaming(self):
386 self._state = self.SERVER_STATE_CONNECTED 387 response = Response(self.OK_RESPONSE, 'Authenticated', self._version) 388 self._sendMessage(response) 389 self.info("Started streaming") 390 self._delegateFD()
391
392 - def _checkAuthentication(self, command):
393 digest = self._makeHash([self._user, self._password, self._challenge]) 394 if digest != command.content: 395 raise AuthenticationFailed( 396 "could not verify the challenge response") 397 return False
398
399 400 -class FGDPClient_0_1(FGDP_0_1, FGDPBaseProtocol):
401 ''' 402 Implementation of the client-side FGDP protocol for version 0.1 403 ''' 404 405 logCategory = 'fgdp-client' 406 407 CLIENT_STATE_DISCONNECTED = "disconnected" 408 CLIENT_STATE_LOGIN = "login" 409 CLIENT_STATE_AUTHENTICATING = "authenticate" 410 CLIENT_STATE_CONNECTED = "connected" 411 412 _version = (0, 1) 413 _state = CLIENT_STATE_DISCONNECTED 414
415 - def __init__(self, gstElement):
416 self._user = gstElement.username 417 self._password = gstElement.password 418 FGDPBaseProtocol.__init__(self, gstElement)
419
420 - def startProtocol(self):
421 self.info("Starting protocol session") 422 self._login()
423
424 - def stopProtocol(self, reason):
425 self.info('Stopping protocol session') 426 self._state = self.CLIENT_STATE_DISCONNECTED 427 self._disconnectFD(reason)
428
429 - def lineReceived(self, line):
430 # Parse response and check state 431 try: 432 response = Response.parseResponse(line) 433 self._checkState(response) 434 except (MalformedResponse, ErrorResponse, UnexpectedResponse), e: 435 self.warning("%s", e) 436 self.loseConnection() 437 return 438 # State LOGIN 439 if self._state == self.CLIENT_STATE_LOGIN: 440 self._authenticate(response) 441 # State AUTHENTICATING 442 elif self._state == self.CLIENT_STATE_AUTHENTICATING: 443 self._startStreaming()
444
445 - def _checkState(self, response):
446 if response.response == self.ERROR_RESPONSE: 447 raise ErrorResponse(response.content) 448 if self._state == self.CLIENT_STATE_LOGIN and \ 449 response.response != self.CHALLENGE_RESPONSE: 450 raise UnexpectedResponse(response.content) 451 if self._state == self.CLIENT_STATE_AUTHENTICATING and \ 452 response.response != self.OK_RESPONSE: 453 raise UnexpectedResponse(response.content) 454 if self._state == self.CLIENT_STATE_CONNECTED: 455 raise UnexpectedResponse(response.content)
456
457 - def _login(self):
458 self.info('Starting client login with user=%s, password=%s', 459 self._user, self._password) 460 self._state = self.CLIENT_STATE_LOGIN 461 command = Command(self.LOGIN_COMMAND, self._user, self._version) 462 self._sendMessage(command)
463
464 - def _authenticate(self, response):
465 self.info('Authenticating user with challenge %s', response.content) 466 self._state = self.CLIENT_STATE_AUTHENTICATING 467 res = self._makeHash([self._user, self._password, response.content]) 468 command = Command(self.AUTH_COMMAND, res, self._version) 469 self._sendMessage(command)
470
471 - def _startStreaming(self):
472 self.info("Starting streaming") 473 self._state = self.CLIENT_STATE_CONNECTED 474 self._delegateFD()
475
476 477 -class FGDPClientFactory(ReconnectingClientFactory, log.Loggable):
478 logCategory = 'fgdp-client' 479 480 _supportedVersions = ['0.1'] 481
482 - def __init__(self, gstElement):
483 ReconnectingClientFactory.maxDelay = gstElement.maxDelay 484 self.gstElement = gstElement 485 self._setProtocol(gstElement.version)
486
487 - def _setProtocol(self, version):
488 if version in self._supportedVersions: 489 classname = 'FGDPClient_%s' % version.replace('.', '_') 490 self.protocol = globals()[classname]
491
492 - def buildProtocol(self, addr):
493 p = self.protocol(self.gstElement) 494 p.factory = self 495 return p
496
497 - def retry(self, connector=None):
498 self.info("Trying reconnection with FGDP peer") 499 return ReconnectingClientFactory.retry(self, connector)
500
501 502 -class FGDPServerFactory(Factory):
503 504 clients = 0 505 _supportedVersions = ['0.1'] 506
507 - def __init__(self, gstElement):
508 self.gstElement = gstElement 509 self._setProtocol(gstElement.version)
510
511 - def _setProtocol(self, version):
512 if version in self._supportedVersions: 513 classname = 'FGDPServer_%s' % version.replace('.', '_') 514 self.protocol = globals()[classname]
515
516 - def buildProtocol(self, addr):
517 p = self.protocol(self.gstElement) 518 p.factory = self 519 return p
520