1#!/usr/bin/env python 2# Copyright (c) PLUMgrid, Inc. 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5import ctypes as ct 6import os 7import unittest 8from bcc import BPF 9import multiprocessing 10 11class TestLru(unittest.TestCase): 12 def test_lru_hash(self): 13 b = BPF(text="""BPF_TABLE("lru_hash", int, u64, lru, 1024);""") 14 t = b["lru"] 15 for i in range(1, 1032): 16 t[ct.c_int(i)] = ct.c_ulonglong(i) 17 for i, v in t.items(): 18 self.assertEqual(v.value, i.value) 19 # BPF_MAP_TYPE_LRU_HASH eviction happens in batch and we expect less 20 # items than specified size. 21 self.assertLess(len(t), 1024); 22 23 def test_lru_percpu_hash(self): 24 test_prog1 = """ 25 BPF_TABLE("lru_percpu_hash", u32, u32, stats, 1); 26 int hello_world(void *ctx) { 27 u32 key=0; 28 u32 value = 0, *val; 29 val = stats.lookup_or_init(&key, &value); 30 *val += 1; 31 return 0; 32 } 33 """ 34 b = BPF(text=test_prog1) 35 stats_map = b.get_table("stats") 36 event_name = b.get_syscall_fnname("clone") 37 b.attach_kprobe(event=event_name, fn_name="hello_world") 38 ini = stats_map.Leaf() 39 for i in range(0, multiprocessing.cpu_count()): 40 ini[i] = 0 41 # First initialize with key 1 42 stats_map[ stats_map.Key(1) ] = ini 43 # Then initialize with key 0 44 stats_map[ stats_map.Key(0) ] = ini 45 # Key 1 should have been evicted 46 with self.assertRaises(KeyError): 47 val = stats_map[ stats_map.Key(1) ] 48 f = os.popen("hostname") 49 f.close() 50 self.assertEqual(len(stats_map),1) 51 val = stats_map[ stats_map.Key(0) ] 52 sum = stats_map.sum(stats_map.Key(0)) 53 avg = stats_map.average(stats_map.Key(0)) 54 max = stats_map.max(stats_map.Key(0)) 55 self.assertGreater(sum.value, 0L) 56 self.assertGreater(max.value, 0L) 57 b.detach_kprobe(event_name) 58 59if __name__ == "__main__": 60 unittest.main() 61