1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18# TODO(b/147454897): Keep the logic in sync with
19#                    test/vts/utils/python/controllers/android_device.py until
20#                    it is removed.
21import gzip
22import logging
23import os
24import subprocess
25import tempfile
26
27class AndroidDevice(object):
28    """This class controls the device via adb commands."""
29
30    def __init__(self, serial_number):
31        self._serial_number = serial_number
32
33    def AdbPull(self, src, dst):
34        cmd = ["adb", "-s", self._serial_number, "pull", src, dst]
35        env = os.environ.copy()
36        if "ADB_COMPRESSION" not in env:
37            env["ADB_COMPRESSION"] = "0"
38        subprocess.check_call(cmd, shell=False, env=env, stdin=subprocess.PIPE,
39                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
40
41    def Execute(self, *args):
42        """Executes a command.
43
44        Args:
45            args: Strings, the arguments.
46
47        Returns:
48            Stdout as a string, stderr as a string, and return code as an
49            integer.
50        """
51        cmd = ["adb", "-s", self._serial_number, "shell"]
52        cmd.extend(args)
53        proc = subprocess.Popen(cmd, shell=False, stdin=subprocess.PIPE,
54                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
55        out, err = proc.communicate()
56        # Compatible with python2 and python3
57        if not isinstance(out, str):
58            out = out.decode("utf-8")
59        if not isinstance(err, str):
60            err = err.decode("utf-8")
61        return out, err, proc.returncode
62
63    def _GetProp(self, name):
64        """Gets an Android system property.
65
66        Args:
67            name: A string, the property name.
68
69        Returns:
70            A string, the value of the property.
71
72        Raises:
73            IOError if the command fails.
74        """
75        out, err, return_code = self.Execute("getprop", name)
76        if err.strip() or return_code != 0:
77            raise IOError("`getprop %s` stdout: %s\nstderr: %s" %
78                          (name, out, err))
79        return out.strip()
80
81    def GetCpuAbiList(self, bitness=""):
82        """Gets the list of supported ABIs from property.
83
84        Args:
85            bitness: 32 or 64. If the argument is not specified, this method
86                     returns both 32 and 64-bit ABIs.
87
88        Returns:
89            A list of strings, the supported ABIs.
90        """
91        out = self._GetProp("ro.product.cpu.abilist" + str(bitness))
92        return out.lower().split(",") if out else []
93
94    def GetLaunchApiLevel(self):
95        """Gets the API level that the device was initially launched with.
96
97        This method reads ro.product.first_api_level from the device. If the
98        value is 0, it then reads ro.build.version.sdk.
99
100        Returns:
101            An integer, the API level.
102        """
103        level_str = self._GetProp("ro.product.first_api_level")
104        level = int(level_str)
105        if level != 0:
106            return level
107
108        level_str = self._GetProp("ro.build.version.sdk")
109        return int(level_str)
110
111    def getLaunchApiLevel(self, strict=True):
112        """Gets the API level that the device was initially launched with.
113
114        This method is compatible with vndk_utils in vts package.
115
116        Args:
117            strict: A boolean, whether to raise an error if the property is
118                    not an integer or not defined.
119
120        Returns:
121            An integer, the API level.
122            0 if the value is undefined and strict is False.
123
124        Raises:
125            ValueError: if the value is undefined and strict is True.
126        """
127        try:
128            return self.GetLaunchApiLevel()
129        except ValueError as e:
130            if strict:
131                raise
132            logging.exception(e)
133            return 0
134
135    @property
136    def vndk_lite(self):
137        """Checks whether the vendor partition requests lite VNDK enforcement.
138
139        This method is compatible with vndk_utils in vts package.
140
141        Returns:
142            A boolean, True for lite vndk enforcement.
143        """
144        return self._GetProp("ro.vndk.lite").lower() == "true"
145
146    def GetVndkVersion(self):
147        """Gets the VNDK version that the vendor partition requests."""
148        return self._GetProp("ro.vndk.version")
149
150    def GetKernelConfig(self, config_name):
151        """Gets kernel config from the device.
152
153        Args:
154            config_name: A string, the name of the configuration.
155
156        Returns:
157            "y" or "m" if the config is set.
158            "" if the config is not set.
159            None if fails to read config.
160        """
161        line_prefix = config_name + "="
162        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
163            config_path = temp_file.name
164        try:
165            logging.debug("Pull config.gz to %s", config_path)
166            self.AdbPull("/proc/config.gz", config_path)
167            with gzip.open(config_path, "rt") as config_file:
168                for line in config_file:
169                    if line.strip().startswith(line_prefix):
170                        logging.debug("Found config: %s", line)
171                        return line.strip()[len(line_prefix):]
172            logging.debug("%s is not set.", config_name)
173            return ""
174        except (subprocess.CalledProcessError, IOError) as e:
175            logging.exception("Cannot read kernel config.", e)
176            return None
177        finally:
178            os.remove(config_path)
179
180    def GetBinderBitness(self):
181        """Returns the value of BINDER_IPC_32BIT in kernel config.
182
183        Returns:
184            32 or 64, binder bitness of the device.
185            None if fails to read config.
186        """
187        config_value = self.GetKernelConfig("CONFIG_ANDROID_BINDER_IPC_32BIT")
188        if config_value is None:
189            return None
190        elif config_value:
191            return 32
192        else:
193            return 64
194
195    def IsRoot(self):
196        """Returns whether adb has root privilege on the device."""
197        out, err, return_code = self.Execute("id")
198        if err.strip() or return_code != 0:
199            raise IOError("`id` stdout: %s\nstderr: %s \n" % (out, err))
200        return "uid=0(root)" in out.strip()
201
202    def _Test(self, *args):
203        """Tests file types and status."""
204        out, err, return_code = self.Execute("test", *args)
205        if out.strip() or err.strip():
206            raise IOError("`test` args: %s\nstdout: %s\nstderr: %s" %
207                          (args, out, err))
208        return return_code == 0
209
210    def Exists(self, path):
211        """Returns whether a path on the device exists."""
212        return self._Test("-e", path)
213
214    def IsDirectory(self, path):
215        """Returns whether a path on the device is a directory."""
216        return self._Test("-d", path)
217
218    def _Stat(self, fmt, path):
219        """Executes stat command."""
220        out, err, return_code = self.Execute("stat", "--format", fmt, path)
221        if return_code != 0 or err.strip():
222            raise IOError("`stat --format %s %s` stdout: %s\nstderr: %s" %
223                          (fmt, path, out, err))
224        return out.strip()
225
226    def IsExecutable(self, path):
227        """Returns if execute permission is granted to a path on the device."""
228        return "x" in self._Stat("%A", path)
229
230    def FindFiles(self, path, name_pattern, *options):
231        """Executes find command.
232
233        Args:
234            path: A string, the path on the device.
235            name_pattern: A string, the pattern of the file name.
236            options: Strings, extra options passed to the command.
237
238        Returns:
239            A list of strings, the paths to the found files.
240
241        Raises:
242            ValueError if the pattern contains quotes.
243            IOError if the path does not exist.
244        """
245        if '"' in name_pattern or "'" in name_pattern:
246            raise ValueError("File name pattern contains quotes.")
247        out, err, return_code = self.Execute("find", path, "-name",
248                                             "'" + name_pattern + "'",
249                                             *options)
250        if return_code != 0 or err.strip():
251            raise IOError("`find %s -name '%s' %s` stdout: %s\nstderr: %s" %
252                          (path, name_pattern, " ".join(options), out, err))
253
254        # Return empty file list when out is an empty string.
255        out = out.strip()
256        return out.split("\n") if out else []
257