1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Framework for writing automated integration tests.
20
21 This module provides a way of writing automated integration tests from
22 within Twisted's unit testing framework, trial. Test cases are
23 constructed as subclasses of the normal trial
24 L{twisted.trial.unittest.TestCase} class.
25
26 Integration tests look like normal test methods, except that they are
27 decorated with L{integration.test}, take an extra "plan" argument, and
28 do not return anything. For example::
29
30 from twisted.trial import unittest
31 from flumotion.twisted import integration
32
33 class IntegrationTestExample(unittest.TestCase):
34 @integration.test
35 def testEchoFunctionality(self, plan):
36 process = plan.spawn('echo', 'hello world')
37 plan.wait(process, 0)
38
39 This example will spawn a process, as if you typed "echo 'hello world'"
40 at the shell prompt. It then waits for the process to exit, expecting
41 the exit status to be 0.
42
43 The example illustrates two of the fundamental plan operators, spawn and
44 wait. "spawn" spawns a process. "wait" waits for a process to finish.
45 The other operators are "spawnPar", which spawns a number of processes
46 in parallel, "waitPar", which waits for a number of processes in
47 parallel, and "kill", which kills one or more processes via SIGTERM and
48 then waits for them to exit.
49
50 It is evident that this framework is most appropriate for testing the
51 integration of multiple processes, and is not suitable for in-process
52 tests. The plan that is built up is only executed after the test method
53 exits, via the L{integration.test} decorator; the writer of the
54 integration test does not have access to the plan's state.
55
56 Note that all process exits must be anticipated. If at any point the
57 integration tester receives SIGCHLD, the next operation must be a wait
58 for that process. If this is not the case, the test is interpreted as
59 having failed.
60
61 Also note that while the test is running, the stdout and stderr of each
62 spawned process is redirected into log files in a subdirectory of where
63 the test is located. For example, in the previous example, the following
64 files will be created::
65
66 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout
67 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr
68
69 In the case that multiple echo commands are run in the same plan, the
70 subsequent commands will be named as echo-1, echo-2, and the like. Upon
71 successful completion of the test case, the log directory will be
72 deleted.
73 """
74
75 import os
76 import signal
77 import tempfile
78
79 from twisted.internet import reactor, protocol, defer
80 from twisted.internet import error as ierror
81 from flumotion.common import log as flog
82 from twisted.internet.defer import failure
83
84 __version__ = "$Rev$"
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp()
108
109
110 -def log(format, *args):
112
113
114 -def debug(format, *args):
116
117
118 -def info(format, *args):
120
121
124
125
126 -def error(format, *args):
128
129
131 if os.sep in executable:
132 if os.access(os.path.abspath(executable), os.X_OK):
133 return os.path.abspath(executable)
134 elif os.getenv('PATH'):
135 for path in os.getenv('PATH').split(os.pathsep):
136 if os.access(os.path.join(path, executable), os.X_OK):
137 return os.path.join(path, executable)
138 raise CommandNotFoundException(executable)
139
140
142
143 - def __init__(self, process, expectedCode, actualCode):
144 Exception.__init__(self)
145 self.process = process
146 self.expected = expectedCode
147 self.actual = actualCode
148
150 return ('Expected exit code %r from %r, but got %r'
151 % (self.expected, self.process, self.actual))
152
153
155
159
161 return 'The process %r exited prematurely.' % self.process
162
163
165
169
171 return 'Command %r not found in the PATH.' % self.command
172
173
175
177 Exception.__init__(self)
178 self.processes = processes
179
181 return ('Processes still running at end of test: %r'
182 % (self.processes, ))
183
184
186
190
192 return ('Timed out waiting for %r to exit with status %r'
193 % (self.process, self.status))
194
195
197
199 self.exitDeferred = defer.Deferred()
200 self.timedOut = False
201
203 return self.exitDeferred
204
205 - def timeout(self, process, status):
209
211 info('process ended with status %r, exit code %r',
212 status, status.value.exitCode)
213 if self.timedOut:
214 warning('already timed out??')
215 print 'already timed out quoi?'
216 else:
217 info('process ended with status %r, exit code %r',
218 status, status.value.exitCode)
219 self.exitDeferred.callback(status.value.exitCode)
220
221
223
225 self.exitDeferred = defer.Deferred()
226 self.timedOut = False
227
229 return self.exitDeferred
230
231 - def timeout(self, process, status):
235
237 info('process ended with status %r, exit code %r',
238 status, status.value.exitCode)
239 if self.timedOut:
240 warning('already timed out??')
241 print 'already timed out quoi?'
242 else:
243 info('process ended with status %r, exit code %r',
244 status, status.value.exitCode)
245 self.exitDeferred.callback(status.value.exitCode)
246
247
249 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED'
250
251 - def __init__(self, name, argv, testDir):
252 self.name = name
253 self.argv = (_which(argv[0]), ) + argv[1:]
254 self.testDir = testDir
255
256 self.pid = None
257 self.protocol = None
258 self.state = self.NOT_STARTED
259 self._timeoutDC = None
260
261 log('created process object %r', self)
262
264 assert self.state == self.NOT_STARTED
265
266 self.protocol = ProcessProtocol()
267
268 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w')
269 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w')
270
271 childFDs = {1: stdout.fileno(), 2: stderr.fileno()}
272
273
274
275
276
277
278
279
280
281
282
283
284 info('spawning process %r, argv=%r', self, self.argv)
285 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL)
286 env = dict(os.environ)
287 env['FLU_DEBUG'] = '5'
288 process = reactor.spawnProcess(self.protocol, self.argv[0],
289 env=env, args=self.argv,
290 childFDs=childFDs)
291 signal.signal(signal.SIGTERM, termHandler)
292
293 stdout.close()
294 stderr.close()
295
296
297
298
299 self.pid = process.pid
300 self.state = self.STARTED
301
302 def got_exit(res):
303 self.state = self.STOPPED
304 info('process %r has stopped', self)
305 return res
306 self.protocol.getDeferred().addCallback(got_exit)
307
308 - def kill(self, sig=signal.SIGTERM):
309 assert self.state == self.STARTED
310 info('killing process %r, signal %d', self, sig)
311 os.kill(self.pid, sig)
312
313 - def wait(self, status, timeout=20):
324 d.addCallback(got_exit)
325 if self.state == self.STARTED:
326 self._timeoutDC = reactor.callLater(timeout,
327 self.protocol.timeout,
328 self,
329 status)
330
331 def cancel_timeout(res):
332 debug('cancelling timeout for %r', self)
333 if self._timeoutDC.active():
334 self._timeoutDC.cancel()
335 return res
336 d.addCallbacks(cancel_timeout, cancel_timeout)
337 return d
338
340 return '<Process %s in state %s>' % (self.name, self.state)
341
342
344 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED'
345
346 - def __init__(self, name, method, argv, testDir):
347 self.name = name
348 self.argv = argv
349 self.testDir = testDir
350
351 self.pid = None
352 self.method = method
353 self.protocol = None
354 self.state = self.NOT_STARTED
355 self._timeoutDC = None
356
357 log('created threaded method object %r', self)
358
360 self.method(self.argv)
361 info('process %r has stopped', self)
362 return self.protocol.processEnded(
363 failure.Failure(ierror.ProcessDone(0)))
364
377 self.protocol.getDeferred().addCallback(got_exit)
378
379 - def kill(self, sig=signal.SIGTERM):
383
384 - def wait(self, status, timeout=20):
395 d.addCallback(got_exit)
396 if self.state == self.STARTED:
397 self._timeoutDC = reactor.callLater(timeout,
398 self.protocol.timeout,
399 self,
400 status)
401
402 def cancel_timeout(res):
403 debug('cancelling timeout for %r', self)
404 if self._timeoutDC.active():
405 self._timeoutDC.cancel()
406 return res
407 d.addCallbacks(cancel_timeout, cancel_timeout)
408 return d
409
411 return '<Thread %s in state %s>' % (self.name, self.state)
412
413
415
416
418 self.processes = []
419 self.timeout = 20
420
421 - def spawn(self, process):
426
432
433 - def kill(self, process):
437
438 - def wait(self, process, exitCode):
439 assert process in self.processes
440
441 def remove_from_processes_list(_):
442 self.processes.remove(process)
443 d = process.wait(exitCode, timeout=self.timeout)
444 d.addCallback(remove_from_processes_list)
445 return d
446
462 p.protocol.processEnded = callbacker(d)
463 p.kill(sig=signal.SIGKILL)
464 d = defer.DeferredList(dlist)
465
466 def error(_):
467 if failure:
468 return failure
469 else:
470 raise e
471 d.addCallback(error)
472 return d
473 return failure
474
475 - def run(self, ops, timeout=20):
476 self.timeout = timeout
477 d = defer.Deferred()
478
479 def run_op(_, op):
480
481
482 return op[0](*op[1:])
483 for op in ops:
484 d.addCallback(run_op, op)
485 d.addCallbacks(lambda _: self._checkProcesses(failure=None),
486 lambda failure: self._checkProcesses(failure=failure))
487
488
489
490
491
492 reactor.callLater(0, d.callback, None)
493 return d
494
495
497
498 - def __init__(self, testCase, testName):
510
512 testDir = tempfile.mkdtemp(prefix="test_integration")
513 return testDir
514
516 tail = '%s-%s' % (self.testCaseName, self.name)
517 outputDir = os.path.join(testDir, tail)
518 os.mkdir(outputDir)
519 return outputDir
520
522 for root, dirs, files in os.walk(self.outputDir, topdown=False):
523 for name in files:
524 os.remove(os.path.join(root, name))
525 for name in dirs:
526 os.rmdir(os.path.join(root, name))
527 os.rmdir(self.outputDir)
528 os.rmdir(self.testDir)
529 self.testDir = None
530 self.outputDir = None
531
542
553
556
559
560 - def spawn(self, command, *args):
564
570
572 processes = []
573 self._appendOp(self.vm.checkExits, ())
574 for argv in argvs:
575 assert isinstance(argv, tuple), \
576 'all arguments to spawnPar must be tuples'
577 for arg in argv:
578 assert isinstance(arg, str), \
579 'all subarguments to spawnPar must be strings'
580 processes.append(self._allocProcess(argv))
581 for process in processes:
582 self._appendOp(self.vm.spawn, process)
583 return tuple(processes)
584
585 - def wait(self, process, status):
587
588 - def waitPar(self, *processStatusPairs):
593
594 - def kill(self, process, status=None):
598
603
604
606 testName = proc.__name__
607
608 def wrappedtest(self):
609 plan = Plan(self, testName)
610 proc(self, plan)
611 return plan.execute()
612 try:
613 wrappedtest.__name__ = testName
614 except TypeError:
615
616 pass
617
618
619 wrappedtest.timeout = 666
620 return wrappedtest
621