Package flumotion :: Package twisted :: Module integration
[hide private]

Source Code for Module flumotion.twisted.integration

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  Framework for writing automated integration tests. 
 20   
 21  This module provides a way of writing automated integration tests from 
 22  within Twisted's unit testing framework, trial. Test cases are 
 23  constructed as subclasses of the normal trial 
 24  L{twisted.trial.unittest.TestCase} class. 
 25   
 26  Integration tests look like normal test methods, except that they are 
 27  decorated with L{integration.test}, take an extra "plan" argument, and 
 28  do not return anything. For example:: 
 29   
 30    from twisted.trial import unittest 
 31    from flumotion.twisted import integration 
 32   
 33    class IntegrationTestExample(unittest.TestCase): 
 34        @integration.test 
 35        def testEchoFunctionality(self, plan): 
 36            process = plan.spawn('echo', 'hello world') 
 37            plan.wait(process, 0) 
 38   
 39  This example will spawn a process, as if you typed "echo 'hello world'" 
 40  at the shell prompt. It then waits for the process to exit, expecting 
 41  the exit status to be 0. 
 42   
 43  The example illustrates two of the fundamental plan operators, spawn and 
 44  wait. "spawn" spawns a process. "wait" waits for a process to finish. 
 45  The other operators are "spawnPar", which spawns a number of processes 
 46  in parallel, "waitPar", which waits for a number of processes in 
 47  parallel, and "kill", which kills one or more processes via SIGTERM and 
 48  then waits for them to exit. 
 49   
 50  It is evident that this framework is most appropriate for testing the 
 51  integration of multiple processes, and is not suitable for in-process 
 52  tests. The plan that is built up is only executed after the test method 
 53  exits, via the L{integration.test} decorator; the writer of the 
 54  integration test does not have access to the plan's state. 
 55   
 56  Note that all process exits must be anticipated. If at any point the 
 57  integration tester receives SIGCHLD, the next operation must be a wait 
 58  for that process. If this is not the case, the test is interpreted as 
 59  having failed. 
 60   
 61  Also note that while the test is running, the stdout and stderr of each 
 62  spawned process is redirected into log files in a subdirectory of where 
 63  the test is located. For example, in the previous example, the following 
 64  files will be created:: 
 65   
 66    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout 
 67    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr 
 68   
 69  In the case that multiple echo commands are run in the same plan, the 
 70  subsequent commands will be named as echo-1, echo-2, and the like. Upon 
 71  successful completion of the test case, the log directory will be 
 72  deleted. 
 73  """ 
 74   
 75  import os 
 76  import signal 
 77  import tempfile 
 78   
 79  from twisted.internet import reactor, protocol, defer 
 80  from twisted.internet import error as ierror 
 81  from flumotion.common import log as flog 
 82  from twisted.internet.defer import failure 
 83   
 84  __version__ = "$Rev$" 
 85   
 86   
 87  # Twisted's reactor.iterate() is defined like this: 
 88  # 
 89  #     def iterate(self, delay=0): 
 90  #        """See twisted.internet.interfaces.IReactorCore.iterate. 
 91  #        """ 
 92  #        self.runUntilCurrent() 
 93  #        self.doIteration(delay) 
 94  # 
 95  # runUntilCurrent runs all the procs on the threadCallQueue. So if 
 96  # something is added to the threadCallQueue between runUntilCurrent() 
 97  # and doIteration(), the reactor needs to have an fd ready for reading 
 98  # to shortcut the select(). This is done by callFromThread() calling 
 99  # reactor.wakeUp(), which will write on the wakeup FD. 
