1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This class defines the CrosHost Label class."""
6
7import collections
8import logging
9import os
10import re
11
12import common
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import global_config
16from autotest_lib.client.cros.audio import cras_utils
17from autotest_lib.client.cros.video import constants as video_test_constants
18from autotest_lib.server.cros.dynamic_suite import constants as ds_constants
19from autotest_lib.server.hosts import base_label
20from autotest_lib.server.hosts import common_label
21from autotest_lib.server.hosts import servo_host
22from autotest_lib.site_utils import hwid_lib
23
24# pylint: disable=missing-docstring
25LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board'])
26
27def _parse_lsb_output(host):
28    """Parses the LSB output and returns key data points for labeling.
29
30    @param host: Host that the command will be executed against
31    @returns: LsbOutput with the result of parsing the /etc/lsb-release output
32    """
33    release_info = utils.parse_cmd_output('cat /etc/lsb-release',
34                                          run_method=host.run)
35
36    unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1'
37    return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD'])
38
39
40class BoardLabel(base_label.StringPrefixLabel):
41    """Determine the correct board label for the device."""
42
43    _NAME = ds_constants.BOARD_PREFIX.rstrip(':')
44
45    def generate_labels(self, host):
46        # We only want to apply the board labels once, which is when they get
47        # added to the AFE.  That way we don't have to worry about the board
48        # label switching on us if the wrong builds get put on the devices.
49        # crbug.com/624207 records one event of the board label switching
50        # unexpectedly on us.
51        board = host.host_info_store.get().board
52        if board:
53            return [board]
54        for label in host._afe_host.labels:
55            if label.startswith(self._NAME + ':'):
56                return [label.split(':')[-1]]
57
58        return [_parse_lsb_output(host).board]
59
60
61class ModelLabel(base_label.StringPrefixLabel):
62    """Determine the correct model label for the device."""
63
64    _NAME = ds_constants.MODEL_LABEL
65
66    def generate_labels(self, host):
67        # Based on the issue explained in BoardLabel, return the existing
68        # label if it has already been set once.
69        model = host.host_info_store.get().model
70        if model:
71            return [model]
72        for label in host._afe_host.labels:
73            if label.startswith(self._NAME + ':'):
74                return [label.split(':')[-1]]
75
76        lsb_output = _parse_lsb_output(host)
77        model = None
78
79        if lsb_output.unibuild:
80            test_label_cmd = 'cros_config / test-label'
81            result = host.run(command=test_label_cmd, ignore_status=True)
82            if result.exit_status == 0:
83                model = result.stdout.strip()
84            if not model:
85                mosys_cmd = 'mosys platform model'
86                result = host.run(command=mosys_cmd, ignore_status=True)
87                if result.exit_status == 0:
88                    model = result.stdout.strip()
89
90        # We need some sort of backwards compatibility for boards that
91        # are not yet supported with mosys and unified builds.
92        # This is necessary so that we can begin changing cbuildbot to take
93        # advantage of the model/board label differentiations for
94        # scheduling, while still retaining backwards compatibility.
95        return [model or lsb_output.board]
96
97
98class LightSensorLabel(base_label.BaseLabel):
99    """Label indicating if a light sensor is detected."""
100
101    _NAME = 'lightsensor'
102    _LIGHTSENSOR_SEARCH_DIR = '/sys/bus/iio/devices'
103    _LIGHTSENSOR_FILES = [
104        "in_illuminance0_input",
105        "in_illuminance_input",
106        "in_illuminance0_raw",
107        "in_illuminance_raw",
108        "illuminance0_input",
109    ]
110
111    def exists(self, host):
112        search_cmd = "find -L %s -maxdepth 4 | egrep '%s'" % (
113            self._LIGHTSENSOR_SEARCH_DIR, '|'.join(self._LIGHTSENSOR_FILES))
114        # Run the search cmd following the symlinks. Stderr_tee is set to
115        # None as there can be a symlink loop, but the command will still
116        # execute correctly with a few messages printed to stderr.
117        result = host.run(search_cmd, stdout_tee=None, stderr_tee=None,
118                          ignore_status=True)
119
120        return result.exit_status == 0
121
122
123class BluetoothLabel(base_label.BaseLabel):
124    """Label indicating if bluetooth is detected."""
125
126    _NAME = 'bluetooth'
127
128    def exists(self, host):
129        result = host.run('test -d /sys/class/bluetooth/hci0',
130                          ignore_status=True)
131
132        return result.exit_status == 0
133
134
135class ECLabel(base_label.BaseLabel):
136    """Label to determine the type of EC on this host."""
137
138    _NAME = 'ec:cros'
139
140    def exists(self, host):
141        cmd = 'mosys ec info'
142        # The output should look like these, so that the last field should
143        # match our EC version scheme:
144        #
145        #   stm | stm32f100 | snow_v1.3.139-375eb9f
146        #   ti | Unknown-10de | peppy_v1.5.114-5d52788
147        #
148        # Non-Chrome OS ECs will look like these:
149        #
150        #   ENE | KB932 | 00BE107A00
151        #   ite | it8518 | 3.08
152        #
153        # And some systems don't have ECs at all (Lumpy, for example).
154        regexp = r'^.*\|\s*(\S+_v\d+\.\d+\.\d+-[0-9a-f]+)\s*$'
155
156        ecinfo = host.run(command=cmd, ignore_status=True)
157        if ecinfo.exit_status == 0:
158            res = re.search(regexp, ecinfo.stdout)
159            if res:
160                logging.info("EC version is %s", res.groups()[0])
161                return True
162            logging.info("%s got: %s", cmd, ecinfo.stdout)
163            # Has an EC, but it's not a Chrome OS EC
164        logging.info("%s exited with status %d", cmd, ecinfo.exit_status)
165        return False
166
167
168class Cr50Label(base_label.StringPrefixLabel):
169    """Label indicating the cr50 version."""
170
171    _NAME = 'cr50'
172
173    def __init__(self):
174        self.ver = None
175
176
177    def exists(self, host):
178        # Make sure the gsctool version command runs ok
179        self.ver = host.run('gsctool -a -f', ignore_status=True)
180        return self.ver.exit_status == 0
181
182
183    def generate_labels(self, host):
184        # Check the major version to determine prePVT vs PVT
185        major_ver = int(re.search('RW \d+\.(\d+)\.\d+[\r\n]',
186                self.ver.stdout).group(1))
187        # PVT images have a odd major version prePVT have even
188        return ['pvt' if (major_ver % 2) else 'prepvt']
189
190
191class AccelsLabel(base_label.BaseLabel):
192    """Determine the type of accelerometers on this host."""
193
194    _NAME = 'accel:cros-ec'
195
196    def exists(self, host):
197        # Check to make sure we have ectool
198        rv = host.run('which ectool', ignore_status=True)
199        if rv.exit_status:
200            logging.info("No ectool cmd found; assuming no EC accelerometers")
201            return False
202
203        # Check that the EC supports the motionsense command
204        rv = host.run('ectool motionsense', ignore_status=True)
205        if rv.exit_status:
206            logging.info("EC does not support motionsense command; "
207                         "assuming no EC accelerometers")
208            return False
209
210        # Check that EC motion sensors are active
211        active = host.run('ectool motionsense active').stdout.split('\n')
212        if active[0] == "0":
213            logging.info("Motion sense inactive; assuming no EC accelerometers")
214            return False
215
216        logging.info("EC accelerometers found")
217        return True
218
219
220class ChameleonLabel(base_label.BaseLabel):
221    """Determine if a Chameleon is connected to this host."""
222
223    _NAME = 'chameleon'
224
225    def exists(self, host):
226        return host._chameleon_host is not None
227
228
229class ChameleonConnectionLabel(base_label.StringPrefixLabel):
230    """Return the Chameleon connection label."""
231
232    _NAME = 'chameleon'
233
234    def exists(self, host):
235        return host._chameleon_host is not None
236
237
238    def generate_labels(self, host):
239        return [host.chameleon.get_label()]
240
241
242class ChameleonPeripheralsLabel(base_label.StringPrefixLabel):
243    """Return the Chameleon peripherals labels.
244
245    The 'chameleon:bt_hid' label is applied if the bluetooth
246    classic hid device, i.e, RN-42 emulation kit, is detected.
247
248    Any peripherals plugged into the chameleon board would be
249    detected and applied proper labels in this class.
250    """
251
252    _NAME = 'chameleon'
253
254    def exists(self, host):
255        return host._chameleon_host is not None
256
257
258    def generate_labels(self, host):
259        bt_hid_device = host.chameleon.get_bluetooth_hid_mouse()
260        return ['bt_hid'] if bt_hid_device.CheckSerialConnection() else []
261
262
263class AudioLoopbackDongleLabel(base_label.BaseLabel):
264    """Return the label if an audio loopback dongle is plugged in."""
265
266    _NAME = 'audio_loopback_dongle'
267
268    def exists(self, host):
269        nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(),
270                              ignore_status=True).stdout
271        if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and
272            cras_utils.node_type_is_plugged('MIC', nodes_info)):
273                return True
274        return False
275
276
277class PowerSupplyLabel(base_label.StringPrefixLabel):
278    """
279    Return the label describing the power supply type.
280
281    Labels representing this host's power supply.
282         * `power:battery` when the device has a battery intended for
283                extended use
284         * `power:AC_primary` when the device has a battery not intended
285                for extended use (for moving the machine, etc)
286         * `power:AC_only` when the device has no battery at all.
287    """
288
289    _NAME = 'power'
290
291    def __init__(self):
292        self.psu_cmd_result = None
293
294
295    def exists(self, host):
296        self.psu_cmd_result = host.run(command='mosys psu type',
297                                       ignore_status=True)
298        return self.psu_cmd_result.stdout.strip() != 'unknown'
299
300
301    def generate_labels(self, host):
302        if self.psu_cmd_result.exit_status:
303            # The psu command for mosys is not included for all platforms. The
304            # assumption is that the device will have a battery if the command
305            # is not found.
306            return ['battery']
307        return [self.psu_cmd_result.stdout.strip()]
308
309
310class StorageLabel(base_label.StringPrefixLabel):
311    """
312    Return the label describing the storage type.
313
314    Determine if the internal device is SCSI or dw_mmc device.
315    Then check that it is SSD or HDD or eMMC or something else.
316
317    Labels representing this host's internal device type:
318             * `storage:ssd` when internal device is solid state drive
319             * `storage:hdd` when internal device is hard disk drive
320             * `storage:mmc` when internal device is mmc drive
321             * `storage:nvme` when internal device is NVMe drive
322             * `storage:ufs` when internal device is ufs drive
323             * None          When internal device is something else or
324                             when we are unable to determine the type
325    """
326
327    _NAME = 'storage'
328
329    def __init__(self):
330        self.type_str = ''
331
332
333    def exists(self, host):
334        # The output should be /dev/mmcblk* for SD/eMMC or /dev/sd* for scsi
335        rootdev_cmd = ' '.join(['. /usr/sbin/write_gpt.sh;',
336                                '. /usr/share/misc/chromeos-common.sh;',
337                                'load_base_vars;',
338                                'get_fixed_dst_drive'])
339        rootdev = host.run(command=rootdev_cmd, ignore_status=True)
340        if rootdev.exit_status:
341            logging.info("Fail to run %s", rootdev_cmd)
342            return False
343        rootdev_str = rootdev.stdout.strip()
344
345        if not rootdev_str:
346            return False
347
348        rootdev_base = os.path.basename(rootdev_str)
349
350        mmc_pattern = '/dev/mmcblk[0-9]'
351        if re.match(mmc_pattern, rootdev_str):
352            # Use type to determine if the internal device is eMMC or somthing
353            # else. We can assume that MMC is always an internal device.
354            type_cmd = 'cat /sys/block/%s/device/type' % rootdev_base
355            type = host.run(command=type_cmd, ignore_status=True)
356            if type.exit_status:
357                logging.info("Fail to run %s", type_cmd)
358                return False
359            type_str = type.stdout.strip()
360
361            if type_str == 'MMC':
362                self.type_str = 'mmc'
363                return True
364
365        scsi_pattern = '/dev/sd[a-z]+'
366        if re.match(scsi_pattern, rootdev.stdout):
367            # Read symlink for /sys/block/sd* to determine if the internal
368            # device is connected via ata or usb.
369            link_cmd = 'readlink /sys/block/%s' % rootdev_base
370            link = host.run(command=link_cmd, ignore_status=True)
371            if link.exit_status:
372                logging.info("Fail to run %s", link_cmd)
373                return False
374            link_str = link.stdout.strip()
375            if 'usb' in link_str:
376                return False
377            elif 'ufs' in link_str:
378              self.type_str = 'ufs'
379              return True
380
381            # Read rotation to determine if the internal device is ssd or hdd.
382            rotate_cmd = str('cat /sys/block/%s/queue/rotational'
383                              % rootdev_base)
384            rotate = host.run(command=rotate_cmd, ignore_status=True)
385            if rotate.exit_status:
386                logging.info("Fail to run %s", rotate_cmd)
387                return False
388            rotate_str = rotate.stdout.strip()
389
390            rotate_dict = {'0':'ssd', '1':'hdd'}
391            self.type_str = rotate_dict.get(rotate_str)
392            return True
393
394        nvme_pattern = '/dev/nvme[0-9]+n[0-9]+'
395        if re.match(nvme_pattern, rootdev_str):
396            self.type_str = 'nvme'
397            return True
398
399        # All other internal device / error case will always fall here
400        return False
401
402
403    def generate_labels(self, host):
404        return [self.type_str]
405
406
407class ServoLabel(base_label.BaseLabel):
408    """Label to apply if a servo is present."""
409
410    _NAME = 'servo'
411
412    def exists(self, host):
413        """
414        Check if the servo label should apply to the host or not.
415
416        @returns True if a servo host is detected, False otherwise.
417        """
418        servo_host_hostname = None
419        servo_args = servo_host.get_servo_args_for_host(host)
420        if servo_args:
421            servo_host_hostname = servo_args.get(servo_host.SERVO_HOST_ATTR)
422        return (servo_host_hostname is not None
423                and servo_host.servo_host_is_up(servo_host_hostname))
424
425
426class ArcLabel(base_label.BaseLabel):
427    """Label indicates if host has ARC support."""
428
429    _NAME = 'arc'
430
431    @base_label.forever_exists_decorate
432    def exists(self, host):
433        return 0 == host.run(
434            'grep CHROMEOS_ARC_VERSION /etc/lsb-release',
435            ignore_status=True).exit_status
436
437
438class CtsArchLabel(base_label.StringLabel):
439    """Labels to determine the abi of the CTS bundle (arm or x86 only)."""
440
441    _NAME = ['cts_abi_arm', 'cts_abi_x86', 'cts_cpu_arm', 'cts_cpu_x86']
442
443    def _get_cts_abis(self, arch):
444        """Return supported CTS ABIs.
445
446        @return List of supported CTS bundle ABIs.
447        """
448        cts_abis = {'x86_64': ['arm', 'x86'], 'arm': ['arm']}
449        return cts_abis.get(arch, [])
450
451    def _get_cts_cpus(self, arch):
452        """Return supported CTS native CPUs.
453
454        This is needed for CTS_Instant scheduling.
455        @return List of supported CTS native CPUs.
456        """
457        cts_cpus = {'x86_64': ['x86'], 'arm': ['arm']}
458        return cts_cpus.get(arch, [])
459
460    def generate_labels(self, host):
461        cpu_arch = host.get_cpu_arch()
462        abi_labels = ['cts_abi_' + abi for abi in self._get_cts_abis(cpu_arch)]
463        cpu_labels = ['cts_cpu_' + cpu for cpu in self._get_cts_cpus(cpu_arch)]
464        return abi_labels + cpu_labels
465
466
467class VideoGlitchLabel(base_label.BaseLabel):
468    """Label indicates if host supports video glitch detection tests."""
469
470    _NAME = 'video_glitch_detection'
471
472    def exists(self, host):
473        board = host.get_board().replace(ds_constants.BOARD_PREFIX, '')
474
475        return board in video_test_constants.SUPPORTED_BOARDS
476
477
478class InternalDisplayLabel(base_label.StringLabel):
479    """Label that determines if the device has an internal display."""
480
481    _NAME = 'internal_display'
482
483    def generate_labels(self, host):
484        from autotest_lib.client.cros.graphics import graphics_utils
485        from autotest_lib.client.common_lib import utils as common_utils
486
487        def __system_output(cmd):
488            return host.run(cmd).stdout
489
490        def __read_file(remote_path):
491            return host.run('cat %s' % remote_path).stdout
492
493        # Hijack the necessary client functions so that we can take advantage
494        # of the client lib here.
495        # FIXME: find a less hacky way than this
496        original_system_output = utils.system_output
497        original_read_file = common_utils.read_file
498        utils.system_output = __system_output
499        common_utils.read_file = __read_file
500        try:
501            return ([self._NAME]
502                    if graphics_utils.has_internal_display()
503                    else [])
504        finally:
505            utils.system_output = original_system_output
506            common_utils.read_file = original_read_file
507
508
509class LucidSleepLabel(base_label.BaseLabel):
510    """Label that determines if device has support for lucid sleep."""
511
512    # TODO(kevcheng): See if we can determine if this label is applicable a
513    # better way (crbug.com/592146).
514    _NAME = 'lucidsleep'
515    LUCID_SLEEP_BOARDS = ['nocturne', 'poppy']
516
517    def exists(self, host):
518        board = host.get_board().replace(ds_constants.BOARD_PREFIX, '')
519        return board in self.LUCID_SLEEP_BOARDS
520
521
522class HWIDLabel(base_label.StringLabel):
523    """Return all the labels generated from the hwid."""
524
525    # We leave out _NAME because hwid_lib will generate everything for us.
526
527    def __init__(self):
528        # Grab the key file needed to access the hwid service.
529        self.key_file = global_config.global_config.get_config_value(
530                'CROS', 'HWID_KEY', type=str)
531
532
533    def generate_labels(self, host):
534        hwid_labels = []
535        hwid = host.run_output('crossystem hwid').strip()
536        hwid_info_list = hwid_lib.get_hwid_info(hwid, hwid_lib.HWID_INFO_LABEL,
537                                                self.key_file).get('labels', [])
538
539        for hwid_info in hwid_info_list:
540            # If it's a prefix, we'll have:
541            # {'name': prefix_label, 'value': postfix_label} and create
542            # 'prefix_label:postfix_label'; otherwise it'll just be
543            # {'name': label} which should just be 'label'.
544            value = hwid_info.get('value', '')
545            name = hwid_info.get('name', '')
546            # There should always be a name but just in case there is not.
547            if name:
548                hwid_labels.append(name if not value else
549                                   '%s:%s' % (name, value))
550        return hwid_labels
551
552
553    def get_all_labels(self):
554        """We need to try all labels as a prefix and as standalone.
555
556        We don't know for sure which labels are prefix labels and which are
557        standalone so we try all of them as both.
558        """
559        all_hwid_labels = []
560        try:
561            all_hwid_labels = hwid_lib.get_all_possible_dut_labels(
562                    self.key_file)
563        except IOError:
564            logging.error('Can not open key file: %s', self.key_file)
565        except hwid_lib.HwIdException as e:
566            logging.error('hwid service: %s', e)
567        return all_hwid_labels, all_hwid_labels
568
569
570class DetachableBaseLabel(base_label.BaseLabel):
571    """Label indicating if device has detachable keyboard."""
572
573    _NAME = 'detachablebase'
574
575    def exists(self, host):
576        return host.run('which hammerd', ignore_status=True).exit_status == 0
577
578
579class FingerprintLabel(base_label.BaseLabel):
580    """Label indicating whether device has fingerprint sensor."""
581
582    _NAME = 'fingerprint'
583
584    def exists(self, host):
585        return host.run('test -c /dev/cros_fp',
586                        ignore_status=True).exit_status == 0
587
588
589class ReferenceDesignLabel(base_label.StringPrefixLabel):
590    """Determine the correct reference design label for the device. """
591
592    _NAME = 'reference_design'
593
594    def __init__(self):
595        self.response = None
596
597    def exists(self, host):
598        self.response = host.run('mosys platform family', ignore_status=True)
599        return self.response.exit_status == 0
600
601    def generate_labels(self, host):
602        if self.exists(host):
603            return [self.response.stdout.strip()]
604
605
606CROS_LABELS = [
607    AccelsLabel(),
608    ArcLabel(),
609    AudioLoopbackDongleLabel(),
610    BluetoothLabel(),
611    BoardLabel(),
612    ModelLabel(),
613    ChameleonConnectionLabel(),
614    ChameleonLabel(),
615    ChameleonPeripheralsLabel(),
616    common_label.OSLabel(),
617    Cr50Label(),
618    CtsArchLabel(),
619    DetachableBaseLabel(),
620    ECLabel(),
621    FingerprintLabel(),
622    HWIDLabel(),
623    InternalDisplayLabel(),
624    LightSensorLabel(),
625    LucidSleepLabel(),
626    PowerSupplyLabel(),
627    ReferenceDesignLabel(),
628    ServoLabel(),
629    StorageLabel(),
630    VideoGlitchLabel(),
631]
632