1#!/usr/bin/python
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import itertools
7import mock
8import unittest
9
10import common
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.common_lib import hosts
13from autotest_lib.client.common_lib.cros import retry
14from autotest_lib.server.hosts import cros_firmware
15from autotest_lib.server.hosts import cros_repair
16from autotest_lib.server.hosts import repair_utils
17
18
19CROS_VERIFY_DAG = (
20    (repair_utils.SshVerifier, 'ssh', ()),
21    (cros_repair.DevModeVerifier, 'devmode', ('ssh',)),
22    (cros_repair.HWIDVerifier,    'hwid',    ('ssh',)),
23    (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
24    (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)),
25    (cros_repair.WritableVerifier, 'writable', ('ssh',)),
26    (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)),
27    (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)),
28    (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)),
29    (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
30    (cros_repair.PythonVerifier, 'python', ('ssh',)),
31    (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
32    (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)),
33)
34
35CROS_REPAIR_ACTIONS = (
36    (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
37    (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)),
38    (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)),
39    (cros_firmware.FirmwareRepair,
40     'firmware', (), ('ssh', 'fwstatus', 'good_au')),
41    (cros_repair.CrosRebootRepair,
42     'reboot', ('ssh',), ('devmode', 'writable',)),
43    (cros_repair.ColdRebootRepair,
44     'coldboot', ('ssh',), ('ec_reset',)),
45    (cros_repair.AutoUpdateRepair,
46     'au',
47     ('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
48     ('power', 'rwfw', 'python', 'cros')),
49    (cros_repair.PowerWashRepair,
50     'powerwash',
51     ('ssh', 'writable'),
52     ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros')),
53    (cros_repair.ServoInstallRepair,
54     'usb',
55     (),
56     ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw',
57      'python', 'cros')),
58)
59
60MOBLAB_VERIFY_DAG = (
61    (repair_utils.SshVerifier, 'ssh', ()),
62    (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
63    (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
64    (cros_repair.PythonVerifier, 'python', ('ssh',)),
65    (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
66)
67
68MOBLAB_REPAIR_ACTIONS = (
69    (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
70    (cros_repair.AutoUpdateRepair,
71     'au', ('ssh',), ('power', 'rwfw', 'python', 'cros',)),
72)
73
74JETSTREAM_VERIFY_DAG = (
75    (repair_utils.SshVerifier, 'ssh', ()),
76    (cros_repair.DevModeVerifier, 'devmode', ('ssh',)),
77    (cros_repair.HWIDVerifier,    'hwid',    ('ssh',)),
78    (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
79    (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)),
80    (cros_repair.WritableVerifier, 'writable', ('ssh',)),
81    (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)),
82    (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)),
83    (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)),
84    (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
85    (cros_repair.PythonVerifier, 'python', ('ssh',)),
86    (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
87    (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)),
88    (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh',)),
89    (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation',
90     ('ssh',)),
91    (cros_repair.JetstreamServicesVerifier, 'jetstream_services', ('ssh',)),
92)
93
94JETSTREAM_REPAIR_ACTIONS = (
95    (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
96    (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)),
97    (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)),
98    (cros_firmware.FirmwareRepair,
99     'firmware', (), ('ssh', 'fwstatus', 'good_au')),
100    (cros_repair.CrosRebootRepair,
101     'reboot', ('ssh',), ('devmode', 'writable',)),
102    (cros_repair.ColdRebootRepair,
103     'coldboot', ('ssh',), ('ec_reset',)),
104    (cros_repair.JetstreamTpmRepair,
105     'jetstream_tpm_repair',
106     ('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
107     ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
108      'jetstream_attestation')),
109    (cros_repair.JetstreamServiceRepair,
110     'jetstream_service_repair',
111     ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'jetstream_tpm',
112      'jetstream_attestation'),
113     ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
114      'jetstream_attestation', 'jetstream_services')),
115    (cros_repair.AutoUpdateRepair,
116     'au',
117     ('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
118     ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
119      'jetstream_attestation', 'jetstream_services')),
120    (cros_repair.PowerWashRepair,
121     'powerwash',
122     ('ssh', 'writable'),
123     ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros',
124      'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')),
125    (cros_repair.ServoInstallRepair,
126     'usb',
127     (),
128     ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python',
129      'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')),
130)
131
132CRYPTOHOME_STATUS_OWNED = """{
133   "installattrs": {
134      "first_install": true,
135      "initialized": true,
136      "invalid": false,
137      "lockbox_index": 536870916,
138      "lockbox_nvram_version": 2,
139      "secure": true,
140      "size": 0,
141      "version": 1
142   },
143   "mounts": [  ],
144   "tpm": {
145      "being_owned": false,
146      "can_connect": true,
147      "can_decrypt": false,
148      "can_encrypt": false,
149      "can_load_srk": true,
150      "can_load_srk_pubkey": true,
151      "enabled": true,
152      "has_context": true,
153      "has_cryptohome_key": false,
154      "has_key_handle": false,
155      "last_error": 0,
156      "owned": true
157   }
158}
159"""
160
161CRYPTOHOME_STATUS_NOT_OWNED = """{
162   "installattrs": {
163      "first_install": true,
164      "initialized": true,
165      "invalid": false,
166      "lockbox_index": 536870916,
167      "lockbox_nvram_version": 2,
168      "secure": true,
169      "size": 0,
170      "version": 1
171   },
172   "mounts": [  ],
173   "tpm": {
174      "being_owned": false,
175      "can_connect": true,
176      "can_decrypt": false,
177      "can_encrypt": false,
178      "can_load_srk": false,
179      "can_load_srk_pubkey": false,
180      "enabled": true,
181      "has_context": true,
182      "has_cryptohome_key": false,
183      "has_key_handle": false,
184      "last_error": 0,
185      "owned": false
186   }
187}
188"""
189
190CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{
191   "installattrs": {
192      "first_install": true,
193      "initialized": true,
194      "invalid": false,
195      "lockbox_index": 536870916,
196      "lockbox_nvram_version": 2,
197      "secure": true,
198      "size": 0,
199      "version": 1
200   },
201   "mounts": [  ],
202   "tpm": {
203      "being_owned": false,
204      "can_connect": true,
205      "can_decrypt": false,
206      "can_encrypt": false,
207      "can_load_srk": false,
208      "can_load_srk_pubkey": false,
209      "enabled": true,
210      "has_context": true,
211      "has_cryptohome_key": false,
212      "has_key_handle": false,
213      "last_error": 0,
214      "owned": true
215   }
216}
217"""
218
219TPM_STATUS_READY = """
220TPM Enabled: true
221TPM Owned: true
222TPM Being Owned: false
223TPM Ready: true
224TPM Password: 9eaee4da8b4c
225"""
226
227TPM_STATUS_NOT_READY = """
228TPM Enabled: true
229TPM Owned: false
230TPM Being Owned: true
231TPM Ready: false
232TPM Password:
233"""
234
235
236class CrosRepairUnittests(unittest.TestCase):
237    # pylint: disable=missing-docstring
238
239    maxDiff = None
240
241    def test_cros_repair(self):
242        verify_dag = cros_repair._cros_verify_dag()
243        self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG)
244        self.check_verify_dag(verify_dag)
245        repair_actions = cros_repair._cros_repair_actions()
246        self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS)
247        self.check_repair_actions(verify_dag, repair_actions)
248
249    def test_moblab_repair(self):
250        verify_dag = cros_repair._moblab_verify_dag()
251        self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG)
252        self.check_verify_dag(verify_dag)
253        repair_actions = cros_repair._moblab_repair_actions()
254        self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS)
255        self.check_repair_actions(verify_dag, repair_actions)
256
257    def test_jetstream_repair(self):
258        verify_dag = cros_repair._jetstream_verify_dag()
259        self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG)
260        self.check_verify_dag(verify_dag)
261        repair_actions = cros_repair._jetstream_repair_actions()
262        self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS)
263        self.check_repair_actions(verify_dag, repair_actions)
264
265    def check_verify_dag(self, verify_dag):
266        """Checks that dependency labels are defined."""
267        labels = [n[1] for n in verify_dag]
268        for node in verify_dag:
269            for dep in node[2]:
270                self.assertIn(dep, labels)
271
272    def check_repair_actions(self, verify_dag, repair_actions):
273        """Checks that dependency and trigger labels are defined."""
274        verify_labels = [n[1] for n in verify_dag]
275        for action in repair_actions:
276            deps = action[2]
277            triggers = action[3]
278            for label in deps + triggers:
279                self.assertIn(label, verify_labels)
280
281    def test_get_cryptohome_status_owned(self):
282        mock_host = mock.Mock()
283        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
284        status = cros_repair.CryptohomeStatus(mock_host)
285        self.assertDictEqual({
286            'being_owned': False,
287            'can_connect': True,
288            'can_decrypt': False,
289            'can_encrypt': False,
290            'can_load_srk': True,
291            'can_load_srk_pubkey': True,
292            'enabled': True,
293            'has_context': True,
294            'has_cryptohome_key': False,
295            'has_key_handle': False,
296            'last_error': 0,
297            'owned': True,
298            }, status['tpm'])
299        self.assertTrue(status.tpm_enabled)
300        self.assertTrue(status.tpm_owned)
301        self.assertTrue(status.tpm_can_load_srk)
302        self.assertTrue(status.tpm_can_load_srk_pubkey)
303
304    def test_get_cryptohome_status_not_owned(self):
305        mock_host = mock.Mock()
306        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
307        status = cros_repair.CryptohomeStatus(mock_host)
308        self.assertDictEqual({
309            'being_owned': False,
310            'can_connect': True,
311            'can_decrypt': False,
312            'can_encrypt': False,
313            'can_load_srk': False,
314            'can_load_srk_pubkey': False,
315            'enabled': True,
316            'has_context': True,
317            'has_cryptohome_key': False,
318            'has_key_handle': False,
319            'last_error': 0,
320            'owned': False,
321        }, status['tpm'])
322        self.assertTrue(status.tpm_enabled)
323        self.assertFalse(status.tpm_owned)
324        self.assertFalse(status.tpm_can_load_srk)
325        self.assertFalse(status.tpm_can_load_srk_pubkey)
326
327    @mock.patch.object(cros_repair, '_is_virtual_machine')
328    def test_tpm_status_verifier_owned(self, mock_is_virt):
329        mock_is_virt.return_value = False
330        mock_host = mock.Mock()
331        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
332        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
333        tpm_verifier.verify(mock_host)
334
335    @mock.patch.object(cros_repair, '_is_virtual_machine')
336    def test_tpm_status_verifier_not_owned(self, mock_is_virt):
337        mock_is_virt.return_value = False
338        mock_host = mock.Mock()
339        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
340        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
341        tpm_verifier.verify(mock_host)
342
343    @mock.patch.object(cros_repair, '_is_virtual_machine')
344    def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt):
345        mock_is_virt.return_value = False
346        mock_host = mock.Mock()
347        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK
348        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
349        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
350            tpm_verifier.verify(mock_host)
351        self.assertEqual('Cannot load the TPM SRK',
352                         ctx.exception.message)
353
354    def test_jetstream_tpm_owned(self):
355        mock_host = mock.Mock()
356        mock_host.run.side_effect = [
357            mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
358            mock.Mock(stdout=TPM_STATUS_READY),
359        ]
360        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
361        tpm_verifier.verify(mock_host)
362
363    @mock.patch.object(retry.logging, 'warning')
364    @mock.patch.object(retry.time, 'time')
365    @mock.patch.object(retry.time, 'sleep')
366    def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging):
367        mock_time.side_effect = itertools.count(0, 20)
368        mock_host = mock.Mock()
369        mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
370        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
371        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
372            tpm_verifier.verify(mock_host)
373        self.assertEqual('TPM is not owned', ctx.exception.message)
374
375    @mock.patch.object(retry.logging, 'warning')
376    @mock.patch.object(retry.time, 'time')
377    @mock.patch.object(retry.time, 'sleep')
378    def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging):
379        mock_time.side_effect = itertools.count(0, 20)
380        mock_host = mock.Mock()
381        mock_host.run.side_effect = itertools.cycle([
382            mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
383            mock.Mock(stdout=TPM_STATUS_NOT_READY),
384        ])
385        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
386        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
387            tpm_verifier.verify(mock_host)
388        self.assertEqual('TPM is not ready', ctx.exception.message)
389
390    @mock.patch.object(retry.logging, 'warning')
391    @mock.patch.object(retry.time, 'time')
392    @mock.patch.object(retry.time, 'sleep')
393    def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time,
394                                          mock_logging):
395        mock_time.side_effect = itertools.count(0, 20)
396        mock_host = mock.Mock()
397        mock_host.run.side_effect = error.AutoservRunError('test', None)
398        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
399        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
400            tpm_verifier.verify(mock_host)
401        self.assertEqual('Could not determine TPM status',
402                         ctx.exception.message)
403
404
405if __name__ == '__main__':
406    unittest.main()
407