1#!/usr/bin/env python
2#
3# Copyright 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7#
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Helper tool for performing an authenticated AVB unlock of an Android Things device.
18
19This tool communicates with an Android Things device over fastboot to perform an
20authenticated AVB unlock. The user provides unlock credentials valid for the
21device they want to unlock, likely obtained from the Android Things Developer
22Console. The tool handles the sequence of fastboot commands to complete the
23challenge-response unlock protocol.
24
25Unlock credentials can be provided to the tool in one of two ways:
26
27  1) by providing paths to the individual credential files using the
28     '--pik_cert', '--puk_cert', and '--puk' command line swtiches, or
29
30  2) by providing a path to a zip archive containing the three credential files,
31     named as follows:
32       - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin'
33       - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin'
34       - PUK private key: 'puk.*\.pem'
35
36     You can also provide one or more archives and/or one or more directories
37     containing such zip archives. In either scenario, the tool will search all
38     of the provided credential archives for a match against the product ID of
39     the device being unlocked and automatically use the first match.
40
41This tool also clears the factory partition persistent digest unless the
42--clear_factory_digest=false option is used. There is no harm to clear this
43digest even if changes to the factory partition are not planned.
44
45Dependencies:
46  - Python 2.7.x, 3.2.x, or newer (for argparse)
47  - PyCrypto 2.5 or newer (for PKCS1_v1_5 and RSA PKCS#8 PEM key import)
48  - Android SDK Platform Tools (for fastboot), in PATH
49    - https://developer.android.com/studio/releases/platform-tools
50"""
51
52HELP_DESCRIPTION = """Performs an authenticated AVB unlock of an Android Things device over
53fastboot, given valid unlock credentials for the device."""
54
55HELP_USAGE = """
56  %(prog)s [-h] [-v] [-s SERIAL] [--clear_factory_digest=true|false] unlock_creds.zip [unlock_creds_2.zip ...]
57  %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem"""
58
59HELP_EPILOG = """examples:
60  %(prog)s unlock_creds.zip
61  %(prog)s unlock_creds.zip unlock_creds_2.zip -s SERIAL
62  %(prog)s path_to_dir_with_multiple_unlock_creds/
63  %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem"""
64
65import sys
66
67ver = sys.version_info
68if (ver[0] < 2) or (ver[0] == 2 and ver[1] < 7) or (ver[0] == 3 and ver[1] < 2):
69  print('This script requires Python 2.7+ or 3.2+')
70  sys.exit(1)
71
72import argparse
73import binascii
74import os
75import re
76import shutil
77import struct
78import subprocess
79import tempfile
80import zipfile
81
82# Requires PyCrypto 2.5 (or newer) for PKCS1_v1_5 and support for importing
83# PEM-encoded RSA keys
84try:
85  from Crypto.Hash import SHA512
86  from Crypto.PublicKey import RSA
87  from Crypto.Signature import PKCS1_v1_5
88except ImportError as e:
89  print('PyCrypto 2.5 or newer required, missing or too old: ' + str(e))
90
91
92class UnlockCredentials(object):
93  """Helper data container class for the 3 unlock credentials involved in an AVB authenticated unlock operation.
94
95  """
96
97  def __init__(self,
98               intermediate_cert_file,
99               unlock_cert_file,
100               unlock_key_file,
101               source_file=None):
102    # The certificates are AvbAtxCertificate structs as defined in libavb_atx,
103    # not an X.509 certificate. Do a basic length sanity check when reading
104    # them.
105    EXPECTED_CERTIFICATE_SIZE = 1620
106
107    with open(intermediate_cert_file, 'rb') as f:
108      self._intermediate_cert = f.read()
109    if len(self._intermediate_cert) != EXPECTED_CERTIFICATE_SIZE:
110      raise ValueError('Invalid intermediate key certificate length.')
111
112    with open(unlock_cert_file, 'rb') as f:
113      self._unlock_cert = f.read()
114    if len(self._unlock_cert) != EXPECTED_CERTIFICATE_SIZE:
115      raise ValueError('Invalid product unlock key certificate length.')
116
117    with open(unlock_key_file, 'rb') as f:
118      self._unlock_key = RSA.importKey(f.read())
119      if not self._unlock_key.has_private():
120        raise ValueError('Unlock key was not an RSA private key.')
121
122    self._source_file = source_file
123
124  @property
125  def intermediate_cert(self):
126    return self._intermediate_cert
127
128  @property
129  def unlock_cert(self):
130    return self._unlock_cert
131
132  @property
133  def unlock_key(self):
134    return self._unlock_key
135
136  @property
137  def source_file(self):
138    return self._source_file
139
140  @classmethod
141  def from_credential_archive(cls, archive):
142    """Create UnlockCredentials from an unlock credential zip archive.
143
144    The zip archive must contain the following three credential files, named as
145    follows:
146      - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin'
147      - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin'
148      - PUK private key: 'puk.*\.pem'
149
150    This uses @contextlib.contextmanager so we can clean up the tempdir created
151    to unpack the zip contents into.
152
153    Arguments:
154      - archive: Filename of zip archive containing unlock credentials.
155
156    Raises:
157      ValueError: If archive is either missing a required file or contains
158      multiple files matching one of the filename formats.
159    """
160
161    def _find_one_match(contents, regex, desc):
162      r = re.compile(regex)
163      matches = list(filter(r.search, contents))
164      if not matches:
165        raise ValueError(
166            "Couldn't find {} file (matching regex '{}') in archive {}".format(
167                desc, regex, archive))
168      elif len(matches) > 1:
169        raise ValueError(
170            "Found multiple files for {} (matching regex '{}') in archive {}"
171            .format(desc, regex, archive))
172      return matches[0]
173
174    tempdir = tempfile.mkdtemp()
175    try:
176      with zipfile.ZipFile(archive, mode='r') as zip:
177        contents = zip.namelist()
178
179        pik_cert_re = r'^pik_certificate.*\.bin$'
180        pik_cert = _find_one_match(contents, pik_cert_re,
181                                   'intermediate key (PIK) certificate')
182
183        puk_cert_re = r'^puk_certificate.*\.bin$'
184        puk_cert = _find_one_match(contents, puk_cert_re,
185                                   'unlock key (PUK) certificate')
186
187        puk_re = r'^puk.*\.pem$'
188        puk = _find_one_match(contents, puk_re, 'unlock key (PUK)')
189
190        zip.extractall(path=tempdir, members=[pik_cert, puk_cert, puk])
191
192        return cls(
193            intermediate_cert_file=os.path.join(tempdir, pik_cert),
194            unlock_cert_file=os.path.join(tempdir, puk_cert),
195            unlock_key_file=os.path.join(tempdir, puk),
196            source_file=archive)
197    finally:
198      shutil.rmtree(tempdir)
199
200
201class UnlockChallenge(object):
202  """Helper class for parsing the AvbAtxUnlockChallenge struct returned from 'fastboot oem at-get-vboot-unlock-challenge'.
203
204     The file provided to the constructor should be the full 52-byte
205     AvbAtxUnlockChallenge struct, not just the challenge itself.
206  """
207
208  def __init__(self, challenge_file):
209    CHALLENGE_STRUCT_SIZE = 52
210    PRODUCT_ID_HASH_SIZE = 32
211    CHALLENGE_DATA_SIZE = 16
212    with open(challenge_file, 'rb') as f:
213      data = f.read()
214      if len(data) != CHALLENGE_STRUCT_SIZE:
215        raise ValueError('Invalid unlock challenge length.')
216
217      self._version, self._product_id_hash, self._challenge_data = struct.unpack(
218          '<I{}s{}s'.format(PRODUCT_ID_HASH_SIZE, CHALLENGE_DATA_SIZE), data)
219
220  @property
221  def version(self):
222    return self._version
223
224  @property
225  def product_id_hash(self):
226    return self._product_id_hash
227
228  @property
229  def challenge_data(self):
230    return self._challenge_data
231
232
233def GetAtxCertificateSubject(cert):
234  """Parses and returns the subject field from the given AvbAtxCertificate struct."""
235  CERT_SUBJECT_OFFSET = 4 + 1032  # Format version and public key come before subject
236  CERT_SUBJECT_LENGTH = 32
237  return cert[CERT_SUBJECT_OFFSET:CERT_SUBJECT_OFFSET + CERT_SUBJECT_LENGTH]
238
239
240def SelectMatchingUnlockCredential(all_creds, challenge):
241  """Find and return the first UnlockCredentials object whose product ID matches that of the unlock challenge.
242
243  The Product Unlock Key (PUK) certificate's subject field contains the
244  SHA256 hash of the product ID that it can be used to unlock. This same
245  value (SHA256 hash of the product ID) is contained in the unlock challenge.
246
247  Arguments:
248    all_creds: List of UnlockCredentials objects to be searched for a match
249      against the given challenge.
250    challenge: UnlockChallenge object created from challenge obtained via
251      'fastboot oem at-get-vboot-unlock-challenge'.
252  """
253  for creds in all_creds:
254    if GetAtxCertificateSubject(creds.unlock_cert) == challenge.product_id_hash:
255      return creds
256
257
258def MakeAtxUnlockCredential(creds, challenge, out_file):
259  """Simple reimplementation of 'avbtool make_atx_unlock_credential'.
260
261  Generates an Android Things authenticated unlock credential to authorize
262  unlocking AVB on a device.
263
264  This is reimplemented locally for simplicity, which avoids the need to bundle
265  this tool with the full avbtool. avbtool also uses openssl by default whereas
266  this uses PyCrypto, which makes it easier to support Windows since there are
267  no officially supported openssl binary distributions.
268
269  Arguments:
270    creds: UnlockCredentials object wrapping the PIK certificate, PUK
271      certificate, and PUK private key.
272    challenge: UnlockChallenge object created from challenge obtained via
273      'fastboot oem at-get-vboot-unlock-challenge'.
274    out_file: Output filename to write the AvbAtxUnlockCredential struct to.
275
276  Raises:
277    ValueError: If challenge has wrong length.
278  """
279  hash = SHA512.new(challenge.challenge_data)
280  signer = PKCS1_v1_5.new(creds.unlock_key)
281  signature = signer.sign(hash)
282
283  with open(out_file, 'wb') as out:
284    out.write(struct.pack('<I', 1))  # Format Version
285    out.write(creds.intermediate_cert)
286    out.write(creds.unlock_cert)
287    out.write(signature)
288
289
290def AuthenticatedUnlock(all_creds, serial=None, verbose=False):
291  """Performs an authenticated AVB unlock of a device over fastboot.
292
293  Arguments:
294    all_creds: List of UnlockCredentials objects wrapping the PIK certificate,
295      PUK certificate, and PUK private key. The list will be searched to find
296      matching credentials for the device being unlocked.
297    serial: [optional] A device serial number or other valid value to be passed
298      to fastboot's '-s' switch to select the device to unlock.
299    verbose: [optional] Enable verbose output, which prints the fastboot
300      commands and their output as the commands are run.
301  """
302
303  tempdir = tempfile.mkdtemp()
304  try:
305    challenge_file = os.path.join(tempdir, 'challenge')
306    credential_file = os.path.join(tempdir, 'credential')
307
308    def fastboot_cmd(args):
309      args = ['fastboot'] + (['-s', serial] if serial else []) + args
310      if verbose:
311        print('\n$ ' + ' '.join(args))
312
313      out = subprocess.check_output(
314          args, stderr=subprocess.STDOUT).decode('utf-8')
315
316      if verbose:
317        print(out)
318      return out
319
320    try:
321      fastboot_cmd(['oem', 'at-get-vboot-unlock-challenge'])
322      fastboot_cmd(['get_staged', challenge_file])
323
324      challenge = UnlockChallenge(challenge_file)
325      print('Product ID SHA256 hash = {}'.format(
326          binascii.hexlify(challenge.product_id_hash)))
327
328      selected_cred = SelectMatchingUnlockCredential(all_creds, challenge)
329      if not selected_cred:
330        print(
331            'ERROR: None of the provided unlock credentials match this device.')
332        return False
333      if selected_cred.source_file:
334        print('Found matching unlock credentials: {}'.format(
335            selected_cred.source_file))
336      MakeAtxUnlockCredential(selected_cred, challenge, credential_file)
337
338      fastboot_cmd(['stage', credential_file])
339      fastboot_cmd(['oem', 'at-unlock-vboot'])
340
341      res = fastboot_cmd(['getvar', 'at-vboot-state'])
342      if re.search(r'avb-locked(:\s*|=)0', res) is not None:
343        print('Device successfully AVB unlocked')
344        return True
345      else:
346        print('ERROR: Commands succeeded but device still locked')
347        return False
348    except subprocess.CalledProcessError as e:
349      print(e.output.decode('utf-8'))
350      print("Command '{}' returned non-zero exit status {}".format(
351          ' '.join(e.cmd), e.returncode))
352      return False
353  finally:
354    shutil.rmtree(tempdir)
355
356
357def FindUnlockCredentialsInDirectory(dir, verbose=False):
358  if not os.path.isdir(dir):
359    raise ValueError('Not a directory: ' + dir)
360
361  creds = []
362  for file in os.listdir(dir):
363    path = os.path.join(dir, file)
364    if os.path.isfile(path):
365      try:
366        creds.append(UnlockCredentials.from_credential_archive(path))
367        if verbose:
368          print('Found valid unlock credential bundle: ' + path)
369      except (IOError, ValueError, zipfile.BadZipfile) as e:
370        if verbose:
371          print(
372              "Ignoring file which isn't a valid unlock credential zip bundle: "
373              + path)
374  return creds
375
376
377def ClearFactoryPersistentDigest(serial=None, verbose=False):
378  """Clears the factory partition persistent digest using fastboot.
379
380  Most of the time this should be cleared when unlocking a device because
381  otherwise any attempts to update the factory partition will be rejected once
382  the device is again locked, causing confusion. There is no harm to clear this
383  digest even if factory partition updates are not planned.
384
385  Arguments:
386    serial: [optional] A device serial number or other valid value to be passed
387      to fastboot's '-s' switch to select the device to unlock.
388    verbose: [optional] Enable verbose output, which prints the fastboot
389      commands and their output as the commands are run.
390  """
391  FACTORY_PERSISTENT_DIGEST_NAME = 'avb.persistent_digest.factory'
392
393  tempdir = tempfile.mkdtemp()
394  try:
395    digest_data = os.path.join(tempdir, 'digest_data')
396
397    with open(digest_data, 'wb') as out:
398      out.write(struct.pack('<I', len(FACTORY_PERSISTENT_DIGEST_NAME)))
399      out.write(FACTORY_PERSISTENT_DIGEST_NAME)
400      # Sending a zero length digest will clear the existing digest.
401      out.write(struct.pack('<I', 0))
402
403    def fastboot_cmd(args):
404      args = ['fastboot'] + (['-s', serial] if serial else []) + args
405      if verbose:
406        print('$ ' + ' '.join(args))
407
408      out = subprocess.check_output(
409          args, stderr=subprocess.STDOUT).decode('utf-8')
410
411      if verbose:
412        print(out)
413
414    try:
415      fastboot_cmd(['stage', digest_data])
416      fastboot_cmd(['oem', 'at-write-persistent-digest'])
417      print("Successfully cleared the factory partition persistent digest.")
418      return True
419    except subprocess.CalledProcessError as e:
420      print(e.output.decode('utf-8'))
421      print("Command '{}' returned non-zero exit status {}".format(
422          ' '.join(e.cmd), e.returncode))
423      print("Warning: Failed to clear factory partition persistent digest.")
424      return False
425
426  finally:
427    shutil.rmtree(tempdir)
428
429
430def parse_boolean(value):
431  if value.strip().lower() in ('true', 't', 'yes', 'y', 'on', '1'):
432      return True
433  elif value.strip().lower() in ('false', 'f', 'no', 'n', 'off', '0'):
434      return False
435  else:
436      raise argparse.ArgumentTypeError('Unexpected boolean value: %s' % value)
437
438def main(in_args):
439  parser = argparse.ArgumentParser(
440      description=HELP_DESCRIPTION,
441      usage=HELP_USAGE,
442      epilog=HELP_EPILOG,
443      formatter_class=argparse.RawDescriptionHelpFormatter)
444
445  # General optional arguments.
446  parser.add_argument(
447      '-v',
448      '--verbose',
449      action='store_true',
450      help=
451      'enable verbose output, e.g. prints fastboot commands and their output')
452  parser.add_argument(
453      '-s',
454      '--serial',
455      help=
456      "specify device to unlock, either by serial or any other valid value for fastboot's -s arg"
457  )
458  parser.add_argument(
459      '--clear_factory_digest',
460      nargs='?',
461      type=parse_boolean,
462      default='true',
463      const='true',
464      help='Defaults to true. Set to false to prevent clearing the factory persistent digest')
465
466  # User must provide either a unlock credential bundle, or the individual files
467  # normally contained in such a bundle.
468  # argparse doesn't support specifying this argument format - two groups of
469  # mutually exclusive arguments, where one group requires all arguments in that
470  # group to be specified - so we define them as optional arguments and do the
471  # validation ourselves below.
472
473  # Argument group #1 - Unlock credential zip archive(s) (or directory
474  # containing multiple such archives)
475  parser.add_argument(
476      'bundle',
477      metavar='unlock_creds.zip',
478      nargs='*',
479      help=
480      'Unlock using a zip bundle/archive of credentials (e.g. from Developer '
481      'Console). You can optionally provide multiple archives and/or a  '
482      'directory of such bundles and the tool will automatically select the '
483      'correct one to use based on matching the product ID against the device '
484      'being unlocked.')
485
486  # Argument group #2 - Individual credential files
487  parser.add_argument(
488      '--pik_cert',
489      metavar='pik_cert.bin',
490      help='Path to product intermediate key (PIK) certificate file')
491  parser.add_argument(
492      '--puk_cert',
493      metavar='puk_cert.bin',
494      help='Path to product unlock key (PUK) certificate file')
495  parser.add_argument(
496      '--puk',
497      metavar='puk.pem',
498      help='Path to product unlock key in PEM format')
499
500  # Print help if no args given
501  args = parser.parse_args(in_args if in_args else ['-h'])
502
503  # Do the custom validation described above.
504  if args.pik_cert is not None or args.puk_cert is not None or args.puk is not None:
505    # Check mutual exclusion with bundle positional argument
506    if len(args.bundle):
507      parser.error(
508          'bundle argument is mutually exclusive with --pik_cert, --puk_cert, and --puk'
509      )
510
511    # Check for 'mutual inclusion' of individual file options
512    if args.pik_cert is None:
513      parser.error("--pik_cert is required if --puk_cert or --puk' is given")
514    if args.puk_cert is None:
515      parser.error("--puk_cert is required if --pik_cert or --puk' is given")
516    if args.puk is None:
517      parser.error("--puk is required if --pik_cert or --puk_cert' is given")
518  elif not len(args.bundle):
519    parser.error(
520        'must provide either credentials bundle or individual credential files')
521
522  # Parse arguments into UnlockCredentials objects
523  if len(args.bundle):
524    creds = []
525    for path in args.bundle:
526      if os.path.isfile(path):
527        creds.append(UnlockCredentials.from_credential_archive(path))
528      elif os.path.isdir(path):
529        creds.extend(
530            FindUnlockCredentialsInDirectory(path, verbose=args.verbose))
531      else:
532        parser.error("path argument '{}' does not exist".format(path))
533
534    if len(creds) == 0:
535      parser.error('No unlock credentials were found in any of the given paths')
536  else:
537    creds = [UnlockCredentials(args.pik_cert, args.puk_cert, args.puk)]
538
539  ret = AuthenticatedUnlock(creds, serial=args.serial, verbose=args.verbose)
540  if ret and args.clear_factory_digest:
541    ret = ClearFactoryPersistentDigest(serial=args.serial, verbose=args.verbose)
542  return 0 if ret else 1
543
544
545if __name__ == '__main__':
546  sys.exit(main(sys.argv[1:]))
547