1#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15r"""
16
17Trace module which creates trace files from a list of trace events.
18
19This is a work in progress, future work will look to add:
20    - Config options to customize output.
21    - A method of providing custom data formatters.
22    - Perfetto support.
23"""
24from enum import Enum
25import json
26import logging
27import struct
28from typing import Iterable, NamedTuple
29
30_LOG = logging.getLogger('pw_trace')
31
32
33class TraceType(Enum):
34    Invalid = 0
35    Instantaneous = 1
36    InstantaneousGroup = 2
37    AsyncStart = 3
38    AsyncStep = 4
39    AsyncEnd = 5
40    DurationStart = 6
41    DurationEnd = 7
42    DurationGroupStart = 8
43    DurationGroupEnd = 9
44
45
46class TraceEvent(NamedTuple):
47    event_type: TraceType
48    module: str
49    label: str
50    timestamp_us: int
51    group: str = ""
52    trace_id: int = 0
53    flags: int = 0
54    has_data: bool = False
55    data_fmt: str = ""
56    data: bytes = b''
57
58
59def event_has_trace_id(event_type):
60    return event_type in {
61        "PW_TRACE_EVENT_TYPE_ASYNC_START",
62        "PW_TRACE_EVENT_TYPE_ASYNC_STEP",
63        "PW_TRACE_EVENT_TYPE_ASYNC_END",
64    }
65
66
67def generate_trace_json(events: Iterable[TraceEvent]):
68    """Generates a list of JSON lines from provided trace events."""
69    json_lines = []
70    for event in events:
71        if event.module is None or event.timestamp_us is None or \
72           event.event_type is None or event.label is None:
73            _LOG.error("Invalid sample")
74            continue
75
76        line = {
77            "pid": event.module,
78            "name": (event.label),
79            "ts": event.timestamp_us
80        }
81        if event.event_type == TraceType.DurationStart:
82            line["ph"] = "B"
83            line["tid"] = event.label
84        elif event.event_type == TraceType.DurationEnd:
85            line["ph"] = "E"
86            line["tid"] = event.label
87        elif event.event_type == TraceType.DurationGroupStart:
88            line["ph"] = "B"
89            line["tid"] = event.group
90        elif event.event_type == TraceType.DurationGroupEnd:
91            line["ph"] = "E"
92            line["tid"] = event.group
93        elif event.event_type == TraceType.Instantaneous:
94            line["ph"] = "I"
95            line["s"] = "p"
96        elif event.event_type == TraceType.InstantaneousGroup:
97            line["ph"] = "I"
98            line["s"] = "t"
99            line["tid"] = event.group
100        elif event.event_type == TraceType.AsyncStart:
101            line["ph"] = "b"
102            line["scope"] = event.group
103            line["tid"] = event.group
104            line["cat"] = event.module
105            line["id"] = event.trace_id
106            line["args"] = {"id": line["id"]}
107        elif event.event_type == TraceType.AsyncStep:
108            line["ph"] = "n"
109            line["scope"] = event.group
110            line["tid"] = event.group
111            line["cat"] = event.module
112            line["id"] = event.trace_id
113            line["args"] = {"id": line["id"]}
114        elif event.event_type == TraceType.AsyncEnd:
115            line["ph"] = "e"
116            line["scope"] = event.group
117            line["tid"] = event.group
118            line["cat"] = event.module
119            line["id"] = event.trace_id
120            line["args"] = {"id": line["id"]}
121        else:
122            _LOG.error("Unknown event type, skipping")
123            continue
124
125        # Handle Data
126        if event.has_data:
127            if event.data_fmt == "@pw_arg_label":
128                line["name"] = event.data.decode("utf-8")
129            elif event.data_fmt == "@pw_arg_group":
130                line["tid"] = event.data.decode("utf-8")
131            elif event.data_fmt == "@pw_arg_counter":
132                line["ph"] = "C"
133                line["args"] = {
134                    line["name"]: int.from_bytes(event.data, "little")
135                }
136            elif event.data_fmt.startswith("@pw_py_struct_fmt:"):
137                items = struct.unpack_from(
138                    event.data_fmt[len("@pw_py_struct_fmt:"):], event.data)
139                args = {}
140                for i, item in enumerate(items):
141                    args["data_" + str(i)] = item
142                line["args"] = args
143            else:
144                line["args"] = {"data": event.data.hex()}
145
146        # Encode as JSON
147        json_lines.append(json.dumps(line))
148
149    return json_lines
150