1#!/usr/bin/env python
2#
3# USAGE: test_usdt3.py
4#
5# Copyright 2018 Facebook, Inc
6# Licensed under the Apache License, Version 2.0 (the "License")
7
8from __future__ import print_function
9from bcc import BPF, USDT
10from unittest import main, TestCase
11from subprocess import Popen, PIPE
12import ctypes as ct
13import inspect, os, tempfile
14
15class TestUDST(TestCase):
16    def setUp(self):
17        common_h = b"""
18#include "folly/tracing/StaticTracepoint.h"
19
20static inline void record_val(int val)
21{
22  FOLLY_SDT(test, probe, val);
23}
24
25extern void record_a(int val);
26extern void record_b(int val);
27"""
28
29        a_c = b"""
30#include <stdio.h>
31#include "common.h"
32
33void record_a(int val)
34{
35    record_val(val);
36}
37"""
38
39        b_c = b"""
40#include <stdio.h>
41#include "common.h"
42
43void record_b(int val)
44{
45    record_val(val);
46}
47"""
48
49        m_c = b"""
50#include <stdio.h>
51#include <unistd.h>
52#include "common.h"
53
54int main() {
55   while (1) {
56     record_a(1);
57     record_b(2);
58     record_val(3);
59     sleep(1);
60   }
61   return 0;
62}
63"""
64        # BPF program
65        self.bpf_text = """
66BPF_PERF_OUTPUT(event);
67int do_trace(struct pt_regs *ctx) {
68    int result = 0;
69    bpf_usdt_readarg(1, ctx, &result);
70    event.perf_submit(ctx, &result, sizeof(result));
71    return 0;
72};
73"""
74
75        def _create_file(name, text):
76            text_file = open(name, "wb")
77            text_file.write(text)
78            text_file.close()
79
80        # Create source files
81        self.tmp_dir = tempfile.mkdtemp()
82        print("temp directory: " + self.tmp_dir)
83        _create_file(self.tmp_dir + "/common.h", common_h)
84        _create_file(self.tmp_dir + "/a.c", a_c)
85        _create_file(self.tmp_dir + "/b.c", b_c)
86        _create_file(self.tmp_dir + "/m.c", m_c)
87
88        # Compilation
89        # the usdt test:probe exists in liba.so, libb.so and a.out
90        include_path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) + "/include"
91        a_src = self.tmp_dir + "/a.c"
92        a_obj = self.tmp_dir + "/a.o"
93        a_lib = self.tmp_dir + "/liba.so"
94        b_src = self.tmp_dir + "/b.c"
95        b_obj = self.tmp_dir + "/b.o"
96        b_lib = self.tmp_dir + "/libb.so"
97        m_src = self.tmp_dir + "/m.c"
98        m_bin = self.tmp_dir + "/a.out"
99        m_linker_opt = " -L" + self.tmp_dir + " -la -lb"
100        self.assertEqual(os.system("gcc -I" + include_path + " -fpic -c -o " + a_obj + " " + a_src), 0)
101        self.assertEqual(os.system("gcc -I" + include_path + " -fpic -c -o " + b_obj + " " + b_src), 0)
102        self.assertEqual(os.system("gcc -shared -o " + a_lib + " " + a_obj), 0)
103        self.assertEqual(os.system("gcc -shared -o " + b_lib + " " + b_obj), 0)
104        self.assertEqual(os.system("gcc -I" + include_path + " " + m_src + " -o " + m_bin + m_linker_opt), 0)
105
106        # Run the application
107        self.app = Popen([m_bin], env=dict(os.environ, LD_LIBRARY_PATH=self.tmp_dir))
108        # os.system("tplist.py -vvv -p " + str(self.app.pid))
109
110    def test_attach1(self):
111        # enable USDT probe from given PID and verifier generated BPF programs
112        u = USDT(pid=int(self.app.pid))
113        u.enable_probe(probe="probe", fn_name="do_trace")
114        b = BPF(text=self.bpf_text, usdt_contexts=[u])
115
116        # processing events
117        self.probe_value_1 = 0
118        self.probe_value_2 = 0
119        self.probe_value_3 = 0
120        self.probe_value_other = 0
121
122        def print_event(cpu, data, size):
123            result = ct.cast(data, ct.POINTER(ct.c_int)).contents
124            if result.value == 1:
125                self.probe_value_1 = 1
126            elif result.value == 2:
127                self.probe_value_2 = 1
128            elif result.value == 3:
129                self.probe_value_3 = 1
130            else:
131                self.probe_value_other = 1
132
133        b["event"].open_perf_buffer(print_event)
134        for i in range(100):
135            if (self.probe_value_1 == 0 or
136                self.probe_value_2 == 0 or
137                self.probe_value_3 == 0 or
138                self.probe_value_other != 0):
139                b.perf_buffer_poll()
140            else:
141                break;
142
143        self.assertTrue(self.probe_value_1 != 0)
144        self.assertTrue(self.probe_value_2 != 0)
145        self.assertTrue(self.probe_value_3 != 0)
146        self.assertTrue(self.probe_value_other == 0)
147
148    def tearDown(self):
149        # kill the subprocess, clean the environment
150        self.app.kill()
151        self.app.wait()
152        os.system("rm -rf " + self.tmp_dir)
153
154if __name__ == "__main__":
155    main()
156