1#!/usr/bin/env python
2# @lint-avoid-python-3-compatibility-imports
3#
4# ustat  Activity stats from high-level languages, including exceptions,
5#        method calls, class loads, garbage collections, and more.
6#        For Linux, uses BCC, eBPF.
7#
8# USAGE: ustat [-l {java,node,perl,php,python,ruby,tcl}] [-C]
9#        [-S {cload,excp,gc,method,objnew,thread}] [-r MAXROWS] [-d]
10#        [interval [count]]
11#
12# This uses in-kernel eBPF maps to store per process summaries for efficiency.
13# Newly-created processes might only be traced at the next interval, if the
14# relevant USDT probe requires enabling through a semaphore.
15#
16# Copyright 2016 Sasha Goldshtein
17# Licensed under the Apache License, Version 2.0 (the "License")
18#
19# 26-Oct-2016   Sasha Goldshtein    Created this.
20
21from __future__ import print_function
22import argparse
23from bcc import BPF, USDT, USDTException
24import os
25import sys
26from subprocess import call
27from time import sleep, strftime
28
29class Category(object):
30    THREAD = "THREAD"
31    METHOD = "METHOD"
32    OBJNEW = "OBJNEW"
33    CLOAD = "CLOAD"
34    EXCP = "EXCP"
35    GC = "GC"
36
37class Probe(object):
38    def __init__(self, language, procnames, events):
39        """
40        Initialize a new probe object with a specific language, set of process
41        names to monitor for that language, and a dictionary of events and
42        categories. The dictionary is a mapping of USDT probe names (such as
43        'gc__start') to event categories supported by this tool -- from the
44        Category class.
45        """
46        self.language = language
47        self.procnames = procnames
48        self.events = events
49
50    def _find_targets(self):
51        """Find pids where the comm is one of the specified list"""
52        self.targets = {}
53        all_pids = [int(pid) for pid in os.listdir('/proc') if pid.isdigit()]
54        for pid in all_pids:
55            try:
56                comm = open('/proc/%d/comm' % pid).read().strip()
57                if comm in self.procnames:
58                    cmdline = open('/proc/%d/cmdline' % pid).read()
59                    self.targets[pid] = cmdline.replace('\0', ' ')
60            except IOError:
61                continue    # process may already have terminated
62
63    def _enable_probes(self):
64        self.usdts = []
65        for pid in self.targets:
66            try:
67                usdt = USDT(pid=pid)
68            except USDTException:
69                # avoid race condition on pid going away.
70                print("failed to instrument %d" % pid, file=sys.stderr)
71                continue
72            for event in self.events:
73                try:
74                    usdt.enable_probe(event, "%s_%s" % (self.language, event))
75                except Exception:
76                    # This process might not have a recent version of the USDT
77                    # probes enabled, or might have been compiled without USDT
78                    # probes at all. The process could even have been shut down
79                    # and the pid been recycled. We have to gracefully handle
80                    # the possibility that we can't attach probes to it at all.
81                    pass
82            self.usdts.append(usdt)
83
84    def _generate_tables(self):
85        text = """
86BPF_HASH(%s_%s_counts, u32, u64);   // pid to event count
87        """
88        return str.join('', [text % (self.language, event)
89                             for event in self.events])
90
91    def _generate_functions(self):
92        text = """
93int %s_%s(void *ctx) {
94    u64 *valp, zero = 0;
95    u32 tgid = bpf_get_current_pid_tgid() >> 32;
96    valp = %s_%s_counts.lookup_or_init(&tgid, &zero);
97    ++(*valp);
98    return 0;
99}
100        """
101        lang = self.language
102        return str.join('', [text % (lang, event, lang, event)
103                             for event in self.events])
104
105    def get_program(self):
106        self._find_targets()
107        self._enable_probes()
108        return self._generate_tables() + self._generate_functions()
109
110    def get_usdts(self):
111        return self.usdts
112
113    def get_counts(self, bpf):
114        """Return a map of event counts per process"""
115        event_dict = dict([(category, 0) for category in self.events.values()])
116        result = dict([(pid, event_dict.copy()) for pid in self.targets])
117        for event, category in self.events.items():
118            counts = bpf["%s_%s_counts" % (self.language, event)]
119            for pid, count in counts.items():
120                if pid.value not in result:
121                    print("result was not found for %d" % pid.value, file=sys.stderr)
122                    continue
123                result[pid.value][category] = count.value
124            counts.clear()
125        return result
126
127    def cleanup(self):
128        self.usdts = None
129
130class Tool(object):
131    def _parse_args(self):
132        examples = """examples:
133  ./ustat              # stats for all languages, 1 second refresh
134  ./ustat -C           # don't clear the screen
135  ./ustat -l java      # Java processes only
136  ./ustat 5            # 5 second summaries
137  ./ustat 5 10         # 5 second summaries, 10 times only
138        """
139        parser = argparse.ArgumentParser(
140            description="Activity stats from high-level languages.",
141            formatter_class=argparse.RawDescriptionHelpFormatter,
142            epilog=examples)
143        parser.add_argument("-l", "--language",
144            choices=["java", "node", "perl", "php", "python", "ruby", "tcl"],
145            help="language to trace (default: all languages)")
146        parser.add_argument("-C", "--noclear", action="store_true",
147            help="don't clear the screen")
148        parser.add_argument("-S", "--sort",
149            choices=[cat.lower() for cat in dir(Category) if cat.isupper()],
150            help="sort by this field (descending order)")
151        parser.add_argument("-r", "--maxrows", default=20, type=int,
152            help="maximum rows to print, default 20")
153        parser.add_argument("-d", "--debug", action="store_true",
154            help="Print the resulting BPF program (for debugging purposes)")
155        parser.add_argument("interval", nargs="?", default=1, type=int,
156            help="output interval, in seconds")
157        parser.add_argument("count", nargs="?", default=99999999, type=int,
158            help="number of outputs")
159        parser.add_argument("--ebpf", action="store_true",
160            help=argparse.SUPPRESS)
161        self.args = parser.parse_args()
162
163    def _create_probes(self):
164        probes_by_lang = {
165                "java": Probe("java", ["java"], {
166                    "gc__begin": Category.GC,
167                    "mem__pool__gc__begin": Category.GC,
168                    "thread__start": Category.THREAD,
169                    "class__loaded": Category.CLOAD,
170                    "object__alloc": Category.OBJNEW,
171                    "method__entry": Category.METHOD,
172                    "ExceptionOccurred__entry": Category.EXCP
173                    }),
174                "node": Probe("node", ["node"], {
175                    "gc__start": Category.GC
176                    }),
177                "perl": Probe("perl", ["perl"], {
178                    "sub__entry": Category.METHOD
179                    }),
180                "php": Probe("php", ["php"], {
181                    "function__entry": Category.METHOD,
182                    "compile__file__entry": Category.CLOAD,
183                    "exception__thrown": Category.EXCP
184                    }),
185                "python": Probe("python", ["python"], {
186                    "function__entry": Category.METHOD,
187                    "gc__start": Category.GC
188                    }),
189                "ruby": Probe("ruby", ["ruby", "irb"], {
190                    "method__entry": Category.METHOD,
191                    "cmethod__entry": Category.METHOD,
192                    "gc__mark__begin": Category.GC,
193                    "gc__sweep__begin": Category.GC,
194                    "object__create": Category.OBJNEW,
195                    "hash__create": Category.OBJNEW,
196                    "string__create": Category.OBJNEW,
197                    "array__create": Category.OBJNEW,
198                    "require__entry": Category.CLOAD,
199                    "load__entry": Category.CLOAD,
200                    "raise": Category.EXCP
201                    }),
202                "tcl": Probe("tcl", ["tclsh", "wish"], {
203                    "proc__entry": Category.METHOD,
204                    "obj__create": Category.OBJNEW
205                    }),
206                }
207
208        if self.args.language:
209            self.probes = [probes_by_lang[self.args.language]]
210        else:
211            self.probes = probes_by_lang.values()
212
213    def _attach_probes(self):
214        program = str.join('\n', [p.get_program() for p in self.probes])
215        if self.args.debug or self.args.ebpf:
216            print(program)
217            if self.args.ebpf:
218                exit()
219            for probe in self.probes:
220                print("Attached to %s processes:" % probe.language,
221                        str.join(', ', map(str, probe.targets)))
222        self.bpf = BPF(text=program)
223        usdts = [usdt for probe in self.probes for usdt in probe.get_usdts()]
224        # Filter out duplicates when we have multiple processes with the same
225        # uprobe. We are attaching to these probes manually instead of using
226        # the USDT support from the bcc module, because the USDT class attaches
227        # to each uprobe with a specific pid. When there is more than one
228        # process from some language, we end up attaching more than once to the
229        # same uprobe (albeit with different pids), which is not allowed.
230        # Instead, we use a global attach (with pid=-1).
231        uprobes = set([(path, func, addr) for usdt in usdts
232                       for (path, func, addr, _)
233                       in usdt.enumerate_active_probes()])
234        for (path, func, addr) in uprobes:
235            self.bpf.attach_uprobe(name=path, fn_name=func, addr=addr, pid=-1)
236
237    def _detach_probes(self):
238        for probe in self.probes:
239            probe.cleanup()     # Cleans up USDT contexts
240        self.bpf.cleanup()      # Cleans up all attached probes
241        self.bpf = None
242
243    def _loop_iter(self):
244        self._attach_probes()
245        try:
246            sleep(self.args.interval)
247        except KeyboardInterrupt:
248            self.exiting = True
249
250        if not self.args.noclear:
251            call("clear")
252        else:
253            print()
254        with open("/proc/loadavg") as stats:
255            print("%-8s loadavg: %s" % (strftime("%H:%M:%S"), stats.read()))
256        print("%-6s %-20s %-10s %-6s %-10s %-8s %-6s %-6s" % (
257            "PID", "CMDLINE", "METHOD/s", "GC/s", "OBJNEW/s",
258            "CLOAD/s", "EXC/s", "THR/s"))
259
260        line = 0
261        counts = {}
262        targets = {}
263        for probe in self.probes:
264            counts.update(probe.get_counts(self.bpf))
265            targets.update(probe.targets)
266        if self.args.sort:
267            sort_field = self.args.sort.upper()
268            counts = sorted(counts.items(),
269                            key=lambda kv: -kv[1].get(sort_field, 0))
270        else:
271            counts = sorted(counts.items(), key=lambda kv: kv[0])
272        for pid, stats in counts:
273            print("%-6d %-20s %-10d %-6d %-10d %-8d %-6d %-6d" % (
274                  pid, targets[pid][:20],
275                  stats.get(Category.METHOD, 0) / self.args.interval,
276                  stats.get(Category.GC, 0) / self.args.interval,
277                  stats.get(Category.OBJNEW, 0) / self.args.interval,
278                  stats.get(Category.CLOAD, 0) / self.args.interval,
279                  stats.get(Category.EXCP, 0) / self.args.interval,
280                  stats.get(Category.THREAD, 0) / self.args.interval
281                  ))
282            line += 1
283            if line >= self.args.maxrows:
284                break
285        self._detach_probes()
286
287    def run(self):
288        self._parse_args()
289        self._create_probes()
290        print('Tracing... Output every %d secs. Hit Ctrl-C to end' %
291              self.args.interval)
292        countdown = self.args.count
293        self.exiting = False
294        while True:
295            self._loop_iter()
296            countdown -= 1
297            if self.exiting or countdown == 0:
298                print("Detaching...")
299                exit()
300
301if __name__ == "__main__":
302    try:
303        Tool().run()
304    except KeyboardInterrupt:
305        pass
306