1#!/usr/bin/python 2 3# 4# Copyright 2017 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18"""Test that implements the Android Things Attestation Provisioning protocol. 19 20Enables testing of the device side of the Android Things Attestation 21Provisioning (ATAP) Protocol without access to a CA or Android Things Factory 22Appliance (ATFA). 23""" 24 25import argparse 26from collections import namedtuple 27import os 28import struct 29 30from aesgcm import AESGCM 31import cryptography.exceptions 32from cryptography.hazmat.backends import default_backend 33from cryptography.hazmat.primitives import hashes 34from cryptography.hazmat.primitives.kdf.hkdf import HKDF 35import curve25519 36import ec_helper 37 38_ATAPSessionParameters = namedtuple('_AtapSessionParameters', [ 39 'algorithm', 'operation', 'private_key', 'public_key' 40]) 41 42_MESSAGE_VERSION = 1 43_OPERATIONS = {'ISSUE': 2, 'ISSUE_ENC': 3} 44_ALGORITHMS = {'p256': 1, 'x25519': 2} 45_ECDH_KEY_LEN = 33 46 47_session_params = _ATAPSessionParameters(0, 0, bytes(), bytes()) 48 49 50def _write_operation_start(algorithm, operation): 51 """Writes a fresh Operation Start message to tmp/operation_start.bin. 52 53 Generates an ECDHE key specified by <algorithm> and writes an Operation 54 Start message for executing <operation> on the device. 55 56 Args: 57 algorithm: Integer specifying the curve to use for the session key. 58 1: P256, 2: X25519 59 operation: Specifies the operation. 1: Certify, 2: Issue, 3: Issue Encrypted 60 61 Raises: 62 ValueError: algorithm or operation is is invalid. 63 """ 64 65 global _session_params 66 67 if algorithm > 2 or algorithm < 1: 68 raise ValueError('Invalid algorithm value.') 69 70 if operation > 3 or operation < 1: 71 raise ValueError('Invalid operation value.') 72 73 # Generate new key for each provisioning session 74 if algorithm == _ALGORITHMS['x25519']: 75 private_key = curve25519.genkey() 76 # Make 33 bytes to match P256 77 public_key = curve25519.public(private_key) + '\0' 78 elif algorithm == _ALGORITHMS['p256']: 79 [private_key, public_key] = ec_helper.generate_p256_key() 80 81 _session_params = _ATAPSessionParameters(algorithm, operation, private_key, 82 public_key) 83 84 # "Operation Start" Header 85 # +2 for algo and operation bytes 86 header = (_MESSAGE_VERSION, 0, 0, 0, _ECDH_KEY_LEN + 2) 87 operation_start = bytearray(struct.pack('<4B I', *header)) 88 89 # "Operation Start" Message 90 op_start = (algorithm, operation, public_key) 91 operation_start.extend(struct.pack('<2B 33s', *op_start)) 92 93 with open('tmp/operation_start.bin', 'wb') as f: 94 f.write(operation_start) 95 96 97def _get_ca_response(ca_request): 98 """Writes a CA Response message to tmp/ca_response.bin. 99 100 Parses the CA Request message at ca_request. Computes the session key from 101 the ca_request, decrypts the inner request, verifies the SOM key signature, 102 and issues or certifies attestation keys as applicable. The CA Response 103 message containing test keys is written to ca_response.bin. 104 105 Args: 106 ca_request: The CA Request message from the device. 107 108 Raises: 109 ValueError: ca_request is malformed. 110 111 CA Request message format for reference, sizes in bytes 112 113 cleartext header 8 114 cleartext device ephemeral public key 33 115 cleartext GCM IV 12 116 encrypted header 8 117 encrypted SOM key certificate chain variable 118 encrypted SOM key authentication signature variable 119 encrypted product ID SHA256 hash 32 120 encrypted RSA public key variable 121 encrypted ECDSA public key variable 122 encrypted edDSA public key variable 123 cleartext GCM tag 16 124 """ 125 126 var_len = 4 127 header_len = 8 128 pub_key_len = _ECDH_KEY_LEN 129 gcm_iv_len = 12 130 prod_id_hash_len = 32 131 gcm_tag_len = 16 132 133 min_message_length = ( 134 header_len + pub_key_len + gcm_iv_len + header_len + var_len + var_len + 135 prod_id_hash_len + var_len + var_len + var_len + gcm_tag_len) 136 137 if len(ca_request) < min_message_length: 138 raise ValueError('Malformed message: Length invalid') 139 140 # Unpack Request header 141 end = header_len 142 ca_req_start = ca_request[:end] 143 (device_message_version, res1, res2, res3, 144 device_message_len) = struct.unpack('<4B I', ca_req_start) 145 146 if device_message_version != _MESSAGE_VERSION: 147 raise ValueError('Malformed message: Incorrect message version') 148 149 if res1 or res2 or res3: 150 raise ValueError('Malformed message: Reserved values set') 151 152 if device_message_len > len(ca_request) - header_len: 153 raise ValueError('Malformed message: Incorrect device message length') 154 155 # Extract AT device ephemeral public key 156 start = header_len 157 end = start + pub_key_len 158 device_pub_key = bytes(ca_request[start:end]) 159 160 # Generate shared_key 161 salt = _session_params.public_key + device_pub_key 162 shared_key = _get_shared_key(_session_params.algorithm, device_pub_key, salt) 163 164 # Decrypt AES-128-GCM message using the shared_key 165 # Extract the GCM IV 166 start = header_len + pub_key_len 167 end = start + gcm_iv_len 168 gcm_iv = bytes(ca_request[start:end]) 169 170 # Extract the encrypted message 171 start = header_len + pub_key_len + gcm_iv_len 172 enc_message_len = _get_var_len(ca_request, start) 173 174 if enc_message_len > len(ca_request) - gcm_tag_len - start - var_len: 175 raise ValueError('Encrypted message size %d too large' % enc_message_len) 176 177 start += var_len 178 end = start + enc_message_len 179 enc_message = bytes(ca_request[start:end]) 180 181 # Extract the GCM Tag 182 gcm_tag = bytes(ca_request[-gcm_tag_len:]) 183 184 # Decrypt message 185 try: 186 data = AESGCM.decrypt(enc_message, shared_key, gcm_iv, gcm_tag) 187 except cryptography.exceptions.InvalidTag: 188 raise ValueError('Malformed message: GCM decrypt failed') 189 190 # Unpack Inner header 191 end = header_len 192 ca_req_inner_header = data[:end] 193 (device_message_version, res1, res2, res3, inner_message_len) = struct.unpack( 194 '<4B I', ca_req_inner_header) 195 196 if device_message_version != _MESSAGE_VERSION: 197 raise ValueError('Malformed message: Incorrect inner message version') 198 199 if res1 or res2 or res3: 200 raise ValueError('Malformed message: Reserved values set') 201 202 remaining_bytes = len(ca_request) - header_len - pub_key_len 203 remaining_bytes = remaining_bytes - gcm_iv_len - gcm_tag_len 204 if inner_message_len > remaining_bytes: 205 raise ValueError('Malformed message: Incorrect device inner message length') 206 207 # SOM key certificate chain 208 som_chain_start = header_len 209 som_chain_len = _get_var_len(data, som_chain_start) 210 if som_chain_len > 0: 211 raise ValueError( 212 'SOM authentication not yet supported, set cert chain length to zero') 213 214 # SOM key authentication signature 215 som_key_start = som_chain_start + var_len + som_chain_len 216 som_len = _get_var_len(data, som_key_start) 217 if som_len > 0: 218 raise ValueError( 219 'SOM authentication not yet supported, set signature length to zero') 220 221 # Product ID SHA-256 hash 222 prod_id_start = som_key_start + var_len + som_len 223 prod_id_end = prod_id_start + prod_id_hash_len 224 prod_id_hash = data[prod_id_start:prod_id_end] 225 print 'product_id hash:' + prod_id_hash.encode('hex') 226 227 # RSA public key to certify 228 rsa_start = prod_id_start + prod_id_hash_len 229 rsa_len = _get_var_len(data, rsa_start) 230 if rsa_len > 0: 231 raise ValueError( 232 'Certify operation not supported, set RSA public key length to zero') 233 234 # ECDSA public key to certify 235 ecdsa_start = rsa_start + var_len + rsa_len 236 ecdsa_len = _get_var_len(data, ecdsa_start) 237 if ecdsa_len > 0: 238 raise ValueError( 239 'Certify operation not supported, set ECDSA public key length to zero') 240 241 # edDSA public key to certify 242 eddsa_start = prod_id_start + var_len + prod_id_hash_len 243 eddsa_len = _get_var_len(data, eddsa_start) 244 if eddsa_len > 0: 245 raise ValueError( 246 'Certify operation not supported, set edDSA public key length to zero') 247 248 # ATFA treats ISSUE and ISSUE_ENCRYPTED operations the same 249 if _session_params.operation == _OPERATIONS['ISSUE']: 250 with open('keysets/unencrypted.keyset', 'rb') as infile: 251 inner_ca_response = bytes(infile.read()) 252 elif _session_params.operation == _OPERATIONS['ISSUE_ENC']: 253 with open('keysets/encrypted.keyset', 'rb') as infile: 254 inner_ca_response = bytes(infile.read()) 255 256 (gcm_iv, encrypted_keyset, gcm_tag) = AESGCM.encrypt(inner_ca_response, 257 shared_key) 258 259 # "CA Response" Header 260 # +2 for algo and operation bytes 261 header = (_MESSAGE_VERSION, 0, 0, 0, 12 + 4 + len(encrypted_keyset) + 16) 262 ca_response = bytearray(struct.pack('<4B I', *header)) 263 264 struct_fmt = '12s I %ds 16s' % len(inner_ca_response) 265 message = (gcm_iv, len(encrypted_keyset), encrypted_keyset, gcm_tag) 266 ca_response.extend(struct.pack(struct_fmt, *message)) 267 268 with open('tmp/ca_response.bin', 'wb') as f: 269 f.write(ca_response) 270 271 272def _get_shared_key(algorithm, 273 device_pub_key, 274 hkdf_salt, 275 hkdf_info='KEY', 276 hkdf_hash_len=16): 277 """Generates the shared key based on ECDH and HKDF. 278 279 Uses a particular ECDH algorithm and HKDF-SHA256 to create a shared key 280 281 Args: 282 algorithm: p256 or curve25519 283 device_pub_key: ephemeral public key from the AT device 284 hkdf_salt: salt to use in the HKDF operation 285 hkdf_info: info value to use in the HKDF operation 286 hkdf_hash_len: length of the outputted hash value for use as a shared key 287 288 Raises: 289 RuntimeError: Computing the shared secret fails. 290 291 Returns: 292 The shared key. 293 """ 294 295 if algorithm == _ALGORITHMS['p256']: 296 ecdhe_shared_secret = ec_helper.compute_p256_shared_secret( 297 _session_params.private_key, device_pub_key) 298 299 elif algorithm == _ALGORITHMS['x25519']: 300 device_pub_key = device_pub_key[:-1] 301 ecdhe_shared_secret = curve25519.shared(_session_params.private_key, 302 device_pub_key) 303 304 hkdf = HKDF( 305 algorithm=hashes.SHA256(), 306 length=hkdf_hash_len, 307 salt=hkdf_salt, 308 info=hkdf_info, 309 backend=default_backend()) 310 shared_key = hkdf.derive(ecdhe_shared_secret) 311 312 return shared_key 313 314 315def _get_var_len(data, index): 316 """Reads the 4 byte little endian unsigned integer at data[index]. 317 318 Args: 319 data: Start of bytearray 320 index: Offset that indicates where the integer begins 321 322 Returns: 323 Little endian unsigned integer at data[index] 324 """ 325 return struct.unpack('<I', data[index:index + 4])[0] 326 327 328def main(): 329 parser = argparse.ArgumentParser( 330 description='Test for Android Things key provisioning.') 331 parser.add_argument( 332 '-a', 333 '--algorithm', 334 type=str, 335 choices=['p256', 'x25519'], 336 required=True, 337 dest='algorithm', 338 help='Algorithm for deriving the ECDHE shared secret') 339 parser.add_argument( 340 '-s', 341 '--serial', 342 type=str, 343 required=True, 344 dest='serial', 345 help='Fastboot serial device', 346 metavar='FASTBOOT_SERIAL_NUMBER') 347 parser.add_argument( 348 '-o', 349 '--operation', 350 type=str, 351 default='ISSUE', 352 choices=['ISSUE', 'ISSUE_ENC'], 353 dest='operation', 354 help='Operation for provisioning the device') 355 356 results = parser.parse_args() 357 fastboot_device = results.serial 358 algorithm = _ALGORITHMS[results.algorithm] 359 operation = _OPERATIONS[results.operation] 360 _write_operation_start(algorithm, operation) 361 print 'Wrote Operation Start message to tmp/operation_start.bin' 362 os.system('fastboot -s %s stage tmp/operation_start.bin' % fastboot_device) 363 os.system('fastboot -s %s oem at-get-ca-request' % fastboot_device) 364 os.system('fastboot -s %s get_staged tmp/ca_request.bin' % fastboot_device) 365 with open('tmp/ca_request.bin', 'rb') as f: 366 ca_request = bytearray(f.read()) 367 _get_ca_response(ca_request) 368 print 'Wrote CA Response message to tmp/ca_response.bin' 369 os.system('fastboot -s %s stage tmp/ca_response.bin' % fastboot_device) 370 os.system('fastboot -s %s oem at-set-ca-response' % fastboot_device) 371 os.system('fastboot -s %s getvar at-attest-uuid' % fastboot_device) 372 373 374if __name__ == '__main__': 375 main() 376