1// Copyright (C) 2019 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import * as protobuf from 'protobufjs/minimal';
16
17import {perfetto} from '../gen/protos';
18
19import {AdbAuthState, AdbBaseConsumerPort} from './adb_base_controller';
20import {Adb, AdbStream} from './adb_interfaces';
21import {
22  isReadBuffersResponse,
23} from './consumer_port_types';
24import {Consumer} from './record_controller_interfaces';
25
26enum SocketState {
27  DISCONNECTED,
28  BINDING_IN_PROGRESS,
29  BOUND,
30}
31
32// See wire_protocol.proto for more details.
33const WIRE_PROTOCOL_HEADER_SIZE = 4;
34const MAX_IPC_BUFFER_SIZE = 128 * 1024;
35
36const PROTO_LEN_DELIMITED_WIRE_TYPE = 2;
37const TRACE_PACKET_PROTO_ID = 1;
38const TRACE_PACKET_PROTO_TAG =
39    (TRACE_PACKET_PROTO_ID << 3) | PROTO_LEN_DELIMITED_WIRE_TYPE;
40
41declare type Frame = perfetto.protos.IPCFrame;
42declare type IMethodInfo =
43    perfetto.protos.IPCFrame.BindServiceReply.IMethodInfo;
44declare type ISlice = perfetto.protos.ReadBuffersResponse.ISlice;
45
46interface Command {
47  method: string;
48  params: Uint8Array;
49}
50
51const TRACED_SOCKET = '/dev/socket/traced_consumer';
52
53export class AdbSocketConsumerPort extends AdbBaseConsumerPort {
54  private socketState = SocketState.DISCONNECTED;
55
56  private socket?: AdbStream;
57  // Wire protocol request ID. After each request it is increased. It is needed
58  // to keep track of the type of request, and parse the response correctly.
59  private requestId = 1;
60
61  // Buffers received wire protocol data.
62  private incomingBuffer = new Uint8Array(MAX_IPC_BUFFER_SIZE);
63  private incomingBufferLen = 0;
64  private frameToParseLen = 0;
65
66  private availableMethods: IMethodInfo[] = [];
67  private serviceId = -1;
68
69  private resolveBindingPromise!: VoidFunction;
70  private requestMethods = new Map<number, string>();
71
72  // Needed for ReadBufferResponse: all the trace packets are split into
73  // several slices. |partialPacket| is the buffer for them. Once we receive a
74  // slice with the flag |lastSliceForPacket|, a new packet is created.
75  private partialPacket: ISlice[] = [];
76  // Accumulates trace packets into a proto trace file..
77  private traceProtoWriter = protobuf.Writer.create();
78
79  private socketCommandQueue: Command[] = [];
80
81  constructor(adb: Adb, consumer: Consumer) {
82    super(adb, consumer);
83  }
84
85  async invoke(method: string, params: Uint8Array) {
86    // ADB connection & authentication is handled by the superclass.
87    console.assert(this.state === AdbAuthState.CONNECTED);
88    this.socketCommandQueue.push({method, params});
89
90    if (this.socketState === SocketState.BINDING_IN_PROGRESS) return;
91    if (this.socketState === SocketState.DISCONNECTED) {
92      this.socketState = SocketState.BINDING_IN_PROGRESS;
93      await this.listenForMessages();
94      await this.bind();
95      this.traceProtoWriter = protobuf.Writer.create();
96      this.socketState = SocketState.BOUND;
97    }
98
99    console.assert(this.socketState === SocketState.BOUND);
100
101    for (const cmd of this.socketCommandQueue) {
102      this.invokeInternal(cmd.method, cmd.params);
103    }
104    this.socketCommandQueue = [];
105  }
106
107  private invokeInternal(method: string, argsProto: Uint8Array) {
108    // Socket is bound in invoke().
109    console.assert(this.socketState === SocketState.BOUND);
110    const requestId = this.requestId++;
111    const methodId = this.findMethodId(method);
112    if (methodId === undefined) {
113      // This can happen with 'GetTraceStats': it seems that not all the Android
114      // <= 9 devices support it.
115      console.error(`Method ${method} not supported by the target`);
116      return;
117    }
118    const frame = new perfetto.protos.IPCFrame({
119      requestId,
120      msgInvokeMethod: new perfetto.protos.IPCFrame.InvokeMethod(
121          {serviceId: this.serviceId, methodId, argsProto})
122    });
123    this.requestMethods.set(requestId, method);
124    this.sendFrame(frame);
125
126    if (method === 'EnableTracing') this.setDurationStatus(argsProto);
127  }
128
129  static generateFrameBufferToSend(frame: Frame): Uint8Array {
130    const frameProto: Uint8Array =
131        perfetto.protos.IPCFrame.encode(frame).finish();
132    const frameLen = frameProto.length;
133    const buf = new Uint8Array(WIRE_PROTOCOL_HEADER_SIZE + frameLen);
134    const dv = new DataView(buf.buffer);
135    dv.setUint32(0, frameProto.length, /* littleEndian */ true);
136    for (let i = 0; i < frameLen; i++) {
137      dv.setUint8(WIRE_PROTOCOL_HEADER_SIZE + i, frameProto[i]);
138    }
139    return buf;
140  }
141
142  async sendFrame(frame: Frame) {
143    console.assert(this.socket !== undefined);
144    if (!this.socket) return;
145    const buf = AdbSocketConsumerPort.generateFrameBufferToSend(frame);
146    await this.socket.write(buf);
147  }
148
149  async listenForMessages() {
150    this.socket = await this.adb.socket(TRACED_SOCKET);
151    this.socket.onData = (raw) => this.handleReceivedData(raw);
152    this.socket.onClose = () => {
153      this.socketState = SocketState.DISCONNECTED;
154      this.socketCommandQueue = [];
155    };
156  }
157
158  private parseMessageSize(buffer: Uint8Array) {
159    const dv = new DataView(buffer.buffer, buffer.byteOffset, buffer.length);
160    return dv.getUint32(0, true);
161  }
162
163  private parseMessage(frameBuffer: Uint8Array) {
164    // Copy message to new array:
165    const buf = new ArrayBuffer(frameBuffer.byteLength);
166    const arr = new Uint8Array(buf);
167    arr.set(frameBuffer);
168    const frame = perfetto.protos.IPCFrame.decode(arr);
169    this.handleIncomingFrame(frame);
170  }
171
172  private incompleteSizeHeader() {
173    if (!this.frameToParseLen) {
174      console.assert(this.incomingBufferLen < WIRE_PROTOCOL_HEADER_SIZE);
175      return true;
176    }
177    return false;
178  }
179
180  private canCompleteSizeHeader(newData: Uint8Array) {
181    return newData.length + this.incomingBufferLen > WIRE_PROTOCOL_HEADER_SIZE;
182  }
183
184  private canParseFullMessage(newData: Uint8Array) {
185    return this.frameToParseLen &&
186        this.incomingBufferLen + newData.length >= this.frameToParseLen;
187  }
188
189  private appendToIncomingBuffer(array: Uint8Array) {
190    this.incomingBuffer.set(array, this.incomingBufferLen);
191    this.incomingBufferLen += array.length;
192  }
193
194  handleReceivedData(newData: Uint8Array) {
195    if (this.incompleteSizeHeader() && this.canCompleteSizeHeader(newData)) {
196      const newDataBytesToRead =
197          WIRE_PROTOCOL_HEADER_SIZE - this.incomingBufferLen;
198      // Add to the incoming buffer the remaining bytes to arrive at
199      // WIRE_PROTOCOL_HEADER_SIZE
200      this.appendToIncomingBuffer(newData.subarray(0, newDataBytesToRead));
201      newData = newData.subarray(newDataBytesToRead);
202
203      this.frameToParseLen = this.parseMessageSize(this.incomingBuffer);
204      this.incomingBufferLen = 0;
205    }
206
207    // Parse all complete messages in incomingBuffer and newData.
208    while (this.canParseFullMessage(newData)) {
209      // All the message is in the newData buffer.
210      if (this.incomingBufferLen === 0) {
211        this.parseMessage(newData.subarray(0, this.frameToParseLen));
212        newData = newData.subarray(this.frameToParseLen);
213      } else {  // We need to complete the local buffer.
214        // Read the remaining part of this message.
215        const bytesToCompleteMessage =
216            this.frameToParseLen - this.incomingBufferLen;
217        this.appendToIncomingBuffer(
218            newData.subarray(0, bytesToCompleteMessage));
219        this.parseMessage(
220            this.incomingBuffer.subarray(0, this.frameToParseLen));
221        this.incomingBufferLen = 0;
222        // Remove the data just parsed.
223        newData = newData.subarray(bytesToCompleteMessage);
224      }
225      this.frameToParseLen = 0;
226      if (!this.canCompleteSizeHeader(newData)) break;
227
228      this.frameToParseLen =
229          this.parseMessageSize(newData.subarray(0, WIRE_PROTOCOL_HEADER_SIZE));
230      newData = newData.subarray(WIRE_PROTOCOL_HEADER_SIZE);
231    }
232    // Buffer the remaining data (part of the next header + message).
233    this.appendToIncomingBuffer(newData);
234  }
235
236  decodeResponse(
237      requestId: number, responseProto: Uint8Array, hasMore = false) {
238    const method = this.requestMethods.get(requestId);
239    if (!method) {
240      console.error(`Unknown request id: ${requestId}`);
241      this.sendErrorMessage(`Wire protocol error.`);
242      return;
243    }
244    const decoder = decoders.get(method);
245    if (decoder === undefined) {
246      console.error(`Unable to decode method: ${method}`);
247      return;
248    }
249    const decodedResponse = decoder(responseProto);
250    const response = {type: `${method}Response`, ...decodedResponse};
251
252    // TODO(nicomazz): Fix this.
253    // We assemble all the trace and then send it back to the main controller.
254    // This is a temporary solution, that will be changed in a following CL,
255    // because now both the chrome consumer port and the other adb consumer port
256    // send back the entire trace, while the correct behavior should be to send
257    // back the slices, that are assembled by the main record controller.
258    if (isReadBuffersResponse(response)) {
259      if (response.slices) this.handleSlices(response.slices);
260      if (!hasMore) this.sendReadBufferResponse();
261      return;
262    }
263    this.sendMessage(response);
264  }
265
266  handleSlices(slices: ISlice[]) {
267    for (const slice of slices) {
268      this.partialPacket.push(slice);
269      if (slice.lastSliceForPacket) {
270        const tracePacket = this.generateTracePacket(this.partialPacket);
271        this.traceProtoWriter.uint32(TRACE_PACKET_PROTO_TAG);
272        this.traceProtoWriter.bytes(tracePacket);
273        this.partialPacket = [];
274      }
275    }
276  }
277
278  generateTracePacket(slices: ISlice[]): Uint8Array {
279    let bufferSize = 0;
280    for (const slice of slices) bufferSize += slice.data!.length;
281    const fullBuffer = new Uint8Array(bufferSize);
282    let written = 0;
283    for (const slice of slices) {
284      const data = slice.data!;
285      fullBuffer.set(data, written);
286      written += data.length;
287    }
288    return fullBuffer;
289  }
290
291  sendReadBufferResponse() {
292    this.sendMessage(this.generateChunkReadResponse(
293        this.traceProtoWriter.finish(), /* last */ true));
294    this.traceProtoWriter = protobuf.Writer.create();
295  }
296
297  bind() {
298    console.assert(this.socket !== undefined);
299    const requestId = this.requestId++;
300    const frame = new perfetto.protos.IPCFrame({
301      requestId,
302      msgBindService: new perfetto.protos.IPCFrame.BindService(
303          {serviceName: 'ConsumerPort'})
304    });
305    return new Promise((resolve, _) => {
306      this.resolveBindingPromise = resolve;
307      this.sendFrame(frame);
308    });
309  }
310
311  findMethodId(method: string): number|undefined {
312    const methodObject = this.availableMethods.find((m) => m.name === method);
313    if (methodObject && methodObject.id) return methodObject.id;
314    return undefined;
315  }
316
317  static async hasSocketAccess(device: USBDevice, adb: Adb): Promise<boolean> {
318    await adb.connect(device);
319    try {
320      const socket = await adb.socket(TRACED_SOCKET);
321      socket.close();
322      return true;
323    } catch (e) {
324      return false;
325    }
326  }
327
328  handleIncomingFrame(frame: perfetto.protos.IPCFrame) {
329    const requestId = frame.requestId;
330    switch (frame.msg) {
331      case 'msgBindServiceReply': {
332        const msgBindServiceReply = frame.msgBindServiceReply;
333        if (msgBindServiceReply && msgBindServiceReply.methods &&
334            msgBindServiceReply.serviceId) {
335          console.assert(msgBindServiceReply.success);
336          this.availableMethods = msgBindServiceReply.methods;
337          this.serviceId = msgBindServiceReply.serviceId;
338          this.resolveBindingPromise();
339          this.resolveBindingPromise = () => {};
340        }
341        return;
342      }
343      case 'msgInvokeMethodReply': {
344        const msgInvokeMethodReply = frame.msgInvokeMethodReply;
345        if (msgInvokeMethodReply && msgInvokeMethodReply.replyProto) {
346          if (!msgInvokeMethodReply.success) {
347            console.error(
348                'Unsuccessful method invocation: ', msgInvokeMethodReply);
349            return;
350          }
351          this.decodeResponse(
352              requestId,
353              msgInvokeMethodReply.replyProto,
354              msgInvokeMethodReply.hasMore === true);
355        }
356        return;
357      }
358      default:
359        console.error(`not recognized frame message: ${frame.msg}`);
360    }
361  }
362}
363
364const decoders =
365    new Map<string, Function>()
366        .set('EnableTracing', perfetto.protos.EnableTracingResponse.decode)
367        .set('FreeBuffers', perfetto.protos.FreeBuffersResponse.decode)
368        .set('ReadBuffers', perfetto.protos.ReadBuffersResponse.decode)
369        .set('DisableTracing', perfetto.protos.DisableTracingResponse.decode)
370        .set('GetTraceStats', perfetto.protos.GetTraceStatsResponse.decode);
371