1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35	"""Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39	"""Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43	"""An abstraction over command line operations performed on a source tree."""
44
45	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46		self._linux_arch = linux_arch
47		self._cross_compile = cross_compile
48
49	def make_mrproper(self) -> None:
50		try:
51			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52		except OSError as e:
53			raise ConfigError('Could not call make command: ' + str(e))
54		except subprocess.CalledProcessError as e:
55			raise ConfigError(e.output.decode())
56
57	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58		return base_kunitconfig
59
60	def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62		if self._cross_compile:
63			command += ['CROSS_COMPILE=' + self._cross_compile]
64		if make_options:
65			command.extend(make_options)
66		print('Populating config with:\n$', ' '.join(command))
67		try:
68			subprocess.check_output(command, stderr=subprocess.STDOUT)
69		except OSError as e:
70			raise ConfigError('Could not call make command: ' + str(e))
71		except subprocess.CalledProcessError as e:
72			raise ConfigError(e.output.decode())
73
74	def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
76		if make_options:
77			command.extend(make_options)
78		if self._cross_compile:
79			command += ['CROSS_COMPILE=' + self._cross_compile]
80		print('Building with:\n$', ' '.join(command))
81		try:
82			proc = subprocess.Popen(command,
83						stderr=subprocess.PIPE,
84						stdout=subprocess.DEVNULL)
85		except OSError as e:
86			raise BuildError('Could not call execute make: ' + str(e))
87		except subprocess.CalledProcessError as e:
88			raise BuildError(e.output)
89		_, stderr = proc.communicate()
90		if proc.returncode != 0:
91			raise BuildError(stderr.decode())
92		if stderr:  # likely only due to build warnings
93			print(stderr.decode())
94
95	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
96		raise RuntimeError('not implemented!')
97
98
99class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
100
101	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
102		super().__init__(linux_arch=qemu_arch_params.linux_arch,
103				 cross_compile=cross_compile)
104		self._kconfig = qemu_arch_params.kconfig
105		self._qemu_arch = qemu_arch_params.qemu_arch
106		self._kernel_path = qemu_arch_params.kernel_path
107		self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
108		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
109		self._serial = qemu_arch_params.serial
110
111	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
112		kconfig = kunit_config.parse_from_string(self._kconfig)
113		kconfig.merge_in_entries(base_kunitconfig)
114		return kconfig
115
116	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
117		kernel_path = os.path.join(build_dir, self._kernel_path)
118		qemu_command = ['qemu-system-' + self._qemu_arch,
119				'-nodefaults',
120				'-m', '1024',
121				'-kernel', kernel_path,
122				'-append', ' '.join(params + [self._kernel_command_line]),
123				'-no-reboot',
124				'-nographic',
125				'-serial', self._serial] + self._extra_qemu_params
126		# Note: shlex.join() does what we want, but requires python 3.8+.
127		print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
128		return subprocess.Popen(qemu_command,
129					stdin=subprocess.PIPE,
130					stdout=subprocess.PIPE,
131					stderr=subprocess.STDOUT,
132					text=True, errors='backslashreplace')
133
134class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
135	"""An abstraction over command line operations performed on a source tree."""
136
137	def __init__(self, cross_compile: Optional[str]=None):
138		super().__init__(linux_arch='um', cross_compile=cross_compile)
139
140	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
141		kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
142		kconfig.merge_in_entries(base_kunitconfig)
143		return kconfig
144
145	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
146		"""Runs the Linux UML binary. Must be named 'linux'."""
147		linux_bin = os.path.join(build_dir, 'linux')
148		params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
149		print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
150		return subprocess.Popen([linux_bin] + params,
151					   stdin=subprocess.PIPE,
152					   stdout=subprocess.PIPE,
153					   stderr=subprocess.STDOUT,
154					   text=True, errors='backslashreplace')
155
156def get_kconfig_path(build_dir: str) -> str:
157	return os.path.join(build_dir, KCONFIG_PATH)
158
159def get_kunitconfig_path(build_dir: str) -> str:
160	return os.path.join(build_dir, KUNITCONFIG_PATH)
161
162def get_old_kunitconfig_path(build_dir: str) -> str:
163	return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
164
165def get_parsed_kunitconfig(build_dir: str,
166			   kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
167	if not kunitconfig_paths:
168		path = get_kunitconfig_path(build_dir)
169		if not os.path.exists(path):
170			shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
171		return kunit_config.parse_file(path)
172
173	merged = kunit_config.Kconfig()
174
175	for path in kunitconfig_paths:
176		if os.path.isdir(path):
177			path = os.path.join(path, KUNITCONFIG_PATH)
178		if not os.path.exists(path):
179			raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
180
181		partial = kunit_config.parse_file(path)
182		diff = merged.conflicting_options(partial)
183		if diff:
184			diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
185			raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
186		merged.merge_in_entries(partial)
187	return merged
188
189def get_outfile_path(build_dir: str) -> str:
190	return os.path.join(build_dir, OUTFILE_PATH)
191
192def _default_qemu_config_path(arch: str) -> str:
193	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
194	if os.path.isfile(config_path):
195		return config_path
196
197	options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
198	raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
199
200def _get_qemu_ops(config_path: str,
201		  extra_qemu_args: Optional[List[str]],
202		  cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
203	# The module name/path has very little to do with where the actual file
204	# exists (I learned this through experimentation and could not find it
205	# anywhere in the Python documentation).
206	#
207	# Bascially, we completely ignore the actual file location of the config
208	# we are loading and just tell Python that the module lives in the
209	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
210	# exists as a file.
211	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
212	spec = importlib.util.spec_from_file_location(module_path, config_path)
213	assert spec is not None
214	config = importlib.util.module_from_spec(spec)
215	# See https://github.com/python/typeshed/pull/2626 for context.
216	assert isinstance(spec.loader, importlib.abc.Loader)
217	spec.loader.exec_module(config)
218
219	if not hasattr(config, 'QEMU_ARCH'):
220		raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
221	params: qemu_config.QemuArchParams = config.QEMU_ARCH
222	if extra_qemu_args:
223		params.extra_qemu_params.extend(extra_qemu_args)
224	return params.linux_arch, LinuxSourceTreeOperationsQemu(
225			params, cross_compile=cross_compile)
226
227class LinuxSourceTree:
228	"""Represents a Linux kernel source tree with KUnit tests."""
229
230	def __init__(
231	      self,
232	      build_dir: str,
233	      kunitconfig_paths: Optional[List[str]]=None,
234	      kconfig_add: Optional[List[str]]=None,
235	      arch: Optional[str]=None,
236	      cross_compile: Optional[str]=None,
237	      qemu_config_path: Optional[str]=None,
238	      extra_qemu_args: Optional[List[str]]=None) -> None:
239		signal.signal(signal.SIGINT, self.signal_handler)
240		if qemu_config_path:
241			self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
242		else:
243			self._arch = 'um' if arch is None else arch
244			if self._arch == 'um':
245				self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
246			else:
247				qemu_config_path = _default_qemu_config_path(self._arch)
248				_, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
249
250		self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
251		if kconfig_add:
252			kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
253			self._kconfig.merge_in_entries(kconfig)
254
255	def arch(self) -> str:
256		return self._arch
257
258	def clean(self) -> bool:
259		try:
260			self._ops.make_mrproper()
261		except ConfigError as e:
262			logging.error(e)
263			return False
264		return True
265
266	def validate_config(self, build_dir: str) -> bool:
267		kconfig_path = get_kconfig_path(build_dir)
268		validated_kconfig = kunit_config.parse_file(kconfig_path)
269		if self._kconfig.is_subset_of(validated_kconfig):
270			return True
271		missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
272		message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
273			  'This is probably due to unsatisfied dependencies.\n' \
274			  'Missing: ' + ', '.join(str(e) for e in missing)
275		if self._arch == 'um':
276			message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
277				   'on a different architecture with something like "--arch=x86_64".'
278		logging.error(message)
279		return False
280
281	def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
282		kconfig_path = get_kconfig_path(build_dir)
283		if build_dir and not os.path.exists(build_dir):
284			os.mkdir(build_dir)
285		try:
286			self._kconfig = self._ops.make_arch_config(self._kconfig)
287			self._kconfig.write_to_file(kconfig_path)
288			self._ops.make_olddefconfig(build_dir, make_options)
289		except ConfigError as e:
290			logging.error(e)
291			return False
292		if not self.validate_config(build_dir):
293			return False
294
295		old_path = get_old_kunitconfig_path(build_dir)
296		if os.path.exists(old_path):
297			os.remove(old_path)  # write_to_file appends to the file
298		self._kconfig.write_to_file(old_path)
299		return True
300
301	def _kunitconfig_changed(self, build_dir: str) -> bool:
302		old_path = get_old_kunitconfig_path(build_dir)
303		if not os.path.exists(old_path):
304			return True
305
306		old_kconfig = kunit_config.parse_file(old_path)
307		return old_kconfig != self._kconfig
308
309	def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
310		"""Creates a new .config if it is not a subset of the .kunitconfig."""
311		kconfig_path = get_kconfig_path(build_dir)
312		if not os.path.exists(kconfig_path):
313			print('Generating .config ...')
314			return self.build_config(build_dir, make_options)
315
316		existing_kconfig = kunit_config.parse_file(kconfig_path)
317		self._kconfig = self._ops.make_arch_config(self._kconfig)
318
319		if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
320			return True
321		print('Regenerating .config ...')
322		os.remove(kconfig_path)
323		return self.build_config(build_dir, make_options)
324
325	def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
326		try:
327			self._ops.make_olddefconfig(build_dir, make_options)
328			self._ops.make(jobs, build_dir, make_options)
329		except (ConfigError, BuildError) as e:
330			logging.error(e)
331			return False
332		return self.validate_config(build_dir)
333
334	def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
335		if not args:
336			args = []
337		if filter_glob:
338			args.append('kunit.filter_glob=' + filter_glob)
339		if filter:
340			args.append('kunit.filter="' + filter + '"')
341		if filter_action:
342			args.append('kunit.filter_action=' + filter_action)
343		args.append('kunit.enable=1')
344
345		process = self._ops.start(args, build_dir)
346		assert process.stdout is not None  # tell mypy it's set
347
348		# Enforce the timeout in a background thread.
349		def _wait_proc() -> None:
350			try:
351				process.wait(timeout=timeout)
352			except Exception as e:
353				print(e)
354				process.terminate()
355				process.wait()
356		waiter = threading.Thread(target=_wait_proc)
357		waiter.start()
358
359		output = open(get_outfile_path(build_dir), 'w')
360		try:
361			# Tee the output to the file and to our caller in real time.
362			for line in process.stdout:
363				output.write(line)
364				yield line
365		# This runs even if our caller doesn't consume every line.
366		finally:
367			# Flush any leftover output to the file
368			output.write(process.stdout.read())
369			output.close()
370			process.stdout.close()
371
372			waiter.join()
373			subprocess.call(['stty', 'sane'])
374
375	def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
376		logging.error('Build interruption occurred. Cleaning console.')
377		subprocess.call(['stty', 'sane'])
378