100  # 
101  # HOWEVER. For some reason reactor.wakeUp() only writes on the fd if it 
102  # is being called from another thread. This is obviously borked in the 
103  # signal-handling case, when a signal arrives between runUntilCurrent() 
104  # and doIteration(), and is processed via reactor.callFromThread(), as 
105  # is the case with SIGCHLD. So we monkeypatch the reactor to always wake 
106  # the waker. This is twisted bug #1997. 
107  reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp() 
108   
109   
110 -def log(format, *args):
111 flog.doLog(flog.LOG, None, 'integration', format, args, -2)
112 113
114 -def debug(format, *args):
115 flog.doLog(flog.DEBUG, None, 'integration', format, args, -2)
116 117
118 -def info(format, *args):
119 flog.doLog(flog.INFO, None, 'integration', format, args, -2)
120 121
122 -def warning(format, *args):
123 flog.doLog(flog.WARN, None, 'integration', format, args, -2)
124 125
126 -def error(format, *args):
127 flog.doLog(flog.ERROR, None, 'integration', format, args, -2)
128 129
130 -def _which(executable):
131 if os.sep in executable: 132 if os.access(os.path.abspath(executable), os.X_OK): 133 return os.path.abspath(executable) 134 elif os.getenv('PATH'): 135 for path in os.getenv('PATH').split(os.pathsep): 136 if os.access(os.path.join(path, executable), os.X_OK): 137 return os.path.join(path, executable) 138 raise CommandNotFoundException(executable)
139 140
141 -class UnexpectedExitCodeException(Exception):
142
143 - def __init__(self, process, expectedCode, actualCode):
144 Exception.__init__(self) 145 self.process = process 146 self.expected = expectedCode 147 self.actual = actualCode
148
149 - def __str__(self):
150 return ('Expected exit code %r from %r, but got %r' 151 % (self.expected, self.process, self.actual))
152 153
154 -class UnexpectedExitException(Exception):
155
156 - def __init__(self, process):
157 Exception.__init__(self) 158 self.process = process
159
160 - def __str__(self):
161 return 'The process %r exited prematurely.' % self.process
162 163
164 -class CommandNotFoundException(Exception):
165
166 - def __init__(self, command):
167 Exception.__init__(self) 168 self.command = command
169
170 - def __str__(self):
171 return 'Command %r not found in the PATH.' % self.command
172 173
174 -class ProcessesStillRunningException(Exception):
175
176 - def __init__(self, processes):
177 Exception.__init__(self) 178 self.processes = processes
179
180 - def __str__(self):
181 return ('Processes still running at end of test: %r' 182 % (self.processes, ))
183 184
185 -class TimeoutException(Exception):
186
187 - def __init__(self, process, status):
188 self.process = process 189 self.status = status
190
191 - def __str__(self):
192 return ('Timed out waiting for %r to exit with status %r' 193 % (self.process, self.status))
194 195
196 -class ProcessProtocol(protocol.ProcessProtocol):
197
198 - def __init__(self):
199 self.exitDeferred = defer.Deferred() 200 self.timedOut = False
201
202 - def getDeferred(self):
203 return self.exitDeferred
204
205 - def timeout(self, process, status):
206 info('forcing timeout for process protocol %r', self) 207 self.timedOut = True 208 self.exitDeferred.errback(TimeoutException(process, status))
209
210 - def processEnded(self, status):
211 info('process ended with status %r, exit code %r', 212 status, status.value.exitCode) 213 if self.timedOut: 214 warning('already timed out??') 215 print 'already timed out quoi?' 216 else: 217 info('process ended with status %r, exit code %r', 218 status, status.value.exitCode) 219 self.exitDeferred.callback(status.value.exitCode)
220 221
222 -class ThreadProtocol(object):
223
224 - def __init__(self):
225 self.exitDeferred = defer.Deferred() 226 self.timedOut = False
227
228 - def getDeferred(self):
229 return self.exitDeferred
230
231 - def timeout(self, process, status):
232 info('forcing timeout for process protocol %r', self) 233 self.timedOut = True 234 self.exitDeferred.errback(TimeoutException(process, status))
235
236 - def processEnded(self, status):
237 info('process ended with status %r, exit code %r', 238 status, status.value.exitCode) 239 if self.timedOut: 240 warning('already timed out??') 241 print 'already timed out quoi?' 242 else: 243 info('process ended with status %r, exit code %r', 244 status, status.value.exitCode) 245 self.exitDeferred.callback(status.value.exitCode)
246 247
248 -class Process:
249 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED' 250
251 - def __init__(self, name, argv, testDir):
252 self.name = name 253 self.argv = (_which(argv[0]), ) + argv[1:] 254 self.testDir = testDir 255 256 self.pid = None 257 self.protocol = None 258 self.state = self.NOT_STARTED 259 self._timeoutDC = None 260 261 log('created process object %r', self)
262
263 - def start(self):
264 assert self.state == self.NOT_STARTED 265 266 self.protocol = ProcessProtocol() 267 268 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w') 269 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w') 270 # don't give it a stdin, output to log files 271 childFDs = {1: stdout.fileno(), 2: stderr.fileno()} 272 # There's a race condition in twisted.internet.process, whereby 273 # signals received between the fork() and exec() in the child 274 # are handled with the twisted handlers, i.e. postponed, but 275 # they never get called because of the exec(). The end is they 276 # are ignored. 277 # 278 # So, work around that by resetting the sigterm handler to the 279 # default so if we self.kill() immediately after self.start(), 280 # that the subprocess won't ignore the signal. This is a window 281 # in the parent in which SIGTERM will cause immediate 282 # termination instead of the twisted nice termination, but 283 # that's better than the kid missing the signal. 284 info('spawning process %r, argv=%r', self, self.argv) 285 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL) 286 env = dict(os.environ) 287 env['FLU_DEBUG'] = '5' 288 process = reactor.spawnProcess(self.protocol, self.argv[0], 289 env=env, args=self.argv, 290 childFDs=childFDs) 291 signal.signal(signal.SIGTERM, termHandler) 292 # close our handles on the log files 293 stdout.close() 294 stderr.close() 295 296 # it's possible the process *already* exited, from within the 297 # spawnProcess itself. So set our state to STARTED, *then* 298 # attach the callback. 299 self.pid = process.pid 300 self.state = self.STARTED 301 302 def got_exit(res): 303 self.state = self.STOPPED 304 info('process %r has stopped', self) 305 return res
306 self.protocol.getDeferred().addCallback(got_exit)
307
308 - def kill(self, sig=signal.SIGTERM):
309 assert self.state == self.STARTED 310 info('killing process %r, signal %d', self, sig) 311 os.kill(self.pid, sig)
312
313 - def wait(self, status, timeout=20):
314 assert self.state != self.NOT_STARTED 315 info('waiting for process %r to exit', self) 316 d = self.protocol.getDeferred() 317 318 def got_exit(res): 319 debug('process %r exited with status %r', self, res) 320 if res != status: 321 warning('expected exit code %r for process %r, but got %r', 322 status, self, res) 323 raise UnexpectedExitCodeException(self, status, res)
324 d.addCallback(got_exit) 325 if self.state == self.STARTED: 326 self._timeoutDC = reactor.callLater(timeout, 327 self.protocol.timeout, 328 self, 329 status) 330 331 def cancel_timeout(res): 332 debug('cancelling timeout for %r', self) 333 if self._timeoutDC.active(): 334 self._timeoutDC.cancel() 335 return res 336 d.addCallbacks(cancel_timeout, cancel_timeout) 337 return d 338
339 - def __repr__(self):
340 return '<Process %s in state %s>' % (self.name, self.state)
341 342
343 -class ThreadedMethod:
344 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED' 345
346 - def __init__(self, name, method, argv, testDir):
347 self.name = name 348 self.argv = argv 349 self.testDir = testDir 350 351 self.pid = None 352 self.method = method 353 self.protocol = None 354 self.state = self.NOT_STARTED 355 self._timeoutDC = None 356 357 log('created threaded method object %r', self)
358
359 - def method_wrapper(self):
360 self.method(self.argv) 361 info('process %r has stopped', self) 362 return self.protocol.processEnded( 363 failure.Failure(ierror.ProcessDone(0)))
364
365 - def start(self):
366 assert self.state == self.NOT_STARTED 367 info('spawning thread %r, argv=%r', self, self.argv) 368 self.protocol = ProcessProtocol() 369 reactor.callFromThread(self.method_wrapper) 370 371 self.state = self.STARTED 372 373 def got_exit(res): 374 self.state = self.STOPPED 375 info('process %r has stopped', self) 376 return res
377 self.protocol.getDeferred().addCallback(got_exit)
378
379 - def kill(self, sig=signal.SIGTERM):
380 assert self.state == self.STARTED 381 self.state = self.STOPPED 382 info('killing process %r, signal %d', self, sig)
383
384 - def wait(self, status, timeout=20):
385 assert self.state != self.NOT_STARTED 386 info('waiting for thread %r to exit', self) 387 d = self.protocol.getDeferred() 388 389 def got_exit(res): 390 debug('process %r exited with status %r', self, res) 391 if res != status: 392 warning('expected exit code %r for process %r, but got %r', 393 status, self, res) 394 raise UnexpectedExitCodeException(self, status, res)
395 d.addCallback(got_exit) 396 if self.state == self.STARTED: 397 self._timeoutDC = reactor.callLater(timeout, 398 self.protocol.timeout, 399 self, 400 status) 401 402 def cancel_timeout(res): 403 debug('cancelling timeout for %r', self) 404 if self._timeoutDC.active(): 405 self._timeoutDC.cancel() 406 return res 407 d.addCallbacks(cancel_timeout, cancel_timeout) 408 return d 409
410 - def __repr__(self):
411 return '<Thread %s in state %s>' % (self.name, self.state)
412 413
414 -class PlanExecutor:
415 # both the vm and its ops 416
417 - def __init__(self):
418 self.processes = [] 419 self.timeout = 20
420
421 - def spawn(self, process):
422 assert process not in self.processes 423 self.processes.append(process) 424 process.start() 425 return defer.succeed(True)
426
427 - def checkExits(self, expectedExits):
428 for process in self.processes: 429 if (process.state != process.STARTED 430 and process not in expectedExits): 431 raise UnexpectedExitException(process)
432
433 - def kill(self, process):
434 assert process in self.processes 435 process.kill() 436 return defer.succeed(True)
437
438 - def wait(self, process, exitCode):
439 assert process in self.processes 440 441 def remove_from_processes_list(_): 442 self.processes.remove(process)
443 d = process.wait(exitCode, timeout=self.timeout) 444 d.addCallback(remove_from_processes_list) 445 return d
446
447 - def _checkProcesses(self, failure=None):
448 if self.processes: 449 warning('processes still running at end of test: %r', 450 self.processes) 451 e = ProcessesStillRunningException(self.processes) 452 dlist = [] 453 # reap all processes, and once we have them reaped, errback 454 for p in self.processes: 455 if p.state != p.STARTED: 456 continue 457 d = defer.Deferred() 458 dlist.append(d) 459 460 def callbacker(d): 461 return lambda status: d.callback(status.value.exitCode)
462 p.protocol.processEnded = callbacker(d) 463 p.kill(sig=signal.SIGKILL) 464 d = defer.DeferredList(dlist) 465 466 def error(_): 467 if failure: 468 return failure 469 else: 470 raise e 471 d.addCallback(error) 472 return d 473 return failure 474
475 - def run(self, ops, timeout=20):
476 self.timeout = timeout 477 d = defer.Deferred() 478 479 def run_op(_, op): 480 # print 'Last result: %r' % (_, ) 481 # print 'Now running: %s(%r)' % (op[0].__name__, op[1:]) 482 return op[0](*op[1:])
483 for op in ops: 484 d.addCallback(run_op, op) 485 d.addCallbacks(lambda _: self._checkProcesses(failure=None), 486 lambda failure: self._checkProcesses(failure=failure)) 487 488 # We should only spawn processes when twisted has set up its 489 # sighandlers. It does that *after* firing the reactor startup 490 # event and before entering the reactor loop. So, make sure 491 # twisted is ready for us by firing the plan in a callLater. 492 reactor.callLater(0, d.callback, None) 493 return d 494 495
496 -class Plan:
497
498 - def __init__(self, testCase, testName):
499 self.name = testName 500 self.testCaseName = testCase.__class__.__name__ 501 self.processes = {} 502 self.testDir = self._makeTestDir() 503 self.outputDir = self._makeOutputDir(self.testDir) 504 505 # put your boots on monterey jacks, cause this gravy just made a 506 # virtual machine whose instructions are python methods 507 self.vm = PlanExecutor() 508 self.ops = [] 509 self.timeout = 20
510
511 - def _makeTestDir(self):
512 testDir = tempfile.mkdtemp(prefix="test_integration") 513 return testDir
514
515 - def _makeOutputDir(self, testDir):
516 tail = '%s-%s' % (self.testCaseName, self.name) 517 outputDir = os.path.join(testDir, tail) 518 os.mkdir(outputDir) 519 return outputDir
520
521 - def _cleanOutputDir(self):
522 for root, dirs, files in os.walk(self.outputDir, topdown=False): 523 for name in files: 524 os.remove(os.path.join(root, name)) 525 for name in dirs: 526 os.rmdir(os.path.join(root, name)) 527 os.rmdir(self.outputDir) 528 os.rmdir(self.testDir) 529 self.testDir = None 530 self.outputDir = None
531
532 - def _allocProcess(self, args):
533 command = args[0] 534 name = command 535 i = 0 536 while name in self.processes: 537 i += 1 538 name = '%s-%d' % (command, i) 539 process = Process(name, args, self.outputDir) 540 self.processes[name] = process 541 return process
542
543 - def _allocThread(self, args):
544 method = args[0] 545 name = method.__name__ 546 i = 0 547 while name in self.processes: 548 i += 1 549 name = '%s-%d' % (name, i) 550 process = ThreadedMethod(name, method, args[1:], self.outputDir) 551 self.processes[name] = process 552 return process
553
554 - def _appendOp(self, *args):
555 self.ops.append(args)
556
557 - def setTimeout(self, timeout):
558 self.timeout = timeout
559
560 - def spawn(self, command, *args):
561 allArgs = (command, ) + args 562 process, = self.spawnPar(allArgs) 563 return process
564
565 - def spawnThread(self, method, *args):
566 self._appendOp(self.vm.checkExits, ()) 567 thread = self._allocThread((method, ) + args) 568 self._appendOp(self.vm.spawn, thread) 569 return thread
570
571 - def spawnPar(self, *argvs):
572 processes = [] 573 self._appendOp(self.vm.checkExits, ()) 574 for argv in argvs: 575 assert isinstance(argv, tuple), \ 576 'all arguments to spawnPar must be tuples' 577 for arg in argv: 578 assert isinstance(arg, str), \ 579 'all subarguments to spawnPar must be strings' 580 processes.append(self._allocProcess(argv)) 581 for process in processes: 582 self._appendOp(self.vm.spawn, process) 583 return tuple(processes)
584
585 - def wait(self, process, status):
586 self.waitPar((process, status))
587
588 - def waitPar(self, *processStatusPairs):
589 processes = tuple([p for p, s in processStatusPairs]) 590 self._appendOp(self.vm.checkExits, processes) 591 for process, status in processStatusPairs: 592 self._appendOp(self.vm.wait, process, status)
593
594 - def kill(self, process, status=None):
595 self._appendOp(self.vm.checkExits, ()) 596 self._appendOp(self.vm.kill, process) 597 self._appendOp(self.vm.wait, process, status)
598
599 - def execute(self):
600 d = self.vm.run(self.ops, timeout=self.timeout) 601 d.addCallback(lambda _: self._cleanOutputDir()) 602 return d
603 604
605 -def test(proc):
606 testName = proc.__name__ 607 608 def wrappedtest(self): 609 plan = Plan(self, testName) 610 proc(self, plan) 611 return plan.execute()
612 try: 613 wrappedtest.__name__ = testName 614 except TypeError: 615 # can only set procedure names in python >= 2.4 616 pass 617 # trial seems to require a timeout, at least in twisted 2.4, so give 618 # it a nice one 619 wrappedtest.timeout = 666 620 return wrappedtest 621