1# 2# Copyright (C) 2022 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import io 17from datetime import datetime 18import sys 19import textwrap 20from typing import Dict, Optional 21from . import vulkan_printer 22from . import opcodes 23import struct 24 25 26class CommandPrinter: 27 """This class is responsible for printing the commands found in the minidump file to the terminal.""" 28 29 def __init__(self, opcode: int, original_size: int, data: bytes, timestamp: int, 30 stream_idx: int, cmd_idx: int, out=sys.stdout): 31 self.opcode = opcode 32 self.original_size = original_size 33 self.data = io.BytesIO(data) 34 self.timestamp = timestamp 35 self.stream_idx = stream_idx 36 self.cmd_idx = cmd_idx 37 self.out = out 38 39 def print_cmd(self): 40 """ 41 Tries to decode and pretty print the command to the terminal. 42 Falls back to printing hex data if the command doesn't have a printer. 43 """ 44 45 # Print out the command name 46 print("\n{}.{} - {}: ({} bytes)".format(self.stream_idx, self.cmd_idx, self.cmd_name(), 47 self.original_size - 8), file=self.out) 48 self.write_timestamp(4) 49 50 if len(self.data.getbuffer()) == 0: 51 return 52 53 pretty_printer = getattr(vulkan_printer, self.cmd_name(), None) 54 if not pretty_printer: 55 self.print_raw() 56 return 57 58 try: 59 pretty_printer(self, indent=4) 60 self.check_no_more_bytes() 61 except Exception as ex: 62 print("Error while processing {}: {}".format(self.cmd_name(), repr(ex)), file=self.out) 63 print("Command raw data:", file=self.out) 64 self.print_raw() 65 raise ex 66 67 def check_no_more_bytes(self): 68 """ 69 Checks that we processed all the bytes, otherwise there's probably a bug in the decoding 70 logic 71 """ 72 if self.data.tell() != len(self.data.getbuffer()): 73 raise BufferError( 74 "Not all data was decoded. Decoded {} bytes but command had {}".format( 75 self.data.tell(), len(self.data.getbuffer()))) 76 77 def cmd_name(self) -> str: 78 """Returns the command name (e.g.: "OP_vkBeginCommandBuffer", or the opcode as a string if unknown""" 79 return opcodes.opcodes.get(self.opcode, str(self.opcode)) 80 81 def print_raw(self): 82 """Prints the command data as a hex bytes, as a fallback if we don't know how to decode it""" 83 truncated = self.original_size > len(self.data.getbuffer()) + 8 84 indent = 8 85 hex = ' '.join(["{:02x}".format(x) for x in self.data.getbuffer()]) 86 if truncated: 87 hex += " [...]" 88 lines = textwrap.wrap(hex, width=16 * 3 + indent, initial_indent=' ' * indent, 89 subsequent_indent=' ' * indent) 90 for l in lines: 91 print(l, file=self.out) 92 93 def read_bytes(self, size: int): 94 buf = self.data.read(size) 95 if len(buf) != size: 96 raise EOFError("Unexpectedly reached the end of the buffer") 97 return buf 98 99 def read_int(self, size: int, signed: bool = False, big_endian: bool = False) -> int: 100 assert size in [1, 2, 4, 8], "Invalid size to read: " + str(size) 101 buf = self.read_bytes(size) 102 byte_order = 'big' if big_endian else 'little' 103 return int.from_bytes(buf, byteorder=byte_order, signed=signed) 104 105 def read_float(self) -> float: 106 buf = self.read_bytes(4) 107 return struct.unpack('f', buf)[0] 108 109 def write(self, msg: str, indent: int): 110 """Prints a string at a given indentation level""" 111 assert type(msg) == str 112 assert type(indent) == int and indent >= 0 113 print(" " * indent + msg, end='', file=self.out) 114 115 def write_int(self, 116 field_name: str, 117 size: int, 118 indent: int, 119 signed: bool = False, 120 big_endian: bool = False, 121 optional: bool = False, 122 count: Optional[int] = None) -> Optional[int]: 123 """ 124 Reads and prints integers from the data stream. 125 126 When reading a single int (ie: when count=None), returns the int that was read, otherwise 127 returns None. 128 129 size: size of the integer in bytes 130 indent: indentation level that we should write at 131 signed: whether to treat it as a signed or unsigned int 132 big_endian: whether to treat it as little endian or big endian 133 optional: if True, we will first read an 8 byte boolean value. If that value is false, we 134 will return without further reading. 135 count: how many integers to read, for repeated values. 136 """ 137 if optional and self.check_null(field_name, indent): 138 return 139 140 # Write the field name 141 self.write("{name}: ".format(name=field_name), indent) 142 143 if count is not None: 144 values = ["0x{:x}".format(self.read_int(size, signed, big_endian)) for i in 145 range(0, count)] 146 self.write("[{}]\n".format(", ".join(values)), indent=0) 147 else: 148 value = self.read_int(size, signed, big_endian) 149 # Print small values as decimal only, otherwise hex + decimal 150 format_str = ("{val}\n" if value < 10 else "0x{val:x} ({val})\n") 151 self.write(format_str.format(val=value), indent=0) 152 return value 153 154 def write_float(self, field_name: str, indent: int, count: Optional[int] = None): 155 if count is not None: 156 values = [str(self.read_float()) for i in range(0, count)] 157 self.write("{}: [{}]\n".format(field_name, ", ".join(values)), indent) 158 else: 159 self.write("{}: {}\n".format(field_name, self.read_float()), indent) 160 161 def write_enum(self, field_name: str, enum: Dict[int, str], indent: int) -> int: 162 """Reads the next 32-byte int from the data stream, prints it as an enum, and return it""" 163 value = self.read_int(4) 164 self.write("{}: {} ({})\n".format(field_name, enum.get(value, ""), value), indent) 165 return value 166 167 def write_flags(self, field_name: str, enum: Dict[int, str], indent: int): 168 """Reads and prints Vulkan flags (byte masks)""" 169 remaining_flags = flags = self.read_int(4) 170 flags_list = [] 171 if remaining_flags == 0xffffffff: 172 # When the value is set to all flags, don't bother listing them all 173 flags_list.append("(all flags)") 174 else: 175 for (value, flag) in enum.items(): 176 if value & remaining_flags: 177 remaining_flags ^= value 178 flags_list.append(flag) 179 if remaining_flags != 0: 180 flags_list.insert(0, "0x{:x}".format(remaining_flags)) 181 self.write("{}: {} (0x{:x})\n".format(field_name, " | ".join(flags_list), flags), indent) 182 183 def write_stype_and_pnext(self, expected_stype: str, indent: int): 184 """Reads and prints the sType and pNext fields found in many Vulkan structs, while also sanity checking them""" 185 stype = self.write_enum("sType", vulkan_printer.VkStructureType, indent) 186 stype_str = vulkan_printer.VkStructureType.get(stype) 187 if stype_str != expected_stype: 188 raise ValueError("Wrong structure type. Expected: {}, got {} ({}) instead".format( 189 expected_stype, stype, stype_str)) 190 191 pnext_size = self.write_int("pNext_size", 4, indent, big_endian=True) 192 if pnext_size != 0: 193 self.write_enum("ext type", vulkan_printer.VkStructureType, indent + 1) 194 raise NotImplementedError("Decoding structs with extensions is not supported") 195 196 def write_timestamp(self, indent): 197 if self.timestamp != 0: 198 self.write( 199 "Recorded at: {}\n".format(datetime.fromtimestamp(self.timestamp / 1000000.0)), 200 indent) 201 202 def check_null(self, field_name: str, indent) -> bool: 203 is_null = self.read_int(8, big_endian=True) == 0 204 if is_null: 205 self.write("{}: (null)\n".format(field_name), indent) 206 return is_null 207 208 def write_struct(self, field_name: str, struct_fn, optional: bool, count: Optional[int], 209 indent: int): 210 """ 211 Reads and prints a struct, calling `struct_fn` to pretty-print it 212 optional: whether this is an optional structure. In this case, we will read an int64 first 213 and skip the struct if the result is null. 214 count: how many times this is repeated. Pass None for non-repeated fields. 215 """ 216 if optional and self.check_null(field_name, indent): 217 return 218 219 is_repeated = count is not None 220 for i in range(0, count if is_repeated else 1): 221 suffix = " #{}".format(i) if is_repeated else "" 222 self.write("{}{}:\n".format(field_name, suffix), indent) 223 struct_fn(self, indent + 1) 224 225 def write_string(self, field_name: str, size: Optional[int], indent: int): 226 """ 227 Reads a null-terminated string from the stream. 228 size: if specified, reads up to this many characters 229 """ 230 buf = bytearray() 231 if size is not None: 232 buf = self.read_bytes(size) 233 buf = buf.rstrip(b'\x00') 234 else: 235 # Reads from the string one char at a time, until we find a null 236 # Not the most efficient way of doing this, but whatever 237 while True: 238 c = self.read_int(1) 239 if c == 0: 240 break 241 buf.append(c) 242 243 self.write("{}: \"{}\"\n".format(field_name, buf.decode('utf-8')), indent) 244