1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3#
4# A thin wrapper on top of the KUnit Kernel
5#
6# Copyright (C) 2019, Google LLC.
7# Author: Felix Guo <felixguoxiuping@gmail.com>
8# Author: Brendan Higgins <brendanhiggins@google.com>
9
10import argparse
11import os
12import re
13import shlex
14import sys
15import time
16
17assert sys.version_info >= (3, 7), "Python version is too old"
18
19from dataclasses import dataclass
20from enum import Enum, auto
21from typing import Iterable, List, Optional, Sequence, Tuple
22
23import kunit_json
24import kunit_kernel
25import kunit_parser
26from kunit_printer import stdout
27
28class KunitStatus(Enum):
29	SUCCESS = auto()
30	CONFIG_FAILURE = auto()
31	BUILD_FAILURE = auto()
32	TEST_FAILURE = auto()
33
34@dataclass
35class KunitResult:
36	status: KunitStatus
37	elapsed_time: float
38
39@dataclass
40class KunitConfigRequest:
41	build_dir: str
42	make_options: Optional[List[str]]
43
44@dataclass
45class KunitBuildRequest(KunitConfigRequest):
46	jobs: int
47
48@dataclass
49class KunitParseRequest:
50	raw_output: Optional[str]
51	json: Optional[str]
52
53@dataclass
54class KunitExecRequest(KunitParseRequest):
55	build_dir: str
56	timeout: int
57	filter_glob: str
58	filter: str
59	filter_action: Optional[str]
60	kernel_args: Optional[List[str]]
61	run_isolated: Optional[str]
62	list_tests: bool
63	list_tests_attr: bool
64
65@dataclass
66class KunitRequest(KunitExecRequest, KunitBuildRequest):
67	pass
68
69
70def get_kernel_root_path() -> str:
71	path = sys.argv[0] if not __file__ else __file__
72	parts = os.path.realpath(path).split('tools/testing/kunit')
73	if len(parts) != 2:
74		sys.exit(1)
75	return parts[0]
76
77def config_tests(linux: kunit_kernel.LinuxSourceTree,
78		 request: KunitConfigRequest) -> KunitResult:
79	stdout.print_with_timestamp('Configuring KUnit Kernel ...')
80
81	config_start = time.time()
82	success = linux.build_reconfig(request.build_dir, request.make_options)
83	config_end = time.time()
84	status = KunitStatus.SUCCESS if success else KunitStatus.CONFIG_FAILURE
85	return KunitResult(status, config_end - config_start)
86
87def build_tests(linux: kunit_kernel.LinuxSourceTree,
88		request: KunitBuildRequest) -> KunitResult:
89	stdout.print_with_timestamp('Building KUnit Kernel ...')
90
91	build_start = time.time()
92	success = linux.build_kernel(request.jobs,
93				     request.build_dir,
94				     request.make_options)
95	build_end = time.time()
96	status = KunitStatus.SUCCESS if success else KunitStatus.BUILD_FAILURE
97	return KunitResult(status, build_end - build_start)
98
99def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree,
100			   request: KunitBuildRequest) -> KunitResult:
101	config_result = config_tests(linux, request)
102	if config_result.status != KunitStatus.SUCCESS:
103		return config_result
104
105	return build_tests(linux, request)
106
107def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]:
108	args = ['kunit.action=list']
109
110	if request.kernel_args:
111		args.extend(request.kernel_args)
112
113	output = linux.run_kernel(args=args,
114			   timeout=request.timeout,
115			   filter_glob=request.filter_glob,
116			   filter=request.filter,
117			   filter_action=request.filter_action,
118			   build_dir=request.build_dir)
119	lines = kunit_parser.extract_tap_lines(output)
120	# Hack! Drop the dummy TAP version header that the executor prints out.
121	lines.pop()
122
123	# Filter out any extraneous non-test output that might have gotten mixed in.
124	return [l for l in output if re.match(r'^[^\s.]+\.[^\s.]+$', l)]
125
126def _list_tests_attr(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> Iterable[str]:
127	args = ['kunit.action=list_attr']
128
129	if request.kernel_args:
130		args.extend(request.kernel_args)
131
132	output = linux.run_kernel(args=args,
133			   timeout=request.timeout,
134			   filter_glob=request.filter_glob,
135			   filter=request.filter,
136			   filter_action=request.filter_action,
137			   build_dir=request.build_dir)
138	lines = kunit_parser.extract_tap_lines(output)
139	# Hack! Drop the dummy TAP version header that the executor prints out.
140	lines.pop()
141
142	# Filter out any extraneous non-test output that might have gotten mixed in.
143	return lines
144
145def _suites_from_test_list(tests: List[str]) -> List[str]:
146	"""Extracts all the suites from an ordered list of tests."""
147	suites = []  # type: List[str]
148	for t in tests:
149		parts = t.split('.', maxsplit=2)
150		if len(parts) != 2:
151			raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"')
152		suite, _ = parts
153		if not suites or suites[-1] != suite:
154			suites.append(suite)
155	return suites
156
157def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult:
158	filter_globs = [request.filter_glob]
159	if request.list_tests:
160		output = _list_tests(linux, request)
161		for line in output:
162			print(line.rstrip())
163		return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0)
164	if request.list_tests_attr:
165		attr_output = _list_tests_attr(linux, request)
166		for line in attr_output:
167			print(line.rstrip())
168		return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0)
169	if request.run_isolated:
170		tests = _list_tests(linux, request)
171		if request.run_isolated == 'test':
172			filter_globs = tests
173		elif request.run_isolated == 'suite':
174			filter_globs = _suites_from_test_list(tests)
175			# Apply the test-part of the user's glob, if present.
176			if '.' in request.filter_glob:
177				test_glob = request.filter_glob.split('.', maxsplit=2)[1]
178				filter_globs = [g + '.'+ test_glob for g in filter_globs]
179
180	metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig')
181
182	test_counts = kunit_parser.TestCounts()
183	exec_time = 0.0
184	for i, filter_glob in enumerate(filter_globs):
185		stdout.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs)))
186
187		test_start = time.time()
188		run_result = linux.run_kernel(
189			args=request.kernel_args,
190			timeout=request.timeout,
191			filter_glob=filter_glob,
192			filter=request.filter,
193			filter_action=request.filter_action,
194			build_dir=request.build_dir)
195
196		_, test_result = parse_tests(request, metadata, run_result)
197		# run_kernel() doesn't block on the kernel exiting.
198		# That only happens after we get the last line of output from `run_result`.
199		# So exec_time here actually contains parsing + execution time, which is fine.
200		test_end = time.time()
201		exec_time += test_end - test_start
202
203		test_counts.add_subtest_counts(test_result.counts)
204
205	if len(filter_globs) == 1 and test_counts.crashed > 0:
206		bd = request.build_dir
207		print('The kernel seems to have crashed; you can decode the stack traces with:')
208		print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format(
209				bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0]))
210
211	kunit_status = _map_to_overall_status(test_counts.get_status())
212	return KunitResult(status=kunit_status, elapsed_time=exec_time)
213
214def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus:
215	if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED):
216		return KunitStatus.SUCCESS
217	return KunitStatus.TEST_FAILURE
218
219def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]:
220	parse_start = time.time()
221
222	if request.raw_output:
223		# Treat unparsed results as one passing test.
224		fake_test = kunit_parser.Test()
225		fake_test.status = kunit_parser.TestStatus.SUCCESS
226		fake_test.counts.passed = 1
227
228		output: Iterable[str] = input_data
229		if request.raw_output == 'all':
230			pass
231		elif request.raw_output == 'kunit':
232			output = kunit_parser.extract_tap_lines(output)
233		for line in output:
234			print(line.rstrip())
235		parse_time = time.time() - parse_start
236		return KunitResult(KunitStatus.SUCCESS, parse_time), fake_test
237
238
239	# Actually parse the test results.
240	test = kunit_parser.parse_run_tests(input_data)
241	parse_time = time.time() - parse_start
242
243	if request.json:
244		json_str = kunit_json.get_json_result(
245					test=test,
246					metadata=metadata)
247		if request.json == 'stdout':
248			print(json_str)
249		else:
250			with open(request.json, 'w') as f:
251				f.write(json_str)
252			stdout.print_with_timestamp("Test results stored in %s" %
253				os.path.abspath(request.json))
254
255	if test.status != kunit_parser.TestStatus.SUCCESS:
256		return KunitResult(KunitStatus.TEST_FAILURE, parse_time), test
257
258	return KunitResult(KunitStatus.SUCCESS, parse_time), test
259
260def run_tests(linux: kunit_kernel.LinuxSourceTree,
261	      request: KunitRequest) -> KunitResult:
262	run_start = time.time()
263
264	config_result = config_tests(linux, request)
265	if config_result.status != KunitStatus.SUCCESS:
266		return config_result
267
268	build_result = build_tests(linux, request)
269	if build_result.status != KunitStatus.SUCCESS:
270		return build_result
271
272	exec_result = exec_tests(linux, request)
273
274	run_end = time.time()
275
276	stdout.print_with_timestamp((
277		'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' +
278		'building, %.3fs running\n') % (
279				run_end - run_start,
280				config_result.elapsed_time,
281				build_result.elapsed_time,
282				exec_result.elapsed_time))
283	return exec_result
284
285# Problem:
286# $ kunit.py run --json
287# works as one would expect and prints the parsed test results as JSON.
288# $ kunit.py run --json suite_name
289# would *not* pass suite_name as the filter_glob and print as json.
290# argparse will consider it to be another way of writing
291# $ kunit.py run --json=suite_name
292# i.e. it would run all tests, and dump the json to a `suite_name` file.
293# So we hackily automatically rewrite --json => --json=stdout
294pseudo_bool_flag_defaults = {
295		'--json': 'stdout',
296		'--raw_output': 'kunit',
297}
298def massage_argv(argv: Sequence[str]) -> Sequence[str]:
299	def massage_arg(arg: str) -> str:
300		if arg not in pseudo_bool_flag_defaults:
301			return arg
302		return  f'{arg}={pseudo_bool_flag_defaults[arg]}'
303	return list(map(massage_arg, argv))
304
305def get_default_jobs() -> int:
306	return len(os.sched_getaffinity(0))
307
308def add_common_opts(parser: argparse.ArgumentParser) -> None:
309	parser.add_argument('--build_dir',
310			    help='As in the make command, it specifies the build '
311			    'directory.',
312			    type=str, default='.kunit', metavar='DIR')
313	parser.add_argument('--make_options',
314			    help='X=Y make option, can be repeated.',
315			    action='append', metavar='X=Y')
316	parser.add_argument('--alltests',
317			    help='Run all KUnit tests via tools/testing/kunit/configs/all_tests.config',
318			    action='store_true')
319	parser.add_argument('--kunitconfig',
320			     help='Path to Kconfig fragment that enables KUnit tests.'
321			     ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" '
322			     'will get  automatically appended. If repeated, the files '
323			     'blindly concatenated, which might not work in all cases.',
324			     action='append', metavar='PATHS')
325	parser.add_argument('--kconfig_add',
326			     help='Additional Kconfig options to append to the '
327			     '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.',
328			    action='append', metavar='CONFIG_X=Y')
329
330	parser.add_argument('--arch',
331			    help=('Specifies the architecture to run tests under. '
332				  'The architecture specified here must match the '
333				  'string passed to the ARCH make param, '
334				  'e.g. i386, x86_64, arm, um, etc. Non-UML '
335				  'architectures run on QEMU.'),
336			    type=str, default='um', metavar='ARCH')
337
338	parser.add_argument('--cross_compile',
339			    help=('Sets make\'s CROSS_COMPILE variable; it should '
340				  'be set to a toolchain path prefix (the prefix '
341				  'of gcc and other tools in your toolchain, for '
342				  'example `sparc64-linux-gnu-` if you have the '
343				  'sparc toolchain installed on your system, or '
344				  '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` '
345				  'if you have downloaded the microblaze toolchain '
346				  'from the 0-day website to a directory in your '
347				  'home directory called `toolchains`).'),
348			    metavar='PREFIX')
349
350	parser.add_argument('--qemu_config',
351			    help=('Takes a path to a path to a file containing '
352				  'a QemuArchParams object.'),
353			    type=str, metavar='FILE')
354
355	parser.add_argument('--qemu_args',
356			    help='Additional QEMU arguments, e.g. "-smp 8"',
357			    action='append', metavar='')
358
359def add_build_opts(parser: argparse.ArgumentParser) -> None:
360	parser.add_argument('--jobs',
361			    help='As in the make command, "Specifies  the number of '
362			    'jobs (commands) to run simultaneously."',
363			    type=int, default=get_default_jobs(), metavar='N')
364
365def add_exec_opts(parser: argparse.ArgumentParser) -> None:
366	parser.add_argument('--timeout',
367			    help='maximum number of seconds to allow for all tests '
368			    'to run. This does not include time taken to build the '
369			    'tests.',
370			    type=int,
371			    default=300,
372			    metavar='SECONDS')
373	parser.add_argument('filter_glob',
374			    help='Filter which KUnit test suites/tests run at '
375			    'boot-time, e.g. list* or list*.*del_test',
376			    type=str,
377			    nargs='?',
378			    default='',
379			    metavar='filter_glob')
380	parser.add_argument('--filter',
381			    help='Filter KUnit tests with attributes, '
382			    'e.g. module=example or speed>slow',
383			    type=str,
384				default='')
385	parser.add_argument('--filter_action',
386			    help='If set to skip, filtered tests will be skipped, '
387				'e.g. --filter_action=skip. Otherwise they will not run.',
388			    type=str,
389				choices=['skip'])
390	parser.add_argument('--kernel_args',
391			    help='Kernel command-line parameters. Maybe be repeated',
392			     action='append', metavar='')
393	parser.add_argument('--run_isolated', help='If set, boot the kernel for each '
394			    'individual suite/test. This is can be useful for debugging '
395			    'a non-hermetic test, one that might pass/fail based on '
396			    'what ran before it.',
397			    type=str,
398			    choices=['suite', 'test'])
399	parser.add_argument('--list_tests', help='If set, list all tests that will be '
400			    'run.',
401			    action='store_true')
402	parser.add_argument('--list_tests_attr', help='If set, list all tests and test '
403			    'attributes.',
404			    action='store_true')
405
406def add_parse_opts(parser: argparse.ArgumentParser) -> None:
407	parser.add_argument('--raw_output', help='If set don\'t parse output from kernel. '
408			    'By default, filters to just KUnit output. Use '
409			    '--raw_output=all to show everything',
410			     type=str, nargs='?', const='all', default=None, choices=['all', 'kunit'])
411	parser.add_argument('--json',
412			    nargs='?',
413			    help='Prints parsed test results as JSON to stdout or a file if '
414			    'a filename is specified. Does nothing if --raw_output is set.',
415			    type=str, const='stdout', default=None, metavar='FILE')
416
417
418def tree_from_args(cli_args: argparse.Namespace) -> kunit_kernel.LinuxSourceTree:
419	"""Returns a LinuxSourceTree based on the user's arguments."""
420	# Allow users to specify multiple arguments in one string, e.g. '-smp 8'
421	qemu_args: List[str] = []
422	if cli_args.qemu_args:
423		for arg in cli_args.qemu_args:
424			qemu_args.extend(shlex.split(arg))
425
426	kunitconfigs = cli_args.kunitconfig if cli_args.kunitconfig else []
427	if cli_args.alltests:
428		# Prepend so user-specified options take prio if we ever allow
429		# --kunitconfig options to have differing options.
430		kunitconfigs = [kunit_kernel.ALL_TESTS_CONFIG_PATH] + kunitconfigs
431
432	return kunit_kernel.LinuxSourceTree(cli_args.build_dir,
433			kunitconfig_paths=kunitconfigs,
434			kconfig_add=cli_args.kconfig_add,
435			arch=cli_args.arch,
436			cross_compile=cli_args.cross_compile,
437			qemu_config_path=cli_args.qemu_config,
438			extra_qemu_args=qemu_args)
439
440
441def run_handler(cli_args: argparse.Namespace) -> None:
442	if not os.path.exists(cli_args.build_dir):
443		os.mkdir(cli_args.build_dir)
444
445	linux = tree_from_args(cli_args)
446	request = KunitRequest(build_dir=cli_args.build_dir,
447					make_options=cli_args.make_options,
448					jobs=cli_args.jobs,
449					raw_output=cli_args.raw_output,
450					json=cli_args.json,
451					timeout=cli_args.timeout,
452					filter_glob=cli_args.filter_glob,
453					filter=cli_args.filter,
454					filter_action=cli_args.filter_action,
455					kernel_args=cli_args.kernel_args,
456					run_isolated=cli_args.run_isolated,
457					list_tests=cli_args.list_tests,
458					list_tests_attr=cli_args.list_tests_attr)
459	result = run_tests(linux, request)
460	if result.status != KunitStatus.SUCCESS:
461		sys.exit(1)
462
463
464def config_handler(cli_args: argparse.Namespace) -> None:
465	if cli_args.build_dir and (
466			not os.path.exists(cli_args.build_dir)):
467		os.mkdir(cli_args.build_dir)
468
469	linux = tree_from_args(cli_args)
470	request = KunitConfigRequest(build_dir=cli_args.build_dir,
471						make_options=cli_args.make_options)
472	result = config_tests(linux, request)
473	stdout.print_with_timestamp((
474		'Elapsed time: %.3fs\n') % (
475			result.elapsed_time))
476	if result.status != KunitStatus.SUCCESS:
477		sys.exit(1)
478
479
480def build_handler(cli_args: argparse.Namespace) -> None:
481	linux = tree_from_args(cli_args)
482	request = KunitBuildRequest(build_dir=cli_args.build_dir,
483					make_options=cli_args.make_options,
484					jobs=cli_args.jobs)
485	result = config_and_build_tests(linux, request)
486	stdout.print_with_timestamp((
487		'Elapsed time: %.3fs\n') % (
488			result.elapsed_time))
489	if result.status != KunitStatus.SUCCESS:
490		sys.exit(1)
491
492
493def exec_handler(cli_args: argparse.Namespace) -> None:
494	linux = tree_from_args(cli_args)
495	exec_request = KunitExecRequest(raw_output=cli_args.raw_output,
496					build_dir=cli_args.build_dir,
497					json=cli_args.json,
498					timeout=cli_args.timeout,
499					filter_glob=cli_args.filter_glob,
500					filter=cli_args.filter,
501					filter_action=cli_args.filter_action,
502					kernel_args=cli_args.kernel_args,
503					run_isolated=cli_args.run_isolated,
504					list_tests=cli_args.list_tests,
505					list_tests_attr=cli_args.list_tests_attr)
506	result = exec_tests(linux, exec_request)
507	stdout.print_with_timestamp((
508		'Elapsed time: %.3fs\n') % (result.elapsed_time))
509	if result.status != KunitStatus.SUCCESS:
510		sys.exit(1)
511
512
513def parse_handler(cli_args: argparse.Namespace) -> None:
514	if cli_args.file is None:
515		sys.stdin.reconfigure(errors='backslashreplace')  # type: ignore
516		kunit_output = sys.stdin  # type: Iterable[str]
517	else:
518		with open(cli_args.file, 'r', errors='backslashreplace') as f:
519			kunit_output = f.read().splitlines()
520	# We know nothing about how the result was created!
521	metadata = kunit_json.Metadata()
522	request = KunitParseRequest(raw_output=cli_args.raw_output,
523					json=cli_args.json)
524	result, _ = parse_tests(request, metadata, kunit_output)
525	if result.status != KunitStatus.SUCCESS:
526		sys.exit(1)
527
528
529subcommand_handlers_map = {
530	'run': run_handler,
531	'config': config_handler,
532	'build': build_handler,
533	'exec': exec_handler,
534	'parse': parse_handler
535}
536
537
538def main(argv: Sequence[str]) -> None:
539	parser = argparse.ArgumentParser(
540			description='Helps writing and running KUnit tests.')
541	subparser = parser.add_subparsers(dest='subcommand')
542
543	# The 'run' command will config, build, exec, and parse in one go.
544	run_parser = subparser.add_parser('run', help='Runs KUnit tests.')
545	add_common_opts(run_parser)
546	add_build_opts(run_parser)
547	add_exec_opts(run_parser)
548	add_parse_opts(run_parser)
549
550	config_parser = subparser.add_parser('config',
551						help='Ensures that .config contains all of '
552						'the options in .kunitconfig')
553	add_common_opts(config_parser)
554
555	build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests')
556	add_common_opts(build_parser)
557	add_build_opts(build_parser)
558
559	exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests')
560	add_common_opts(exec_parser)
561	add_exec_opts(exec_parser)
562	add_parse_opts(exec_parser)
563
564	# The 'parse' option is special, as it doesn't need the kernel source
565	# (therefore there is no need for a build_dir, hence no add_common_opts)
566	# and the '--file' argument is not relevant to 'run', so isn't in
567	# add_parse_opts()
568	parse_parser = subparser.add_parser('parse',
569					    help='Parses KUnit results from a file, '
570					    'and parses formatted results.')
571	add_parse_opts(parse_parser)
572	parse_parser.add_argument('file',
573				  help='Specifies the file to read results from.',
574				  type=str, nargs='?', metavar='input_file')
575
576	cli_args = parser.parse_args(massage_argv(argv))
577
578	if get_kernel_root_path():
579		os.chdir(get_kernel_root_path())
580
581	subcomand_handler = subcommand_handlers_map.get(cli_args.subcommand, None)
582
583	if subcomand_handler is None:
584		parser.print_help()
585		return
586
587	subcomand_handler(cli_args)
588
589
590if __name__ == '__main__':
591	main(sys.argv[1:])
592