1#
2# Copyright (C) 2016 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#            http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""Provides functionality to interact with a device via `fastboot`."""
17
18import os
19import re
20import subprocess
21
22
23class FastbootError(Exception):
24    """Something went wrong interacting with fastboot."""
25
26
27class FastbootDevice(object):
28    """Class to interact with a fastboot device."""
29
30    # Prefix for INFO-type messages when printed by fastboot. If we want
31    # to parse the output from an INFO message we need to strip this off.
32    INFO_PREFIX = '(bootloader) '
33
34    def __init__(self, path='fastboot'):
35        """Initialization.
36
37        Args:
38            path: path to the fastboot executable to test with.
39
40        Raises:
41            FastbootError: Failed to find a device in fastboot mode.
42        """
43        self.path = path
44
45        # Make sure the fastboot executable is available.
46        try:
47            _subprocess_check_output([self.path, '--version'])
48        except OSError:
49            raise FastbootError('Could not execute `{}`'.format(self.path))
50
51        # Make sure exactly 1 fastboot device is available if <specific device>
52        # was not given as an argument. Do not try to find an adb device and
53        # put it in fastboot mode, it would be too easy to accidentally
54        # download to the wrong device.
55        if not self._check_single_device():
56            raise FastbootError('Requires exactly 1 device in fastboot mode')
57
58    def _check_single_device(self):
59        """Returns True if there is exactly one fastboot device attached.
60           When ANDROID_SERIAL is set it checks that the device is available.
61        """
62
63        if 'ANDROID_SERIAL' in os.environ:
64            try:
65                self.getvar('product')
66                return True
67            except subprocess.CalledProcessError:
68                return False
69        devices = _subprocess_check_output([self.path, 'devices']).splitlines()
70        return len(devices) == 1 and devices[0].split()[1] == 'fastboot'
71
72    def getvar(self, name):
73        """Calls `fastboot getvar`.
74
75        To query all variables (fastboot getvar all) use getvar_all()
76        instead.
77
78        Args:
79            name: variable name to access.
80
81        Returns:
82            String value of variable |name| or None if not found.
83        """
84        try:
85            output = _subprocess_check_output([self.path, 'getvar', name],
86                                             stderr=subprocess.STDOUT).splitlines()
87        except subprocess.CalledProcessError:
88            return None
89        # Output format is <name>:<whitespace><value>.
90        out = 0
91        if output[0] == "< waiting for any device >":
92            out = 1
93        result = re.search(r'{}:\s*(.*)'.format(name), output[out])
94        if result:
95            return result.group(1)
96        else:
97            return None
98
99    def getvar_all(self):
100        """Calls `fastboot getvar all`.
101
102        Returns:
103            A {name, value} dictionary of variables.
104        """
105        output = _subprocess_check_output([self.path, 'getvar', 'all'],
106                                         stderr=subprocess.STDOUT).splitlines()
107        all_vars = {}
108        for line in output:
109            result = re.search(r'(.*):\s*(.*)', line)
110            if result:
111                var_name = result.group(1)
112
113                # `getvar all` works by sending one INFO message per variable
114                # so we need to strip out the info prefix string.
115                if var_name.startswith(self.INFO_PREFIX):
116                    var_name = var_name[len(self.INFO_PREFIX):]
117
118                # In addition to returning all variables the bootloader may
119                # also think it's supposed to query a return a variable named
120                # "all", so ignore this line if so. Fastboot also prints a
121                # summary line that we want to ignore.
122                if var_name != 'all' and 'total time' not in var_name:
123                    all_vars[var_name] = result.group(2)
124        return all_vars
125
126    def flashall(self, wipe_user=True, slot=None, skip_secondary=False, quiet=True):
127        """Calls `fastboot [-w] flashall`.
128
129        Args:
130            wipe_user: whether to set the -w flag or not.
131            slot: slot to flash if device supports A/B, otherwise default will be used.
132            skip_secondary: on A/B devices, flashes only the primary images if true.
133            quiet: True to hide output, false to send it to stdout.
134        """
135        func = (_subprocess_check_output if quiet else subprocess.check_call)
136        command = [self.path, 'flashall']
137        if slot:
138            command.extend(['--slot', slot])
139        if skip_secondary:
140            command.append("--skip-secondary")
141        if wipe_user:
142            command.append('-w')
143        func(command, stderr=subprocess.STDOUT)
144
145    def flash(self, partition='cache', img=None, slot=None, quiet=True):
146        """Calls `fastboot flash`.
147
148        Args:
149            partition: which partition to flash.
150            img: path to .img file, otherwise the default will be used.
151            slot: slot to flash if device supports A/B, otherwise default will be used.
152            quiet: True to hide output, false to send it to stdout.
153        """
154        func = (_subprocess_check_output if quiet else subprocess.check_call)
155        command = [self.path, 'flash', partition]
156        if img:
157            command.append(img)
158        if slot:
159            command.extend(['--slot', slot])
160        if skip_secondary:
161            command.append("--skip-secondary")
162        func(command, stderr=subprocess.STDOUT)
163
164    def reboot(self, bootloader=False):
165        """Calls `fastboot reboot [bootloader]`.
166
167        Args:
168            bootloader: True to reboot back to the bootloader.
169        """
170        command = [self.path, 'reboot']
171        if bootloader:
172            command.append('bootloader')
173        _subprocess_check_output(command, stderr=subprocess.STDOUT)
174
175    def set_active(self, slot):
176        """Calls `fastboot set_active <slot>`.
177
178        Args:
179            slot: The slot to set as the current slot."""
180        command = [self.path, 'set_active', slot]
181        _subprocess_check_output(command, stderr=subprocess.STDOUT)
182
183# If necessary, modifies subprocess.check_output() or subprocess.Popen() args
184# to run the subprocess via Windows PowerShell to work-around an issue in
185# Python 2's subprocess class on Windows where it doesn't support Unicode.
186def _get_subprocess_args(args):
187    # Only do this slow work-around if Unicode is in the cmd line on Windows.
188    # PowerShell takes 600-700ms to startup on a 2013-2014 machine, which is
189    # very slow.
190    if os.name != 'nt' or all(not isinstance(arg, unicode) for arg in args[0]):
191        return args
192
193    def escape_arg(arg):
194        # Escape for the parsing that the C Runtime does in Windows apps. In
195        # particular, this will take care of double-quotes.
196        arg = subprocess.list2cmdline([arg])
197        # Escape single-quote with another single-quote because we're about
198        # to...
199        arg = arg.replace(u"'", u"''")
200        # ...put the arg in a single-quoted string for PowerShell to parse.
201        arg = u"'" + arg + u"'"
202        return arg
203
204    # Escape command line args.
205    argv = map(escape_arg, args[0])
206    # Cause script errors (such as adb not found) to stop script immediately
207    # with an error.
208    ps_code = u'$ErrorActionPreference = "Stop"\r\n'
209    # Add current directory to the PATH var, to match cmd.exe/CreateProcess()
210    # behavior.
211    ps_code += u'$env:Path = ".;" + $env:Path\r\n'
212    # Precede by &, the PowerShell call operator, and separate args by space.
213    ps_code += u'& ' + u' '.join(argv)
214    # Make the PowerShell exit code the exit code of the subprocess.
215    ps_code += u'\r\nExit $LastExitCode'
216    # Encode as UTF-16LE (without Byte-Order-Mark) which Windows natively
217    # understands.
218    ps_code = ps_code.encode('utf-16le')
219
220    # Encode the PowerShell command as base64 and use the special
221    # -EncodedCommand option that base64 decodes. Base64 is just plain ASCII,
222    # so it should have no problem passing through Win32 CreateProcessA()
223    # (which python erroneously calls instead of CreateProcessW()).
224    return (['powershell.exe', '-NoProfile', '-NonInteractive',
225             '-EncodedCommand', base64.b64encode(ps_code)],) + args[1:]
226
227# Call this instead of subprocess.check_output() to work-around issue in Python
228# 2's subprocess class on Windows where it doesn't support Unicode.
229def _subprocess_check_output(*args, **kwargs):
230    try:
231        return subprocess.check_output(*_get_subprocess_args(args), **kwargs)
232    except subprocess.CalledProcessError as e:
233        # Show real command line instead of the powershell.exe command line.
234        raise subprocess.CalledProcessError(e.returncode, args[0],
235                                            output=e.output)
236