1// Copyright (C) 2019 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import * as protobuf from 'protobufjs/minimal'; 16 17import {perfetto} from '../gen/protos'; 18 19import {AdbAuthState, AdbBaseConsumerPort} from './adb_base_controller'; 20import {Adb, AdbStream} from './adb_interfaces'; 21import { 22 isReadBuffersResponse, 23} from './consumer_port_types'; 24import {Consumer} from './record_controller_interfaces'; 25 26enum SocketState { 27 DISCONNECTED, 28 BINDING_IN_PROGRESS, 29 BOUND, 30} 31 32// See wire_protocol.proto for more details. 33const WIRE_PROTOCOL_HEADER_SIZE = 4; 34const MAX_IPC_BUFFER_SIZE = 128 * 1024; 35 36const PROTO_LEN_DELIMITED_WIRE_TYPE = 2; 37const TRACE_PACKET_PROTO_ID = 1; 38const TRACE_PACKET_PROTO_TAG = 39 (TRACE_PACKET_PROTO_ID << 3) | PROTO_LEN_DELIMITED_WIRE_TYPE; 40 41declare type Frame = perfetto.protos.IPCFrame; 42declare type IMethodInfo = 43 perfetto.protos.IPCFrame.BindServiceReply.IMethodInfo; 44declare type ISlice = perfetto.protos.ReadBuffersResponse.ISlice; 45 46interface Command { 47 method: string; 48 params: Uint8Array; 49} 50 51const TRACED_SOCKET = '/dev/socket/traced_consumer'; 52 53export class AdbSocketConsumerPort extends AdbBaseConsumerPort { 54 private socketState = SocketState.DISCONNECTED; 55 56 private socket?: AdbStream; 57 // Wire protocol request ID. After each request it is increased. It is needed 58 // to keep track of the type of request, and parse the response correctly. 59 private requestId = 1; 60 61 // Buffers received wire protocol data. 62 private incomingBuffer = new Uint8Array(MAX_IPC_BUFFER_SIZE); 63 private incomingBufferLen = 0; 64 private frameToParseLen = 0; 65 66 private availableMethods: IMethodInfo[] = []; 67 private serviceId = -1; 68 69 private resolveBindingPromise!: VoidFunction; 70 private requestMethods = new Map<number, string>(); 71 72 // Needed for ReadBufferResponse: all the trace packets are split into 73 // several slices. |partialPacket| is the buffer for them. Once we receive a 74 // slice with the flag |lastSliceForPacket|, a new packet is created. 75 private partialPacket: ISlice[] = []; 76 // Accumulates trace packets into a proto trace file.. 77 private traceProtoWriter = protobuf.Writer.create(); 78 79 private socketCommandQueue: Command[] = []; 80 81 constructor(adb: Adb, consumer: Consumer) { 82 super(adb, consumer); 83 } 84 85 async invoke(method: string, params: Uint8Array) { 86 // ADB connection & authentication is handled by the superclass. 87 console.assert(this.state === AdbAuthState.CONNECTED); 88 this.socketCommandQueue.push({method, params}); 89 90 if (this.socketState === SocketState.BINDING_IN_PROGRESS) return; 91 if (this.socketState === SocketState.DISCONNECTED) { 92 this.socketState = SocketState.BINDING_IN_PROGRESS; 93 await this.listenForMessages(); 94 await this.bind(); 95 this.traceProtoWriter = protobuf.Writer.create(); 96 this.socketState = SocketState.BOUND; 97 } 98 99 console.assert(this.socketState === SocketState.BOUND); 100 101 for (const cmd of this.socketCommandQueue) { 102 this.invokeInternal(cmd.method, cmd.params); 103 } 104 this.socketCommandQueue = []; 105 } 106 107 private invokeInternal(method: string, argsProto: Uint8Array) { 108 // Socket is bound in invoke(). 109 console.assert(this.socketState === SocketState.BOUND); 110 const requestId = this.requestId++; 111 const methodId = this.findMethodId(method); 112 if (methodId === undefined) { 113 // This can happen with 'GetTraceStats': it seems that not all the Android 114 // <= 9 devices support it. 115 console.error(`Method ${method} not supported by the target`); 116 return; 117 } 118 const frame = new perfetto.protos.IPCFrame({ 119 requestId, 120 msgInvokeMethod: new perfetto.protos.IPCFrame.InvokeMethod( 121 {serviceId: this.serviceId, methodId, argsProto}) 122 }); 123 this.requestMethods.set(requestId, method); 124 this.sendFrame(frame); 125 126 if (method === 'EnableTracing') this.setDurationStatus(argsProto); 127 } 128 129 static generateFrameBufferToSend(frame: Frame): Uint8Array { 130 const frameProto: Uint8Array = 131 perfetto.protos.IPCFrame.encode(frame).finish(); 132 const frameLen = frameProto.length; 133 const buf = new Uint8Array(WIRE_PROTOCOL_HEADER_SIZE + frameLen); 134 const dv = new DataView(buf.buffer); 135 dv.setUint32(0, frameProto.length, /* littleEndian */ true); 136 for (let i = 0; i < frameLen; i++) { 137 dv.setUint8(WIRE_PROTOCOL_HEADER_SIZE + i, frameProto[i]); 138 } 139 return buf; 140 } 141 142 async sendFrame(frame: Frame) { 143 console.assert(this.socket !== undefined); 144 if (!this.socket) return; 145 const buf = AdbSocketConsumerPort.generateFrameBufferToSend(frame); 146 await this.socket.write(buf); 147 } 148 149 async listenForMessages() { 150 this.socket = await this.adb.socket(TRACED_SOCKET); 151 this.socket.onData = (raw) => this.handleReceivedData(raw); 152 this.socket.onClose = () => { 153 this.socketState = SocketState.DISCONNECTED; 154 this.socketCommandQueue = []; 155 }; 156 } 157 158 private parseMessageSize(buffer: Uint8Array) { 159 const dv = new DataView(buffer.buffer, buffer.byteOffset, buffer.length); 160 return dv.getUint32(0, true); 161 } 162 163 private parseMessage(frameBuffer: Uint8Array) { 164 // Copy message to new array: 165 const buf = new ArrayBuffer(frameBuffer.byteLength); 166 const arr = new Uint8Array(buf); 167 arr.set(frameBuffer); 168 const frame = perfetto.protos.IPCFrame.decode(arr); 169 this.handleIncomingFrame(frame); 170 } 171 172 private incompleteSizeHeader() { 173 if (!this.frameToParseLen) { 174 console.assert(this.incomingBufferLen < WIRE_PROTOCOL_HEADER_SIZE); 175 return true; 176 } 177 return false; 178 } 179 180 private canCompleteSizeHeader(newData: Uint8Array) { 181 return newData.length + this.incomingBufferLen > WIRE_PROTOCOL_HEADER_SIZE; 182 } 183 184 private canParseFullMessage(newData: Uint8Array) { 185 return this.frameToParseLen && 186 this.incomingBufferLen + newData.length >= this.frameToParseLen; 187 } 188 189 private appendToIncomingBuffer(array: Uint8Array) { 190 this.incomingBuffer.set(array, this.incomingBufferLen); 191 this.incomingBufferLen += array.length; 192 } 193 194 handleReceivedData(newData: Uint8Array) { 195 if (this.incompleteSizeHeader() && this.canCompleteSizeHeader(newData)) { 196 const newDataBytesToRead = 197 WIRE_PROTOCOL_HEADER_SIZE - this.incomingBufferLen; 198 // Add to the incoming buffer the remaining bytes to arrive at 199 // WIRE_PROTOCOL_HEADER_SIZE 200 this.appendToIncomingBuffer(newData.subarray(0, newDataBytesToRead)); 201 newData = newData.subarray(newDataBytesToRead); 202 203 this.frameToParseLen = this.parseMessageSize(this.incomingBuffer); 204 this.incomingBufferLen = 0; 205 } 206 207 // Parse all complete messages in incomingBuffer and newData. 208 while (this.canParseFullMessage(newData)) { 209 // All the message is in the newData buffer. 210 if (this.incomingBufferLen === 0) { 211 this.parseMessage(newData.subarray(0, this.frameToParseLen)); 212 newData = newData.subarray(this.frameToParseLen); 213 } else { // We need to complete the local buffer. 214 // Read the remaining part of this message. 215 const bytesToCompleteMessage = 216 this.frameToParseLen - this.incomingBufferLen; 217 this.appendToIncomingBuffer( 218 newData.subarray(0, bytesToCompleteMessage)); 219 this.parseMessage( 220 this.incomingBuffer.subarray(0, this.frameToParseLen)); 221 this.incomingBufferLen = 0; 222 // Remove the data just parsed. 223 newData = newData.subarray(bytesToCompleteMessage); 224 } 225 this.frameToParseLen = 0; 226 if (!this.canCompleteSizeHeader(newData)) break; 227 228 this.frameToParseLen = 229 this.parseMessageSize(newData.subarray(0, WIRE_PROTOCOL_HEADER_SIZE)); 230 newData = newData.subarray(WIRE_PROTOCOL_HEADER_SIZE); 231 } 232 // Buffer the remaining data (part of the next header + message). 233 this.appendToIncomingBuffer(newData); 234 } 235 236 decodeResponse( 237 requestId: number, responseProto: Uint8Array, hasMore = false) { 238 const method = this.requestMethods.get(requestId); 239 if (!method) { 240 console.error(`Unknown request id: ${requestId}`); 241 this.sendErrorMessage(`Wire protocol error.`); 242 return; 243 } 244 const decoder = decoders.get(method); 245 if (decoder === undefined) { 246 console.error(`Unable to decode method: ${method}`); 247 return; 248 } 249 const decodedResponse = decoder(responseProto); 250 const response = {type: `${method}Response`, ...decodedResponse}; 251 252 // TODO(nicomazz): Fix this. 253 // We assemble all the trace and then send it back to the main controller. 254 // This is a temporary solution, that will be changed in a following CL, 255 // because now both the chrome consumer port and the other adb consumer port 256 // send back the entire trace, while the correct behavior should be to send 257 // back the slices, that are assembled by the main record controller. 258 if (isReadBuffersResponse(response)) { 259 if (response.slices) this.handleSlices(response.slices); 260 if (!hasMore) this.sendReadBufferResponse(); 261 return; 262 } 263 this.sendMessage(response); 264 } 265 266 handleSlices(slices: ISlice[]) { 267 for (const slice of slices) { 268 this.partialPacket.push(slice); 269 if (slice.lastSliceForPacket) { 270 const tracePacket = this.generateTracePacket(this.partialPacket); 271 this.traceProtoWriter.uint32(TRACE_PACKET_PROTO_TAG); 272 this.traceProtoWriter.bytes(tracePacket); 273 this.partialPacket = []; 274 } 275 } 276 } 277 278 generateTracePacket(slices: ISlice[]): Uint8Array { 279 let bufferSize = 0; 280 for (const slice of slices) bufferSize += slice.data!.length; 281 const fullBuffer = new Uint8Array(bufferSize); 282 let written = 0; 283 for (const slice of slices) { 284 const data = slice.data!; 285 fullBuffer.set(data, written); 286 written += data.length; 287 } 288 return fullBuffer; 289 } 290 291 sendReadBufferResponse() { 292 this.sendMessage(this.generateChunkReadResponse( 293 this.traceProtoWriter.finish(), /* last */ true)); 294 this.traceProtoWriter = protobuf.Writer.create(); 295 } 296 297 bind() { 298 console.assert(this.socket !== undefined); 299 const requestId = this.requestId++; 300 const frame = new perfetto.protos.IPCFrame({ 301 requestId, 302 msgBindService: new perfetto.protos.IPCFrame.BindService( 303 {serviceName: 'ConsumerPort'}) 304 }); 305 return new Promise((resolve, _) => { 306 this.resolveBindingPromise = resolve; 307 this.sendFrame(frame); 308 }); 309 } 310 311 findMethodId(method: string): number|undefined { 312 const methodObject = this.availableMethods.find((m) => m.name === method); 313 if (methodObject && methodObject.id) return methodObject.id; 314 return undefined; 315 } 316 317 static async hasSocketAccess(device: USBDevice, adb: Adb): Promise<boolean> { 318 await adb.connect(device); 319 try { 320 const socket = await adb.socket(TRACED_SOCKET); 321 socket.close(); 322 return true; 323 } catch (e) { 324 return false; 325 } 326 } 327 328 handleIncomingFrame(frame: perfetto.protos.IPCFrame) { 329 const requestId = frame.requestId; 330 switch (frame.msg) { 331 case 'msgBindServiceReply': { 332 const msgBindServiceReply = frame.msgBindServiceReply; 333 if (msgBindServiceReply && msgBindServiceReply.methods && 334 msgBindServiceReply.serviceId) { 335 console.assert(msgBindServiceReply.success); 336 this.availableMethods = msgBindServiceReply.methods; 337 this.serviceId = msgBindServiceReply.serviceId; 338 this.resolveBindingPromise(); 339 this.resolveBindingPromise = () => {}; 340 } 341 return; 342 } 343 case 'msgInvokeMethodReply': { 344 const msgInvokeMethodReply = frame.msgInvokeMethodReply; 345 if (msgInvokeMethodReply && msgInvokeMethodReply.replyProto) { 346 if (!msgInvokeMethodReply.success) { 347 console.error( 348 'Unsuccessful method invocation: ', msgInvokeMethodReply); 349 return; 350 } 351 this.decodeResponse( 352 requestId, 353 msgInvokeMethodReply.replyProto, 354 msgInvokeMethodReply.hasMore === true); 355 } 356 return; 357 } 358 default: 359 console.error(`not recognized frame message: ${frame.msg}`); 360 } 361 } 362} 363 364const decoders = 365 new Map<string, Function>() 366 .set('EnableTracing', perfetto.protos.EnableTracingResponse.decode) 367 .set('FreeBuffers', perfetto.protos.FreeBuffersResponse.decode) 368 .set('ReadBuffers', perfetto.protos.ReadBuffersResponse.decode) 369 .set('DisableTracing', perfetto.protos.DisableTracingResponse.decode) 370 .set('GetTraceStats', perfetto.protos.GetTraceStatsResponse.decode); 371