1# Lint as: python2, python3
2# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""A Python library to interact with TPM module for testing.
7
8Background
9 - TPM stands for Trusted Platform Module, a piece of security device
10 - TPM specification is the work of Trusted Computing Group
11 - As of September 2011, the current TPM specification is version 1.2
12
13Dependency
14 - This library depends on a C shared library called "libtspi.so", which
15   contains a set of APIs for interacting with TPM module
16
17Notes:
18 - An exception is raised if it doesn't make logical sense to continue program
19   flow (e.g. I/O error prevents test case from executing)
20 - An exception is caught and then converted to an error code if the caller
21   expects to check for error code per API definition
22"""
23
24from __future__ import absolute_import
25from __future__ import division
26from __future__ import print_function
27
28import datetime, logging
29from six.moves import range
30
31from autotest_lib.client.common_lib import smogcheck_ttci
32# Use explicit import to make code more readable
33from ctypes import c_uint, c_uint32, cdll, c_bool, Structure, POINTER, \
34    c_ubyte, c_byte, byref, c_uint16, cast, create_string_buffer, c_uint64, \
35    c_char_p, addressof, c_char, pointer
36
37
38# TPM flags
39# TODO(tgao): possible to import from trousers/src/include/tss/tss_defines.h?
40TSS_KEY_AUTHORIZATION = c_uint32(0x00000001)
41TSS_KEY_TSP_SRK = c_uint32(0x04000000)
42TSS_POLICY_USAGE = c_uint32(0x00000001)
43TSS_OBJECT_TYPE_RSAKEY = c_uint(0x02)
44TSS_SECRET_MODE_SHA1 = c_uint32(0x00001000)
45TSS_SECRET_MODE_PLAIN = c_uint32(0x00001800)
46TSS_TPMCAP_PROP_MANUFACTURER = c_uint(0x12)
47TSS_TPMCAP_PROPERTY = c_uint(0x13)
48TSS_TPMCAP_VERSION = c_uint(0x14)
49TSS_TPMCAP_VERSION_VAL = c_uint32(0x15)
50TSS_TPMSTATUS_DISABLEOWNERCLEAR = c_uint32(0x00000001)
51TSS_TPMSTATUS_DISABLEFORCECLEAR = c_uint32(0x00000002)
52TSS_TPMSTATUS_PHYSICALSETDEACTIVATED = c_uint32(0x00000010)
53TSS_TPMSTATUS_SETTEMPDEACTIVATED = c_uint32(0x00000011)
54
55# TODO(tgao): possible to import from trousers/src/include/tss/tpm.h?
56TPM_SHA1_160_HASH_LEN = c_uint(0x14)
57
58# Path to TSPI shared library.
59TSPI_C_LIB = "/usr/lib/libtspi.so.1"
60
61# Valid operation of tpmSetActive(). Equivalent CLI commands:
62# 'status' = tpm_setactive --well-known --status
63# 'activate' = tpm_setactive --well-known --active
64# 'deactivate' = tpm_setactive --well-known --inactive
65# 'temp' = tpm_setactive --well-known --temp
66TPM_SETACTIVE_OP = ['status', 'activate', 'deactivate', 'temp']
67
68# Valid operation of tpmSetClearable(). Equivalent CLI commands:
69# 'status' = tpm_setclearable --well-known --status
70# 'owner' = tpm_setclearable --well-known --owner
71# 'force' = tpm_setclearable --well-known --force
72TPM_SETCLEARABLE_OP = ['status', 'owner', 'force']
73
74# Secret mode for setPolicySecret()
75TSS_SECRET_MODE = dict(sha1=TSS_SECRET_MODE_SHA1,
76                       plain=TSS_SECRET_MODE_PLAIN)
77
78
79class SmogcheckError(Exception):
80    """Base class for all smogcheck API errors."""
81
82
83class TpmVersion(Structure):
84    """Defines TPM version string struct.
85
86    Declared in tss/tpm.h and named TPM_VERSION.
87    """
88    _fields_ = [('major', c_ubyte),
89                ('minor', c_ubyte),
90                ('revMajor', c_ubyte),
91                ('revMinor', c_ubyte)]
92
93
94class TpmCapVersionInfo(Structure):
95    """Defines TPM version info struct.
96
97    Declared in tss/tpm.h and named TPM_CAP_VERSION_INFO.
98    """
99    _fields_ = [('tag', c_uint16),
100                ('version', TpmVersion),
101                ('specLevel', c_uint16),
102                ('errataRev', c_ubyte),
103                ('tpmVendorID', c_char*4),
104                ('vendorSpecific', POINTER(c_ubyte))]
105
106
107def InitVersionInfo(vi):
108    """Utility method to allocate memory for TPM version info.
109
110    Args:
111      vi: a TpmCapVerisonInfo object, just created.
112    """
113    vi.tpmVendorId = create_string_buffer(4)  # Allocate 4 bytes
114    vendorDetail = create_string_buffer(64)   # Allocate 64 bytes
115    vi.vendorSpecific = cast(pointer(vendorDetail), POINTER(c_ubyte))
116
117
118def PrintVersionInfo(vi):
119    """Utility method to print TPM version info.
120
121    Args:
122      vi: a TpmCapVerisonInfo object.
123    """
124    logging.info('  TPM 1.2 Version Info:\n')
125    logging.info('  Chip Version:  %d.%d.%d.%d.', vi.version.major,
126                 vi.version.minor, vi.version.revMajor, vi.version.revMinor)
127    logging.info('  Spec Level:  %d', vi.specLevel)
128    logging.info('  Errata Revision:  %d', vi.errataRev)
129    vendorId = [i for i in vi.tpmVendorID if i]
130    logging.info('  TPM Vendor ID:  %s', ''.join(vendorId))
131    # TODO(tgao): handle the case when there's no vendor specific data.
132    logging.info('  Vendor Specific data (first 4 bytes in Hex):  '
133                 '%.2x %.2x %.2x %.2x', vi.vendorSpecific[0],
134                 vi.vendorSpecific[1], vi.vendorSpecific[2],
135                 vi.vendorSpecific[3])
136
137
138def PrintSelfTestResult(str_len, pResult):
139    """Utility method to print TPM self test result.
140
141    Args:
142      str_len: an integer, length of string pointed to by pResult.
143      pResult: a c_char_p, pointer to result.
144    """
145    out = []
146    for i in range(str_len):
147        if i and not i % 32:
148            out.append('\t')
149        if not i % 4:
150            out.append(' ')
151        b = pResult.value[i]
152        out.append('%02x' % ord(b))
153    logging.info('  TPM Test Results: %s', ''.join(out))
154
155
156class TpmController(object):
157    """Object to interact with TPM module for testing."""
158
159    def __init__(self):
160        """Constructor.
161
162        Mandatory params:
163          hContext: a c_uint32, context object handle.
164          _contextSet: a boolean, True if TPM context is set.
165          hTpm: a c_uint32, TPM object handle.
166          hTpmPolicy: a c_uint32, TPM policy object handle.
167          tspi_lib: a shared library object (libtspi.so).
168
169        Raises:
170          SmogcheckError: if error initializing TpmController.
171        """
172        self.hContext = c_uint32(0)
173        self._contextSet = False
174        self.hTpm = c_uint32(0)
175        self.hTpmPolicy = c_uint32(0)
176
177        logging.info('Attempt to load shared library %s', TSPI_C_LIB)
178        try:
179            self.tspi_lib = cdll.LoadLibrary(TSPI_C_LIB)
180        except OSError as e:
181            raise SmogcheckError('Error loading C library %s: %r' %
182                                 (TSPI_C_LIB, e))
183        logging.info('Successfully loaded shared library %s', TSPI_C_LIB)
184
185    def closeContext(self):
186        """Closes TPM context and cleans up.
187
188        Returns:
189          an integer, 0 for success and -1 for error.
190        """
191        if not self._contextSet:
192            logging.debug('TPM context NOT set.')
193            return 0
194
195        ret = -1
196        # Calling the pointer type without an argument creates a NULL pointer
197        if self.tspi_lib.Tspi_Context_FreeMemory(self.hContext,
198                                                 POINTER(c_byte)()) != 0:
199            logging.error('Error freeing memory when closing TPM context')
200        else:
201            logging.debug('Tspi_Context_FreeMemory() success')
202
203        if self.tspi_lib.Tspi_Context_Close(self.hContext) != 0:
204            logging.error('Error closing TPM context')
205        else:
206            logging.debug('Tspi_Context_Close() success')
207            ret = 0
208            self._contextSet = False
209
210        return ret
211
212    def _closeContextObject(self, hObject):
213        """Closes TPM context object.
214
215        Args:
216          hObject: an integer, basic object handle.
217
218        Raises:
219          SmogcheckError: if an error is encountered.
220        """
221        if self.tspi_lib.Tspi_Context_CloseObject(self.hContext, hObject) != 0:
222            raise SmogcheckError('Error closing TPM context object')
223
224        logging.debug('Tspi_Context_CloseObject() success')
225
226    def setupContext(self):
227        """Sets up tspi context for TPM access.
228
229        TPM context cannot be reused. Therefore, each new Tspi_* command would
230        require a new context to be set up before execution and closing that
231        context after execution (or error).
232
233        Raises:
234          SmogcheckError: if an error is encountered.
235        """
236        if self._contextSet:
237            logging.debug('TPM context already set.')
238            return
239
240        if self.tspi_lib.Tspi_Context_Create(byref(self.hContext)) != 0:
241            raise SmogcheckError('Error creating tspi context')
242
243        logging.info('Created tspi context = 0x%x', self.hContext.value)
244
245        if self.tspi_lib.Tspi_Context_Connect(self.hContext,
246                                              POINTER(c_uint16)()) != 0:
247            raise SmogcheckError('Error connecting to tspi context')
248
249        logging.info('Connected to tspi context')
250
251        if self.tspi_lib.Tspi_Context_GetTpmObject(self.hContext,
252                                                   byref(self.hTpm)) != 0:
253            raise SmogcheckError('Error getting TPM object from tspi context')
254
255        logging.info('Got tpm object from tspi context = 0x%x', self.hTpm.value)
256        self._contextSet = True
257
258    def _getTpmStatus(self, flag, bValue):
259        """Wrapper function to call Tspi_TPM_GetStatus().
260
261        Args:
262          flag: a c_uint, TPM status info flag, values defined in C header file
263                "tss/tss_defines.h".
264          bValue: a c_bool, place holder for specific TPM flag bit value (0/1).
265
266        Raises:
267          SmogcheckError: if an error is encountered.
268        """
269        result = self.tspi_lib.Tspi_TPM_GetStatus(self.hTpm, flag,
270                                                  byref(bValue))
271        if result != 0:
272            msg = ('Error (0x%x) getting status for flag 0x%x' %
273                   (result, flag.value))
274            raise SmogcheckError(msg)
275
276        logging.info('Tspi_TPM_GetStatus(): success for flag 0x%x',
277                     flag.value)
278
279    def _setTpmStatus(self, flag, bValue):
280        """Wrapper function to call Tspi_TPM_GetStatus().
281
282        Args:
283          flag: a c_uint, TPM status info flag.
284          bValue: a c_bool, place holder for specific TPM flag bit value (0/1).
285
286        Raises:
287          SmogcheckError: if an error is encountered.
288        """
289        result = self.tspi_lib.Tspi_TPM_SetStatus(self.hTpm, flag, bValue)
290        if result != 0:
291            msg = ('Error (0x%x) setting status for flag 0x%x' %
292                   (result, flag.value))
293            raise SmogcheckError(msg)
294
295        logging.info('Tspi_TPM_SetStatus(): success for flag 0x%x',
296                     flag.value)
297
298    def getPolicyObject(self, hTpm=None, hPolicy=None):
299        """Get TPM policy object.
300
301        Args:
302          hTpm: a c_uint, TPM object handle.
303          hPolicy: a c_uint, TPM policy object handle.
304
305        Raises:
306          SmogcheckError: if an error is encountered.
307        """
308        if hTpm is None:
309            hTpm = self.hTpm
310
311        if hPolicy is None:
312            hPolicy = self.hTpmPolicy
313
314        logging.debug('Tspi_GetPolicyObject(): hTpm = 0x%x, hPolicy = 0x%x',
315                      hTpm.value, hPolicy.value)
316        result = self.tspi_lib.Tspi_GetPolicyObject(hTpm, TSS_POLICY_USAGE,
317                                                    byref(hPolicy))
318        if result != 0:
319            msg = 'Error (0x%x) getting TPM policy object' % result
320            raise SmogcheckError(msg)
321
322        logging.debug('Tspi_GetPolicyObject() success hTpm = 0x%x, '
323                      'hPolicy = 0x%x', hTpm.value, hPolicy.value)
324
325    def setPolicySecret(self, hPolicy=None, pSecret=None, secret_mode=None):
326        """Sets TPM policy secret.
327
328        Args:
329          hPolicy: a c_uint, TPM policy object handle.
330          pSecret: a pointer to a byte array, which holds the TSS secret.
331          secret_mode: a string, valid values are keys of TSS_SECRET_MODE.
332
333        Raises:
334          SmogcheckError: if an error is encountered.
335        """
336        if hPolicy is None:
337            hPolicy = self.hTpmPolicy
338
339        if pSecret is None:
340            raise SmogcheckError('setPolicySecret(): pSecret cannot be None')
341
342        if secret_mode is None or secret_mode not in TSS_SECRET_MODE:
343            raise SmogcheckError('setPolicySecret(): invalid secret_mode')
344
345        logging.debug('Tspi_Policy_SetSecret(): hPolicy = 0x%x, secret_mode '
346                      '(%r) = %r', hPolicy.value, secret_mode,
347                      TSS_SECRET_MODE[secret_mode])
348
349        result = self.tspi_lib.Tspi_Policy_SetSecret(
350            hPolicy, TSS_SECRET_MODE[secret_mode], TPM_SHA1_160_HASH_LEN,
351            pSecret)
352        if result != 0:
353            msg = 'Error (0x%x) setting TPM policy secret' % result
354            raise SmogcheckError(msg)
355
356        logging.debug('Tspi_Policy_SetSecret() success, hPolicy = 0x%x',
357                      hPolicy.value)
358
359    def getTpmVersion(self):
360        """Gets TPM version info.
361
362        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_version.c
363        Downloaded from:
364          http://sourceforge.net/projects/trousers/files/tpm-tools/1.3.4/\
365          tpm-tools-1.3.4.tar.gz
366
367        Raises:
368          SmogcheckError: if an error is encountered.
369        """
370        uiResultLen = c_uint32(0)
371        pResult = c_char_p()
372        offset = c_uint64(0)
373        versionInfo = TpmCapVersionInfo()
374        InitVersionInfo(versionInfo)
375
376        logging.debug('Successfully set up tspi context: hTpm = %r', self.hTpm)
377
378        result = self.tspi_lib.Tspi_TPM_GetCapability(
379            self.hTpm, TSS_TPMCAP_VERSION_VAL, 0, POINTER(c_byte)(),
380            byref(uiResultLen), byref(pResult))
381        if result != 0:
382            msg = 'Error (0x%x) getting TPM capability, pResult = %r' % (
383                result, pResult.value)
384            raise SmogcheckError(msg)
385
386        logging.info('Successfully received TPM capability: '
387                     'uiResultLen = %d, pResult=%r', uiResultLen.value,
388                     pResult.value)
389        result = self.tspi_lib.Trspi_UnloadBlob_CAP_VERSION_INFO(
390            byref(offset), pResult, cast(byref(versionInfo),
391                                         POINTER(c_byte)))
392        if result != 0:
393            msg = 'Error (0x%x) unloading TPM CAP version info' % result
394            raise SmogcheckError(msg)
395
396        PrintVersionInfo(versionInfo)
397
398    def runTpmSelfTest(self):
399        """Executes TPM self test.
400
401        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_selftest.c
402
403        Raises:
404          SmogcheckError: if an error is encountered.
405        """
406        uiResultLen = c_uint32(0)
407        pResult = c_char_p()
408        self.setupContext()
409
410        logging.debug('Successfully set up tspi context: hTpm = 0x%x',
411                      self.hTpm.value)
412
413        result = self.tspi_lib.Tspi_TPM_SelfTestFull(self.hTpm)
414        if result != 0:
415            self.closeContext()
416            raise SmogcheckError('Error (0x%x) with TPM self test' % result)
417
418        logging.info('Successfully executed TPM self test: hTpm = 0x%x',
419                     self.hTpm.value)
420        result = self.tspi_lib.Tspi_TPM_GetTestResult(
421            self.hTpm, byref(uiResultLen), byref(pResult))
422        if result != 0:
423            self.closeContext()
424            raise SmogcheckError('Error (0x%x) getting test results' % result)
425
426        logging.info('TPM self test results: uiResultLen = %d, pResult=%r',
427                     uiResultLen.value, pResult.value)
428        PrintSelfTestResult(uiResultLen.value, pResult)
429        self.closeContext()
430
431    def takeTpmOwnership(self):
432        """Take TPM ownership.
433
434        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_takeownership.c
435
436        Raises:
437          SmogcheckError: if an error is encountered.
438        """
439        hSrk = c_uint32(0)  # TPM Storage Root Key
440        hSrkPolicy = c_uint32(0)
441        # Defaults each byte value to 0x00
442        well_known_secret = create_string_buffer(20)
443        pSecret = c_char_p(addressof(well_known_secret))
444
445        self.setupContext()
446        logging.debug('Successfully set up tspi context: hTpm = 0x%x',
447                      self.hTpm.value)
448
449        try:
450            self.getPolicyObject()
451            self.setPolicySecret(pSecret=pSecret, secret_mode='sha1')
452        except SmogcheckError:
453            if hSrk != 0:
454                self._closeContextObject(hSrk)
455            self.closeContext()
456            raise  # re-raise
457
458        flag = TSS_KEY_TSP_SRK.value | TSS_KEY_AUTHORIZATION.value
459        result = self.tspi_lib.Tspi_Context_CreateObject(
460            self.hContext, TSS_OBJECT_TYPE_RSAKEY, flag, byref(hSrk))
461        if result != 0:
462            raise SmogcheckError('Error (0x%x) creating context object' %
463                                 result)
464        logging.debug('hTpm = 0x%x, flag = 0x%x, hSrk = 0x%x',
465                      self.hTpm.value, flag, hSrk.value)  # DEBUG
466
467        try:
468            self.getPolicyObject(hTpm=hSrk, hPolicy=hSrkPolicy)
469            self.setPolicySecret(hPolicy=hSrkPolicy, pSecret=pSecret,
470                                 secret_mode='sha1')
471        except SmogcheckError:
472            if hSrk != 0:
473                self._closeContextObject(hSrk)
474            self.closeContext()
475            raise  # re-raise
476
477        logging.debug('Successfully set up SRK policy: secret = %r, '
478                      'hSrk = 0x%x, hSrkPolicy = 0x%x',
479                      well_known_secret.value, hSrk.value, hSrkPolicy.value)
480
481        start_time = datetime.datetime.now()
482        result = self.tspi_lib.Tspi_TPM_TakeOwnership(self.hTpm, hSrk,
483                                                      c_uint(0))
484        end_time = datetime.datetime.now()
485        if result != 0:
486            logging.info('Tspi_TPM_TakeOwnership error')
487            self._closeContextObject(hSrk)
488            self.closeContext()
489            raise SmogcheckError('Error (0x%x) taking TPM ownership' % result)
490
491        logging.info('Successfully took TPM ownership')
492        self._closeContextObject(hSrk)
493        self.closeContext()
494        return smogcheck_ttci.computeTimeElapsed(end_time, start_time)
495
496    def clearTpm(self):
497        """Return TPM to default state.
498
499        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_clear.c
500
501        Raises:
502          SmogcheckError: if an error is encountered.
503        """
504        logging.debug('Successfully set up tspi context: hTpm = %r', self.hTpm)
505
506        result = self.tspi_lib.Tspi_TPM_ClearOwner(self.hTpm, True)
507        if result != 0:
508            raise SmogcheckError('Error (0x%x) clearing TPM' % result)
509
510        logging.info('Successfully cleared TPM')
511
512    def setTpmActive(self, op):
513        """Change TPM active state.
514
515        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_activate.c
516
517        Args:
518          op: a string, desired operation. Valid values are defined in
519              TPM_SETACTIVE_OP.
520
521        Raises:
522          SmogcheckError: if an error is encountered.
523        """
524        bValue = c_bool()
525        # Defaults each byte value to 0x00
526        well_known_secret = create_string_buffer(20)
527        pSecret = c_char_p(addressof(well_known_secret))
528
529        if op not in TPM_SETACTIVE_OP:
530            msg = ('Invalid op (%s) for tpmSetActive(). Valid values are %r' %
531                   (op, TPM_SETACTIVE_OP))
532            raise SmogcheckError(msg)
533
534        logging.debug('Successfully set up tspi context: hTpm = %r', self.hTpm)
535
536        if op == 'status':
537            self.getPolicyObject()
538            self.setPolicySecret(pSecret=pSecret, secret_mode='sha1')
539
540            self._getTpmStatus(
541                TSS_TPMSTATUS_PHYSICALSETDEACTIVATED, bValue)
542            logging.info('Persistent Deactivated Status: %s', bValue.value)
543
544            self._getTpmStatus(
545                TSS_TPMSTATUS_SETTEMPDEACTIVATED, bValue)
546            logging.info('Volatile Deactivated Status: %s', bValue.value)
547        elif op == 'activate':
548            self._setTpmStatus(
549                TSS_TPMSTATUS_PHYSICALSETDEACTIVATED, False)
550            logging.info('Successfully activated TPM')
551        elif op == 'deactivate':
552            self._setTpmStatus(
553                TSS_TPMSTATUS_PHYSICALSETDEACTIVATED, True)
554            logging.info('Successfully deactivated TPM')
555        elif op == 'temp':
556            self._setTpmStatus(
557                TSS_TPMSTATUS_SETTEMPDEACTIVATED, True)
558            logging.info('Successfully deactivated TPM for current boot')
559
560    def setTpmClearable(self, op):
561        """Disable TPM clear operations.
562
563        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_clearable.c
564
565        Args:
566          op: a string, desired operation. Valid values are defined in
567              TPM_SETCLEARABLE_OP.
568
569        Raises:
570          SmogcheckError: if an error is encountered.
571        """
572        bValue = c_bool()
573        # Defaults each byte value to 0x00
574        well_known_secret = create_string_buffer(20)
575        pSecret = c_char_p(addressof(well_known_secret))
576
577        if op not in TPM_SETCLEARABLE_OP:
578            msg = ('Invalid op (%s) for tpmSetClearable(). Valid values are %r'
579                   % (op, TPM_SETCLEARABLE_OP))
580            raise SmogcheckError(msg)
581
582        logging.debug('Successfully set up tspi context: hTpm = %r', self.hTpm)
583
584        if op == 'status':
585            self.getPolicyObject()
586            self.setPolicySecret(pSecret=pSecret, secret_mode='sha1')
587
588            self._getTpmStatus(
589                TSS_TPMSTATUS_DISABLEOWNERCLEAR, bValue)
590            logging.info('Owner Clear Disabled: %s', bValue.value)
591
592            self._getTpmStatus(
593                TSS_TPMSTATUS_DISABLEFORCECLEAR, bValue)
594            logging.info('Force Clear Disabled: %s', bValue.value)
595        elif op == 'owner':
596            self.getPolicyObject()
597            self.setPolicySecret(pSecret=pSecret, secret_mode='sha1')
598
599            self._setTpmStatus(
600                TSS_TPMSTATUS_DISABLEOWNERCLEAR, False)
601            logging.info('Successfully disabled Owner Clear')
602        elif op == 'force':
603            self._setTpmStatus(
604                TSS_TPMSTATUS_DISABLEFORCECLEAR, True)
605            logging.info('Successfully disabled Force Clear')
606