1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, re
6from autotest_lib.client.bin import utils
7from autotest_lib.client.common_lib import error
8from autotest_lib.client.cros import crash_test
9
10
11class logging_KernelCrash(crash_test.CrashTest):
12    """
13    Validates the contents of a kernel crash report.
14    """
15    version = 1
16
17    def _test_reporter_startup(self):
18        """Test that the crash_reporter is handling kernel crashes."""
19        if not self._log_reader.can_find('Enabling kernel crash handling'):
20            if not self._log_reader.can_find(
21                'Kernel does not support crash dumping'):
22                raise error.TestFail(
23                    'Could not find kernel crash enabling message')
24
25
26    def _get_kcrash_name(self):
27        filename_match = re.search(r'Stored kcrash to (\S+)',
28            self._log_reader.get_logs())
29        if not filename_match:
30            return None
31        return filename_match.group(1)
32
33
34    def _is_signature_match(self, signature):
35        # Update these as kernels evolve:
36        matches = r'write_breakme'    # for 2.6.38 kernels and 3.0.13 x86
37        matches += r'|breakme_do_bug' # for 3.2 kernels
38        matches += r'|__bug'          # for 3.0.13 ARM
39        matches += r'|lkdtm_do_action'# for 3.8.11 with lkdtm
40        regex = r'kernel-(' + matches + r')-[0-9A-F]{8}$'
41        return (re.match(regex, signature) is not None)
42
43    def _is_handled_reason(self, reason):
44        return (re.match(r'(handling|developer build - always dumping)$',
45                         reason) is not None)
46
47    def _test_reporter_kcrash_storage(self):
48        """Test that crash_reporter has properly stored the kcrash report."""
49        announce_match = re.search(
50            r'Received .* from kernel \(signature ([^\)]+)\) \(([^\)]+)\)',
51            self._log_reader.get_logs())
52
53        if not announce_match:
54            raise error.TestFail('Could not find kernel crash announcement')
55
56        logging.info('Signature: [%s]', announce_match.group(1))
57        logging.info('Reason: [%s]', announce_match.group(2))
58
59        if not self._is_signature_match(announce_match.group(1)):
60            raise error.TestFail(
61                'Kernel crash signature (%s) did not match expected pattern' %
62                announce_match.group(1))
63
64        kcrash_report = self._get_kcrash_name()
65
66        if self._consent:
67            if kcrash_report is None:
68                raise error.TestFail(
69                    'Could not find message with kcrash filename')
70            if not self._is_handled_reason(announce_match.group(2)):
71                raise error.TestFail('Did not announce handling of kcrash ' \
72                                     '(%s)' % (announce_match.group(2)))
73        else:
74            if kcrash_report is not None:
75                raise error.TestFail('Should not have found kcrash filename')
76            if announce_match.group(2) != 'ignoring - no consent':
77                raise error.TestFail('Did not announce ignoring of kcrash ' \
78                                     '(%s)' % (announce_match.group(2)))
79            return
80
81        if not os.path.exists(kcrash_report):
82            raise error.TestFail('Crash report %s gone' % kcrash_report)
83        report_contents = utils.read_file(kcrash_report)
84        src_re = r'kernel BUG at .*(fs/proc/breakme.c|drivers/misc/lkdtm.c)'
85        if re.search(src_re, report_contents) == None:
86            raise error.TestFail('Crash report has unexpected contents')
87
88
89    def _test_sender_send_kcrash(self):
90        """Test that crash_sender properly sends the crash report."""
91        if not self._consent:
92            return
93        kcrash_report = self._get_kcrash_name()
94        if not os.path.exists(kcrash_report):
95            raise error.TestFail('Crash report %s gone' % kcrash_report)
96        result = self._call_sender_one_crash(
97            report=os.path.basename(kcrash_report))
98        if (not result['send_attempt'] or not result['send_success'] or
99            result['report_exists']):
100            raise error.TestFail('kcrash not sent properly')
101        if result['exec_name'] != 'kernel' or result['report_kind'] != 'kcrash':
102            raise error.TestFail('kcrash exec name or report kind wrong ' \
103                                 '(exec_name: [%s] report_kind: [%s]' %
104                                 (result['exec_name'], result['report_kind']))
105        if result['report_payload'] != kcrash_report:
106            raise error.TestFail('Sent the wrong kcrash report')
107        if not self._is_signature_match(result['sig']):
108            raise error.TestFail('Sent the wrong kcrash signature')
109
110
111    def run_once(self, is_before, consent):
112        self._log_reader.set_start_by_reboot(-1)
113        # We manage consent saving across tests.
114        self._automatic_consent_saving = False
115        self._consent = consent
116        if is_before:
117            self.run_crash_tests(['reporter_startup'], must_run_all=False)
118            # Leave crash sending paused for the kernel crash.
119            self._leave_crash_sending = False
120        else:
121            self.run_crash_tests(['reporter_startup',
122                                  'reporter_kcrash_storage',
123                                  'sender_send_kcrash'],
124                                 clear_spool_first=False)
125