1#!/usr/bin/env python3.4 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from builtins import str 18 19import logging 20import re 21import shellescape 22 23from acts.libs.proc import job 24 25DEFAULT_ADB_TIMEOUT = 60 26DEFAULT_ADB_PULL_TIMEOUT = 180 27# Uses a regex to be backwards compatible with previous versions of ADB 28# (N and above add the serial to the error msg). 29DEVICE_NOT_FOUND_REGEX = re.compile('^error: device (?:\'.*?\' )?not found') 30DEVICE_OFFLINE_REGEX = re.compile('^error: device offline') 31ROOT_USER_ID = '0' 32SHELL_USER_ID = '2000' 33 34 35def parsing_parcel_output(output): 36 """Parsing the adb output in Parcel format. 37 38 Parsing the adb output in format: 39 Result: Parcel( 40 0x00000000: 00000000 00000014 00390038 00340031 '........8.9.1.4.' 41 0x00000010: 00300038 00300030 00300030 00340032 '8.0.0.0.0.0.2.4.' 42 0x00000020: 00350034 00330035 00320038 00310033 '4.5.5.3.8.2.3.1.' 43 0x00000030: 00000000 '.... ') 44 """ 45 output = ''.join(re.findall(r"'(.*)'", output)) 46 return re.sub(r'[.\s]', '', output) 47 48 49class AdbError(Exception): 50 """Raised when there is an error in adb operations.""" 51 52 def __init__(self, cmd, stdout, stderr, ret_code): 53 self.cmd = cmd 54 self.stdout = stdout 55 self.stderr = stderr 56 self.ret_code = ret_code 57 58 def __str__(self): 59 return ("Error executing adb cmd '%s'. ret: %d, stdout: %s, stderr: %s" 60 ) % (self.cmd, self.ret_code, self.stdout, self.stderr) 61 62 63class AdbProxy(object): 64 """Proxy class for ADB. 65 66 For syntactic reasons, the '-' in adb commands need to be replaced with 67 '_'. Can directly execute adb commands on an object: 68 >> adb = AdbProxy(<serial>) 69 >> adb.start_server() 70 >> adb.devices() # will return the console output of "adb devices". 71 """ 72 73 _SERVER_LOCAL_PORT = None 74 75 def __init__(self, serial="", ssh_connection=None): 76 """Construct an instance of AdbProxy. 77 78 Args: 79 serial: str serial number of Android device from `adb devices` 80 ssh_connection: SshConnection instance if the Android device is 81 connected to a remote host that we can reach via SSH. 82 """ 83 self.serial = serial 84 adb_path = self._exec_cmd("which adb") 85 adb_cmd = [adb_path] 86 if serial: 87 adb_cmd.append("-s %s" % serial) 88 if ssh_connection is not None and not AdbProxy._SERVER_LOCAL_PORT: 89 # Kill all existing adb processes on the remote host (if any) 90 # Note that if there are none, then pkill exits with non-zero status 91 ssh_connection.run("pkill adb", ignore_status=True) 92 # Copy over the adb binary to a temp dir 93 temp_dir = ssh_connection.run("mktemp -d").stdout.strip() 94 ssh_connection.send_file(adb_path, temp_dir) 95 # Start up a new adb server running as root from the copied binary. 96 remote_adb_cmd = "%s/adb %s root" % (temp_dir, "-s %s" % serial 97 if serial else "") 98 ssh_connection.run(remote_adb_cmd) 99 # Proxy a local port to the adb server port 100 local_port = ssh_connection.create_ssh_tunnel(5037) 101 AdbProxy._SERVER_LOCAL_PORT = local_port 102 103 if AdbProxy._SERVER_LOCAL_PORT: 104 adb_cmd.append("-P %d" % local_port) 105 self.adb_str = " ".join(adb_cmd) 106 self._ssh_connection = ssh_connection 107 108 def get_user_id(self): 109 """Returns the adb user. Either 2000 (shell) or 0 (root).""" 110 return self.shell('id -u') 111 112 def is_root(self, user_id=None): 113 """Checks if the user is root. 114 115 Args: 116 user_id: if supplied, the id to check against. 117 Returns: 118 True if the user is root. False otherwise. 119 """ 120 if not user_id: 121 user_id = self.get_user_id() 122 return user_id == ROOT_USER_ID 123 124 def ensure_root(self): 125 """Ensures the user is root after making this call. 126 127 Note that this will still fail if the device is a user build, as root 128 is not accessible from a user build. 129 130 Returns: 131 False if the device is a user build. True otherwise. 132 """ 133 self.ensure_user(ROOT_USER_ID) 134 return self.is_root() 135 136 def ensure_user(self, user_id=SHELL_USER_ID): 137 """Ensures the user is set to the given user. 138 139 Args: 140 user_id: The id of the user. 141 """ 142 if self.is_root(user_id): 143 self.root() 144 else: 145 self.unroot() 146 self.wait_for_device() 147 return self.get_user_id() == user_id 148 149 def _exec_cmd(self, cmd, ignore_status=False, timeout=DEFAULT_ADB_TIMEOUT): 150 """Executes adb commands in a new shell. 151 152 This is specific to executing adb commands. 153 154 Args: 155 cmd: A string that is the adb command to execute. 156 157 Returns: 158 The stdout of the adb command. 159 160 Raises: 161 AdbError is raised if adb cannot find the device. 162 """ 163 result = job.run(cmd, ignore_status=True, timeout=timeout) 164 ret, out, err = result.exit_status, result.stdout, result.stderr 165 166 if DEVICE_OFFLINE_REGEX.match(err): 167 raise AdbError(cmd=cmd, stdout=out, stderr=err, ret_code=ret) 168 if "Result: Parcel" in out: 169 return parsing_parcel_output(out) 170 if ignore_status: 171 return out or err 172 if ret == 1 and DEVICE_NOT_FOUND_REGEX.match(err): 173 raise AdbError(cmd=cmd, stdout=out, stderr=err, ret_code=ret) 174 else: 175 return out 176 177 def _exec_adb_cmd(self, name, arg_str, **kwargs): 178 return self._exec_cmd(' '.join((self.adb_str, name, arg_str)), 179 **kwargs) 180 181 def _exec_cmd_nb(self, cmd, **kwargs): 182 """Executes adb commands in a new shell, non blocking. 183 184 Args: 185 cmds: A string that is the adb command to execute. 186 187 """ 188 return job.run_async(cmd, **kwargs) 189 190 def _exec_adb_cmd_nb(self, name, arg_str, **kwargs): 191 return self._exec_cmd_nb(' '.join((self.adb_str, name, arg_str)), 192 **kwargs) 193 194 def tcp_forward(self, host_port, device_port): 195 """Starts tcp forwarding from localhost to this android device. 196 197 Args: 198 host_port: Port number to use on localhost 199 device_port: Port number to use on the android device. 200 201 Returns: 202 The command output for the forward command. 203 """ 204 if self._ssh_connection: 205 # We have to hop through a remote host first. 206 # 1) Find some free port on the remote host's localhost 207 # 2) Setup forwarding between that remote port and the requested 208 # device port 209 remote_port = self._ssh_connection.find_free_port() 210 self._ssh_connection.create_ssh_tunnel( 211 remote_port, local_port=host_port) 212 host_port = remote_port 213 return self.forward("tcp:%d tcp:%d" % (host_port, device_port)) 214 215 def remove_tcp_forward(self, host_port): 216 """Stop tcp forwarding a port from localhost to this android device. 217 218 Args: 219 host_port: Port number to use on localhost 220 """ 221 if self._ssh_connection: 222 remote_port = self._ssh_connection.close_ssh_tunnel(host_port) 223 if remote_port is None: 224 logging.warning("Cannot close unknown forwarded tcp port: %d", 225 host_port) 226 return 227 # The actual port we need to disable via adb is on the remote host. 228 host_port = remote_port 229 self.forward("--remove tcp:%d" % host_port) 230 231 def getprop(self, prop_name): 232 """Get a property of the device. 233 234 This is a convenience wrapper for "adb shell getprop xxx". 235 236 Args: 237 prop_name: A string that is the name of the property to get. 238 239 Returns: 240 A string that is the value of the property, or None if the property 241 doesn't exist. 242 """ 243 return self.shell("getprop %s" % prop_name) 244 245 # TODO: This should be abstracted out into an object like the other shell 246 # command. 247 def shell(self, command, ignore_status=False, timeout=DEFAULT_ADB_TIMEOUT): 248 return self._exec_adb_cmd( 249 'shell', 250 shellescape.quote(command), 251 ignore_status=ignore_status, 252 timeout=timeout) 253 254 def shell_nb(self, command): 255 return self._exec_adb_cmd_nb('shell', shellescape.quote(command)) 256 257 def pull(self, 258 command, 259 ignore_status=False, 260 timeout=DEFAULT_ADB_PULL_TIMEOUT): 261 return self._exec_adb_cmd( 262 'pull', command, ignore_status=ignore_status, timeout=timeout) 263 264 def __getattr__(self, name): 265 def adb_call(*args, **kwargs): 266 clean_name = name.replace('_', '-') 267 arg_str = ' '.join(str(elem) for elem in args) 268 return self._exec_adb_cmd(clean_name, arg_str, **kwargs) 269 270 return adb_call 271