1#!/usr/bin/env python
2# Copyright (c) Sasha Goldshtein
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5import bcc
6import unittest
7from time import sleep
8import distutils.version
9import os
10import subprocess
11
12def kernel_version_ge(major, minor):
13    # True if running kernel is >= X.Y
14    version = distutils.version.LooseVersion(os.uname()[2]).version
15    if version[0] > major:
16        return True
17    if version[0] < major:
18        return False
19    if minor and version[1] < minor:
20        return False
21    return True
22
23@unittest.skipUnless(kernel_version_ge(4,7), "requires kernel >= 4.7")
24class TestTracepoint(unittest.TestCase):
25    def test_tracepoint(self):
26        text = """
27        BPF_HASH(switches, u32, u64);
28        TRACEPOINT_PROBE(sched, sched_switch) {
29            u64 val = 0;
30            u32 pid = args->next_pid;
31            u64 *existing = switches.lookup_or_init(&pid, &val);
32            (*existing)++;
33            return 0;
34        }
35        """
36        b = bcc.BPF(text=text)
37        sleep(1)
38        total_switches = 0
39        for k, v in b["switches"].items():
40            total_switches += v.value
41        self.assertNotEqual(0, total_switches)
42
43@unittest.skipUnless(kernel_version_ge(4,7), "requires kernel >= 4.7")
44class TestTracepointDataLoc(unittest.TestCase):
45    def test_tracepoint_data_loc(self):
46        text = """
47        struct value_t {
48            char filename[64];
49        };
50        BPF_HASH(execs, u32, struct value_t);
51        TRACEPOINT_PROBE(sched, sched_process_exec) {
52            struct value_t val = {0};
53            char fn[64];
54            u32 pid = args->pid;
55            struct value_t *existing = execs.lookup_or_init(&pid, &val);
56            TP_DATA_LOC_READ_CONST(fn, filename, 64);
57            __builtin_memcpy(existing->filename, fn, 64);
58            return 0;
59        }
60        """
61        b = bcc.BPF(text=text)
62        subprocess.check_output(["/bin/ls"])
63        sleep(1)
64        self.assertTrue("/bin/ls" in [v.filename.decode()
65                                      for v in b["execs"].values()])
66
67if __name__ == "__main__":
68    unittest.main()
69