1#!/usr/bin/env python
2#
3# Copyright 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7#
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Helper tool for performing an authenticated AVB unlock of an Android Things device.
18
19This tool communicates with an Android Things device over fastboot to perform an
20authenticated AVB unlock. The user provides unlock credentials valid for the
21device they want to unlock, likely obtained from the Android Things Developer
22Console. The tool handles the sequence of fastboot commands to complete the
23challenge-response unlock protocol.
24
25Unlock credentials can be provided to the tool in one of two ways:
26
27  1) by providing paths to the individual credential files using the
28     '--pik_cert', '--puk_cert', and '--puk' command line swtiches, or
29
30  2) by providing a path to a zip archive containing the three credential files,
31     named as follows:
32       - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin'
33       - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin'
34       - PUK private key: 'puk.*\.pem'
35
36     You can also provide one or more archives and/or one or more directories
37     containing such zip archives. In either scenario, the tool will search all
38     of the provided credential archives for a match against the product ID of
39     the device being unlocked and automatically use the first match.
40
41Dependencies:
42  - Python 2.7.x, 3.2.x, or newer (for argparse)
43  - PyCrypto 2.5 or newer (for PKCS1_v1_5 and RSA PKCS#8 PEM key import)
44  - Android SDK Platform Tools (for fastboot), in PATH
45    - https://developer.android.com/studio/releases/platform-tools
46"""
47
48HELP_DESCRIPTION = """Performs an authenticated AVB unlock of an Android Things device over
49fastboot, given valid unlock credentials for the device."""
50
51HELP_USAGE = """
52  %(prog)s [-h] [-v] [-s SERIAL] unlock_creds.zip [unlock_creds_2.zip ...]
53  %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem"""
54
55HELP_EPILOG = """examples:
56  %(prog)s unlock_creds.zip
57  %(prog)s unlock_creds.zip unlock_creds_2.zip -s SERIAL
58  %(prog)s path_to_dir_with_multiple_unlock_creds/
59  %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem"""
60
61import sys
62
63ver = sys.version_info
64if (ver[0] < 2) or (ver[0] == 2 and ver[1] < 7) or (ver[0] == 3 and ver[1] < 2):
65  print('This script requires Python 2.7+ or 3.2+')
66  sys.exit(1)
67
68import argparse
69import binascii
70import os
71import re
72import shutil
73import struct
74import subprocess
75import tempfile
76import zipfile
77
78# Requires PyCrypto 2.5 (or newer) for PKCS1_v1_5 and support for importing
79# PEM-encoded RSA keys
80try:
81  from Crypto.Hash import SHA512
82  from Crypto.PublicKey import RSA
83  from Crypto.Signature import PKCS1_v1_5
84except ImportError as e:
85  print('PyCrypto 2.5 or newer required, missing or too old: ' + str(e))
86
87
88class UnlockCredentials(object):
89  """Helper data container class for the 3 unlock credentials involved in an AVB authenticated unlock operation.
90
91  """
92
93  def __init__(self,
94               intermediate_cert_file,
95               unlock_cert_file,
96               unlock_key_file,
97               source_file=None):
98    # The certificates are AvbAtxCertificate structs as defined in libavb_atx,
99    # not an X.509 certificate. Do a basic length sanity check when reading
100    # them.
101    EXPECTED_CERTIFICATE_SIZE = 1620
102
103    with open(intermediate_cert_file, 'rb') as f:
104      self._intermediate_cert = f.read()
105    if len(self._intermediate_cert) != EXPECTED_CERTIFICATE_SIZE:
106      raise ValueError('Invalid intermediate key certificate length.')
107
108    with open(unlock_cert_file, 'rb') as f:
109      self._unlock_cert = f.read()
110    if len(self._unlock_cert) != EXPECTED_CERTIFICATE_SIZE:
111      raise ValueError('Invalid product unlock key certificate length.')
112
113    with open(unlock_key_file, 'rb') as f:
114      self._unlock_key = RSA.importKey(f.read())
115      if not self._unlock_key.has_private():
116        raise ValueError('Unlock key was not an RSA private key.')
117
118    self._source_file = source_file
119
120  @property
121  def intermediate_cert(self):
122    return self._intermediate_cert
123
124  @property
125  def unlock_cert(self):
126    return self._unlock_cert
127
128  @property
129  def unlock_key(self):
130    return self._unlock_key
131
132  @property
133  def source_file(self):
134    return self._source_file
135
136  @classmethod
137  def from_credential_archive(cls, archive):
138    """Create UnlockCredentials from an unlock credential zip archive.
139
140    The zip archive must contain the following three credential files, named as
141    follows:
142      - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin'
143      - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin'
144      - PUK private key: 'puk.*\.pem'
145
146    This uses @contextlib.contextmanager so we can clean up the tempdir created
147    to unpack the zip contents into.
148
149    Arguments:
150      - archive: Filename of zip archive containing unlock credentials.
151
152    Raises:
153      ValueError: If archive is either missing a required file or contains
154      multiple files matching one of the filename formats.
155    """
156
157    def _find_one_match(contents, regex, desc):
158      r = re.compile(regex)
159      matches = list(filter(r.search, contents))
160      if not matches:
161        raise ValueError(
162            "Couldn't find {} file (matching regex '{}') in archive {}".format(
163                desc, regex, archive))
164      elif len(matches) > 1:
165        raise ValueError(
166            "Found multiple files for {} (matching regex '{}') in archive {}"
167            .format(desc, regex, archive))
168      return matches[0]
169
170    tempdir = tempfile.mkdtemp()
171    try:
172      with zipfile.ZipFile(archive, mode='r') as zip:
173        contents = zip.namelist()
174
175        pik_cert_re = r'^pik_certificate.*\.bin$'
176        pik_cert = _find_one_match(contents, pik_cert_re,
177                                   'intermediate key (PIK) certificate')
178
179        puk_cert_re = r'^puk_certificate.*\.bin$'
180        puk_cert = _find_one_match(contents, puk_cert_re,
181                                   'unlock key (PUK) certificate')
182
183        puk_re = r'^puk.*\.pem$'
184        puk = _find_one_match(contents, puk_re, 'unlock key (PUK)')
185
186        zip.extractall(path=tempdir, members=[pik_cert, puk_cert, puk])
187
188        return cls(
189            intermediate_cert_file=os.path.join(tempdir, pik_cert),
190            unlock_cert_file=os.path.join(tempdir, puk_cert),
191            unlock_key_file=os.path.join(tempdir, puk),
192            source_file=archive)
193    finally:
194      shutil.rmtree(tempdir)
195
196
197class UnlockChallenge(object):
198  """Helper class for parsing the AvbAtxUnlockChallenge struct returned from 'fastboot oem at-get-vboot-unlock-challenge'.
199
200     The file provided to the constructor should be the full 52-byte
201     AvbAtxUnlockChallenge struct, not just the challenge itself.
202  """
203
204  def __init__(self, challenge_file):
205    CHALLENGE_STRUCT_SIZE = 52
206    PRODUCT_ID_HASH_SIZE = 32
207    CHALLENGE_DATA_SIZE = 16
208    with open(challenge_file, 'rb') as f:
209      data = f.read()
210      if len(data) != CHALLENGE_STRUCT_SIZE:
211        raise ValueError('Invalid unlock challenge length.')
212
213      self._version, self._product_id_hash, self._challenge_data = struct.unpack(
214          '<I{}s{}s'.format(PRODUCT_ID_HASH_SIZE, CHALLENGE_DATA_SIZE), data)
215
216  @property
217  def version(self):
218    return self._version
219
220  @property
221  def product_id_hash(self):
222    return self._product_id_hash
223
224  @property
225  def challenge_data(self):
226    return self._challenge_data
227
228
229def GetAtxCertificateSubject(cert):
230  """Parses and returns the subject field from the given AvbAtxCertificate struct."""
231  CERT_SUBJECT_OFFSET = 4 + 1032  # Format version and public key come before subject
232  CERT_SUBJECT_LENGTH = 32
233  return cert[CERT_SUBJECT_OFFSET:CERT_SUBJECT_OFFSET + CERT_SUBJECT_LENGTH]
234
235
236def SelectMatchingUnlockCredential(all_creds, challenge):
237  """Find and return the first UnlockCredentials object whose product ID matches that of the unlock challenge.
238
239  The Product Unlock Key (PUK) certificate's subject field contains the
240  SHA256 hash of the product ID that it can be used to unlock. This same
241  value (SHA256 hash of the product ID) is contained in the unlock challenge.
242
243  Arguments:
244    all_creds: List of UnlockCredentials objects to be searched for a match
245      against the given challenge.
246    challenge: UnlockChallenge object created from challenge obtained via
247      'fastboot oem at-get-vboot-unlock-challenge'.
248  """
249  for creds in all_creds:
250    if GetAtxCertificateSubject(creds.unlock_cert) == challenge.product_id_hash:
251      return creds
252
253
254def MakeAtxUnlockCredential(creds, challenge, out_file):
255  """Simple reimplementation of 'avbtool make_atx_unlock_credential'.
256
257  Generates an Android Things authenticated unlock credential to authorize
258  unlocking AVB on a device.
259
260  This is reimplemented locally for simplicity, which avoids the need to bundle
261  this tool with the full avbtool. avbtool also uses openssl by default whereas
262  this uses PyCrypto, which makes it easier to support Windows since there are
263  no officially supported openssl binary distributions.
264
265  Arguments:
266    creds: UnlockCredentials object wrapping the PIK certificate, PUK
267      certificate, and PUK private key.
268    challenge: UnlockChallenge object created from challenge obtained via
269      'fastboot oem at-get-vboot-unlock-challenge'.
270    out_file: Output filename to write the AvbAtxUnlockCredential struct to.
271
272  Raises:
273    ValueError: If challenge has wrong length.
274  """
275  hash = SHA512.new(challenge.challenge_data)
276  signer = PKCS1_v1_5.new(creds.unlock_key)
277  signature = signer.sign(hash)
278
279  with open(out_file, 'wb') as out:
280    out.write(struct.pack('<I', 1))  # Format Version
281    out.write(creds.intermediate_cert)
282    out.write(creds.unlock_cert)
283    out.write(signature)
284
285
286def AuthenticatedUnlock(all_creds, serial=None, verbose=False):
287  """Performs an authenticated AVB unlock of a device over fastboot.
288
289  Arguments:
290    all_creds: List of UnlockCredentials objects wrapping the PIK certificate,
291      PUK certificate, and PUK private key. The list will be searched to find
292      matching credentials for the device being unlocked.
293    serial: [optional] A device serial number or other valid value to be passed
294      to fastboot's '-s' switch to select the device to unlock.
295    verbose: [optional] Enable verbose output, which prints the fastboot
296      commands and their output as the commands are run.
297  """
298
299  tempdir = tempfile.mkdtemp()
300  try:
301    challenge_file = os.path.join(tempdir, 'challenge')
302    credential_file = os.path.join(tempdir, 'credential')
303
304    def fastboot_cmd(args):
305      args = ['fastboot'] + (['-s', serial] if serial else []) + args
306      if verbose:
307        print('\n$ ' + ' '.join(args))
308
309      out = subprocess.check_output(
310          args, stderr=subprocess.STDOUT).decode('utf-8')
311
312      if verbose:
313        print(out)
314      return out
315
316    try:
317      fastboot_cmd(['oem', 'at-get-vboot-unlock-challenge'])
318      fastboot_cmd(['get_staged', challenge_file])
319
320      challenge = UnlockChallenge(challenge_file)
321      print('Product ID SHA256 hash = {}'.format(
322          binascii.hexlify(challenge.product_id_hash)))
323
324      selected_cred = SelectMatchingUnlockCredential(all_creds, challenge)
325      if not selected_cred:
326        print(
327            'ERROR: None of the provided unlock credentials match this device.')
328        return False
329      if selected_cred.source_file:
330        print('Found matching unlock credentials: {}'.format(
331            selected_cred.source_file))
332      MakeAtxUnlockCredential(selected_cred, challenge, credential_file)
333
334      fastboot_cmd(['stage', credential_file])
335      fastboot_cmd(['oem', 'at-unlock-vboot'])
336
337      res = fastboot_cmd(['getvar', 'at-vboot-state'])
338      if re.search(r'avb-locked(:\s*|=)0', res) is not None:
339        print('Device successfully AVB unlocked')
340        return True
341      else:
342        print('ERROR: Commands succeeded but device still locked')
343        return False
344    except subprocess.CalledProcessError as e:
345      print(e.output.decode('utf-8'))
346      print("Command '{}' returned non-zero exit status {}".format(
347          ' '.join(e.cmd), e.returncode))
348      return False
349  finally:
350    shutil.rmtree(tempdir)
351
352
353def FindUnlockCredentialsInDirectory(dir, verbose=False):
354  if not os.path.isdir(dir):
355    raise ValueError('Not a directory: ' + dir)
356
357  creds = []
358  for file in os.listdir(dir):
359    path = os.path.join(dir, file)
360    if os.path.isfile(path):
361      try:
362        creds.append(UnlockCredentials.from_credential_archive(path))
363        if verbose:
364          print('Found valid unlock credential bundle: ' + path)
365      except (IOError, ValueError, zipfile.BadZipfile) as e:
366        if verbose:
367          print(
368              "Ignoring file which isn't a valid unlock credential zip bundle: "
369              + path)
370  return creds
371
372
373def main(in_args):
374  parser = argparse.ArgumentParser(
375      description=HELP_DESCRIPTION,
376      usage=HELP_USAGE,
377      epilog=HELP_EPILOG,
378      formatter_class=argparse.RawDescriptionHelpFormatter)
379
380  # General optional arguments.
381  parser.add_argument(
382      '-v',
383      '--verbose',
384      action='store_true',
385      help=
386      'enable verbose output, e.g. prints fastboot commands and their output')
387  parser.add_argument(
388      '-s',
389      '--serial',
390      help=
391      "specify device to unlock, either by serial or any other valid value for fastboot's -s arg"
392  )
393
394  # User must provide either a unlock credential bundle, or the individual files
395  # normally contained in such a bundle.
396  # argparse doesn't support specifying this argument format - two groups of
397  # mutually exclusive arguments, where one group requires all arguments in that
398  # group to be specified - so we define them as optional arguments and do the
399  # validation ourselves below.
400
401  # Argument group #1 - Unlock credential zip archive(s) (or directory
402  # containing multiple such archives)
403  parser.add_argument(
404      'bundle',
405      metavar='unlock_creds.zip',
406      nargs='*',
407      help=
408      'Unlock using a zip bundle/archive of credentials (e.g. from Developer '
409      'Console). You can optionally provide multiple archives and/or a  '
410      'directory of such bundles and the tool will automatically select the '
411      'correct one to use based on matching the product ID against the device '
412      'being unlocked.')
413
414  # Argument group #2 - Individual credential files
415  parser.add_argument(
416      '--pik_cert',
417      metavar='pik_cert.bin',
418      help='Path to product intermediate key (PIK) certificate file')
419  parser.add_argument(
420      '--puk_cert',
421      metavar='puk_cert.bin',
422      help='Path to product unlock key (PUK) certificate file')
423  parser.add_argument(
424      '--puk',
425      metavar='puk.pem',
426      help='Path to product unlock key in PEM format')
427
428  # Print help if no args given
429  args = parser.parse_args(in_args if in_args else ['-h'])
430
431  # Do the custom validation described above.
432  if args.pik_cert is not None or args.puk_cert is not None or args.puk is not None:
433    # Check mutual exclusion with bundle positional argument
434    if len(args.bundle):
435      parser.error(
436          'bundle argument is mutually exclusive with --pik_cert, --puk_cert, and --puk'
437      )
438
439    # Check for 'mutual inclusion' of individual file options
440    if args.pik_cert is None:
441      parser.error("--pik_cert is required if --puk_cert or --puk' is given")
442    if args.puk_cert is None:
443      parser.error("--puk_cert is required if --pik_cert or --puk' is given")
444    if args.puk is None:
445      parser.error("--puk is required if --pik_cert or --puk_cert' is given")
446  elif not len(args.bundle):
447    parser.error(
448        'must provide either credentials bundle or individual credential files')
449
450  # Parse arguments into UnlockCredentials objects
451  if len(args.bundle):
452    creds = []
453    for path in args.bundle:
454      if os.path.isfile(path):
455        creds.append(UnlockCredentials.from_credential_archive(path))
456      elif os.path.isdir(path):
457        creds.extend(
458            FindUnlockCredentialsInDirectory(path, verbose=args.verbose))
459      else:
460        parser.error("path argument '{}' does not exist".format(path))
461
462    if len(creds) == 0:
463      parser.error('No unlock credentials were found in any of the given paths')
464  else:
465    creds = [UnlockCredentials(args.pik_cert, args.puk_cert, args.puk)]
466
467  ret = AuthenticatedUnlock(creds, serial=args.serial, verbose=args.verbose)
468  return 0 if ret else 1
469
470
471if __name__ == '__main__':
472  sys.exit(main(sys.argv[1:]))
473