1# Copyright (c) 2011-2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections, logging, os
6
7from autotest_lib.client.bin import test, utils
8from autotest_lib.client.common_lib import error
9from autotest_lib.client.cros import rtc
10from autotest_lib.client.cros.power import sys_power
11
12# TODO(tbroch) WOL:
13# - Should we test any of the other modes?  I chose magic as it meant that only
14#   the target device should be awaken.
15
16class network_EthCaps(test.test):
17    """Base class of EthCaps test.
18
19    Verify Capabilities advertised by an ethernet device work.
20    We can't verify much in reality though. But we can verify
21    WOL for built-in devices which is expected to work.
22
23    @param test.test: test instance
24    """
25    version = 1
26
27    # If WOL setting changed during test then restore to original during cleanup
28    _restore_wol = False
29
30
31    def _is_usb(self):
32        """Determine if device is USB (or not)
33
34        Add-on USB devices won't report the same 'Supports Wake-on' value
35        as built-in (ie PCI) ethernet devices.
36        """
37        if not self._bus_info:
38            cmd = "ethtool -i %s | awk '/bus-info/ {print $2}'" % self._ethname
39            self._bus_info = utils.system_output(cmd)
40            logging.debug("bus_info is %s", self._bus_info)
41            if not self._bus_info:
42                logging.error("ethtool -i %s has no bus-info", self._ethname)
43
44        # Two bus_info formats are reported by different device drivers:
45        # 1) "usb-0000:00:1d.0-1.2"
46        #    "0000:00:1d.0" is the "platform" info of the USB host controller
47        #    But it's obvious it's USB since that's the prefix. :)
48        if self._bus_info.startswith('usb-'):
49            return True
50
51        # 2) "2-1.2" where "2-" is USB host controller instance
52        return os.path.exists("/sys/bus/usb/devices/%s" % self._bus_info)
53
54    def _parse_ethtool_caps(self):
55        """Retrieve ethernet capabilities.
56
57        Executes ethtool command and parses various capabilities into a
58        dictionary.
59        """
60        caps = collections.defaultdict(list)
61
62        cmd = "ethtool %s" % self._ethname
63        prev_keyname = None
64        for ln in utils.system_output(cmd).splitlines():
65            cap_str = ln.strip()
66            try:
67                (keyname, value) = cap_str.split(': ')
68                caps[keyname].extend(value.split())
69                prev_keyname = keyname
70            except ValueError:
71                # keyname from previous line, add there
72                if prev_keyname:
73                    caps[prev_keyname].extend(cap_str.split())
74
75        for keyname in caps:
76            logging.debug("cap['%s'] = %s", keyname, caps[keyname])
77
78        self._caps = caps
79
80
81    def _check_eth_caps(self):
82        """Check necessary LAN capabilities are present.
83
84        Hardware and driver should support the following functionality:
85          1000baseT, 100baseT, 10baseT, half-duplex, full-duplex, auto-neg, WOL
86
87        Raises:
88          error.TestError if above LAN capabilities are NOT supported.
89        """
90        default_eth_caps = {
91            'Supported link modes': ['10baseT/Half', '100baseT/Half',
92                                      '1000baseT/Half', '10baseT/Full',
93                                      '100baseT/Full', '1000baseT/Full'],
94            'Supports auto-negotiation': ['Yes'],
95            # TODO(tbroch): Other WOL caps: 'a': arp and 's': magicsecure are
96            # they important?  Are any of these undesirable/security holes?
97            'Supports Wake-on': ['pumbg']
98            }
99        errors = 0
100
101        for keyname in default_eth_caps:
102            if keyname not in self._caps:
103                logging.error("\'%s\' not a capability of %s", keyname,
104                              self._ethname)
105                errors += 1
106                continue
107
108            for value in default_eth_caps[keyname]:
109                if value not in self._caps[keyname]:
110                    # WOL not required for USB Ethernet plug-in devices
111                    # But all USB Ethernet devices to date report "pg".
112                    # Enforce that.
113                    # RTL8153 can report 'pumbag'.
114                    # AX88178 can report 'pumbg'.
115                    if self._is_usb() and keyname == 'Supports Wake-on':
116                        if (self._caps[keyname][0].find('p') >= 0) and \
117                            (self._caps[keyname][0].find('g') >= 0):
118                            continue
119
120                    logging.error("\'%s\' not a supported mode in \'%s\' of %s",
121                                  value, keyname, self._ethname)
122                    errors += 1
123
124        if errors:
125            raise error.TestError("Eth capability checks.  See errors")
126
127
128    def _test_wol_magic_packet(self):
129        """Check the Wake-on-LAN (WOL) magic packet capabilities of a device.
130
131        Raises:
132          error.TestError if WOL functionality fails
133        """
134        # Magic number WOL supported
135        capname = 'Supports Wake-on'
136        if self._caps[capname][0].find('g') != -1:
137            logging.info("%s support magic number WOL", self._ethname)
138        else:
139            raise error.TestError('%s should support magic number WOL' %
140                            self._ethname)
141
142        # Check that WOL works
143        if self._caps['Wake-on'][0] != 'g':
144            utils.system_output("ethtool -s %s wol g" % self._ethname)
145            self._restore_wol = True
146
147        # Set RTC as backup to WOL
148        before_secs = rtc.get_seconds()
149        alarm_secs =  before_secs + self._suspend_secs + self._threshold_secs
150        rtc.set_wake_alarm(alarm_secs)
151
152        sys_power.do_suspend(self._suspend_secs)
153
154        after_secs = rtc.get_seconds()
155        # flush RTC as it may not work subsequently if wake was not RTC
156        rtc.set_wake_alarm(0)
157
158        suspended_secs = after_secs - before_secs
159        if suspended_secs >= (self._suspend_secs + self._threshold_secs):
160            raise error.TestError("Device woke due to RTC not WOL")
161
162
163    def _verify_wol_magic(self):
164        """If possible identify wake source was caused by WOL.
165
166        The bits identifying the wake source may be cleared by the time
167        userspace gets a chance to query the kernel.  However, firmware
168        might have a log and expose the wake source.  Attempt to interrogate
169        the wake source details if they are present on the system.
170
171        Returns:
172          True if verified or unable to verify due to system limitations
173          False otherwise
174        """
175        fw_log = "/sys/firmware/log"
176        if not os.path.isfile(fw_log):
177            logging.warning("Unable to verify wake in s/w due to missing log %s",
178                         fw_log)
179            return True
180
181        log_info_str = utils.system_output("egrep '(SMI|PM1|GPE0)_STS:' %s" %
182                                           fw_log)
183        status_dict = {}
184        for ln in log_info_str.splitlines():
185            logging.debug("f/w line = %s", ln)
186            try:
187                (status_reg, status_values) = ln.strip().split(":")
188                status_dict[status_reg] = status_values.split()
189            except ValueError:
190                # no bits asserted ... empty list
191                status_dict[status_reg] = list()
192
193        for status_reg in status_dict:
194            logging.debug("status_dict[%s] = %s", status_reg,
195                          status_dict[status_reg])
196
197        return ('PM1' in status_dict['SMI_STS']) and \
198            ('WAK' in status_dict['PM1_STS']) and \
199            ('PCIEXPWAK' in status_dict['PM1_STS']) and \
200            len(status_dict['GPE0_STS']) == 0
201
202
203    def cleanup(self):
204        if self._restore_wol:
205            utils.system_output("ethtool -s %s wol %s" %
206                                (self._ethname, self._caps['Wake-on'][0]))
207
208
209    def run_once(self, ethname=None, suspend_secs=5, threshold_secs=10):
210        """Run the test.
211
212        Args:
213          ethname: string of ethernet device under test
214          threshold_secs: integer of seconds to determine whether wake occurred
215            due to WOL versus RTC
216        """
217        if not ethname:
218            raise error.TestError("Name of ethernet device must be declared")
219
220        self._ethname = ethname
221        self._threshold_secs = threshold_secs
222        self._suspend_secs = suspend_secs
223        self._bus_info = None
224
225        self._parse_ethtool_caps()
226        self._check_eth_caps()
227
228        # ChromeOS does not require WOL support for any USB Ethernet Adapters.
229        # In fact, WoL only known to work for PCIe Ethernet devices.
230        # We know _some_ platforms power off all USB ports when suspended.
231        # USB adapters with "pg" capabilities _might_ WoL on _some_ platforms.
232        # White list/black listing of platforms will be required to test
233        # WoL against USB dongles in the future.
234        if self._is_usb():
235            logging.debug("Skipping WOL test on USB Ethernet device.")
236            return
237
238        self._test_wol_magic_packet()
239        # TODO(tbroch) There is evidence in the filesystem of the wake source
240        # for coreboot but its still being flushed out.  For now only produce a
241        # warning for this check.
242        if not self._verify_wol_magic():
243            logging.warning("Unable to see evidence of WOL wake in filesystem")
244