#!/usr/bin/env python # # Copyright 2016 The Chromium OS Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ Runs the touchpad drag latency test using WALT Latency Timer Usage example: $ python walt.py 11 Input device : /dev/input/event11 Serial device : /dev/ttyACM1 Laser log file : /tmp/WALT_2016_06_23__1714_51_laser.log evtest log file: /tmp/WALT_2016_06_23__1714_51_evtest.log Clock zeroed at 1466716492 (rt 0.284ms) ........................................ Processing data, may take a minute or two... Drag latency (min method) = 15.37 ms Note, before running this script, check that evtest can grab the device. On some systems it requires running as root. """ import argparse import contextlib import glob import os import random import re import socket import subprocess import sys import tempfile import threading import time import serial import numpy import evparser import minimization import screen_stats # Time units MS = 1e-3 # MS = 0.001 seconds US = 1e-6 # US = 10^-6 seconds # Globals debug_mode = True def log(msg): if debug_mode: print(msg) class Walt(object): """ A class for communicating with Walt device Usage: with Walt('/dev/ttyUSB0') as walt: body.... """ # Teensy commands, always singe char. Defined in WALT.ino # github.com/google/walt/blob/master/arduino/walt/walt.ino CMD_RESET = 'F' CMD_PING = 'P' CMD_SYNC_ZERO = 'Z' CMD_SYNC_SEND = 'I' CMD_SYNC_READOUT = 'R' CMD_TIME_NOW = 'T' CMD_AUTO_LASER_ON = 'L' CMD_AUTO_LASER_OFF = 'l' CMD_AUTO_SCREEN_ON = 'C' CMD_AUTO_SCREEN_OFF = 'c' CMD_GSHOCK = 'G' CMD_VERSION = 'V' CMD_SAMPLE_ALL = 'Q' CMD_BRIGHTNESS_CURVE = 'U' CMD_AUDIO = 'A' def __init__(self, serial_dev, timeout=None, encoding='utf-8'): self.encoding = encoding self.serial_dev = serial_dev self.ser = serial.Serial(serial_dev, baudrate=115200, timeout=timeout) self.base_time = None self.min_lag = None self.max_lag = None self.median_latency = None def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): try: self.ser.close() except: pass def close(self): self.ser.close() def readline(self): return self.ser.readline().decode(self.encoding) def sndrcv(self, data): """ Send a 1-char command. Return the reply and how long it took. """ t0 = time.time() self.ser.write(data.encode(self.encoding)) reply = self.ser.readline() reply = reply.decode(self.encoding) t1 = time.time() dt = (t1 - t0) log('sndrcv(): round trip %.3fms, reply=%s' % (dt / MS, reply.strip())) return dt, reply def read_shock_time(self): dt, s = self.sndrcv(Walt.CMD_GSHOCK) t_us = int(s.strip()) return t_us def run_comm_stats(self, N=100): """ Measure the USB serial round trip time. Send CMD_TIME_NOW to the Teensy N times measuring the round trip each time. Prints out stats (min, median, max). """ log('Running USB comm stats...') self.ser.flushInput() self.sndrcv(Walt.CMD_SYNC_ZERO) tstart = time.time() times = numpy.zeros((N, 1)) for i in range(N): dt, _ = self.sndrcv(Walt.CMD_TIME_NOW) times[i] = dt t_total = time.time() - tstart median = numpy.median(times) stats = (times.min() / MS, median / MS, times.max() / MS, N) self.median_latency = median log('USB comm round trip stats:') log('min=%.2fms, median=%.2fms, max=%.2fms N=%d' % stats) if (median > 2): print('ERROR: the median round trip is too high: %.2f ms' % (median / MS) ) sys.exit(2) def zero_clock(self, max_delay=0.001, retries=10): """ Tell the TeensyUSB to zero its clock (CMD_SYNC_ZERO). Returns the time when the command was sent. Verify that the response arrived within max_delay seconds. This is the simple zeroing used when the round trip is fast. It does not employ the same method as Android clock sync. """ # Check that we get reasonable ping time with Teensy # this also 'warms up' the comms, first msg is often slower self.run_comm_stats(N=10) self.ser.flushInput() for i in range(retries): t0 = time.time() dt, _ = self.sndrcv(Walt.CMD_SYNC_ZERO) if dt < max_delay: print('Clock zeroed at %.0f (rt %.3f ms)' % (t0, dt / MS)) self.base_time = t0 self.max_lag = dt self.min_lag = 0 return t0 print('Error, failed to zero the clock after %d retries') return -1 def read_remote_times(self): """ Helper func, see doc string in estimate_lage() Read out the timestamps taken recorded by the Teensy. """ times = numpy.zeros(9) for i in range(9): dt, reply = self.sndrcv(Walt.CMD_SYNC_READOUT) num, tstamp = reply.strip().split(':') # TODO: verify that num is what we expect it to be log('read_remote_times() CMD_SYNC_READOUT > w > = %s' % reply) t = float(tstamp) * US # WALT sends timestamps in microseconds times[i] = t return times def estimate_lag(self): """ Estimate the difference between local and remote clocks This is based on: github.com/google/walt/blob/master/android/WALT/app/src/main/jni/README.md self.base_time needs to be set using self.zero_clock() before running this function. The result is saved as self.min_lag and self.max_lag. Assume that the remote clock lags behind the local by `lag` That is, at a given moment local_time = remote_time + lag where local_time = time.time() - self.base_time Immediately after this function completes the lag is guaranteed to be between min_lag and max_lag. But the lag change (drift) away with time. """ self.ser.flushInput() # remote -> local times_local_received = numpy.zeros(9) self.ser.write(Walt.CMD_SYNC_SEND) for i in range(9): reply = self.ser.readline() times_local_received[i] = time.time() - self.base_time times_remote_sent = self.read_remote_times() max_lag = (times_local_received - times_remote_sent).min() # local -> remote times_local_sent = numpy.zeros(9) for i in range(9): s = '%d' % (i + 1) # Sleep between the messages to combat buffering t_sleep = US * random.randint(70, 700) time.sleep(t_sleep) times_local_sent[i] = time.time() - self.base_time self.ser.write(s) times_remote_received = self.read_remote_times() min_lag = (times_local_sent - times_remote_received).max() self.min_lag = min_lag self.max_lag = max_lag def parse_trigger(self, trigger_line): """ Parse a trigger line from WALT. Trigger events look like this: "G L 12902345 1 1" The parts: * G - common for all trigger events * L - means laser * 12902345 is timestamp in us since zeroed * 1st 1 or 0 is trigger value. 0 = changed to dark, 1 = changed to light, * 2nd 1 is counter of how many times this trigger happened since last readout, should always be 1 in our case """ parts = trigger_line.strip().split() if len(parts) != 5: raise Exception('Malformed trigger line: "%s"\n' % trigger_line) t_us = int(parts[2]) val = int(parts[3]) return (t_us / 1e6, val) def array2str(a): a_strs = ['%0.2f' % x for x in a] s = ', '.join(a_strs) return '[' + s + ']' def parse_args(argv): temp_dir = tempfile.gettempdir() serial = '/dev/ttyACM0' # Try to autodetect the WALT serial port ls_ttyACM = glob.glob('/dev/ttyACM*') if len(ls_ttyACM) > 0: serial = ls_ttyACM[0] description = "Run a latency test using WALT Latency Timer" parser = argparse.ArgumentParser( description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('-i', '--input', help='input device, e.g: 6 or /dev/input/event6') parser.add_argument('-s', '--serial', default=serial, help='WALT serial port') parser.add_argument('-t', '--type', help='Test type: drag|tap|screen|sanity|curve|bridge|' 'tapaudio|tapblink') parser.add_argument('-l', '--logdir', default=temp_dir, help='where to store logs') parser.add_argument('-n', default=40, type=int, help='Number of laser toggles to read') parser.add_argument('-p', '--port', default=50007, type=int, help='port to listen on for the TCP bridge') parser.add_argument('-d', '--debug', action='store_true', help='talk more') args = parser.parse_args(argv) if not args.type: parser.print_usage() sys.exit(0) global debug_mode debug_mode = args.debug if args.input and args.input.isalnum(): args.input = '/dev/input/event' + args.input return args def run_drag_latency_test(args): if not args.input: print('Error: --input argument is required for drag latency test') sys.exit(1) # Create names for log files prefix = time.strftime('WALT_%Y_%m_%d__%H%M_%S') laser_file_name = os.path.join(args.logdir, prefix + '_laser.log') evtest_file_name = os.path.join(args.logdir, prefix + '_evtest.log') print('Starting drag latency test') print('Input device : ' + args.input) print('Serial device : ' + args.serial) print('Laser log file : ' + laser_file_name) print('evtest log file: ' + evtest_file_name) with Walt(args.serial) as walt: walt.sndrcv(Walt.CMD_RESET) tstart = time.time() t_zero = walt.zero_clock() if t_zero < 0: print('Error: Couldn\'t zero clock, exiting') sys.exit(1) # Fire up the evtest process cmd = 'evtest %s > %s' % (args.input, evtest_file_name) evtest = subprocess.Popen(cmd, shell=True) # Turn on laser trigger auto-sending walt.sndrcv(Walt.CMD_AUTO_LASER_ON) trigger_count = 0 while trigger_count < args.n: # The following line blocks until a message from WALT arrives trigger_line = walt.readline() trigger_count += 1 log('#%d/%d - ' % (trigger_count, args.n) + trigger_line.strip()) if not debug_mode: sys.stdout.write('.') sys.stdout.flush() t, val = walt.parse_trigger(trigger_line) t += t_zero with open(laser_file_name, 'at') as flaser: flaser.write('%.3f %d\n' % (t, val)) walt.sndrcv(Walt.CMD_AUTO_LASER_OFF) # Send SIGTERM to evtest process evtest.terminate() print("\nProcessing data, may take a minute or two...") # lm.main(evtest_file_name, laser_file_name) minimization.minimize(evtest_file_name, laser_file_name) def run_screen_curve(args): with Walt(args.serial, timeout=1) as walt: walt.sndrcv(Walt.CMD_RESET) t_zero = walt.zero_clock() if t_zero < 0: print('Error: Couldn\'t zero clock, exiting') sys.exit(1) # Fire up the walt_blinker process cmd = 'blink_test 1' blinker = subprocess.Popen(cmd, shell=True) # Request screen brightness data walt.sndrcv(Walt.CMD_BRIGHTNESS_CURVE) s = 'dummy' while s: s = walt.readline() print(s.strip()) def run_screen_latency_test(args): # Create names for log files prefix = time.strftime('WALT_%Y_%m_%d__%H%M_%S') sensor_file_name = os.path.join(args.logdir, prefix + '_screen_sensor.log') blinker_file_name = os.path.join(args.logdir, prefix + '_blinker.log') print('Starting screen latency test') print('Serial device : ' + args.serial) print('Sensor log file : ' + sensor_file_name) print('Blinker log file: ' + blinker_file_name) with Walt(args.serial, timeout=1) as walt: walt.sndrcv(Walt.CMD_RESET) t_zero = walt.zero_clock() if t_zero < 0: print('Error: Couldn\'t zero clock, exiting') sys.exit(1) # Fire up the walt_blinker process cmd = 'blink_test %d > %s' % (args.n, blinker_file_name, ) blinker = subprocess.Popen(cmd, shell=True) # Turn on screen trigger auto-sending walt.sndrcv(Walt.CMD_AUTO_SCREEN_ON) trigger_count = 0 # Iterate while the blinker process is alive # TODO: re-sync clocks every once in a while while blinker.poll() is None: # The following line blocks until a message from WALT arrives trigger_line = walt.readline() if not trigger_line: # This usually happens when readline timeouts on last iteration continue trigger_count += 1 log('#%d/%d - ' % (trigger_count, args.n) + trigger_line.strip()) if not debug_mode: sys.stdout.write('.') sys.stdout.flush() t, val = walt.parse_trigger(trigger_line) t += t_zero with open(sensor_file_name, 'at') as flaser: flaser.write('%.3f %d\n' % (t, val)) walt.sndrcv(Walt.CMD_AUTO_SCREEN_OFF) screen_stats.screen_stats(blinker_file_name, sensor_file_name) def run_tap_audio_test(args): print('Starting tap-to-audio latency test') with Walt(args.serial) as walt: walt.sndrcv(Walt.CMD_RESET) t_zero = walt.zero_clock() if t_zero < 0: print('Error: Couldn\'t zero clock, exiting') sys.exit(1) walt.sndrcv(Walt.CMD_GSHOCK) deltas = [] while len(deltas) < args.n: sys.stdout.write('\rWAIT ') sys.stdout.flush() time.sleep(1) # Wait for previous beep to stop playing while walt.read_shock_time() != 0: pass # skip shocks during sleep sys.stdout.write('\rTAP NOW') sys.stdout.flush() walt.sndrcv(Walt.CMD_AUDIO) trigger_line = walt.readline() beep_time_seconds, val = walt.parse_trigger(trigger_line) beep_time_ms = beep_time_seconds * 1e3 shock_time_ms = walt.read_shock_time() / 1e3 if shock_time_ms == 0: print("\rNo shock detected, skipping this event") continue dt = beep_time_ms - shock_time_ms deltas.append(dt) print("\rdt=%0.1f ms" % dt) print('Median tap-to-audio latency: %0.1f ms' % numpy.median(deltas)) def run_tap_blink_test(args): print('Starting tap-to-blink latency test') with Walt(args.serial) as walt: walt.sndrcv(Walt.CMD_RESET) t_zero = walt.zero_clock() if t_zero < 0: print('Error: Couldn\'t zero clock, exiting') sys.exit(1) walt.sndrcv(Walt.CMD_GSHOCK) walt.sndrcv(Walt.CMD_AUTO_SCREEN_ON) deltas = [] while len(deltas) < args.n: trigger_line = walt.readline() blink_time_seconds, val = walt.parse_trigger(trigger_line) blink_time_ms = blink_time_seconds * 1e3 shock_time_ms = walt.read_shock_time() / 1e3 if shock_time_ms == 0: print("No shock detected, skipping this event") continue dt = blink_time_ms - shock_time_ms deltas.append(dt) print("dt=%0.1f ms" % dt) print('Median tap-to-blink latency: %0.1f ms' % numpy.median(deltas)) def run_tap_latency_test(args): if not args.input: print('Error: --input argument is required for tap latency test') sys.exit(1) print('Starting tap latency test') with Walt(args.serial) as walt: walt.sndrcv(Walt.CMD_RESET) t_zero = walt.zero_clock() if t_zero < 0: print('Error: Couldn\'t zero clock, exiting') sys.exit(1) # Fire up the evtest process cmd = 'evtest ' + args.input evtest = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True) walt.sndrcv(Walt.CMD_GSHOCK) taps_detected = 0 taps = [] while taps_detected < args.n: ev_line = evtest.stdout.readline() tap_info = evparser.parse_tap_line(ev_line) if not tap_info: continue # Just received a tap event from evtest taps_detected += 1 t_tap_epoch, direction = tap_info shock_time_us = walt.read_shock_time() dt_tap_us = 1e6 * (t_tap_epoch - t_zero) - shock_time_us print(ev_line.strip()) print("shock t %d, tap t %f, tap val %d. dt=%0.1f" % (shock_time_us, t_tap_epoch, direction, dt_tap_us)) if shock_time_us == 0: print("No shock detected, skipping this event") continue taps.append((dt_tap_us, direction)) evtest.terminate() # Process data print("\nProcessing data...") dt_down = numpy.array([t[0] for t in taps if t[1] == 1]) / 1e3 dt_up = numpy.array([t[0] for t in taps if t[1] == 0]) / 1e3 print('dt_down = ' + array2str(dt_down)) print('dt_up = ' + array2str(dt_up)) median_down_ms = numpy.median(dt_down) median_up_ms = numpy.median(dt_up) print('Median latency, down: %0.1f, up: %0.1f' % (median_down_ms, median_up_ms)) def run_walt_sanity_test(args): print('Starting sanity test') with Walt(args.serial) as walt: walt.sndrcv(Walt.CMD_RESET) not_digit = re.compile('\D+') lows = numpy.zeros(3) + 1024 highs = numpy.zeros(3) while True: t, s = walt.sndrcv(Walt.CMD_SAMPLE_ALL) nums = not_digit.sub(' ', s).strip().split() if not nums: continue ints = numpy.array([int(x) for x in nums]) lows = numpy.array([lows, ints]).min(axis=0) highs = numpy.array([highs, ints]).max(axis=0) minmax = ' '.join(['%d-%d' % (lows[i], highs[i]) for i in range(3)]) print(s.strip() + '\tmin-max: ' + minmax) time.sleep(0.1) class TcpServer: """ """ def __init__(self, walt, port=50007, host=''): self.running = threading.Event() self.paused = threading.Event() self.net = None self.walt = walt self.port = port self.host = host self.last_zero = 0. def ser2net(self, data): print('w>: ' + repr(data)) return data def net2ser(self, data): print('w<: ' + repr(data)) # Discard any empty data if not data or len(data) == 0: print('o<: discarded empty data') return # Get a string version of the data for checking longer commands s = data.decode(self.walt.encoding) bridge_command = None while len(s) > 0: if not bridge_command: bridge_command = re.search(r'bridge (sync|update)', s) # If a "bridge" command does not exist, send everything to the WALT if not bridge_command: self.walt.ser.write(s.encode(self.walt.encoding)) break # If a "bridge" command is preceded by WALT commands, send those # first if bridge_command.start() > 0: before_command = s[:bridge_command.start()] log('found bridge command after "%s"' % before_command) s = s[bridge_command.start():] self.walt.ser.write(before_command.encode(self.walt.encoding)) continue # Otherwise, reply directly to the command log('bridge command: %s, pausing ser2net thread...' % bridge_command.group(0)) self.pause() is_sync = bridge_command.group(1) == 'sync' or not self.walt.base_time if is_sync: self.walt.zero_clock() self.walt.estimate_lag() if is_sync: # shift the base so that min_lag is 0 self.walt.base_time += self.walt.min_lag self.walt.max_lag -= self.walt.min_lag self.walt.min_lag = 0 min_lag = self.walt.min_lag * 1e6 max_lag = self.walt.max_lag * 1e6 # Send the time difference between now and when the clock was zeroed dt0 = (time.time() - self.walt.base_time) * 1e6 reply = 'clock %d %d %d\n' % (dt0, min_lag, max_lag) self.net.sendall(reply) print('|custom-reply>: ' + repr(reply)) self.resume() s = s[bridge_command.end():] bridge_command = None def connections_loop(self): with contextlib.closing(socket.socket( socket.AF_INET, socket.SOCK_STREAM)) as sock: self.sock = sock # SO_REUSEADDR is supposed to prevent the "Address already in use" error sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.bind((self.host, self.port)) sock.listen(1) while True: print('Listening on port %d' % self.port) net, addr = sock.accept() self.net = net try: print('Connected by: ' + str(addr)) self.net2ser_loop() except socket.error as e: # IO errors with the socket, not sure what they are print('Error: %s' % e) break finally: net.close() self.net = None def net2ser_loop(self): while True: data = self.net.recv(1024) if not data: break # got disconnected self.net2ser(data) def ser2net_loop(self): while True: self.running.wait() data = self.walt.readline() if self.net and self.running.is_set(): data = self.ser2net(data) data = data.encode(self.walt.encoding) self.net.sendall(data) if not self.running.is_set(): self.paused.set() def serve(self): t = self.ser2net_thread = threading.Thread( target=self.ser2net_loop, name='ser2net_thread' ) t.daemon = True t.start() self.paused.clear() self.running.set() self.connections_loop() def pause(self): """ Pause serial -> net forwarding The ser2net_thread stays running, but won't read any incoming data from the serial port. """ self.running.clear() # Send a ping to break out of the blocking read on serial port and get # blocked on running.wait() instead. The ping response is discarded. self.walt.ser.write(Walt.CMD_PING) # Wait until the ping response comes in and we are sure we are no longer # blocked on ser.read() self.paused.wait() print("Paused ser2net thread") def resume(self): self.running.set() self.paused.clear() print("Resuming ser2net thread") def close(self): try: self.sock.close() except: pass try: self.walt.close() except: pass def __exit__(self, exc_type, exc_value, traceback): self.close() def __enter__(self): return self def run_tcp_bridge(args): print('Starting TCP bridge') print('You may need to run the following to allow traffic from the android container:') print('iptables -A INPUT -p tcp --dport %d -j ACCEPT' % args.port) try: with Walt(args.serial) as walt: with TcpServer(walt, port=args.port) as srv: walt.sndrcv(Walt.CMD_RESET) srv.serve() except KeyboardInterrupt: print(' KeyboardInterrupt, exiting...') def main(argv=sys.argv[1:]): args = parse_args(argv) if args.type == 'drag': run_drag_latency_test(args) if args.type == 'tap': run_tap_latency_test(args) elif args.type == 'screen': run_screen_latency_test(args) elif args.type == 'sanity': run_walt_sanity_test(args) elif args.type == 'curve': run_screen_curve(args) elif args.type == 'bridge': run_tcp_bridge(args) elif args.type == 'tapaudio': run_tap_audio_test(args) elif args.type == 'tapblink': run_tap_blink_test(args) else: print('Unknown test type: "%s"' % args.type) if __name__ == '__main__': main()