1#!/usr/bin/env python
2# Copyright (c) PLUMgrid, Inc.
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5import bcc
6import ctypes
7import errno
8import os
9import subprocess
10import shutil
11import time
12import unittest
13
14class TestUprobes(unittest.TestCase):
15    def test_simple_library(self):
16        text = """
17#include <uapi/linux/ptrace.h>
18BPF_ARRAY(stats, u64, 1);
19static void incr(int idx) {
20    u64 *ptr = stats.lookup(&idx);
21    if (ptr)
22        ++(*ptr);
23}
24int count(struct pt_regs *ctx) {
25    bpf_trace_printk("count() uprobe fired");
26    u32 pid = bpf_get_current_pid_tgid();
27    if (pid == PID)
28        incr(0);
29    return 0;
30}"""
31        test_pid = os.getpid()
32        text = text.replace("PID", "%d" % test_pid)
33        b = bcc.BPF(text=text)
34        b.attach_uprobe(name="c", sym="malloc_stats", fn_name="count", pid=test_pid)
35        b.attach_uretprobe(name="c", sym="malloc_stats", fn_name="count", pid=test_pid)
36        libc = ctypes.CDLL("libc.so.6")
37        libc.malloc_stats.restype = None
38        libc.malloc_stats.argtypes = []
39        libc.malloc_stats()
40        self.assertEqual(b["stats"][ctypes.c_int(0)].value, 2)
41        b.detach_uretprobe(name="c", sym="malloc_stats", pid=test_pid)
42        b.detach_uprobe(name="c", sym="malloc_stats", pid=test_pid)
43
44    def test_simple_binary(self):
45        text = """
46#include <uapi/linux/ptrace.h>
47BPF_ARRAY(stats, u64, 1);
48static void incr(int idx) {
49    u64 *ptr = stats.lookup(&idx);
50    if (ptr)
51        ++(*ptr);
52}
53int count(struct pt_regs *ctx) {
54    u32 pid = bpf_get_current_pid_tgid();
55    incr(0);
56    return 0;
57}"""
58        b = bcc.BPF(text=text)
59        b.attach_uprobe(name="/usr/bin/python", sym="main", fn_name="count")
60        b.attach_uretprobe(name="/usr/bin/python", sym="main", fn_name="count")
61        with os.popen("/usr/bin/python -V") as f:
62            pass
63        self.assertGreater(b["stats"][ctypes.c_int(0)].value, 0)
64        b.detach_uretprobe(name="/usr/bin/python", sym="main")
65        b.detach_uprobe(name="/usr/bin/python", sym="main")
66
67    def test_mount_namespace(self):
68        text = """
69#include <uapi/linux/ptrace.h>
70BPF_TABLE("array", int, u64, stats, 1);
71static void incr(int idx) {
72    u64 *ptr = stats.lookup(&idx);
73    if (ptr)
74        ++(*ptr);
75}
76int count(struct pt_regs *ctx) {
77    bpf_trace_printk("count() uprobe fired");
78    u32 pid = bpf_get_current_pid_tgid();
79    if (pid == PID)
80        incr(0);
81    return 0;
82}"""
83        # Need to import libc from ctypes to access unshare(2)
84        libc = ctypes.CDLL("libc.so.6", use_errno=True)
85
86        # Need to find path to libz.so.1
87        libz_path = None
88        p = subprocess.Popen(["ldconfig", "-p"], stdout=subprocess.PIPE)
89        for l in p.stdout:
90            n = l.split()
91            if n[0] == b"libz.so.1":
92                # if libz was already found, override only if new lib is more
93                # specific (e.g. libc6,x86-64 vs libc6)
94                if not libz_path or len(n[1].split(b",")) > 1:
95                    libz_path = n[-1]
96        p.wait()
97        p.stdout.close()
98        p = None
99
100        self.assertIsNotNone(libz_path)
101
102        # fork a child that we'll place in a separate mount namespace
103        child_pid = os.fork()
104        if child_pid == 0:
105            # Unshare CLONE_NEWNS
106            if libc.unshare(0x00020000) == -1:
107                e = ctypes.get_errno()
108                raise OSError(e, errno.errorcode[e])
109
110            # Remount root MS_REC|MS_PRIVATE
111            if libc.mount(None, b"/", None, (1<<14)|(1<<18) , None) == -1:
112                e = ctypes.get_errno()
113                raise OSError(e, errno.errorcode[e])
114
115            if libc.mount(b"tmpfs", b"/tmp", b"tmpfs", 0, None) == -1:
116                e = ctypes.get_errno()
117                raise OSError(e, errno.errorcode[e])
118
119            shutil.copy(libz_path, b"/tmp")
120
121            libz = ctypes.CDLL("/tmp/libz.so.1")
122            time.sleep(1)
123            libz.zlibVersion()
124            time.sleep(5)
125            os._exit(0)
126
127        libname = "/tmp/libz.so.1"
128        symname = "zlibVersion"
129        text = text.replace("PID", "%d" % child_pid)
130        b = bcc.BPF(text=text)
131        b.attach_uprobe(name=libname, sym=symname, fn_name="count", pid=child_pid)
132        b.attach_uretprobe(name=libname, sym=symname, fn_name="count", pid=child_pid)
133        time.sleep(1)
134        self.assertEqual(b["stats"][ctypes.c_int(0)].value, 2)
135        b.detach_uretprobe(name=libname, sym=symname, pid=child_pid)
136        b.detach_uprobe(name=libname, sym=symname, pid=child_pid)
137        os.wait()
138
139if __name__ == "__main__":
140    unittest.main()
141