1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import io 23import os 24import posixpath 25import random 26import re 27import shlex 28import shutil 29import signal 30import socket 31import string 32import subprocess 33import sys 34import tempfile 35import threading 36import time 37import unittest 38 39import proto.devices_pb2 as proto_devices 40import proto.app_processes_pb2 as proto_track_app 41 42from datetime import datetime 43 44import adb 45 46def requires_non_root(func): 47 def wrapper(self, *args): 48 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 49 if was_root: 50 self.device.unroot() 51 self.device.wait() 52 53 try: 54 func(self, *args) 55 finally: 56 if was_root: 57 self.device.root() 58 self.device.wait() 59 60 return wrapper 61 62 63class DeviceTest(unittest.TestCase): 64 def setUp(self) -> None: 65 self.device = adb.get_device() 66 67 68class AbbTest(DeviceTest): 69 def test_smoke(self): 70 abb = subprocess.run(['adb', 'abb'], capture_output=True) 71 cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True) 72 73 # abb squashes all failures to 1. 74 self.assertEqual(abb.returncode == 0, cmd.returncode == 0) 75 self.assertEqual(abb.stdout, cmd.stdout) 76 self.assertEqual(abb.stderr, cmd.stderr) 77 78class ForwardReverseTest(DeviceTest): 79 def _test_no_rebind(self, description, direction_list, direction, 80 direction_no_rebind, direction_remove_all): 81 msg = direction_list() 82 self.assertEqual('', msg.strip(), 83 description + ' list must be empty to run this test.') 84 85 # Use --no-rebind with no existing binding 86 direction_no_rebind('tcp:5566', 'tcp:6655') 87 msg = direction_list() 88 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 89 90 # Use --no-rebind with existing binding 91 with self.assertRaises(subprocess.CalledProcessError): 92 direction_no_rebind('tcp:5566', 'tcp:6677') 93 msg = direction_list() 94 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 95 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 96 97 # Use the absence of --no-rebind with existing binding 98 direction('tcp:5566', 'tcp:6677') 99 msg = direction_list() 100 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 101 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 102 103 direction_remove_all() 104 msg = direction_list() 105 self.assertEqual('', msg.strip()) 106 107 def test_forward_no_rebind(self): 108 self._test_no_rebind('forward', self.device.forward_list, 109 self.device.forward, self.device.forward_no_rebind, 110 self.device.forward_remove_all) 111 112 def test_reverse_no_rebind(self): 113 self._test_no_rebind('reverse', self.device.reverse_list, 114 self.device.reverse, self.device.reverse_no_rebind, 115 self.device.reverse_remove_all) 116 117 def test_forward(self): 118 msg = self.device.forward_list() 119 self.assertEqual('', msg.strip(), 120 'Forwarding list must be empty to run this test.') 121 self.device.forward('tcp:5566', 'tcp:6655') 122 msg = self.device.forward_list() 123 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 124 self.device.forward('tcp:7788', 'tcp:8877') 125 msg = self.device.forward_list() 126 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 127 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 128 self.device.forward_remove('tcp:5566') 129 msg = self.device.forward_list() 130 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 131 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 132 self.device.forward_remove_all() 133 msg = self.device.forward_list() 134 self.assertEqual('', msg.strip()) 135 136 def test_forward_old_protocol(self): 137 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 138 139 msg = self.device.forward_list() 140 self.assertEqual('', msg.strip(), 141 'Forwarding list must be empty to run this test.') 142 143 with socket.create_connection(("localhost", 5037)) as s: 144 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno 145 cmd = b"%04x%s" % (len(service), service) 146 s.sendall(cmd) 147 148 msg = self.device.forward_list() 149 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 150 151 self.device.forward_remove_all() 152 msg = self.device.forward_list() 153 self.assertEqual('', msg.strip()) 154 155 def test_forward_tcp_port_0(self): 156 self.assertEqual('', self.device.forward_list().strip(), 157 'Forwarding list must be empty to run this test.') 158 159 try: 160 # If resolving TCP port 0 is supported, `adb forward` will print 161 # the actual port number. 162 port = self.device.forward('tcp:0', 'tcp:8888').strip() 163 if not port: 164 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 165 166 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 167 self.device.forward_list())) 168 finally: 169 self.device.forward_remove_all() 170 171 def test_reverse(self): 172 msg = self.device.reverse_list() 173 self.assertEqual('', msg.strip(), 174 'Reverse forwarding list must be empty to run this test.') 175 self.device.reverse('tcp:5566', 'tcp:6655') 176 msg = self.device.reverse_list() 177 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 178 self.device.reverse('tcp:7788', 'tcp:8877') 179 msg = self.device.reverse_list() 180 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 181 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 182 self.device.reverse_remove('tcp:5566') 183 msg = self.device.reverse_list() 184 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 185 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 186 self.device.reverse_remove_all() 187 msg = self.device.reverse_list() 188 self.assertEqual('', msg.strip()) 189 190 def test_reverse_tcp_port_0(self): 191 self.assertEqual('', self.device.reverse_list().strip(), 192 'Reverse list must be empty to run this test.') 193 194 try: 195 # If resolving TCP port 0 is supported, `adb reverse` will print 196 # the actual port number. 197 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 198 if not port: 199 raise unittest.SkipTest('Reversing tcp:0 is not available.') 200 201 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 202 self.device.reverse_list())) 203 finally: 204 self.device.reverse_remove_all() 205 206 def test_forward_reverse_echo(self): 207 """Send data through adb forward and read it back via adb reverse""" 208 forward_port = 12345 209 reverse_port = forward_port + 1 210 forward_spec = 'tcp:' + str(forward_port) 211 reverse_spec = 'tcp:' + str(reverse_port) 212 forward_setup = False 213 reverse_setup = False 214 215 try: 216 # listen on localhost:forward_port, connect to remote:forward_port 217 self.device.forward(forward_spec, forward_spec) 218 forward_setup = True 219 # listen on remote:forward_port, connect to localhost:reverse_port 220 self.device.reverse(forward_spec, reverse_spec) 221 reverse_setup = True 222 223 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 224 with contextlib.closing(listener): 225 # Use SO_REUSEADDR so that subsequent runs of the test can grab 226 # the port even if it is in TIME_WAIT. 227 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 228 229 # Listen on localhost:reverse_port before connecting to 230 # localhost:forward_port because that will cause adb to connect 231 # back to localhost:reverse_port. 232 listener.bind(('127.0.0.1', reverse_port)) 233 listener.listen(4) 234 235 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 236 with contextlib.closing(client): 237 # Connect to the listener. 238 client.connect(('127.0.0.1', forward_port)) 239 240 # Accept the client connection. 241 accepted_connection, addr = listener.accept() 242 with contextlib.closing(accepted_connection) as server: 243 data = b'hello' 244 245 # Send data into the port setup by adb forward. 246 client.sendall(data) 247 # Explicitly close() so that server gets EOF. 248 client.close() 249 250 # Verify that the data came back via adb reverse. 251 self.assertEqual(data, server.makefile().read().encode("utf8")) 252 finally: 253 if reverse_setup: 254 self.device.reverse_remove(forward_spec) 255 if forward_setup: 256 self.device.forward_remove(forward_spec) 257 258 259class ShellTest(DeviceTest): 260 def _interactive_shell(self, shell_args, input): 261 """Runs an interactive adb shell. 262 263 Args: 264 shell_args: List of string arguments to `adb shell`. 265 input: bytes input to send to the interactive shell. 266 267 Returns: 268 The remote exit code. 269 270 Raises: 271 unittest.SkipTest: The device doesn't support exit codes. 272 """ 273 if not self.device.has_shell_protocol(): 274 raise unittest.SkipTest('exit codes are unavailable on this device') 275 276 proc = subprocess.Popen( 277 self.device.adb_cmd + ['shell'] + shell_args, 278 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 279 stderr=subprocess.PIPE) 280 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 281 # to explicitly add an exit command to close the session from the device 282 # side, plus the necessary newline to complete the interactive command. 283 proc.communicate(input + b'; exit\n') 284 return proc.returncode 285 286 def test_cat(self): 287 """Check that we can at least cat a file.""" 288 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 289 elements = out.split() 290 self.assertEqual(len(elements), 2) 291 292 uptime, idle = elements 293 self.assertGreater(float(uptime), 0.0) 294 self.assertGreater(float(idle), 0.0) 295 296 def test_throws_on_failure(self): 297 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 298 299 def test_output_not_stripped(self): 300 out = self.device.shell(['echo', 'foo'])[0] 301 self.assertEqual(out, 'foo' + self.device.linesep) 302 303 def test_shell_command_length(self): 304 # Devices that have shell_v2 should be able to handle long commands. 305 if self.device.has_shell_protocol(): 306 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 307 self.assertEqual(rc, 0) 308 self.assertTrue(out == ('x' * 16384 + '\n')) 309 310 def test_shell_nocheck_failure(self): 311 rc, out, _ = self.device.shell_nocheck(['false']) 312 self.assertNotEqual(rc, 0) 313 self.assertEqual(out, '') 314 315 def test_shell_nocheck_output_not_stripped(self): 316 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 317 self.assertEqual(rc, 0) 318 self.assertEqual(out, 'foo' + self.device.linesep) 319 320 def test_can_distinguish_tricky_results(self): 321 # If result checking on ADB shell is naively implemented as 322 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 323 # output from the result for a cmd of `echo -n 1`. 324 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 325 self.assertEqual(rc, 0) 326 self.assertEqual(out, '1') 327 328 def test_line_endings(self): 329 """Ensure that line ending translation is not happening in the pty. 330 331 Bug: http://b/19735063 332 """ 333 output = self.device.shell(['uname'])[0] 334 self.assertEqual(output, 'Linux' + self.device.linesep) 335 336 def test_pty_logic(self): 337 """Tests that a PTY is allocated when it should be. 338 339 PTY allocation behavior should match ssh. 340 """ 341 def check_pty(args): 342 """Checks adb shell PTY allocation. 343 344 Tests |args| for terminal and non-terminal stdin. 345 346 Args: 347 args: -Tt args in a list (e.g. ['-t', '-t']). 348 349 Returns: 350 A tuple (<terminal>, <non-terminal>). True indicates 351 the corresponding shell allocated a remote PTY. 352 """ 353 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 354 355 terminal = subprocess.Popen( 356 test_cmd, stdin=None, 357 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 358 terminal.communicate() 359 360 non_terminal = subprocess.Popen( 361 test_cmd, stdin=subprocess.PIPE, 362 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 363 non_terminal.communicate() 364 365 return (terminal.returncode == 0, non_terminal.returncode == 0) 366 367 # -T: never allocate PTY. 368 self.assertEqual((False, False), check_pty(['-T'])) 369 370 # These tests require a new device. 371 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 372 # No args: PTY only if stdin is a terminal and shell is interactive, 373 # which is difficult to reliably test from a script. 374 self.assertEqual((False, False), check_pty([])) 375 376 # -t: PTY if stdin is a terminal. 377 self.assertEqual((True, False), check_pty(['-t'])) 378 379 # -t -t: always allocate PTY. 380 self.assertEqual((True, True), check_pty(['-t', '-t'])) 381 382 # -tt: always allocate PTY, POSIX style (http://b/32216152). 383 self.assertEqual((True, True), check_pty(['-tt'])) 384 385 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 386 # we follow the man page instead. 387 self.assertEqual((True, True), check_pty(['-ttt'])) 388 389 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 390 self.assertEqual((True, True), check_pty(['-ttx'])) 391 392 # -Ttt: -tt cancels out -T. 393 self.assertEqual((True, True), check_pty(['-Ttt'])) 394 395 # -ttT: -T cancels out -tt. 396 self.assertEqual((False, False), check_pty(['-ttT'])) 397 398 def test_shell_protocol(self): 399 """Tests the shell protocol on the device. 400 401 If the device supports shell protocol, this gives us the ability 402 to separate stdout/stderr and return the exit code directly. 403 404 Bug: http://b/19734861 405 """ 406 if not self.device.has_shell_protocol(): 407 raise unittest.SkipTest('shell protocol unsupported on this device') 408 409 # Shell protocol should be used by default. 410 result = self.device.shell_nocheck( 411 shlex.split('echo foo; echo bar >&2; exit 17')) 412 self.assertEqual(17, result[0]) 413 self.assertEqual('foo' + self.device.linesep, result[1]) 414 self.assertEqual('bar' + self.device.linesep, result[2]) 415 416 self.assertEqual(17, self._interactive_shell([], b'exit 17')) 417 418 # -x flag should disable shell protocol. 419 result = self.device.shell_nocheck( 420 shlex.split('-x echo foo; echo bar >&2; exit 17')) 421 self.assertEqual(0, result[0]) 422 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 423 self.assertEqual('', result[2]) 424 425 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17')) 426 427 def test_non_interactive_sigint(self): 428 """Tests that SIGINT in a non-interactive shell kills the process. 429 430 This requires the shell protocol in order to detect the broken 431 pipe; raw data transfer mode will only see the break once the 432 subprocess tries to read or write. 433 434 Bug: http://b/23825725 435 """ 436 if not self.device.has_shell_protocol(): 437 raise unittest.SkipTest('shell protocol unsupported on this device') 438 439 # Start a long-running process. 440 sleep_proc = subprocess.Popen( 441 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 442 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 443 stderr=subprocess.STDOUT) 444 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8") 445 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 446 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 447 448 # Verify that the process is running, send signal, verify it stopped. 449 self.device.shell(proc_query) 450 os.kill(sleep_proc.pid, signal.SIGINT) 451 sleep_proc.communicate() 452 453 # It can take some time for the process to receive the signal and die. 454 end_time = time.time() + 3 455 while self.device.shell_nocheck(proc_query)[0] != 1: 456 self.assertFalse(time.time() > end_time, 457 'subprocess failed to terminate in time') 458 459 def test_non_interactive_stdin(self): 460 """Tests that non-interactive shells send stdin.""" 461 if not self.device.has_shell_protocol(): 462 raise unittest.SkipTest('non-interactive stdin unsupported ' 463 'on this device') 464 465 # Test both small and large inputs. 466 small_input = b'foo' 467 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits] 468 large_input = b'\n'.join(characters) 469 470 471 for input in (small_input, large_input): 472 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 473 stdin=subprocess.PIPE, 474 stdout=subprocess.PIPE, 475 stderr=subprocess.PIPE) 476 stdout, stderr = proc.communicate(input) 477 self.assertEqual(input.splitlines(), stdout.splitlines()) 478 self.assertEqual(b'', stderr) 479 480 def test_sighup(self): 481 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 482 log_path = "/data/local/tmp/adb_signal_test.log" 483 484 # Clear the output file. 485 self.device.shell_nocheck(["echo", ">", log_path]) 486 487 script = """ 488 trap "echo SIGINT > {path}; exit 0" SIGINT 489 trap "echo SIGHUP > {path}; exit 0" SIGHUP 490 echo Waiting 491 read 492 """.format(path=log_path) 493 494 script = ";".join([x.strip() for x in script.strip().splitlines()]) 495 496 with self.device.shell_popen([script], kill_atexit=False, 497 stdin=subprocess.PIPE, 498 stdout=subprocess.PIPE) as process: 499 500 self.assertEqual(b"Waiting\n", process.stdout.readline()) 501 process.send_signal(signal.SIGINT) 502 process.wait() 503 504 # Waiting for the local adb to finish is insufficient, since it hangs 505 # up immediately. 506 time.sleep(1) 507 508 stdout, _ = self.device.shell(["cat", log_path]) 509 self.assertEqual(stdout.strip(), "SIGHUP") 510 511 # Temporarily disabled because it seems to cause later instability. 512 # http://b/228114748 513 def disabled_test_exit_stress(self): 514 """Hammer `adb shell exit 42` with multiple threads.""" 515 thread_count = 48 516 result = dict() 517 def hammer(thread_idx, thread_count, result): 518 success = True 519 for i in range(thread_idx, 240, thread_count): 520 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 521 if ret != i % 256: 522 success = False 523 break 524 result[thread_idx] = success 525 526 threads = [] 527 for i in range(thread_count): 528 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 529 thread.start() 530 threads.append(thread) 531 for thread in threads: 532 thread.join() 533 for i, success in result.items(): 534 self.assertTrue(success) 535 536 def disabled_test_parallel(self): 537 """Spawn a bunch of `adb shell` instances in parallel. 538 539 This was broken historically due to the use of select, which only works 540 for fds that are numerically less than 1024. 541 542 Bug: http://b/141955761""" 543 544 n_procs = 2048 545 procs = dict() 546 for i in range(0, n_procs): 547 procs[i] = subprocess.Popen( 548 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'], 549 stdin=subprocess.PIPE, 550 stdout=subprocess.PIPE 551 ) 552 553 for i in range(0, n_procs): 554 procs[i].stdin.write("%d\n" % i) 555 556 for i in range(0, n_procs): 557 response = procs[i].stdout.readline() 558 assert(response == "%d\n" % i) 559 560 for i in range(0, n_procs): 561 procs[i].stdin.write("%d\n" % (i % 256)) 562 563 for i in range(0, n_procs): 564 assert(procs[i].wait() == i % 256) 565 566 567class ArgumentEscapingTest(DeviceTest): 568 def test_shell_escaping(self): 569 """Make sure that argument escaping is somewhat sane.""" 570 571 # http://b/19734868 572 # Note that this actually matches ssh(1)'s behavior --- it's 573 # converted to `sh -c echo hello; echo world` which sh interprets 574 # as `sh -c echo` (with an argument to that shell of "hello"), 575 # and then `echo world` back in the first shell. 576 result = self.device.shell( 577 shlex.split("sh -c 'echo hello; echo world'"))[0] 578 result = result.splitlines() 579 self.assertEqual(['', 'world'], result) 580 # If you really wanted "hello" and "world", here's what you'd do: 581 result = self.device.shell( 582 shlex.split(r'echo hello\;echo world'))[0].splitlines() 583 self.assertEqual(['hello', 'world'], result) 584 585 # http://b/15479704 586 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 587 self.assertEqual('t', result) 588 result = self.device.shell( 589 shlex.split("sh -c 'true && echo t'"))[0].strip() 590 self.assertEqual('t', result) 591 592 # http://b/20564385 593 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 594 self.assertEqual('t', result) 595 result = self.device.shell( 596 shlex.split(r'echo -n 123\;uname'))[0].strip() 597 self.assertEqual('123Linux', result) 598 599 def test_install_argument_escaping(self): 600 """Make sure that install argument escaping works.""" 601 # http://b/20323053, http://b/3090932. 602 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"): 603 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 604 delete=False) 605 tf.close() 606 607 # Installing bogus .apks fails if the device supports exit codes. 608 try: 609 output = self.device.install(tf.name.decode("utf8")) 610 except subprocess.CalledProcessError as e: 611 output = e.output 612 613 self.assertIn(file_suffix, output) 614 os.remove(tf.name) 615 616 617class RootUnrootTest(DeviceTest): 618 def _test_root(self): 619 message = self.device.root() 620 if 'adbd cannot run as root in production builds' in message: 621 return 622 self.device.wait() 623 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 624 625 def _test_unroot(self): 626 self.device.unroot() 627 self.device.wait() 628 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 629 630 def test_root_unroot(self): 631 """Make sure that adb root and adb unroot work, using id(1).""" 632 if self.device.get_prop('ro.debuggable') != '1': 633 raise unittest.SkipTest('requires rootable build') 634 635 original_user = self.device.shell(['id', '-un'])[0].strip() 636 try: 637 if original_user == 'root': 638 self._test_unroot() 639 self._test_root() 640 elif original_user == 'shell': 641 self._test_root() 642 self._test_unroot() 643 finally: 644 if original_user == 'root': 645 self.device.root() 646 else: 647 self.device.unroot() 648 self.device.wait() 649 650 651class TcpIpTest(DeviceTest): 652 def test_tcpip_failure_raises(self): 653 """adb tcpip requires a port. 654 655 Bug: http://b/22636927 656 """ 657 self.assertRaises( 658 subprocess.CalledProcessError, self.device.tcpip, '') 659 self.assertRaises( 660 subprocess.CalledProcessError, self.device.tcpip, 'foo') 661 662 663class SystemPropertiesTest(DeviceTest): 664 def test_get_prop(self): 665 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 666 667 def test_set_prop(self): 668 # debug.* prop does not require root privileges 669 prop_name = 'debug.foo' 670 self.device.shell(['setprop', prop_name, '""']) 671 672 val = random.random() 673 self.device.set_prop(prop_name, str(val)) 674 self.assertEqual( 675 self.device.shell(['getprop', prop_name])[0].strip(), str(val)) 676 677 678def compute_md5(string): 679 hsh = hashlib.md5() 680 hsh.update(string) 681 return hsh.hexdigest() 682 683 684class HostFile(object): 685 def __init__(self, handle, checksum): 686 self.handle = handle 687 self.checksum = checksum 688 self.full_path = handle.name 689 self.base_name = os.path.basename(self.full_path) 690 691 692class DeviceFile(object): 693 def __init__(self, checksum, full_path): 694 self.checksum = checksum 695 self.full_path = full_path 696 self.base_name = posixpath.basename(self.full_path) 697 698 699def make_random_host_files(in_dir, num_files): 700 min_size = 1 * (1 << 10) 701 max_size = 16 * (1 << 10) 702 703 files = [] 704 for _ in range(num_files): 705 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 706 707 size = random.randrange(min_size, max_size, 1024) 708 rand_str = os.urandom(size) 709 file_handle.write(rand_str) 710 file_handle.flush() 711 file_handle.close() 712 713 md5 = compute_md5(rand_str) 714 files.append(HostFile(file_handle, md5)) 715 return files 716 717 718def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 719 min_size = 1 * (1 << 10) 720 max_size = 16 * (1 << 10) 721 722 files = [] 723 for file_num in range(num_files): 724 size = random.randrange(min_size, max_size, 1024) 725 726 base_name = prefix + str(file_num) 727 full_path = posixpath.join(in_dir, base_name) 728 729 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 730 'bs={}'.format(size), 'count=1']) 731 dev_md5, _ = device.shell(['md5sum', full_path])[0].split() 732 733 files.append(DeviceFile(dev_md5, full_path)) 734 return files 735 736 737class FileOperationsTest: 738 class Base(DeviceTest): 739 SCRATCH_DIR = '/data/local/tmp' 740 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 741 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 742 743 def setUp(self): 744 super().setUp() 745 self.previous_env = os.environ.get("ADB_COMPRESSION") 746 os.environ["ADB_COMPRESSION"] = self.compression 747 748 def tearDown(self): 749 if self.previous_env is None: 750 del os.environ["ADB_COMPRESSION"] 751 else: 752 os.environ["ADB_COMPRESSION"] = self.previous_env 753 754 def _verify_remote(self, checksum, remote_path): 755 dev_md5, _ = self.device.shell(['md5sum', remote_path])[0].split() 756 self.assertEqual(checksum, dev_md5) 757 758 def _verify_local(self, checksum, local_path): 759 with open(local_path, 'rb') as host_file: 760 host_md5 = compute_md5(host_file.read()) 761 self.assertEqual(host_md5, checksum) 762 763 def test_push(self): 764 """Push a randomly generated file to specified device.""" 765 kbytes = 512 766 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 767 rand_str = os.urandom(1024 * kbytes) 768 tmp.write(rand_str) 769 tmp.close() 770 771 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 772 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 773 774 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 775 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 776 777 os.remove(tmp.name) 778 779 def test_push_dir(self): 780 """Push a randomly generated directory of files to the device.""" 781 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 782 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 783 784 try: 785 host_dir = tempfile.mkdtemp() 786 787 # Make sure the temp directory isn't setuid, or else adb will complain. 788 os.chmod(host_dir, 0o700) 789 790 # Create 32 random files. 791 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 792 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 793 794 for temp_file in temp_files: 795 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 796 os.path.basename(host_dir), 797 temp_file.base_name) 798 self._verify_remote(temp_file.checksum, remote_path) 799 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 800 finally: 801 if host_dir is not None: 802 shutil.rmtree(host_dir) 803 804 def disabled_test_push_empty(self): 805 """Push an empty directory to the device.""" 806 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 807 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 808 809 try: 810 host_dir = tempfile.mkdtemp() 811 812 # Make sure the temp directory isn't setuid, or else adb will complain. 813 os.chmod(host_dir, 0o700) 814 815 # Create an empty directory. 816 empty_dir_path = os.path.join(host_dir, 'empty') 817 os.mkdir(empty_dir_path); 818 819 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 820 821 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 822 test_empty_cmd = ["[", "-d", remote_path, "]"] 823 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 824 825 self.assertEqual(rc, 0) 826 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 827 finally: 828 if host_dir is not None: 829 shutil.rmtree(host_dir) 830 831 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 832 def test_push_symlink(self): 833 """Push a symlink. 834 835 Bug: http://b/31491920 836 """ 837 try: 838 host_dir = tempfile.mkdtemp() 839 840 # Make sure the temp directory isn't setuid, or else adb will 841 # complain. 842 os.chmod(host_dir, 0o700) 843 844 with open(os.path.join(host_dir, 'foo'), 'w') as f: 845 f.write('foo') 846 847 symlink_path = os.path.join(host_dir, 'symlink') 848 os.symlink('foo', symlink_path) 849 850 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 851 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 852 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 853 rc, out, _ = self.device.shell_nocheck( 854 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 855 self.assertEqual(0, rc) 856 self.assertEqual(out.strip(), 'foo') 857 finally: 858 if host_dir is not None: 859 shutil.rmtree(host_dir) 860 861 def test_multiple_push(self): 862 """Push multiple files to the device in one adb push command. 863 864 Bug: http://b/25324823 865 """ 866 867 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 868 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 869 870 try: 871 host_dir = tempfile.mkdtemp() 872 873 # Create some random files and a subdirectory containing more files. 874 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 875 876 subdir = os.path.join(host_dir, 'subdir') 877 os.mkdir(subdir) 878 subdir_temp_files = make_random_host_files(in_dir=subdir, 879 num_files=4) 880 881 paths = [x.full_path for x in temp_files] 882 paths.append(subdir) 883 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 884 885 for temp_file in temp_files: 886 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 887 temp_file.base_name) 888 self._verify_remote(temp_file.checksum, remote_path) 889 890 for subdir_temp_file in subdir_temp_files: 891 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 892 # BROKEN: http://b/25394682 893 # 'subdir'; 894 temp_file.base_name) 895 self._verify_remote(temp_file.checksum, remote_path) 896 897 898 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 899 finally: 900 if host_dir is not None: 901 shutil.rmtree(host_dir) 902 903 @requires_non_root 904 def test_push_error_reporting(self): 905 """Make sure that errors that occur while pushing a file get reported 906 907 Bug: http://b/26816782 908 """ 909 with tempfile.NamedTemporaryFile() as tmp_file: 910 tmp_file.write(b'\0' * 1024 * 1024) 911 tmp_file.flush() 912 try: 913 self.device.push(local=tmp_file.name, remote='/system/') 914 self.fail('push should not have succeeded') 915 except subprocess.CalledProcessError as e: 916 output = e.output 917 918 self.assertTrue(b'Permission denied' in output or 919 b'Read-only file system' in output) 920 921 @requires_non_root 922 def test_push_directory_creation(self): 923 """Regression test for directory creation. 924 925 Bug: http://b/110953234 926 """ 927 with tempfile.NamedTemporaryFile() as tmp_file: 928 tmp_file.write(b'\0' * 1024 * 1024) 929 tmp_file.flush() 930 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 931 self.device.shell(['rm', '-rf', remote_path]) 932 933 remote_path += '/filename' 934 self.device.push(local=tmp_file.name, remote=remote_path) 935 936 def disabled_test_push_multiple_slash_root(self): 937 """Regression test for pushing to //data/local/tmp. 938 939 Bug: http://b/141311284 940 941 Disabled because this broken on the adbd side as well: b/141943968 942 """ 943 with tempfile.NamedTemporaryFile() as tmp_file: 944 tmp_file.write(b'\0' * 1024 * 1024) 945 tmp_file.flush() 946 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' 947 self.device.shell(['rm', '-rf', remote_path]) 948 self.device.push(local=tmp_file.name, remote=remote_path) 949 950 def _test_pull(self, remote_file, checksum): 951 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 952 tmp_write.close() 953 self.device.pull(remote=remote_file, local=tmp_write.name) 954 with open(tmp_write.name, 'rb') as tmp_read: 955 host_contents = tmp_read.read() 956 host_md5 = compute_md5(host_contents) 957 self.assertEqual(checksum, host_md5) 958 os.remove(tmp_write.name) 959 960 @requires_non_root 961 def test_pull_error_reporting(self): 962 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 963 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 964 965 try: 966 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 967 except subprocess.CalledProcessError as e: 968 output = e.output 969 970 self.assertIn(b'Permission denied', output) 971 972 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 973 974 def test_pull(self): 975 """Pull a randomly generated file from specified device.""" 976 kbytes = 512 977 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 978 cmd = ['dd', 'if=/dev/urandom', 979 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 980 'count={}'.format(kbytes)] 981 self.device.shell(cmd) 982 dev_md5, _ = self.device.shell(['md5sum', self.DEVICE_TEMP_FILE])[0].split() 983 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 984 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 985 986 def test_pull_dir(self): 987 """Pull a randomly generated directory of files from the device.""" 988 try: 989 host_dir = tempfile.mkdtemp() 990 991 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 992 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 993 994 # Populate device directory with random files. 995 temp_files = make_random_device_files( 996 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 997 998 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 999 1000 for temp_file in temp_files: 1001 host_path = os.path.join( 1002 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1003 temp_file.base_name) 1004 self._verify_local(temp_file.checksum, host_path) 1005 1006 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1007 finally: 1008 if host_dir is not None: 1009 shutil.rmtree(host_dir) 1010 1011 def test_pull_dir_symlink(self): 1012 """Pull a directory into a symlink to a directory. 1013 1014 Bug: http://b/27362811 1015 """ 1016 if os.name != 'posix': 1017 raise unittest.SkipTest('requires POSIX') 1018 1019 try: 1020 host_dir = tempfile.mkdtemp() 1021 real_dir = os.path.join(host_dir, 'dir') 1022 symlink = os.path.join(host_dir, 'symlink') 1023 os.mkdir(real_dir) 1024 os.symlink(real_dir, symlink) 1025 1026 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1027 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1028 1029 # Populate device directory with random files. 1030 temp_files = make_random_device_files( 1031 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1032 1033 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 1034 1035 for temp_file in temp_files: 1036 host_path = os.path.join( 1037 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1038 temp_file.base_name) 1039 self._verify_local(temp_file.checksum, host_path) 1040 1041 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1042 finally: 1043 if host_dir is not None: 1044 shutil.rmtree(host_dir) 1045 1046 def test_pull_dir_symlink_collision(self): 1047 """Pull a directory into a colliding symlink to directory.""" 1048 if os.name != 'posix': 1049 raise unittest.SkipTest('requires POSIX') 1050 1051 try: 1052 host_dir = tempfile.mkdtemp() 1053 real_dir = os.path.join(host_dir, 'real') 1054 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1055 symlink = os.path.join(host_dir, tmp_dirname) 1056 os.mkdir(real_dir) 1057 os.symlink(real_dir, symlink) 1058 1059 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1060 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1061 1062 # Populate device directory with random files. 1063 temp_files = make_random_device_files( 1064 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1065 1066 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1067 1068 for temp_file in temp_files: 1069 host_path = os.path.join(real_dir, temp_file.base_name) 1070 self._verify_local(temp_file.checksum, host_path) 1071 1072 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1073 finally: 1074 if host_dir is not None: 1075 shutil.rmtree(host_dir) 1076 1077 def test_pull_dir_nonexistent(self): 1078 """Pull a directory of files from the device to a nonexistent path.""" 1079 try: 1080 host_dir = tempfile.mkdtemp() 1081 dest_dir = os.path.join(host_dir, 'dest') 1082 1083 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1084 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1085 1086 # Populate device directory with random files. 1087 temp_files = make_random_device_files( 1088 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1089 1090 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1091 1092 for temp_file in temp_files: 1093 host_path = os.path.join(dest_dir, temp_file.base_name) 1094 self._verify_local(temp_file.checksum, host_path) 1095 1096 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1097 finally: 1098 if host_dir is not None: 1099 shutil.rmtree(host_dir) 1100 1101 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1102 def disabled_test_pull_symlink_dir(self): 1103 """Pull a symlink to a directory of symlinks to files.""" 1104 try: 1105 host_dir = tempfile.mkdtemp() 1106 1107 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1108 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1109 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1110 1111 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1112 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1113 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1114 1115 # Populate device directory with random files. 1116 temp_files = make_random_device_files( 1117 self.device, in_dir=remote_dir, num_files=32) 1118 1119 for temp_file in temp_files: 1120 self.device.shell( 1121 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1122 posixpath.join(remote_links, temp_file.base_name)]) 1123 1124 self.device.pull(remote=remote_symlink, local=host_dir) 1125 1126 for temp_file in temp_files: 1127 host_path = os.path.join( 1128 host_dir, 'symlink', temp_file.base_name) 1129 self._verify_local(temp_file.checksum, host_path) 1130 1131 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1132 finally: 1133 if host_dir is not None: 1134 shutil.rmtree(host_dir) 1135 1136 def test_pull_empty(self): 1137 """Pull a directory containing an empty directory from the device.""" 1138 try: 1139 host_dir = tempfile.mkdtemp() 1140 1141 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1142 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1143 self.device.shell(['mkdir', '-p', remote_empty_path]) 1144 1145 self.device.pull(remote=remote_empty_path, local=host_dir) 1146 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1147 finally: 1148 if host_dir is not None: 1149 shutil.rmtree(host_dir) 1150 1151 def test_multiple_pull(self): 1152 """Pull a randomly generated directory of files from the device.""" 1153 1154 try: 1155 host_dir = tempfile.mkdtemp() 1156 1157 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1158 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1159 self.device.shell(['mkdir', '-p', subdir]) 1160 1161 # Create some random files and a subdirectory containing more files. 1162 temp_files = make_random_device_files( 1163 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1164 1165 subdir_temp_files = make_random_device_files( 1166 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1167 1168 paths = [x.full_path for x in temp_files] 1169 paths.append(subdir) 1170 self.device._simple_call(['pull'] + paths + [host_dir]) 1171 1172 for temp_file in temp_files: 1173 local_path = os.path.join(host_dir, temp_file.base_name) 1174 self._verify_local(temp_file.checksum, local_path) 1175 1176 for subdir_temp_file in subdir_temp_files: 1177 local_path = os.path.join(host_dir, 1178 'subdir', 1179 subdir_temp_file.base_name) 1180 self._verify_local(subdir_temp_file.checksum, local_path) 1181 1182 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1183 finally: 1184 if host_dir is not None: 1185 shutil.rmtree(host_dir) 1186 1187 def verify_sync(self, device, temp_files, device_dir): 1188 """Verifies that a list of temp files was synced to the device.""" 1189 # Confirm that every file on the device mirrors that on the host. 1190 for temp_file in temp_files: 1191 device_full_path = posixpath.join( 1192 device_dir, temp_file.base_name) 1193 dev_md5, _ = device.shell(['md5sum', device_full_path])[0].split() 1194 self.assertEqual(temp_file.checksum, dev_md5) 1195 1196 def test_sync(self): 1197 """Sync a host directory to the data partition.""" 1198 1199 try: 1200 base_dir = tempfile.mkdtemp() 1201 1202 # Create mirror device directory hierarchy within base_dir. 1203 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1204 os.makedirs(full_dir_path) 1205 1206 # Create 32 random files within the host mirror. 1207 temp_files = make_random_host_files( 1208 in_dir=full_dir_path, num_files=32) 1209 1210 # Clean up any stale files on the device. 1211 device = adb.get_device() # pylint: disable=no-member 1212 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1213 1214 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1215 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1216 device.sync('data') 1217 if old_product_out is None: 1218 del os.environ['ANDROID_PRODUCT_OUT'] 1219 else: 1220 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1221 1222 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1223 1224 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1225 finally: 1226 if base_dir is not None: 1227 shutil.rmtree(base_dir) 1228 1229 def test_push_sync(self): 1230 """Sync a host directory to a specific path.""" 1231 1232 try: 1233 temp_dir = tempfile.mkdtemp() 1234 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1235 1236 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1237 1238 # Clean up any stale files on the device. 1239 device = adb.get_device() # pylint: disable=no-member 1240 device.shell(['rm', '-rf', device_dir]) 1241 1242 device.push(temp_dir, device_dir, sync=True) 1243 1244 self.verify_sync(device, temp_files, device_dir) 1245 1246 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1247 finally: 1248 if temp_dir is not None: 1249 shutil.rmtree(temp_dir) 1250 1251 def test_push_sync_multiple(self): 1252 """Sync multiple host directories to a specific path.""" 1253 1254 try: 1255 temp_dir = tempfile.mkdtemp() 1256 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1257 1258 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1259 1260 # Clean up any stale files on the device. 1261 device = adb.get_device() # pylint: disable=no-member 1262 device.shell(['rm', '-rf', device_dir]) 1263 device.shell(['mkdir', '-p', device_dir]) 1264 1265 host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files] 1266 device.push(host_paths, device_dir, sync=True) 1267 1268 self.verify_sync(device, temp_files, device_dir) 1269 1270 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1271 finally: 1272 if temp_dir is not None: 1273 shutil.rmtree(temp_dir) 1274 1275 1276 def test_push_dry_run_nonexistent_file(self): 1277 """Push with dry run (non-existent file).""" 1278 1279 for file_size in [8, 1024 * 1024]: 1280 try: 1281 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1282 device_file = posixpath.join(device_dir, 'file') 1283 1284 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1285 self.device.shell(['mkdir', '-p', device_dir]) 1286 1287 host_dir = tempfile.mkdtemp() 1288 host_file = posixpath.join(host_dir, 'file') 1289 1290 with open(host_file, "w") as f: 1291 f.write('x' * file_size) 1292 1293 self.device._simple_call(['push', '-n', host_file, device_file]) 1294 rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']']) 1295 self.assertNotEqual(0, rc) 1296 1297 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1298 finally: 1299 if host_dir is not None: 1300 shutil.rmtree(host_dir) 1301 1302 def test_push_dry_run_existent_file(self): 1303 """Push with dry run.""" 1304 1305 for file_size in [8, 1024 * 1024]: 1306 try: 1307 host_dir = tempfile.mkdtemp() 1308 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1309 device_file = posixpath.join(device_dir, 'file') 1310 1311 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1312 self.device.shell(['mkdir', '-p', device_dir]) 1313 self.device.shell(['echo', 'foo', '>', device_file]) 1314 1315 host_file = posixpath.join(host_dir, 'file') 1316 1317 with open(host_file, "w") as f: 1318 f.write('x' * file_size) 1319 1320 self.device._simple_call(['push', '-n', host_file, device_file]) 1321 stdout, stderr = self.device.shell(['cat', device_file]) 1322 self.assertEqual(stdout.strip(), "foo") 1323 1324 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1325 finally: 1326 if host_dir is not None: 1327 shutil.rmtree(host_dir) 1328 1329 def test_unicode_paths(self): 1330 """Ensure that we can support non-ASCII paths, even on Windows.""" 1331 name = u'로보카 폴리' 1332 1333 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1334 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1335 1336 ## push. 1337 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1338 tf.close() 1339 self.device.push(tf.name, remote_path) 1340 os.remove(tf.name) 1341 self.assertFalse(os.path.exists(tf.name)) 1342 1343 # Verify that the device ended up with the expected UTF-8 path 1344 output = self.device.shell( 1345 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1346 self.assertEqual(remote_path, output) 1347 1348 # pull. 1349 self.device.pull(remote_path, tf.name) 1350 self.assertTrue(os.path.exists(tf.name)) 1351 os.remove(tf.name) 1352 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1353 1354 1355class FileOperationsTestUncompressed(FileOperationsTest.Base): 1356 compression = "none" 1357 1358 1359class FileOperationsTestBrotli(FileOperationsTest.Base): 1360 compression = "brotli" 1361 1362 1363class FileOperationsTestLZ4(FileOperationsTest.Base): 1364 compression = "lz4" 1365 1366 1367class FileOperationsTestZstd(FileOperationsTest.Base): 1368 compression = "zstd" 1369 1370 1371class DeviceOfflineTest(DeviceTest): 1372 def _get_device_state(self, serialno): 1373 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1374 for line in output.split('\n'): 1375 m = re.match('(\S+)\s+(\S+)', line) 1376 if m and m.group(1) == serialno: 1377 return m.group(2) 1378 return None 1379 1380 def disabled_test_killed_when_pushing_a_large_file(self): 1381 """ 1382 While running adb push with a large file, kill adb server. 1383 Occasionally the device becomes offline. Because the device is still 1384 reading data without realizing that the adb server has been restarted. 1385 Test if we can bring the device online automatically now. 1386 http://b/32952319 1387 """ 1388 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1389 # 1. Push a large file 1390 file_path = 'tmp_large_file' 1391 try: 1392 fh = open(file_path, 'w') 1393 fh.write('\0' * (100 * 1024 * 1024)) 1394 fh.close() 1395 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1396 time.sleep(0.1) 1397 # 2. Kill the adb server 1398 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1399 subproc.terminate() 1400 finally: 1401 try: 1402 os.unlink(file_path) 1403 except: 1404 pass 1405 # 3. See if the device still exist. 1406 # Sleep to wait for the adb server exit. 1407 time.sleep(0.5) 1408 # 4. The device should be online 1409 self.assertEqual(self._get_device_state(serialno), 'device') 1410 1411 def disabled_test_killed_when_pulling_a_large_file(self): 1412 """ 1413 While running adb pull with a large file, kill adb server. 1414 Occasionally the device can't be connected. Because the device is trying to 1415 send a message larger than what is expected by the adb server. 1416 Test if we can bring the device online automatically now. 1417 """ 1418 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1419 file_path = 'tmp_large_file' 1420 try: 1421 # 1. Create a large file on device. 1422 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1423 'bs=1000000', 'count=100']) 1424 # 2. Pull the large file on host. 1425 subproc = subprocess.Popen(self.device.adb_cmd + 1426 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1427 time.sleep(0.1) 1428 # 3. Kill the adb server 1429 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1430 subproc.terminate() 1431 finally: 1432 try: 1433 os.unlink(file_path) 1434 except: 1435 pass 1436 # 4. See if the device still exist. 1437 # Sleep to wait for the adb server exit. 1438 time.sleep(0.5) 1439 self.assertEqual(self._get_device_state(serialno), 'device') 1440 1441 1442 def test_packet_size_regression(self): 1443 """Test for http://b/37783561 1444 1445 Receiving packets of a length divisible by 512 but not 1024 resulted in 1446 the adb client waiting indefinitely for more input. 1447 """ 1448 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1449 # Probe some surrounding values as well, for the hell of it. 1450 for base in [512] + list(range(1024, 1024 * 16, 1024)): 1451 for offset in [-6, -5, -4]: 1452 length = base + offset 1453 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1454 'echo', 'foo'] 1455 rc, stdout, _ = self.device.shell_nocheck(cmd) 1456 1457 self.assertEqual(0, rc) 1458 1459 # Output should be '\0' * length, followed by "foo\n" 1460 self.assertEqual(length, len(stdout) - 4) 1461 self.assertEqual(stdout, "\0" * length + "foo\n") 1462 1463 def test_zero_packet(self): 1464 """Test for http://b/113070258 1465 1466 Make sure that we don't blow up when sending USB transfers that line up 1467 exactly with the USB packet size. 1468 """ 1469 1470 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1471 try: 1472 for size in [512, 1024]: 1473 def listener(): 1474 cmd = ["echo foo | nc -l -p 12345; echo done"] 1475 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1476 1477 thread = threading.Thread(target=listener) 1478 thread.start() 1479 1480 # Wait a bit to let the shell command start. 1481 time.sleep(0.25) 1482 1483 sock = socket.create_connection(("localhost", local_port)) 1484 with contextlib.closing(sock): 1485 bytesWritten = sock.send(b"a" * size) 1486 self.assertEqual(size, bytesWritten) 1487 readBytes = sock.recv(4096) 1488 self.assertEqual(b"foo\n", readBytes) 1489 1490 thread.join() 1491 finally: 1492 self.device.forward_remove("tcp:{}".format(local_port)) 1493 1494 1495class SocketTest(DeviceTest): 1496 def test_socket_flush(self): 1497 """Test that we handle socket closure properly. 1498 1499 If we're done writing to a socket, closing before the other end has 1500 closed will send a TCP_RST if we have incoming data queued up, which 1501 may result in data that we've written being discarded. 1502 1503 Bug: http://b/74616284 1504 """ 1505 def adb_length_prefixed(string): 1506 encoded = string.encode("utf8") 1507 result = b"%04x%s" % (len(encoded), encoded) 1508 return result 1509 1510 if "ANDROID_SERIAL" in os.environ: 1511 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1512 else: 1513 transport_string = "host:transport-any" 1514 1515 with socket.create_connection(("localhost", 5037)) as s: 1516 1517 s.sendall(adb_length_prefixed(transport_string)) 1518 response = s.recv(4) 1519 self.assertEqual(b"OKAY", response) 1520 1521 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1522 s.sendall(adb_length_prefixed(shell_string)) 1523 1524 response = s.recv(4) 1525 self.assertEqual(b"OKAY", response) 1526 1527 # Spawn a thread that dumps garbage into the socket until failure. 1528 def spam(): 1529 buf = b"\0" * 16384 1530 try: 1531 while True: 1532 s.sendall(buf) 1533 except Exception as ex: 1534 print(ex) 1535 1536 thread = threading.Thread(target=spam) 1537 thread.start() 1538 1539 time.sleep(1) 1540 1541 received = b"" 1542 while True: 1543 read = s.recv(512) 1544 if len(read) == 0: 1545 break 1546 received += read 1547 1548 self.assertEqual(1024 * 1024 + len("foo\n"), len(received)) 1549 thread.join() 1550 1551 1552class FramebufferTest(DeviceTest): 1553 def test_framebuffer(self): 1554 """Test that we get something from the framebuffer service.""" 1555 output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"]) 1556 self.assertFalse(len(output) == 0) 1557 1558 1559if sys.platform == "win32": 1560 # From https://stackoverflow.com/a/38749458 1561 import os 1562 import contextlib 1563 import msvcrt 1564 import ctypes 1565 from ctypes import wintypes 1566 1567 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1568 1569 GENERIC_READ = 0x80000000 1570 GENERIC_WRITE = 0x40000000 1571 FILE_SHARE_READ = 1 1572 FILE_SHARE_WRITE = 2 1573 CONSOLE_TEXTMODE_BUFFER = 1 1574 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1575 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1576 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1577 1578 def _check_zero(result, func, args): 1579 if not result: 1580 raise ctypes.WinError(ctypes.get_last_error()) 1581 return args 1582 1583 def _check_invalid(result, func, args): 1584 if result == INVALID_HANDLE_VALUE: 1585 raise ctypes.WinError(ctypes.get_last_error()) 1586 return args 1587 1588 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1589 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1590 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1591 1592 class COORD(ctypes.Structure): 1593 _fields_ = (('X', wintypes.SHORT), 1594 ('Y', wintypes.SHORT)) 1595 1596 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1597 _fields_ = (('cbSize', wintypes.ULONG), 1598 ('dwSize', COORD), 1599 ('dwCursorPosition', COORD), 1600 ('wAttributes', wintypes.WORD), 1601 ('srWindow', wintypes.SMALL_RECT), 1602 ('dwMaximumWindowSize', COORD), 1603 ('wPopupAttributes', wintypes.WORD), 1604 ('bFullscreenSupported', wintypes.BOOL), 1605 ('ColorTable', wintypes.DWORD * 16)) 1606 def __init__(self, *args, **kwds): 1607 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1608 *args, **kwds) 1609 self.cbSize = ctypes.sizeof(self) 1610 1611 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1612 CONSOLE_SCREEN_BUFFER_INFOEX) 1613 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1614 1615 kernel32.GetStdHandle.errcheck = _check_invalid 1616 kernel32.GetStdHandle.restype = wintypes.HANDLE 1617 kernel32.GetStdHandle.argtypes = ( 1618 wintypes.DWORD,) # _In_ nStdHandle 1619 1620 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1621 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1622 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1623 wintypes.DWORD, # _In_ dwDesiredAccess 1624 wintypes.DWORD, # _In_ dwShareMode 1625 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1626 wintypes.DWORD, # _In_ dwFlags 1627 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1628 1629 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1630 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1631 wintypes.HANDLE, # _In_ hConsoleOutput 1632 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1633 1634 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1635 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1636 wintypes.HANDLE, # _In_ hConsoleOutput 1637 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1638 1639 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1640 kernel32.SetConsoleWindowInfo.argtypes = ( 1641 wintypes.HANDLE, # _In_ hConsoleOutput 1642 wintypes.BOOL, # _In_ bAbsolute 1643 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1644 1645 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1646 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1647 wintypes.HANDLE, # _In_ hConsoleOutput 1648 wintypes.WCHAR, # _In_ cCharacter 1649 wintypes.DWORD, # _In_ nLength 1650 COORD, # _In_ dwWriteCoord 1651 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1652 1653 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1654 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1655 wintypes.HANDLE, # _In_ hConsoleOutput 1656 wintypes.LPWSTR, # _Out_ lpCharacter 1657 wintypes.DWORD, # _In_ nLength 1658 COORD, # _In_ dwReadCoord 1659 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1660 1661 @contextlib.contextmanager 1662 def allocate_console(): 1663 allocated = kernel32.AllocConsole() 1664 try: 1665 yield allocated 1666 finally: 1667 if allocated: 1668 kernel32.FreeConsole() 1669 1670 @contextlib.contextmanager 1671 def console_screen(ncols=None, nrows=None): 1672 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1673 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1674 nwritten = (wintypes.DWORD * 1)() 1675 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1676 kernel32.GetConsoleScreenBufferInfoEx( 1677 hStdOut, ctypes.byref(info)) 1678 if ncols is None: 1679 ncols = info.dwSize.X 1680 if nrows is None: 1681 nrows = info.dwSize.Y 1682 elif nrows > 9999: 1683 raise ValueError('nrows must be 9999 or less') 1684 fd_screen = None 1685 hScreen = kernel32.CreateConsoleScreenBuffer( 1686 GENERIC_READ | GENERIC_WRITE, 1687 FILE_SHARE_READ | FILE_SHARE_WRITE, 1688 None, CONSOLE_TEXTMODE_BUFFER, None) 1689 try: 1690 fd_screen = msvcrt.open_osfhandle( 1691 hScreen, os.O_RDWR | os.O_BINARY) 1692 kernel32.GetConsoleScreenBufferInfoEx( 1693 hScreen, ctypes.byref(new_info)) 1694 new_info.dwSize = COORD(ncols, nrows) 1695 new_info.srWindow = wintypes.SMALL_RECT( 1696 Left=0, Top=0, Right=(ncols - 1), 1697 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1698 kernel32.SetConsoleScreenBufferInfoEx( 1699 hScreen, ctypes.byref(new_info)) 1700 kernel32.SetConsoleWindowInfo(hScreen, True, 1701 ctypes.byref(new_info.srWindow)) 1702 kernel32.FillConsoleOutputCharacterW( 1703 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1704 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1705 try: 1706 yield fd_screen 1707 finally: 1708 kernel32.SetConsoleScreenBufferInfoEx( 1709 hStdOut, ctypes.byref(info)) 1710 kernel32.SetConsoleWindowInfo(hStdOut, True, 1711 ctypes.byref(info.srWindow)) 1712 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1713 finally: 1714 if fd_screen is not None: 1715 os.close(fd_screen) 1716 else: 1717 kernel32.CloseHandle(hScreen) 1718 1719 def read_screen(fd): 1720 hScreen = msvcrt.get_osfhandle(fd) 1721 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1722 kernel32.GetConsoleScreenBufferInfoEx( 1723 hScreen, ctypes.byref(csbi)) 1724 ncols = csbi.dwSize.X 1725 pos = csbi.dwCursorPosition 1726 length = ncols * pos.Y + pos.X + 1 1727 buf = (ctypes.c_wchar * length)() 1728 n = (wintypes.DWORD * 1)() 1729 kernel32.ReadConsoleOutputCharacterW( 1730 hScreen, buf, length, COORD(0,0), n) 1731 lines = [buf[i:i+ncols].rstrip(u'\0') 1732 for i in range(0, n[0], ncols)] 1733 return u'\n'.join(lines) 1734 1735@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1736class WindowsConsoleTest(DeviceTest): 1737 def test_unicode_output(self): 1738 """Test Unicode command line parameters and Unicode console window output. 1739 1740 Bug: https://issuetracker.google.com/issues/111972753 1741 """ 1742 # If we don't have a console window, allocate one. This isn't necessary if we're already 1743 # being run from a console window, which is typical. 1744 with allocate_console() as allocated_console: 1745 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1746 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1747 # likely unnecessary given the typical console window size. 1748 with console_screen(nrows=1000) as screen: 1749 unicode_string = u'로보카 폴리' 1750 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1751 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1752 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1753 process.wait() 1754 # Read what was written by adb to the temporary console buffer. 1755 console_output = read_screen(screen) 1756 self.assertEqual(unicode_string, console_output) 1757 1758class DevicesListing(DeviceTest): 1759 1760 serial = subprocess.check_output(['adb', 'get-serialno']).strip().decode("utf-8") 1761 # def get_serial(self): 1762 # return subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip().decode("utf-8") 1763 1764 def test_devices(self): 1765 with subprocess.Popen(['adb', 'devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1766 lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines())) 1767 self.assertEqual(len(lines), 3) 1768 line = lines[1] 1769 self.assertTrue(self.serial in line) 1770 self.assertFalse("{" in line) 1771 self.assertFalse("}" in line) 1772 self.assertTrue("device" in line) 1773 self.assertFalse("product" in line) 1774 self.assertFalse("transport" in line) 1775 1776 def test_devices_l(self): 1777 with subprocess.Popen(['adb', 'devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1778 lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines())) 1779 self.assertEqual(len(lines), 3) 1780 line = lines[1] 1781 self.assertTrue(self.serial in line) 1782 self.assertFalse("{" in line) 1783 self.assertFalse("}" in line) 1784 self.assertTrue("device" in line) 1785 self.assertTrue("product" in line) 1786 self.assertTrue("transport" in line) 1787 1788 def test_track_devices(self): 1789 with subprocess.Popen(['adb', 'track-devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1790 with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader: 1791 output_size = int(reader.read(4), 16) 1792 output = reader.read(output_size) 1793 self.assertFalse("{" in output) 1794 self.assertFalse("}" in output) 1795 self.assertTrue(self.serial in output) 1796 self.assertTrue("device" in output) 1797 self.assertFalse("product" in output) 1798 self.assertFalse("transport" in output) 1799 proc.terminate() 1800 1801 def test_track_devices_l(self): 1802 with subprocess.Popen(['adb', 'track-devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1803 with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader: 1804 output_size = int(reader.read(4), 16) 1805 output = reader.read(output_size) 1806 self.assertFalse("{" in output) 1807 self.assertFalse("}" in output) 1808 self.assertTrue(self.serial in output) 1809 self.assertTrue("device" in output) 1810 self.assertTrue("product" in output) 1811 self.assertTrue("transport" in output) 1812 proc.terminate() 1813 1814 def test_track_devices_proto_text(self): 1815 with subprocess.Popen(['adb', 'track-devices', '--proto-text'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1816 with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader: 1817 output_size = int(reader.read(4), 16) 1818 output = reader.read(output_size) 1819 self.assertTrue("{" in output) 1820 self.assertTrue("}" in output) 1821 self.assertTrue(self.serial in output) 1822 self.assertTrue("device" in output) 1823 self.assertTrue("product" in output) 1824 self.assertTrue("connection_type" in output) 1825 proc.terminate() 1826 1827 def test_track_devices_proto_binary(self): 1828 with subprocess.Popen(['adb', 'track-devices', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1829 1830 output_size = int(proc.stdout.read(4).decode("utf-8"), 16) 1831 proto = proc.stdout.read(output_size) 1832 1833 devices = proto_devices.Devices() 1834 devices.ParseFromString(proto) 1835 1836 device = devices.device[0] 1837 self.assertTrue(device.serial == self.serial) 1838 self.assertFalse(device.bus_address == "") 1839 self.assertFalse(device.product == "") 1840 self.assertFalse(device.model == "") 1841 self.assertFalse(device.device == "") 1842 self.assertTrue(device.negotiated_speed == int(device.negotiated_speed)) 1843 self.assertTrue(device.max_speed == int(device.max_speed)) 1844 self.assertTrue(device.transport_id == int(device.transport_id)) 1845 1846 proc.terminate() 1847 1848class DevicesListing(DeviceTest): 1849 1850 serial = subprocess.check_output(['adb', 'get-serialno']).strip().decode("utf-8") 1851 1852 def test_track_app_appinfo(self): 1853 return # Disabled until b/301491148 is fixed. 1854 # (Exported FeatureFlags cannot be read-only) 1855 subprocess.check_output(['adb', 'install', '-t', 'adb1.apk']).strip().decode("utf-8") 1856 subprocess.check_output(['adb', 'install', '-t', 'adb2.apk']).strip().decode("utf-8") 1857 subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.MainActivity']).strip().decode("utf-8") 1858 subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app2/.MainActivity']).strip().decode("utf-8") 1859 subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.OwnProcessActivity']).strip().decode("utf-8") 1860 with subprocess.Popen(['adb', 'track-app', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1861 output_size = int(proc.stdout.read(4).decode("utf-8"), 16) 1862 proto = proc.stdout.read(output_size) 1863 1864 apps = proto_track_app.AppProcesses() 1865 apps.ParseFromString(proto) 1866 1867 foundAdbAppDefProc = False 1868 foundAdbAppOwnProc = False 1869 for app in apps.process: 1870 if (app.process_name == "adb.test.process.name"): 1871 foundAdbAppDefProc = True 1872 self.assertTrue(app.debuggable) 1873 self.assertTrue("adb.test.app1" in app.package_names) 1874 self.assertTrue("adb.test.app2" in app.package_names) 1875 1876 if (app.process_name == "adb.test.own.process"): 1877 foundAdbAppOwnProc = True 1878 self.assertTrue(app.debuggable) 1879 self.assertTrue("adb.test.app1" in app.package_names) 1880 1881 self.assertTrue(foundAdbAppDefProc) 1882 self.assertTrue(foundAdbAppOwnProc) 1883 proc.terminate() 1884 1885if __name__ == '__main__': 1886 random.seed(0) 1887 unittest.main() 1888