1#!/usr/bin/env python 2# Copyright (c) PLUMgrid, Inc. 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5from bcc import BPF 6import os 7import sys 8from unittest import main, TestCase 9 10class TestKprobeRgx(TestCase): 11 def setUp(self): 12 self.b = BPF(text=b""" 13 typedef struct { int idx; } Key; 14 typedef struct { u64 val; } Val; 15 BPF_HASH(stats, Key, Val, 3); 16 int hello(void *ctx) { 17 stats.lookup_or_init(&(Key){1}, &(Val){0})->val++; 18 return 0; 19 } 20 int goodbye(void *ctx) { 21 stats.lookup_or_init(&(Key){2}, &(Val){0})->val++; 22 return 0; 23 } 24 """) 25 self.b.attach_kprobe(event_re=b"^" + self.b.get_syscall_prefix() + b"bp.*", 26 fn_name=b"hello") 27 self.b.attach_kretprobe(event_re=b"^" + self.b.get_syscall_prefix() + b"bp.*", 28 fn_name=b"goodbye") 29 30 def test_send1(self): 31 k1 = self.b[b"stats"].Key(1) 32 k2 = self.b[b"stats"].Key(2) 33 self.assertTrue(self.b[b"stats"][k1].val >= 2) 34 self.assertTrue(self.b[b"stats"][k2].val == 1) 35 36class TestKprobeReplace(TestCase): 37 def setUp(self): 38 self.b = BPF(text=b"int empty(void *ctx) { return 0; }") 39 40 def test_periods(self): 41 self.b.attach_kprobe(event_re=b"^tcp_enter_cwr.*", fn_name=b"empty") 42 43if __name__ == "__main__": 44 main() 45