1#!/usr/bin/env python
2
3from unittest import main, skipUnless, TestCase
4import distutils.version
5import os
6import subprocess
7import sys
8import tempfile
9
10TOOLS_DIR = "../../tools/"
11
12
13class cfg:
14    cmd_format = ""
15
16    # Amount of memory to leak. Note, that test application allocates memory
17    # for its own needs in libc, so this amount should be large enough to be
18    # the biggest allocation.
19    leaking_amount = 30000
20
21
22def kernel_version_ge(major, minor):
23    # True if running kernel is >= X.Y
24    version = distutils.version.LooseVersion(os.uname()[2]).version
25    if version[0] > major:
26        return True
27    if version[0] < major:
28        return False
29    if minor and version[1] < minor:
30        return False
31    return True
32
33
34def setUpModule():
35    # Build the memory leaking application.
36    c_src = 'test_tools_memleak_leaker_app.c'
37    tmp_dir = tempfile.mkdtemp(prefix='bcc-test-memleak-')
38    c_src_full = os.path.dirname(sys.argv[0]) + os.path.sep + c_src
39    exec_dst = tmp_dir + os.path.sep + 'leaker_app'
40
41    if subprocess.call(['gcc', '-g', '-O0', '-o', exec_dst, c_src_full]) != 0:
42        print("can't compile the leaking application")
43        raise Exception
44
45    # Taking two snapshot with one second interval. Getting the largest
46    # allocation. Since attaching to a program happens with a delay, we wait
47    # for the first snapshot, then issue the command to the app. Finally,
48    # second snapshot is used to extract the information.
49    # Helper utilities "timeout" and "setbuf" are used to limit overall running
50    # time, and to disable buffering.
51    cfg.cmd_format = (
52        'stdbuf -o 0 -i 0 timeout -s KILL 10s ' + TOOLS_DIR +
53        'memleak.py -c "{} {{}} {}" -T 1 1 2'.format(exec_dst,
54                                                     cfg.leaking_amount))
55
56
57@skipUnless(kernel_version_ge(4, 6), "requires kernel >= 4.6")
58class MemleakToolTests(TestCase):
59    def tearDown(self):
60        if self.p:
61            del(self.p)
62    def run_leaker(self, leak_kind):
63        # Starting memleak.py, which in turn launches the leaking application.
64        self.p = subprocess.Popen(cfg.cmd_format.format(leak_kind),
65                                  stdin=subprocess.PIPE, stdout=subprocess.PIPE,
66                                  shell=True)
67
68        # Waiting for the first report.
69        while True:
70            self.p.poll()
71            if self.p.returncode is not None:
72                break
73            line = self.p.stdout.readline()
74            if b"with outstanding allocations" in line:
75                break
76
77        # At this point, memleak.py have already launched application and set
78        # probes. Sending command to the leaking application to make its
79        # allocations.
80        out = self.p.communicate(input=b"\n")[0]
81
82        # If there were memory leaks, they are in the output. Filter the lines
83        # containing "byte" substring. Every interesting line is expected to
84        # start with "N bytes from"
85        x = [x for x in out.split(b'\n') if b'byte' in x]
86
87        self.assertTrue(len(x) >= 1,
88                        msg="At least one line should have 'byte' substring.")
89
90        # Taking last report.
91        x = x[-1].split()
92        self.assertTrue(len(x) >= 1,
93                        msg="There should be at least one word in the line.")
94
95        # First word is the leak amount in bytes.
96        return int(x[0])
97
98    def test_malloc(self):
99        self.assertEqual(cfg.leaking_amount, self.run_leaker("malloc"))
100
101    def test_calloc(self):
102        self.assertEqual(cfg.leaking_amount, self.run_leaker("calloc"))
103
104    def test_realloc(self):
105        self.assertEqual(cfg.leaking_amount, self.run_leaker("realloc"))
106
107    def test_posix_memalign(self):
108        self.assertEqual(cfg.leaking_amount, self.run_leaker("posix_memalign"))
109
110    def test_valloc(self):
111        self.assertEqual(cfg.leaking_amount, self.run_leaker("valloc"))
112
113    def test_memalign(self):
114        self.assertEqual(cfg.leaking_amount, self.run_leaker("memalign"))
115
116    def test_pvalloc(self):
117        self.assertEqual(cfg.leaking_amount, self.run_leaker("pvalloc"))
118
119    def test_aligned_alloc(self):
120        self.assertEqual(cfg.leaking_amount, self.run_leaker("aligned_alloc"))
121
122
123if __name__ == "__main__":
124    main()
125