1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import io
23import os
24import posixpath
25import random
26import re
27import shlex
28import shutil
29import signal
30import socket
31import string
32import subprocess
33import sys
34import tempfile
35import threading
36import time
37import unittest
38
39import proto.devices_pb2 as proto_devices
40import proto.app_processes_pb2 as proto_track_app
41
42from datetime import datetime
43
44import adb
45
46def requires_non_root(func):
47    def wrapper(self, *args):
48        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
49        if was_root:
50            self.device.unroot()
51            self.device.wait()
52
53        try:
54            func(self, *args)
55        finally:
56            if was_root:
57                self.device.root()
58                self.device.wait()
59
60    return wrapper
61
62
63class DeviceTest(unittest.TestCase):
64    def setUp(self) -> None:
65        self.device = adb.get_device()
66
67
68class AbbTest(DeviceTest):
69    def test_smoke(self):
70        abb = subprocess.run(['adb', 'abb'], capture_output=True)
71        cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True)
72
73        # abb squashes all failures to 1.
74        self.assertEqual(abb.returncode == 0, cmd.returncode == 0)
75        self.assertEqual(abb.stdout, cmd.stdout)
76        self.assertEqual(abb.stderr, cmd.stderr)
77
78class ForwardReverseTest(DeviceTest):
79    def _test_no_rebind(self, description, direction_list, direction,
80                       direction_no_rebind, direction_remove_all):
81        msg = direction_list()
82        self.assertEqual('', msg.strip(),
83                         description + ' list must be empty to run this test.')
84
85        # Use --no-rebind with no existing binding
86        direction_no_rebind('tcp:5566', 'tcp:6655')
87        msg = direction_list()
88        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
89
90        # Use --no-rebind with existing binding
91        with self.assertRaises(subprocess.CalledProcessError):
92            direction_no_rebind('tcp:5566', 'tcp:6677')
93        msg = direction_list()
94        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
95        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
96
97        # Use the absence of --no-rebind with existing binding
98        direction('tcp:5566', 'tcp:6677')
99        msg = direction_list()
100        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
101        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
102
103        direction_remove_all()
104        msg = direction_list()
105        self.assertEqual('', msg.strip())
106
107    def test_forward_no_rebind(self):
108        self._test_no_rebind('forward', self.device.forward_list,
109                            self.device.forward, self.device.forward_no_rebind,
110                            self.device.forward_remove_all)
111
112    def test_reverse_no_rebind(self):
113        self._test_no_rebind('reverse', self.device.reverse_list,
114                            self.device.reverse, self.device.reverse_no_rebind,
115                            self.device.reverse_remove_all)
116
117    def test_forward(self):
118        msg = self.device.forward_list()
119        self.assertEqual('', msg.strip(),
120                         'Forwarding list must be empty to run this test.')
121        self.device.forward('tcp:5566', 'tcp:6655')
122        msg = self.device.forward_list()
123        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
124        self.device.forward('tcp:7788', 'tcp:8877')
125        msg = self.device.forward_list()
126        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
127        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
128        self.device.forward_remove('tcp:5566')
129        msg = self.device.forward_list()
130        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
131        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
132        self.device.forward_remove_all()
133        msg = self.device.forward_list()
134        self.assertEqual('', msg.strip())
135
136    def test_forward_old_protocol(self):
137        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
138
139        msg = self.device.forward_list()
140        self.assertEqual('', msg.strip(),
141                         'Forwarding list must be empty to run this test.')
142
143        with socket.create_connection(("localhost", 5037)) as s:
144            service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
145            cmd = b"%04x%s" % (len(service), service)
146            s.sendall(cmd)
147
148        msg = self.device.forward_list()
149        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
150
151        self.device.forward_remove_all()
152        msg = self.device.forward_list()
153        self.assertEqual('', msg.strip())
154
155    def test_forward_tcp_port_0(self):
156        self.assertEqual('', self.device.forward_list().strip(),
157                         'Forwarding list must be empty to run this test.')
158
159        try:
160            # If resolving TCP port 0 is supported, `adb forward` will print
161            # the actual port number.
162            port = self.device.forward('tcp:0', 'tcp:8888').strip()
163            if not port:
164                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
165
166            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
167                                      self.device.forward_list()))
168        finally:
169            self.device.forward_remove_all()
170
171    def test_reverse(self):
172        msg = self.device.reverse_list()
173        self.assertEqual('', msg.strip(),
174                         'Reverse forwarding list must be empty to run this test.')
175        self.device.reverse('tcp:5566', 'tcp:6655')
176        msg = self.device.reverse_list()
177        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
178        self.device.reverse('tcp:7788', 'tcp:8877')
179        msg = self.device.reverse_list()
180        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
181        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
182        self.device.reverse_remove('tcp:5566')
183        msg = self.device.reverse_list()
184        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
185        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
186        self.device.reverse_remove_all()
187        msg = self.device.reverse_list()
188        self.assertEqual('', msg.strip())
189
190    def test_reverse_tcp_port_0(self):
191        self.assertEqual('', self.device.reverse_list().strip(),
192                         'Reverse list must be empty to run this test.')
193
194        try:
195            # If resolving TCP port 0 is supported, `adb reverse` will print
196            # the actual port number.
197            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
198            if not port:
199                raise unittest.SkipTest('Reversing tcp:0 is not available.')
200
201            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
202                                      self.device.reverse_list()))
203        finally:
204            self.device.reverse_remove_all()
205
206    def test_forward_reverse_echo(self):
207        """Send data through adb forward and read it back via adb reverse"""
208        forward_port = 12345
209        reverse_port = forward_port + 1
210        forward_spec = 'tcp:' + str(forward_port)
211        reverse_spec = 'tcp:' + str(reverse_port)
212        forward_setup = False
213        reverse_setup = False
214
215        try:
216            # listen on localhost:forward_port, connect to remote:forward_port
217            self.device.forward(forward_spec, forward_spec)
218            forward_setup = True
219            # listen on remote:forward_port, connect to localhost:reverse_port
220            self.device.reverse(forward_spec, reverse_spec)
221            reverse_setup = True
222
223            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
224            with contextlib.closing(listener):
225                # Use SO_REUSEADDR so that subsequent runs of the test can grab
226                # the port even if it is in TIME_WAIT.
227                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
228
229                # Listen on localhost:reverse_port before connecting to
230                # localhost:forward_port because that will cause adb to connect
231                # back to localhost:reverse_port.
232                listener.bind(('127.0.0.1', reverse_port))
233                listener.listen(4)
234
235                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
236                with contextlib.closing(client):
237                    # Connect to the listener.
238                    client.connect(('127.0.0.1', forward_port))
239
240                    # Accept the client connection.
241                    accepted_connection, addr = listener.accept()
242                    with contextlib.closing(accepted_connection) as server:
243                        data = b'hello'
244
245                        # Send data into the port setup by adb forward.
246                        client.sendall(data)
247                        # Explicitly close() so that server gets EOF.
248                        client.close()
249
250                        # Verify that the data came back via adb reverse.
251                        self.assertEqual(data, server.makefile().read().encode("utf8"))
252        finally:
253            if reverse_setup:
254                self.device.reverse_remove(forward_spec)
255            if forward_setup:
256                self.device.forward_remove(forward_spec)
257
258
259class ShellTest(DeviceTest):
260    def _interactive_shell(self, shell_args, input):
261        """Runs an interactive adb shell.
262
263        Args:
264          shell_args: List of string arguments to `adb shell`.
265          input: bytes input to send to the interactive shell.
266
267        Returns:
268          The remote exit code.
269
270        Raises:
271          unittest.SkipTest: The device doesn't support exit codes.
272        """
273        if not self.device.has_shell_protocol():
274            raise unittest.SkipTest('exit codes are unavailable on this device')
275
276        proc = subprocess.Popen(
277                self.device.adb_cmd + ['shell'] + shell_args,
278                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
279                stderr=subprocess.PIPE)
280        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
281        # to explicitly add an exit command to close the session from the device
282        # side, plus the necessary newline to complete the interactive command.
283        proc.communicate(input + b'; exit\n')
284        return proc.returncode
285
286    def test_cat(self):
287        """Check that we can at least cat a file."""
288        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
289        elements = out.split()
290        self.assertEqual(len(elements), 2)
291
292        uptime, idle = elements
293        self.assertGreater(float(uptime), 0.0)
294        self.assertGreater(float(idle), 0.0)
295
296    def test_throws_on_failure(self):
297        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
298
299    def test_output_not_stripped(self):
300        out = self.device.shell(['echo', 'foo'])[0]
301        self.assertEqual(out, 'foo' + self.device.linesep)
302
303    def test_shell_command_length(self):
304        # Devices that have shell_v2 should be able to handle long commands.
305        if self.device.has_shell_protocol():
306            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
307            self.assertEqual(rc, 0)
308            self.assertTrue(out == ('x' * 16384 + '\n'))
309
310    def test_shell_nocheck_failure(self):
311        rc, out, _ = self.device.shell_nocheck(['false'])
312        self.assertNotEqual(rc, 0)
313        self.assertEqual(out, '')
314
315    def test_shell_nocheck_output_not_stripped(self):
316        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
317        self.assertEqual(rc, 0)
318        self.assertEqual(out, 'foo' + self.device.linesep)
319
320    def test_can_distinguish_tricky_results(self):
321        # If result checking on ADB shell is naively implemented as
322        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
323        # output from the result for a cmd of `echo -n 1`.
324        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
325        self.assertEqual(rc, 0)
326        self.assertEqual(out, '1')
327
328    def test_line_endings(self):
329        """Ensure that line ending translation is not happening in the pty.
330
331        Bug: http://b/19735063
332        """
333        output = self.device.shell(['uname'])[0]
334        self.assertEqual(output, 'Linux' + self.device.linesep)
335
336    def test_pty_logic(self):
337        """Tests that a PTY is allocated when it should be.
338
339        PTY allocation behavior should match ssh.
340        """
341        def check_pty(args):
342            """Checks adb shell PTY allocation.
343
344            Tests |args| for terminal and non-terminal stdin.
345
346            Args:
347                args: -Tt args in a list (e.g. ['-t', '-t']).
348
349            Returns:
350                A tuple (<terminal>, <non-terminal>). True indicates
351                the corresponding shell allocated a remote PTY.
352            """
353            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
354
355            terminal = subprocess.Popen(
356                    test_cmd, stdin=None,
357                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
358            terminal.communicate()
359
360            non_terminal = subprocess.Popen(
361                    test_cmd, stdin=subprocess.PIPE,
362                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
363            non_terminal.communicate()
364
365            return (terminal.returncode == 0, non_terminal.returncode == 0)
366
367        # -T: never allocate PTY.
368        self.assertEqual((False, False), check_pty(['-T']))
369
370        # These tests require a new device.
371        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
372            # No args: PTY only if stdin is a terminal and shell is interactive,
373            # which is difficult to reliably test from a script.
374            self.assertEqual((False, False), check_pty([]))
375
376            # -t: PTY if stdin is a terminal.
377            self.assertEqual((True, False), check_pty(['-t']))
378
379        # -t -t: always allocate PTY.
380        self.assertEqual((True, True), check_pty(['-t', '-t']))
381
382        # -tt: always allocate PTY, POSIX style (http://b/32216152).
383        self.assertEqual((True, True), check_pty(['-tt']))
384
385        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
386        # we follow the man page instead.
387        self.assertEqual((True, True), check_pty(['-ttt']))
388
389        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
390        self.assertEqual((True, True), check_pty(['-ttx']))
391
392        # -Ttt: -tt cancels out -T.
393        self.assertEqual((True, True), check_pty(['-Ttt']))
394
395        # -ttT: -T cancels out -tt.
396        self.assertEqual((False, False), check_pty(['-ttT']))
397
398    def test_shell_protocol(self):
399        """Tests the shell protocol on the device.
400
401        If the device supports shell protocol, this gives us the ability
402        to separate stdout/stderr and return the exit code directly.
403
404        Bug: http://b/19734861
405        """
406        if not self.device.has_shell_protocol():
407            raise unittest.SkipTest('shell protocol unsupported on this device')
408
409        # Shell protocol should be used by default.
410        result = self.device.shell_nocheck(
411                shlex.split('echo foo; echo bar >&2; exit 17'))
412        self.assertEqual(17, result[0])
413        self.assertEqual('foo' + self.device.linesep, result[1])
414        self.assertEqual('bar' + self.device.linesep, result[2])
415
416        self.assertEqual(17, self._interactive_shell([], b'exit 17'))
417
418        # -x flag should disable shell protocol.
419        result = self.device.shell_nocheck(
420                shlex.split('-x echo foo; echo bar >&2; exit 17'))
421        self.assertEqual(0, result[0])
422        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
423        self.assertEqual('', result[2])
424
425        self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
426
427    def test_non_interactive_sigint(self):
428        """Tests that SIGINT in a non-interactive shell kills the process.
429
430        This requires the shell protocol in order to detect the broken
431        pipe; raw data transfer mode will only see the break once the
432        subprocess tries to read or write.
433
434        Bug: http://b/23825725
435        """
436        if not self.device.has_shell_protocol():
437            raise unittest.SkipTest('shell protocol unsupported on this device')
438
439        # Start a long-running process.
440        sleep_proc = subprocess.Popen(
441                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
442                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
443                stderr=subprocess.STDOUT)
444        remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
445        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
446        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
447
448        # Verify that the process is running, send signal, verify it stopped.
449        self.device.shell(proc_query)
450        os.kill(sleep_proc.pid, signal.SIGINT)
451        sleep_proc.communicate()
452
453        # It can take some time for the process to receive the signal and die.
454        end_time = time.time() + 3
455        while self.device.shell_nocheck(proc_query)[0] != 1:
456            self.assertFalse(time.time() > end_time,
457                             'subprocess failed to terminate in time')
458
459    def test_non_interactive_stdin(self):
460        """Tests that non-interactive shells send stdin."""
461        if not self.device.has_shell_protocol():
462            raise unittest.SkipTest('non-interactive stdin unsupported '
463                                    'on this device')
464
465        # Test both small and large inputs.
466        small_input = b'foo'
467        characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
468        large_input = b'\n'.join(characters)
469
470
471        for input in (small_input, large_input):
472            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
473                                    stdin=subprocess.PIPE,
474                                    stdout=subprocess.PIPE,
475                                    stderr=subprocess.PIPE)
476            stdout, stderr = proc.communicate(input)
477            self.assertEqual(input.splitlines(), stdout.splitlines())
478            self.assertEqual(b'', stderr)
479
480    def test_sighup(self):
481        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
482        log_path = "/data/local/tmp/adb_signal_test.log"
483
484        # Clear the output file.
485        self.device.shell_nocheck(["echo", ">", log_path])
486
487        script = """
488            trap "echo SIGINT > {path}; exit 0" SIGINT
489            trap "echo SIGHUP > {path}; exit 0" SIGHUP
490            echo Waiting
491            read
492        """.format(path=log_path)
493
494        script = ";".join([x.strip() for x in script.strip().splitlines()])
495
496        with self.device.shell_popen([script], kill_atexit=False,
497                                     stdin=subprocess.PIPE,
498                                     stdout=subprocess.PIPE) as process:
499
500            self.assertEqual(b"Waiting\n", process.stdout.readline())
501            process.send_signal(signal.SIGINT)
502            process.wait()
503
504        # Waiting for the local adb to finish is insufficient, since it hangs
505        # up immediately.
506        time.sleep(1)
507
508        stdout, _ = self.device.shell(["cat", log_path])
509        self.assertEqual(stdout.strip(), "SIGHUP")
510
511    # Temporarily disabled because it seems to cause later instability.
512    # http://b/228114748
513    def disabled_test_exit_stress(self):
514        """Hammer `adb shell exit 42` with multiple threads."""
515        thread_count = 48
516        result = dict()
517        def hammer(thread_idx, thread_count, result):
518            success = True
519            for i in range(thread_idx, 240, thread_count):
520                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
521                if ret != i % 256:
522                    success = False
523                    break
524            result[thread_idx] = success
525
526        threads = []
527        for i in range(thread_count):
528            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
529            thread.start()
530            threads.append(thread)
531        for thread in threads:
532            thread.join()
533        for i, success in result.items():
534            self.assertTrue(success)
535
536    def disabled_test_parallel(self):
537        """Spawn a bunch of `adb shell` instances in parallel.
538
539        This was broken historically due to the use of select, which only works
540        for fds that are numerically less than 1024.
541
542        Bug: http://b/141955761"""
543
544        n_procs = 2048
545        procs = dict()
546        for i in range(0, n_procs):
547            procs[i] = subprocess.Popen(
548                ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
549                stdin=subprocess.PIPE,
550                stdout=subprocess.PIPE
551            )
552
553        for i in range(0, n_procs):
554            procs[i].stdin.write("%d\n" % i)
555
556        for i in range(0, n_procs):
557            response = procs[i].stdout.readline()
558            assert(response == "%d\n" % i)
559
560        for i in range(0, n_procs):
561            procs[i].stdin.write("%d\n" % (i % 256))
562
563        for i in range(0, n_procs):
564            assert(procs[i].wait() == i % 256)
565
566
567class ArgumentEscapingTest(DeviceTest):
568    def test_shell_escaping(self):
569        """Make sure that argument escaping is somewhat sane."""
570
571        # http://b/19734868
572        # Note that this actually matches ssh(1)'s behavior --- it's
573        # converted to `sh -c echo hello; echo world` which sh interprets
574        # as `sh -c echo` (with an argument to that shell of "hello"),
575        # and then `echo world` back in the first shell.
576        result = self.device.shell(
577            shlex.split("sh -c 'echo hello; echo world'"))[0]
578        result = result.splitlines()
579        self.assertEqual(['', 'world'], result)
580        # If you really wanted "hello" and "world", here's what you'd do:
581        result = self.device.shell(
582            shlex.split(r'echo hello\;echo world'))[0].splitlines()
583        self.assertEqual(['hello', 'world'], result)
584
585        # http://b/15479704
586        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
587        self.assertEqual('t', result)
588        result = self.device.shell(
589            shlex.split("sh -c 'true && echo t'"))[0].strip()
590        self.assertEqual('t', result)
591
592        # http://b/20564385
593        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
594        self.assertEqual('t', result)
595        result = self.device.shell(
596            shlex.split(r'echo -n 123\;uname'))[0].strip()
597        self.assertEqual('123Linux', result)
598
599    def test_install_argument_escaping(self):
600        """Make sure that install argument escaping works."""
601        # http://b/20323053, http://b/3090932.
602        for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
603            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
604                                             delete=False)
605            tf.close()
606
607            # Installing bogus .apks fails if the device supports exit codes.
608            try:
609                output = self.device.install(tf.name.decode("utf8"))
610            except subprocess.CalledProcessError as e:
611                output = e.output
612
613            self.assertIn(file_suffix, output)
614            os.remove(tf.name)
615
616
617class RootUnrootTest(DeviceTest):
618    def _test_root(self):
619        message = self.device.root()
620        if 'adbd cannot run as root in production builds' in message:
621            return
622        self.device.wait()
623        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
624
625    def _test_unroot(self):
626        self.device.unroot()
627        self.device.wait()
628        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
629
630    def test_root_unroot(self):
631        """Make sure that adb root and adb unroot work, using id(1)."""
632        if self.device.get_prop('ro.debuggable') != '1':
633            raise unittest.SkipTest('requires rootable build')
634
635        original_user = self.device.shell(['id', '-un'])[0].strip()
636        try:
637            if original_user == 'root':
638                self._test_unroot()
639                self._test_root()
640            elif original_user == 'shell':
641                self._test_root()
642                self._test_unroot()
643        finally:
644            if original_user == 'root':
645                self.device.root()
646            else:
647                self.device.unroot()
648            self.device.wait()
649
650
651class TcpIpTest(DeviceTest):
652    def test_tcpip_failure_raises(self):
653        """adb tcpip requires a port.
654
655        Bug: http://b/22636927
656        """
657        self.assertRaises(
658            subprocess.CalledProcessError, self.device.tcpip, '')
659        self.assertRaises(
660            subprocess.CalledProcessError, self.device.tcpip, 'foo')
661
662
663class SystemPropertiesTest(DeviceTest):
664    def test_get_prop(self):
665        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
666
667    def test_set_prop(self):
668        # debug.* prop does not require root privileges
669        prop_name = 'debug.foo'
670        self.device.shell(['setprop', prop_name, '""'])
671
672        val = random.random()
673        self.device.set_prop(prop_name, str(val))
674        self.assertEqual(
675            self.device.shell(['getprop', prop_name])[0].strip(), str(val))
676
677
678def compute_md5(string):
679    hsh = hashlib.md5()
680    hsh.update(string)
681    return hsh.hexdigest()
682
683
684class HostFile(object):
685    def __init__(self, handle, checksum):
686        self.handle = handle
687        self.checksum = checksum
688        self.full_path = handle.name
689        self.base_name = os.path.basename(self.full_path)
690
691
692class DeviceFile(object):
693    def __init__(self, checksum, full_path):
694        self.checksum = checksum
695        self.full_path = full_path
696        self.base_name = posixpath.basename(self.full_path)
697
698
699def make_random_host_files(in_dir, num_files):
700    min_size = 1 * (1 << 10)
701    max_size = 16 * (1 << 10)
702
703    files = []
704    for _ in range(num_files):
705        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
706
707        size = random.randrange(min_size, max_size, 1024)
708        rand_str = os.urandom(size)
709        file_handle.write(rand_str)
710        file_handle.flush()
711        file_handle.close()
712
713        md5 = compute_md5(rand_str)
714        files.append(HostFile(file_handle, md5))
715    return files
716
717
718def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
719    min_size = 1 * (1 << 10)
720    max_size = 16 * (1 << 10)
721
722    files = []
723    for file_num in range(num_files):
724        size = random.randrange(min_size, max_size, 1024)
725
726        base_name = prefix + str(file_num)
727        full_path = posixpath.join(in_dir, base_name)
728
729        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
730                      'bs={}'.format(size), 'count=1'])
731        dev_md5, _ = device.shell(['md5sum', full_path])[0].split()
732
733        files.append(DeviceFile(dev_md5, full_path))
734    return files
735
736
737class FileOperationsTest:
738    class Base(DeviceTest):
739        SCRATCH_DIR = '/data/local/tmp'
740        DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
741        DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
742
743        def setUp(self):
744            super().setUp()
745            self.previous_env = os.environ.get("ADB_COMPRESSION")
746            os.environ["ADB_COMPRESSION"] = self.compression
747
748        def tearDown(self):
749            if self.previous_env is None:
750                del os.environ["ADB_COMPRESSION"]
751            else:
752                os.environ["ADB_COMPRESSION"] = self.previous_env
753
754        def _verify_remote(self, checksum, remote_path):
755            dev_md5, _ = self.device.shell(['md5sum', remote_path])[0].split()
756            self.assertEqual(checksum, dev_md5)
757
758        def _verify_local(self, checksum, local_path):
759            with open(local_path, 'rb') as host_file:
760                host_md5 = compute_md5(host_file.read())
761                self.assertEqual(host_md5, checksum)
762
763        def test_push(self):
764            """Push a randomly generated file to specified device."""
765            kbytes = 512
766            tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
767            rand_str = os.urandom(1024 * kbytes)
768            tmp.write(rand_str)
769            tmp.close()
770
771            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
772            self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
773
774            self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
775            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
776
777            os.remove(tmp.name)
778
779        def test_push_dir(self):
780            """Push a randomly generated directory of files to the device."""
781            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
782            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
783
784            try:
785                host_dir = tempfile.mkdtemp()
786
787                # Make sure the temp directory isn't setuid, or else adb will complain.
788                os.chmod(host_dir, 0o700)
789
790                # Create 32 random files.
791                temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
792                self.device.push(host_dir, self.DEVICE_TEMP_DIR)
793
794                for temp_file in temp_files:
795                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
796                                                 os.path.basename(host_dir),
797                                                 temp_file.base_name)
798                    self._verify_remote(temp_file.checksum, remote_path)
799                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
800            finally:
801                if host_dir is not None:
802                    shutil.rmtree(host_dir)
803
804        def disabled_test_push_empty(self):
805            """Push an empty directory to the device."""
806            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
807            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
808
809            try:
810                host_dir = tempfile.mkdtemp()
811
812                # Make sure the temp directory isn't setuid, or else adb will complain.
813                os.chmod(host_dir, 0o700)
814
815                # Create an empty directory.
816                empty_dir_path = os.path.join(host_dir, 'empty')
817                os.mkdir(empty_dir_path);
818
819                self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
820
821                remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
822                test_empty_cmd = ["[", "-d", remote_path, "]"]
823                rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
824
825                self.assertEqual(rc, 0)
826                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
827            finally:
828                if host_dir is not None:
829                    shutil.rmtree(host_dir)
830
831        @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
832        def test_push_symlink(self):
833            """Push a symlink.
834
835            Bug: http://b/31491920
836            """
837            try:
838                host_dir = tempfile.mkdtemp()
839
840                # Make sure the temp directory isn't setuid, or else adb will
841                # complain.
842                os.chmod(host_dir, 0o700)
843
844                with open(os.path.join(host_dir, 'foo'), 'w') as f:
845                    f.write('foo')
846
847                symlink_path = os.path.join(host_dir, 'symlink')
848                os.symlink('foo', symlink_path)
849
850                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
851                self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
852                self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
853                rc, out, _ = self.device.shell_nocheck(
854                    ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
855                self.assertEqual(0, rc)
856                self.assertEqual(out.strip(), 'foo')
857            finally:
858                if host_dir is not None:
859                    shutil.rmtree(host_dir)
860
861        def test_multiple_push(self):
862            """Push multiple files to the device in one adb push command.
863
864            Bug: http://b/25324823
865            """
866
867            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
868            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
869
870            try:
871                host_dir = tempfile.mkdtemp()
872
873                # Create some random files and a subdirectory containing more files.
874                temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
875
876                subdir = os.path.join(host_dir, 'subdir')
877                os.mkdir(subdir)
878                subdir_temp_files = make_random_host_files(in_dir=subdir,
879                                                           num_files=4)
880
881                paths = [x.full_path for x in temp_files]
882                paths.append(subdir)
883                self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
884
885                for temp_file in temp_files:
886                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
887                                                 temp_file.base_name)
888                    self._verify_remote(temp_file.checksum, remote_path)
889
890                for subdir_temp_file in subdir_temp_files:
891                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
892                                                 # BROKEN: http://b/25394682
893                                                 # 'subdir';
894                                                 temp_file.base_name)
895                    self._verify_remote(temp_file.checksum, remote_path)
896
897
898                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
899            finally:
900                if host_dir is not None:
901                    shutil.rmtree(host_dir)
902
903        @requires_non_root
904        def test_push_error_reporting(self):
905            """Make sure that errors that occur while pushing a file get reported
906
907            Bug: http://b/26816782
908            """
909            with tempfile.NamedTemporaryFile() as tmp_file:
910                tmp_file.write(b'\0' * 1024 * 1024)
911                tmp_file.flush()
912                try:
913                    self.device.push(local=tmp_file.name, remote='/system/')
914                    self.fail('push should not have succeeded')
915                except subprocess.CalledProcessError as e:
916                    output = e.output
917
918                self.assertTrue(b'Permission denied' in output or
919                                b'Read-only file system' in output)
920
921        @requires_non_root
922        def test_push_directory_creation(self):
923            """Regression test for directory creation.
924
925            Bug: http://b/110953234
926            """
927            with tempfile.NamedTemporaryFile() as tmp_file:
928                tmp_file.write(b'\0' * 1024 * 1024)
929                tmp_file.flush()
930                remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
931                self.device.shell(['rm', '-rf', remote_path])
932
933                remote_path += '/filename'
934                self.device.push(local=tmp_file.name, remote=remote_path)
935
936        def disabled_test_push_multiple_slash_root(self):
937            """Regression test for pushing to //data/local/tmp.
938
939            Bug: http://b/141311284
940
941            Disabled because this broken on the adbd side as well: b/141943968
942            """
943            with tempfile.NamedTemporaryFile() as tmp_file:
944                tmp_file.write(b'\0' * 1024 * 1024)
945                tmp_file.flush()
946                remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
947                self.device.shell(['rm', '-rf', remote_path])
948                self.device.push(local=tmp_file.name, remote=remote_path)
949
950        def _test_pull(self, remote_file, checksum):
951            tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
952            tmp_write.close()
953            self.device.pull(remote=remote_file, local=tmp_write.name)
954            with open(tmp_write.name, 'rb') as tmp_read:
955                host_contents = tmp_read.read()
956                host_md5 = compute_md5(host_contents)
957            self.assertEqual(checksum, host_md5)
958            os.remove(tmp_write.name)
959
960        @requires_non_root
961        def test_pull_error_reporting(self):
962            self.device.shell(['touch', self.DEVICE_TEMP_FILE])
963            self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
964
965            try:
966                output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
967            except subprocess.CalledProcessError as e:
968                output = e.output
969
970            self.assertIn(b'Permission denied', output)
971
972            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
973
974        def test_pull(self):
975            """Pull a randomly generated file from specified device."""
976            kbytes = 512
977            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
978            cmd = ['dd', 'if=/dev/urandom',
979                   'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
980                   'count={}'.format(kbytes)]
981            self.device.shell(cmd)
982            dev_md5, _ = self.device.shell(['md5sum', self.DEVICE_TEMP_FILE])[0].split()
983            self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
984            self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
985
986        def test_pull_dir(self):
987            """Pull a randomly generated directory of files from the device."""
988            try:
989                host_dir = tempfile.mkdtemp()
990
991                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
992                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
993
994                # Populate device directory with random files.
995                temp_files = make_random_device_files(
996                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
997
998                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
999
1000                for temp_file in temp_files:
1001                    host_path = os.path.join(
1002                        host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1003                        temp_file.base_name)
1004                    self._verify_local(temp_file.checksum, host_path)
1005
1006                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1007            finally:
1008                if host_dir is not None:
1009                    shutil.rmtree(host_dir)
1010
1011        def test_pull_dir_symlink(self):
1012            """Pull a directory into a symlink to a directory.
1013
1014            Bug: http://b/27362811
1015            """
1016            if os.name != 'posix':
1017                raise unittest.SkipTest('requires POSIX')
1018
1019            try:
1020                host_dir = tempfile.mkdtemp()
1021                real_dir = os.path.join(host_dir, 'dir')
1022                symlink = os.path.join(host_dir, 'symlink')
1023                os.mkdir(real_dir)
1024                os.symlink(real_dir, symlink)
1025
1026                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1027                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1028
1029                # Populate device directory with random files.
1030                temp_files = make_random_device_files(
1031                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1032
1033                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1034
1035                for temp_file in temp_files:
1036                    host_path = os.path.join(
1037                        real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1038                        temp_file.base_name)
1039                    self._verify_local(temp_file.checksum, host_path)
1040
1041                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1042            finally:
1043                if host_dir is not None:
1044                    shutil.rmtree(host_dir)
1045
1046        def test_pull_dir_symlink_collision(self):
1047            """Pull a directory into a colliding symlink to directory."""
1048            if os.name != 'posix':
1049                raise unittest.SkipTest('requires POSIX')
1050
1051            try:
1052                host_dir = tempfile.mkdtemp()
1053                real_dir = os.path.join(host_dir, 'real')
1054                tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1055                symlink = os.path.join(host_dir, tmp_dirname)
1056                os.mkdir(real_dir)
1057                os.symlink(real_dir, symlink)
1058
1059                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1060                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1061
1062                # Populate device directory with random files.
1063                temp_files = make_random_device_files(
1064                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1065
1066                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1067
1068                for temp_file in temp_files:
1069                    host_path = os.path.join(real_dir, temp_file.base_name)
1070                    self._verify_local(temp_file.checksum, host_path)
1071
1072                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1073            finally:
1074                if host_dir is not None:
1075                    shutil.rmtree(host_dir)
1076
1077        def test_pull_dir_nonexistent(self):
1078            """Pull a directory of files from the device to a nonexistent path."""
1079            try:
1080                host_dir = tempfile.mkdtemp()
1081                dest_dir = os.path.join(host_dir, 'dest')
1082
1083                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1084                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1085
1086                # Populate device directory with random files.
1087                temp_files = make_random_device_files(
1088                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1089
1090                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1091
1092                for temp_file in temp_files:
1093                    host_path = os.path.join(dest_dir, temp_file.base_name)
1094                    self._verify_local(temp_file.checksum, host_path)
1095
1096                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1097            finally:
1098                if host_dir is not None:
1099                    shutil.rmtree(host_dir)
1100
1101        # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1102        def disabled_test_pull_symlink_dir(self):
1103            """Pull a symlink to a directory of symlinks to files."""
1104            try:
1105                host_dir = tempfile.mkdtemp()
1106
1107                remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1108                remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1109                remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1110
1111                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1112                self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1113                self.device.shell(['ln', '-s', remote_links, remote_symlink])
1114
1115                # Populate device directory with random files.
1116                temp_files = make_random_device_files(
1117                    self.device, in_dir=remote_dir, num_files=32)
1118
1119                for temp_file in temp_files:
1120                    self.device.shell(
1121                        ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1122                         posixpath.join(remote_links, temp_file.base_name)])
1123
1124                self.device.pull(remote=remote_symlink, local=host_dir)
1125
1126                for temp_file in temp_files:
1127                    host_path = os.path.join(
1128                        host_dir, 'symlink', temp_file.base_name)
1129                    self._verify_local(temp_file.checksum, host_path)
1130
1131                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1132            finally:
1133                if host_dir is not None:
1134                    shutil.rmtree(host_dir)
1135
1136        def test_pull_empty(self):
1137            """Pull a directory containing an empty directory from the device."""
1138            try:
1139                host_dir = tempfile.mkdtemp()
1140
1141                remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1142                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1143                self.device.shell(['mkdir', '-p', remote_empty_path])
1144
1145                self.device.pull(remote=remote_empty_path, local=host_dir)
1146                self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1147            finally:
1148                if host_dir is not None:
1149                    shutil.rmtree(host_dir)
1150
1151        def test_multiple_pull(self):
1152            """Pull a randomly generated directory of files from the device."""
1153
1154            try:
1155                host_dir = tempfile.mkdtemp()
1156
1157                subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1158                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1159                self.device.shell(['mkdir', '-p', subdir])
1160
1161                # Create some random files and a subdirectory containing more files.
1162                temp_files = make_random_device_files(
1163                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1164
1165                subdir_temp_files = make_random_device_files(
1166                    self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1167
1168                paths = [x.full_path for x in temp_files]
1169                paths.append(subdir)
1170                self.device._simple_call(['pull'] + paths + [host_dir])
1171
1172                for temp_file in temp_files:
1173                    local_path = os.path.join(host_dir, temp_file.base_name)
1174                    self._verify_local(temp_file.checksum, local_path)
1175
1176                for subdir_temp_file in subdir_temp_files:
1177                    local_path = os.path.join(host_dir,
1178                                              'subdir',
1179                                              subdir_temp_file.base_name)
1180                    self._verify_local(subdir_temp_file.checksum, local_path)
1181
1182                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1183            finally:
1184                if host_dir is not None:
1185                    shutil.rmtree(host_dir)
1186
1187        def verify_sync(self, device, temp_files, device_dir):
1188            """Verifies that a list of temp files was synced to the device."""
1189            # Confirm that every file on the device mirrors that on the host.
1190            for temp_file in temp_files:
1191                device_full_path = posixpath.join(
1192                    device_dir, temp_file.base_name)
1193                dev_md5, _ = device.shell(['md5sum', device_full_path])[0].split()
1194                self.assertEqual(temp_file.checksum, dev_md5)
1195
1196        def test_sync(self):
1197            """Sync a host directory to the data partition."""
1198
1199            try:
1200                base_dir = tempfile.mkdtemp()
1201
1202                # Create mirror device directory hierarchy within base_dir.
1203                full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1204                os.makedirs(full_dir_path)
1205
1206                # Create 32 random files within the host mirror.
1207                temp_files = make_random_host_files(
1208                    in_dir=full_dir_path, num_files=32)
1209
1210                # Clean up any stale files on the device.
1211                device = adb.get_device()  # pylint: disable=no-member
1212                device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1213
1214                old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1215                os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1216                device.sync('data')
1217                if old_product_out is None:
1218                    del os.environ['ANDROID_PRODUCT_OUT']
1219                else:
1220                    os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1221
1222                self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1223
1224                #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1225            finally:
1226                if base_dir is not None:
1227                    shutil.rmtree(base_dir)
1228
1229        def test_push_sync(self):
1230            """Sync a host directory to a specific path."""
1231
1232            try:
1233                temp_dir = tempfile.mkdtemp()
1234                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1235
1236                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1237
1238                # Clean up any stale files on the device.
1239                device = adb.get_device()  # pylint: disable=no-member
1240                device.shell(['rm', '-rf', device_dir])
1241
1242                device.push(temp_dir, device_dir, sync=True)
1243
1244                self.verify_sync(device, temp_files, device_dir)
1245
1246                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1247            finally:
1248                if temp_dir is not None:
1249                    shutil.rmtree(temp_dir)
1250
1251        def test_push_sync_multiple(self):
1252            """Sync multiple host directories to a specific path."""
1253
1254            try:
1255                temp_dir = tempfile.mkdtemp()
1256                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1257
1258                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1259
1260                # Clean up any stale files on the device.
1261                device = adb.get_device()  # pylint: disable=no-member
1262                device.shell(['rm', '-rf', device_dir])
1263                device.shell(['mkdir', '-p', device_dir])
1264
1265                host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files]
1266                device.push(host_paths, device_dir, sync=True)
1267
1268                self.verify_sync(device, temp_files, device_dir)
1269
1270                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1271            finally:
1272                if temp_dir is not None:
1273                    shutil.rmtree(temp_dir)
1274
1275
1276        def test_push_dry_run_nonexistent_file(self):
1277            """Push with dry run (non-existent file)."""
1278
1279            for file_size in [8, 1024 * 1024]:
1280                try:
1281                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1282                    device_file = posixpath.join(device_dir, 'file')
1283
1284                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1285                    self.device.shell(['mkdir', '-p', device_dir])
1286
1287                    host_dir = tempfile.mkdtemp()
1288                    host_file = posixpath.join(host_dir, 'file')
1289
1290                    with open(host_file, "w") as f:
1291                        f.write('x' * file_size)
1292
1293                    self.device._simple_call(['push', '-n', host_file, device_file])
1294                    rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']'])
1295                    self.assertNotEqual(0, rc)
1296
1297                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1298                finally:
1299                    if host_dir is not None:
1300                        shutil.rmtree(host_dir)
1301
1302        def test_push_dry_run_existent_file(self):
1303            """Push with dry run."""
1304
1305            for file_size in [8, 1024 * 1024]:
1306                try:
1307                    host_dir = tempfile.mkdtemp()
1308                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1309                    device_file = posixpath.join(device_dir, 'file')
1310
1311                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1312                    self.device.shell(['mkdir', '-p', device_dir])
1313                    self.device.shell(['echo', 'foo', '>', device_file])
1314
1315                    host_file = posixpath.join(host_dir, 'file')
1316
1317                    with open(host_file, "w") as f:
1318                        f.write('x' * file_size)
1319
1320                    self.device._simple_call(['push', '-n', host_file, device_file])
1321                    stdout, stderr = self.device.shell(['cat', device_file])
1322                    self.assertEqual(stdout.strip(), "foo")
1323
1324                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1325                finally:
1326                    if host_dir is not None:
1327                        shutil.rmtree(host_dir)
1328
1329        def test_unicode_paths(self):
1330            """Ensure that we can support non-ASCII paths, even on Windows."""
1331            name = u'로보카 폴리'
1332
1333            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1334            remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1335
1336            ## push.
1337            tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1338            tf.close()
1339            self.device.push(tf.name, remote_path)
1340            os.remove(tf.name)
1341            self.assertFalse(os.path.exists(tf.name))
1342
1343            # Verify that the device ended up with the expected UTF-8 path
1344            output = self.device.shell(
1345                    ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1346            self.assertEqual(remote_path, output)
1347
1348            # pull.
1349            self.device.pull(remote_path, tf.name)
1350            self.assertTrue(os.path.exists(tf.name))
1351            os.remove(tf.name)
1352            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1353
1354
1355class FileOperationsTestUncompressed(FileOperationsTest.Base):
1356    compression = "none"
1357
1358
1359class FileOperationsTestBrotli(FileOperationsTest.Base):
1360    compression = "brotli"
1361
1362
1363class FileOperationsTestLZ4(FileOperationsTest.Base):
1364    compression = "lz4"
1365
1366
1367class FileOperationsTestZstd(FileOperationsTest.Base):
1368    compression = "zstd"
1369
1370
1371class DeviceOfflineTest(DeviceTest):
1372    def _get_device_state(self, serialno):
1373        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1374        for line in output.split('\n'):
1375            m = re.match('(\S+)\s+(\S+)', line)
1376            if m and m.group(1) == serialno:
1377                return m.group(2)
1378        return None
1379
1380    def disabled_test_killed_when_pushing_a_large_file(self):
1381        """
1382           While running adb push with a large file, kill adb server.
1383           Occasionally the device becomes offline. Because the device is still
1384           reading data without realizing that the adb server has been restarted.
1385           Test if we can bring the device online automatically now.
1386           http://b/32952319
1387        """
1388        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1389        # 1. Push a large file
1390        file_path = 'tmp_large_file'
1391        try:
1392            fh = open(file_path, 'w')
1393            fh.write('\0' * (100 * 1024 * 1024))
1394            fh.close()
1395            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1396            time.sleep(0.1)
1397            # 2. Kill the adb server
1398            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1399            subproc.terminate()
1400        finally:
1401            try:
1402                os.unlink(file_path)
1403            except:
1404                pass
1405        # 3. See if the device still exist.
1406        # Sleep to wait for the adb server exit.
1407        time.sleep(0.5)
1408        # 4. The device should be online
1409        self.assertEqual(self._get_device_state(serialno), 'device')
1410
1411    def disabled_test_killed_when_pulling_a_large_file(self):
1412        """
1413           While running adb pull with a large file, kill adb server.
1414           Occasionally the device can't be connected. Because the device is trying to
1415           send a message larger than what is expected by the adb server.
1416           Test if we can bring the device online automatically now.
1417        """
1418        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1419        file_path = 'tmp_large_file'
1420        try:
1421            # 1. Create a large file on device.
1422            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1423                               'bs=1000000', 'count=100'])
1424            # 2. Pull the large file on host.
1425            subproc = subprocess.Popen(self.device.adb_cmd +
1426                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1427            time.sleep(0.1)
1428            # 3. Kill the adb server
1429            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1430            subproc.terminate()
1431        finally:
1432            try:
1433                os.unlink(file_path)
1434            except:
1435                pass
1436        # 4. See if the device still exist.
1437        # Sleep to wait for the adb server exit.
1438        time.sleep(0.5)
1439        self.assertEqual(self._get_device_state(serialno), 'device')
1440
1441
1442    def test_packet_size_regression(self):
1443        """Test for http://b/37783561
1444
1445        Receiving packets of a length divisible by 512 but not 1024 resulted in
1446        the adb client waiting indefinitely for more input.
1447        """
1448        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1449        # Probe some surrounding values as well, for the hell of it.
1450        for base in [512] + list(range(1024, 1024 * 16, 1024)):
1451            for offset in [-6, -5, -4]:
1452                length = base + offset
1453                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1454                       'echo', 'foo']
1455                rc, stdout, _ = self.device.shell_nocheck(cmd)
1456
1457                self.assertEqual(0, rc)
1458
1459                # Output should be '\0' * length, followed by "foo\n"
1460                self.assertEqual(length, len(stdout) - 4)
1461                self.assertEqual(stdout, "\0" * length + "foo\n")
1462
1463    def test_zero_packet(self):
1464        """Test for http://b/113070258
1465
1466        Make sure that we don't blow up when sending USB transfers that line up
1467        exactly with the USB packet size.
1468        """
1469
1470        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1471        try:
1472            for size in [512, 1024]:
1473                def listener():
1474                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1475                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1476
1477                thread = threading.Thread(target=listener)
1478                thread.start()
1479
1480                # Wait a bit to let the shell command start.
1481                time.sleep(0.25)
1482
1483                sock = socket.create_connection(("localhost", local_port))
1484                with contextlib.closing(sock):
1485                    bytesWritten = sock.send(b"a" * size)
1486                    self.assertEqual(size, bytesWritten)
1487                    readBytes = sock.recv(4096)
1488                    self.assertEqual(b"foo\n", readBytes)
1489
1490                thread.join()
1491        finally:
1492            self.device.forward_remove("tcp:{}".format(local_port))
1493
1494
1495class SocketTest(DeviceTest):
1496    def test_socket_flush(self):
1497        """Test that we handle socket closure properly.
1498
1499        If we're done writing to a socket, closing before the other end has
1500        closed will send a TCP_RST if we have incoming data queued up, which
1501        may result in data that we've written being discarded.
1502
1503        Bug: http://b/74616284
1504        """
1505        def adb_length_prefixed(string):
1506            encoded = string.encode("utf8")
1507            result = b"%04x%s" % (len(encoded), encoded)
1508            return result
1509
1510        if "ANDROID_SERIAL" in os.environ:
1511            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1512        else:
1513            transport_string = "host:transport-any"
1514
1515        with socket.create_connection(("localhost", 5037)) as s:
1516
1517            s.sendall(adb_length_prefixed(transport_string))
1518            response = s.recv(4)
1519            self.assertEqual(b"OKAY", response)
1520
1521            shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1522            s.sendall(adb_length_prefixed(shell_string))
1523
1524            response = s.recv(4)
1525            self.assertEqual(b"OKAY", response)
1526
1527            # Spawn a thread that dumps garbage into the socket until failure.
1528            def spam():
1529                buf = b"\0" * 16384
1530                try:
1531                    while True:
1532                        s.sendall(buf)
1533                except Exception as ex:
1534                    print(ex)
1535
1536            thread = threading.Thread(target=spam)
1537            thread.start()
1538
1539            time.sleep(1)
1540
1541            received = b""
1542            while True:
1543                read = s.recv(512)
1544                if len(read) == 0:
1545                    break
1546                received += read
1547
1548        self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
1549        thread.join()
1550
1551
1552class FramebufferTest(DeviceTest):
1553    def test_framebuffer(self):
1554        """Test that we get something from the framebuffer service."""
1555        output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"])
1556        self.assertFalse(len(output) == 0)
1557
1558
1559if sys.platform == "win32":
1560    # From https://stackoverflow.com/a/38749458
1561    import os
1562    import contextlib
1563    import msvcrt
1564    import ctypes
1565    from ctypes import wintypes
1566
1567    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1568
1569    GENERIC_READ  = 0x80000000
1570    GENERIC_WRITE = 0x40000000
1571    FILE_SHARE_READ  = 1
1572    FILE_SHARE_WRITE = 2
1573    CONSOLE_TEXTMODE_BUFFER = 1
1574    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1575    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1576    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1577
1578    def _check_zero(result, func, args):
1579        if not result:
1580            raise ctypes.WinError(ctypes.get_last_error())
1581        return args
1582
1583    def _check_invalid(result, func, args):
1584        if result == INVALID_HANDLE_VALUE:
1585            raise ctypes.WinError(ctypes.get_last_error())
1586        return args
1587
1588    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1589        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1590        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1591
1592    class COORD(ctypes.Structure):
1593        _fields_ = (('X', wintypes.SHORT),
1594                    ('Y', wintypes.SHORT))
1595
1596    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1597        _fields_ = (('cbSize',               wintypes.ULONG),
1598                    ('dwSize',               COORD),
1599                    ('dwCursorPosition',     COORD),
1600                    ('wAttributes',          wintypes.WORD),
1601                    ('srWindow',             wintypes.SMALL_RECT),
1602                    ('dwMaximumWindowSize',  COORD),
1603                    ('wPopupAttributes',     wintypes.WORD),
1604                    ('bFullscreenSupported', wintypes.BOOL),
1605                    ('ColorTable',           wintypes.DWORD * 16))
1606        def __init__(self, *args, **kwds):
1607            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1608                    *args, **kwds)
1609            self.cbSize = ctypes.sizeof(self)
1610
1611    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1612                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1613    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1614
1615    kernel32.GetStdHandle.errcheck = _check_invalid
1616    kernel32.GetStdHandle.restype = wintypes.HANDLE
1617    kernel32.GetStdHandle.argtypes = (
1618        wintypes.DWORD,) # _In_ nStdHandle
1619
1620    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1621    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1622    kernel32.CreateConsoleScreenBuffer.argtypes = (
1623        wintypes.DWORD,        # _In_       dwDesiredAccess
1624        wintypes.DWORD,        # _In_       dwShareMode
1625        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1626        wintypes.DWORD,        # _In_       dwFlags
1627        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1628
1629    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1630    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1631        wintypes.HANDLE,               # _In_  hConsoleOutput
1632        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1633
1634    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1635    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1636        wintypes.HANDLE,               # _In_  hConsoleOutput
1637        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1638
1639    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1640    kernel32.SetConsoleWindowInfo.argtypes = (
1641        wintypes.HANDLE,      # _In_ hConsoleOutput
1642        wintypes.BOOL,        # _In_ bAbsolute
1643        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1644
1645    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1646    kernel32.FillConsoleOutputCharacterW.argtypes = (
1647        wintypes.HANDLE,  # _In_  hConsoleOutput
1648        wintypes.WCHAR,   # _In_  cCharacter
1649        wintypes.DWORD,   # _In_  nLength
1650        COORD,            # _In_  dwWriteCoord
1651        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1652
1653    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1654    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1655        wintypes.HANDLE,  # _In_  hConsoleOutput
1656        wintypes.LPWSTR,  # _Out_ lpCharacter
1657        wintypes.DWORD,   # _In_  nLength
1658        COORD,            # _In_  dwReadCoord
1659        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1660
1661    @contextlib.contextmanager
1662    def allocate_console():
1663        allocated = kernel32.AllocConsole()
1664        try:
1665            yield allocated
1666        finally:
1667            if allocated:
1668                kernel32.FreeConsole()
1669
1670    @contextlib.contextmanager
1671    def console_screen(ncols=None, nrows=None):
1672        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1673        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1674        nwritten = (wintypes.DWORD * 1)()
1675        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1676        kernel32.GetConsoleScreenBufferInfoEx(
1677               hStdOut, ctypes.byref(info))
1678        if ncols is None:
1679            ncols = info.dwSize.X
1680        if nrows is None:
1681            nrows = info.dwSize.Y
1682        elif nrows > 9999:
1683            raise ValueError('nrows must be 9999 or less')
1684        fd_screen = None
1685        hScreen = kernel32.CreateConsoleScreenBuffer(
1686                    GENERIC_READ | GENERIC_WRITE,
1687                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1688                    None, CONSOLE_TEXTMODE_BUFFER, None)
1689        try:
1690            fd_screen = msvcrt.open_osfhandle(
1691                            hScreen, os.O_RDWR | os.O_BINARY)
1692            kernel32.GetConsoleScreenBufferInfoEx(
1693                   hScreen, ctypes.byref(new_info))
1694            new_info.dwSize = COORD(ncols, nrows)
1695            new_info.srWindow = wintypes.SMALL_RECT(
1696                    Left=0, Top=0, Right=(ncols - 1),
1697                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1698            kernel32.SetConsoleScreenBufferInfoEx(
1699                    hScreen, ctypes.byref(new_info))
1700            kernel32.SetConsoleWindowInfo(hScreen, True,
1701                    ctypes.byref(new_info.srWindow))
1702            kernel32.FillConsoleOutputCharacterW(
1703                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1704            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1705            try:
1706                yield fd_screen
1707            finally:
1708                kernel32.SetConsoleScreenBufferInfoEx(
1709                    hStdOut, ctypes.byref(info))
1710                kernel32.SetConsoleWindowInfo(hStdOut, True,
1711                        ctypes.byref(info.srWindow))
1712                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1713        finally:
1714            if fd_screen is not None:
1715                os.close(fd_screen)
1716            else:
1717                kernel32.CloseHandle(hScreen)
1718
1719    def read_screen(fd):
1720        hScreen = msvcrt.get_osfhandle(fd)
1721        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1722        kernel32.GetConsoleScreenBufferInfoEx(
1723            hScreen, ctypes.byref(csbi))
1724        ncols = csbi.dwSize.X
1725        pos = csbi.dwCursorPosition
1726        length = ncols * pos.Y + pos.X + 1
1727        buf = (ctypes.c_wchar * length)()
1728        n = (wintypes.DWORD * 1)()
1729        kernel32.ReadConsoleOutputCharacterW(
1730            hScreen, buf, length, COORD(0,0), n)
1731        lines = [buf[i:i+ncols].rstrip(u'\0')
1732                    for i in range(0, n[0], ncols)]
1733        return u'\n'.join(lines)
1734
1735@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1736class WindowsConsoleTest(DeviceTest):
1737    def test_unicode_output(self):
1738        """Test Unicode command line parameters and Unicode console window output.
1739
1740        Bug: https://issuetracker.google.com/issues/111972753
1741        """
1742        # If we don't have a console window, allocate one. This isn't necessary if we're already
1743        # being run from a console window, which is typical.
1744        with allocate_console() as allocated_console:
1745            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1746            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1747            # likely unnecessary given the typical console window size.
1748            with console_screen(nrows=1000) as screen:
1749                unicode_string = u'로보카 폴리'
1750                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1751                # device.shell_popen() which does not use a pipe, unlike device.shell().
1752                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1753                process.wait()
1754                # Read what was written by adb to the temporary console buffer.
1755                console_output = read_screen(screen)
1756                self.assertEqual(unicode_string, console_output)
1757
1758class DevicesListing(DeviceTest):
1759
1760    serial = subprocess.check_output(['adb', 'get-serialno']).strip().decode("utf-8")
1761    # def get_serial(self):
1762    #     return subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip().decode("utf-8")
1763
1764    def test_devices(self):
1765        with subprocess.Popen(['adb', 'devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1766            lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines()))
1767            self.assertEqual(len(lines), 3)
1768            line = lines[1]
1769            self.assertTrue(self.serial in line)
1770            self.assertFalse("{" in line)
1771            self.assertFalse("}" in line)
1772            self.assertTrue("device" in line)
1773            self.assertFalse("product" in line)
1774            self.assertFalse("transport" in line)
1775
1776    def test_devices_l(self):
1777        with subprocess.Popen(['adb', 'devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1778            lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines()))
1779            self.assertEqual(len(lines), 3)
1780            line = lines[1]
1781            self.assertTrue(self.serial in line)
1782            self.assertFalse("{" in line)
1783            self.assertFalse("}" in line)
1784            self.assertTrue("device" in line)
1785            self.assertTrue("product" in line)
1786            self.assertTrue("transport" in line)
1787
1788    def test_track_devices(self):
1789        with subprocess.Popen(['adb', 'track-devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1790            with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader:
1791                output_size = int(reader.read(4), 16)
1792                output = reader.read(output_size)
1793                self.assertFalse("{" in output)
1794                self.assertFalse("}" in output)
1795                self.assertTrue(self.serial in output)
1796                self.assertTrue("device" in output)
1797                self.assertFalse("product" in output)
1798                self.assertFalse("transport" in output)
1799            proc.terminate()
1800
1801    def test_track_devices_l(self):
1802        with subprocess.Popen(['adb', 'track-devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1803            with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader:
1804                output_size = int(reader.read(4), 16)
1805                output = reader.read(output_size)
1806                self.assertFalse("{" in output)
1807                self.assertFalse("}" in output)
1808                self.assertTrue(self.serial in output)
1809                self.assertTrue("device" in output)
1810                self.assertTrue("product" in output)
1811                self.assertTrue("transport" in output)
1812            proc.terminate()
1813
1814    def test_track_devices_proto_text(self):
1815        with subprocess.Popen(['adb', 'track-devices', '--proto-text'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1816            with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader:
1817                output_size = int(reader.read(4), 16)
1818                output = reader.read(output_size)
1819                self.assertTrue("{" in output)
1820                self.assertTrue("}" in output)
1821                self.assertTrue(self.serial in output)
1822                self.assertTrue("device" in output)
1823                self.assertTrue("product" in output)
1824                self.assertTrue("connection_type" in output)
1825            proc.terminate()
1826
1827    def test_track_devices_proto_binary(self):
1828        with subprocess.Popen(['adb', 'track-devices', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1829
1830            output_size = int(proc.stdout.read(4).decode("utf-8"), 16)
1831            proto = proc.stdout.read(output_size)
1832
1833            devices = proto_devices.Devices()
1834            devices.ParseFromString(proto)
1835
1836            device = devices.device[0]
1837            self.assertTrue(device.serial == self.serial)
1838            self.assertFalse(device.bus_address == "")
1839            self.assertFalse(device.product == "")
1840            self.assertFalse(device.model == "")
1841            self.assertFalse(device.device == "")
1842            self.assertTrue(device.negotiated_speed == int(device.negotiated_speed))
1843            self.assertTrue(device.max_speed == int(device.max_speed))
1844            self.assertTrue(device.transport_id == int(device.transport_id))
1845
1846            proc.terminate()
1847
1848class DevicesListing(DeviceTest):
1849
1850    serial = subprocess.check_output(['adb', 'get-serialno']).strip().decode("utf-8")
1851
1852    def test_track_app_appinfo(self):
1853        return # Disabled until b/301491148 is fixed.
1854        # (Exported FeatureFlags cannot be read-only)
1855        subprocess.check_output(['adb', 'install', '-t', 'adb1.apk']).strip().decode("utf-8")
1856        subprocess.check_output(['adb', 'install', '-t', 'adb2.apk']).strip().decode("utf-8")
1857        subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.MainActivity']).strip().decode("utf-8")
1858        subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app2/.MainActivity']).strip().decode("utf-8")
1859        subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.OwnProcessActivity']).strip().decode("utf-8")
1860        with subprocess.Popen(['adb', 'track-app', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1861            output_size = int(proc.stdout.read(4).decode("utf-8"), 16)
1862            proto = proc.stdout.read(output_size)
1863
1864            apps = proto_track_app.AppProcesses()
1865            apps.ParseFromString(proto)
1866
1867            foundAdbAppDefProc = False
1868            foundAdbAppOwnProc = False
1869            for app in apps.process:
1870                if (app.process_name == "adb.test.process.name"):
1871                    foundAdbAppDefProc = True
1872                    self.assertTrue(app.debuggable)
1873                    self.assertTrue("adb.test.app1" in app.package_names)
1874                    self.assertTrue("adb.test.app2" in app.package_names)
1875
1876                if (app.process_name == "adb.test.own.process"):
1877                    foundAdbAppOwnProc = True
1878                    self.assertTrue(app.debuggable)
1879                    self.assertTrue("adb.test.app1" in app.package_names)
1880
1881            self.assertTrue(foundAdbAppDefProc)
1882            self.assertTrue(foundAdbAppOwnProc)
1883            proc.terminate()
1884
1885if __name__ == '__main__':
1886    random.seed(0)
1887    unittest.main()
1888