1#!/usr/bin/env python
2# Copyright (c) Sasha Goldshtein
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5import os
6import subprocess
7from bcc import SymbolCache, BPF
8from unittest import main, TestCase
9
10class TestKSyms(TestCase):
11    def grab_sym(self):
12        address = ""
13        aliases = []
14
15        # Grab the first symbol in kallsyms that has type 't' or 'T'.
16        # Also, find all aliases of this symbol which are identifiable
17        # by the same address.
18        with open("/proc/kallsyms", "rb") as f:
19            for line in f:
20
21                # Extract the first 3 columns only. The 4th column
22                # containing the module name may not exist for all
23                # symbols.
24                (addr, t, name) = line.strip().split()[:3]
25                if t == b"t" or t == b"T":
26                    if not address:
27                        address = addr
28                    if addr == address:
29                        aliases.append(name)
30
31        # Return all aliases of the first symbol.
32        return (address, aliases)
33
34    def test_ksymname(self):
35        sym = BPF.ksymname(b"__kmalloc")
36        self.assertIsNotNone(sym)
37        self.assertNotEqual(sym, 0)
38
39    def test_ksym(self):
40        (addr, aliases) = self.grab_sym()
41        sym = BPF.ksym(int(addr, 16))
42        found = sym in aliases
43        self.assertTrue(found)
44
45class Harness(TestCase):
46    def setUp(self):
47        self.build_command()
48        subprocess.check_output('objcopy --only-keep-debug dummy dummy.debug'
49                                .split())
50        self.debug_command()
51        subprocess.check_output('strip dummy'.split())
52        self.process = subprocess.Popen('./dummy', stdout=subprocess.PIPE)
53        # The process prints out the address of some symbol, which we then
54        # try to resolve in the test.
55        self.addr = int(self.process.stdout.readline().strip(), 16)
56        self.syms = SymbolCache(self.process.pid)
57
58    def tearDown(self):
59        self.process.kill()
60        self.process.wait()
61        self.process.stdout.close()
62        self.process = None
63
64    def resolve_addr(self):
65        sym, offset, module = self.syms.resolve(self.addr, False)
66        self.assertEqual(sym, self.mangled_name)
67        self.assertEqual(offset, 0)
68        self.assertTrue(module[-5:] == b'dummy')
69        sym, offset, module = self.syms.resolve(self.addr, True)
70        self.assertEqual(sym, b'some_namespace::some_function(int, int)')
71        self.assertEqual(offset, 0)
72        self.assertTrue(module[-5:] == b'dummy')
73
74
75    def resolve_name(self):
76        script_dir = os.path.dirname(os.path.realpath(__file__).encode("utf8"))
77        addr = self.syms.resolve_name(os.path.join(script_dir, b'dummy'),
78                                      self.mangled_name)
79        self.assertEqual(addr, self.addr)
80        pass
81
82class TestDebuglink(Harness):
83    def build_command(self):
84        subprocess.check_output('g++ -o dummy dummy.cc'.split())
85        lines = subprocess.check_output('nm dummy'.split()).splitlines()
86        for line in lines:
87            if b"some_function" in line:
88                self.mangled_name = line.split(b' ')[2]
89                break
90        self.assertTrue(self.mangled_name)
91
92    def debug_command(self):
93        subprocess.check_output('objcopy --add-gnu-debuglink=dummy.debug dummy'
94                                .split())
95
96    def tearDown(self):
97        super(TestDebuglink, self).tearDown()
98        subprocess.check_output('rm dummy dummy.debug'.split())
99
100    def test_resolve_addr(self):
101        self.resolve_addr()
102
103    def test_resolve_name(self):
104        self.resolve_name()
105
106class TestBuildid(Harness):
107    def build_command(self):
108        subprocess.check_output(('g++ -o dummy -Xlinker ' + \
109               '--build-id=0x123456789abcdef0123456789abcdef012345678 dummy.cc')
110               .split())
111        lines = subprocess.check_output('nm dummy'.split()).splitlines()
112        for line in lines:
113            if b"some_function" in line:
114                self.mangled_name = line.split(b' ')[2]
115                break
116        self.assertTrue(self.mangled_name)
117
118
119    def debug_command(self):
120        subprocess.check_output('mkdir -p /usr/lib/debug/.build-id/12'.split())
121        subprocess.check_output(('mv dummy.debug /usr/lib/debug/.build-id' + \
122            '/12/3456789abcdef0123456789abcdef012345678.debug').split())
123
124    def tearDown(self):
125        super(TestBuildid, self).tearDown()
126        subprocess.check_output('rm dummy'.split())
127        subprocess.check_output(('rm /usr/lib/debug/.build-id/12' +
128            '/3456789abcdef0123456789abcdef012345678.debug').split())
129
130    def test_resolve_name(self):
131        self.resolve_addr()
132
133    def test_resolve_addr(self):
134        self.resolve_name()
135
136if __name__ == "__main__":
137    main()
138