1from lldbsuite.test.lldbtest import *
2import os
3import time
4import json
5
6ADDRESS_REGEX = '0x[0-9a-fA-F]*'
7
8# Decorator that runs a test with both modes of USE_SB_API.
9# It assumes that no tests can be executed in parallel.
10def testSBAPIAndCommands(func):
11    def wrapper(*args, **kwargs):
12        TraceIntelPTTestCaseBase.USE_SB_API = True
13        func(*args, **kwargs)
14        TraceIntelPTTestCaseBase.USE_SB_API = False
15        func(*args, **kwargs)
16    return wrapper
17
18# Class that should be used by all python Intel PT tests.
19#
20# It has a handy check that skips the test if the intel-pt plugin is not enabled.
21#
22# It also contains many functions that can test both the SB API or the command line version
23# of the most important tracing actions.
24class TraceIntelPTTestCaseBase(TestBase):
25
26    NO_DEBUG_INFO_TESTCASE = True
27
28    # If True, the trace test methods will use the SB API, otherwise they'll use raw commands.
29    USE_SB_API = False
30
31    def setUp(self):
32        TestBase.setUp(self)
33        if 'intel-pt' not in configuration.enabled_plugins:
34            self.skipTest("The intel-pt test plugin is not enabled")
35
36    def skipIfPerCpuTracingIsNotSupported(self):
37        def is_supported():
38            try:
39                with open("/proc/sys/kernel/perf_event_paranoid", "r") as permissions:
40                    value = int(permissions.readlines()[0])
41                    if value <= 0:
42                        return True
43            except:
44                return False
45        if not is_supported():
46            self.skipTest("Per cpu tracing is not supported. You need "
47                "/proc/sys/kernel/perf_event_paranoid to be 0 or -1. "
48                "You can use `sudo sysctl -w kernel.perf_event_paranoid=-1` for that.")
49
50    def getTraceOrCreate(self):
51        if not self.target().GetTrace().IsValid():
52            error = lldb.SBError()
53            self.target().CreateTrace(error)
54        return self.target().GetTrace()
55
56    def assertSBError(self, sberror, error=False):
57        if error:
58            self.assertTrue(sberror.Fail())
59        else:
60            self.assertSuccess(sberror)
61
62    def createConfiguration(self, iptTraceSize=None,
63                            processBufferSizeLimit=None, enableTsc=False,
64                            psbPeriod=None, perCpuTracing=False):
65        obj = {}
66        if processBufferSizeLimit is not None:
67            obj["processBufferSizeLimit"] = processBufferSizeLimit
68        if iptTraceSize is not None:
69            obj["iptTraceSize"] = iptTraceSize
70        if psbPeriod is not None:
71            obj["psbPeriod"] = psbPeriod
72        obj["enableTsc"] = enableTsc
73        obj["perCpuTracing"] = perCpuTracing
74
75        configuration = lldb.SBStructuredData()
76        configuration.SetFromJSON(json.dumps(obj))
77        return configuration
78
79    def traceStartThread(self, thread=None, error=False, substrs=None,
80                         iptTraceSize=None, enableTsc=False, psbPeriod=None):
81        if self.USE_SB_API:
82            trace = self.getTraceOrCreate()
83            thread = thread if thread is not None else self.thread()
84            configuration = self.createConfiguration(
85                iptTraceSize=iptTraceSize, enableTsc=enableTsc,
86                psbPeriod=psbPeriod)
87            self.assertSBError(trace.Start(thread, configuration), error)
88        else:
89            command = "thread trace start"
90            if thread is not None:
91                command += " " + str(thread.GetIndexID())
92            if iptTraceSize is not None:
93                command += " -s " + str(iptTraceSize)
94            if enableTsc:
95                command += " --tsc"
96            if psbPeriod is not None:
97                command += " --psb-period " + str(psbPeriod)
98            self.expect(command, error=error, substrs=substrs)
99
100    def traceStartProcess(self, processBufferSizeLimit=None, error=False,
101                          substrs=None, enableTsc=False, psbPeriod=None,
102                          perCpuTracing=False):
103        if self.USE_SB_API:
104            trace = self.getTraceOrCreate()
105            configuration = self.createConfiguration(
106                processBufferSizeLimit=processBufferSizeLimit, enableTsc=enableTsc,
107                psbPeriod=psbPeriod, perCpuTracing=perCpuTracing)
108            self.assertSBError(trace.Start(configuration), error=error)
109        else:
110            command = "process trace start"
111            if processBufferSizeLimit != None:
112                command += " -l " + str(processBufferSizeLimit)
113            if enableTsc:
114                command += " --tsc"
115            if psbPeriod is not None:
116                command += " --psb-period " + str(psbPeriod)
117            if perCpuTracing:
118                command += " --per-cpu-tracing"
119            self.expect(command, error=error, substrs=substrs)
120
121    def traceStopProcess(self):
122        if self.USE_SB_API:
123            self.assertSuccess(self.target().GetTrace().Stop())
124        else:
125            self.expect("process trace stop")
126
127    def traceStopThread(self, thread=None, error=False, substrs=None):
128        if self.USE_SB_API:
129            thread = thread if thread is not None else self.thread()
130            self.assertSBError(self.target().GetTrace().Stop(thread), error)
131
132        else:
133            command = "thread trace stop"
134            if thread is not None:
135                command += " " + str(thread.GetIndexID())
136            self.expect(command, error=error, substrs=substrs)
137
138    def traceLoad(self, traceDescriptionFilePath, error=False, substrs=None):
139        if self.USE_SB_API:
140            traceDescriptionFile = lldb.SBFileSpec(traceDescriptionFilePath, True)
141            loadTraceError = lldb.SBError()
142            self.dbg.LoadTraceFromFile(loadTraceError, traceDescriptionFile)
143            self.assertSBError(loadTraceError, error)
144        else:
145            command = f"trace load -v {traceDescriptionFilePath}"
146            self.expect(command, error=error, substrs=substrs)
147
148    def traceSave(self, traceBundleDir, compact=False, error=False, substrs=None):
149        if self.USE_SB_API:
150            save_error = lldb.SBError()
151            self.target().GetTrace().SaveToDisk(
152                save_error, lldb.SBFileSpec(traceBundleDir), compact)
153            self.assertSBError(save_error, error)
154        else:
155            command = f"trace save {traceBundleDir}"
156            if compact:
157                command += " -c"
158            self.expect(command, error=error, substrs=substrs)
159