1# Lint as: python3
2#
3# Copyright 2020, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Utilities for C-Suite integration tests."""
17
18import argparse
19import contextlib
20import logging
21import os
22import pathlib
23import shlex
24import shutil
25import stat
26import subprocess
27import sys
28import tempfile
29from typing import Sequence, Text
30import zipfile
31import csuite_test
32
33# Export symbols to reduce the number of imports tests have to list.
34TestCase = csuite_test.TestCase  # pylint: disable=invalid-name
35get_device_serial = csuite_test.get_device_serial
36
37# Keep any created temporary directories for debugging test failures. The
38# directories do not need explicit removal since they are created using the
39# system's temporary-file facility.
40_KEEP_TEMP_DIRS = False
41
42
43class CSuiteHarness(contextlib.AbstractContextManager):
44  """Interface class for interacting with the C-Suite harness.
45
46  WARNING: Explicitly clean up created instances or use as a context manager.
47  Not doing so will result in a ResourceWarning for the implicit cleanup which
48  confuses the TradeFed Python test output parser.
49  """
50
51  def __init__(self):
52    self._suite_dir = pathlib.Path(tempfile.mkdtemp(prefix='csuite'))
53    logging.debug('Created harness directory: %s', self._suite_dir)
54
55    with zipfile.ZipFile(_get_standalone_zip_path(), 'r') as f:
56      f.extractall(self._suite_dir)
57
58    # Add owner-execute permission on scripts since zip does not preserve them.
59    self._launcher_binary = self._suite_dir.joinpath(
60        'android-csuite/tools/csuite-tradefed')
61    _add_owner_exec_permission(self._launcher_binary)
62
63    self._generate_module_binary = self._suite_dir.joinpath(
64        'android-csuite/tools/csuite_generate_module')
65    _add_owner_exec_permission(self._generate_module_binary)
66
67    self._testcases_dir = self._suite_dir.joinpath('android-csuite/testcases')
68
69  def __exit__(self, unused_type, unused_value, unused_traceback):
70    self.cleanup()
71
72  def cleanup(self):
73    if _KEEP_TEMP_DIRS:
74      return
75    shutil.rmtree(self._suite_dir, ignore_errors=True)
76
77  def add_module(self, package_name: Text) -> Text:
78    """Generates and adds a test module for the provided package."""
79    module_name = 'csuite_%s' % package_name
80
81    with tempfile.TemporaryDirectory() as o:
82      out_dir = pathlib.Path(o)
83      package_list_path = out_dir.joinpath('packages.list')
84
85      package_list_path.write_text(package_name + '\n')
86
87      flags = ['--package-list', package_list_path, '--root-dir', out_dir]
88
89      _run_command([self._generate_module_binary] + flags)
90
91      out_file_path = self._testcases_dir.joinpath(module_name + '.config')
92      shutil.copy(
93          out_dir.joinpath(package_name, 'AndroidTest.xml'), out_file_path)
94
95      return module_name
96
97  def run_and_wait(self, flags: Sequence[Text]) -> subprocess.CompletedProcess:
98    """Starts the Tradefed launcher and waits for it to complete."""
99
100    env = os.environ.copy()
101
102    # Unset environment variables that would cause the script to think it's in a
103    # build tree.
104    env.pop('ANDROID_BUILD_TOP', None)
105    env.pop('ANDROID_HOST_OUT', None)
106
107    # Unset environment variables that would cause TradeFed to find test configs
108    # other than the ones created by the test.
109    env.pop('ANDROID_HOST_OUT_TESTCASES', None)
110    env.pop('ANDROID_TARGET_OUT_TESTCASES', None)
111
112    # Unset environment variables that might cause the suite to pick up a
113    # connected device that wasn't explicitly specified.
114    env.pop('ANDROID_SERIAL', None)
115
116    # Set the environment variable that TradeFed requires to find test modules.
117    env['ANDROID_TARGET_OUT_TESTCASES'] = self._testcases_dir
118
119    return _run_command([self._launcher_binary] + flags, env=env)
120
121
122class PackageRepository(contextlib.AbstractContextManager):
123  """A file-system based APK repository for use in tests.
124
125  WARNING: Explicitly clean up created instances or use as a context manager.
126  Not doing so will result in a ResourceWarning for the implicit cleanup which
127  confuses the TradeFed Python test output parser.
128  """
129
130  def __init__(self):
131    self._root_dir = pathlib.Path(tempfile.mkdtemp(prefix='csuite_apk_dir'))
132    logging.info('Created repository directory: %s', self._root_dir)
133
134  def __exit__(self, unused_type, unused_value, unused_traceback):
135    self.cleanup()
136
137  def cleanup(self):
138    if _KEEP_TEMP_DIRS:
139      return
140    shutil.rmtree(self._root_dir, ignore_errors=True)
141
142  def get_path(self) -> pathlib.Path:
143    """Returns the path to the repository's root directory."""
144    return self._root_dir
145
146  def add_package_apks(self, package_name: Text,
147                       apk_paths: Sequence[pathlib.Path]):
148    """Adds the provided package APKs to the repository."""
149    apk_dir = self._root_dir.joinpath(package_name)
150
151    # Raises if the directory already exists.
152    apk_dir.mkdir()
153    for f in apk_paths:
154      shutil.copy(f, apk_dir)
155
156
157class Adb:
158  """Encapsulates adb functionality to simplify usage in tests.
159
160  Most methods in this class raise an exception if they fail to execute. This
161  behavior can be overridden by using the check parameter.
162  """
163
164  def __init__(self,
165               adb_binary_path: pathlib.Path = None,
166               device_serial: Text = None):
167    self._args = [adb_binary_path or 'adb']
168
169    device_serial = device_serial or get_device_serial()
170    if device_serial:
171      self._args.extend(['-s', device_serial])
172
173  def shell(self,
174            args: Sequence[Text],
175            check: bool = None) -> subprocess.CompletedProcess:
176    """Runs an adb shell command and waits for it to complete.
177
178    Note that the exit code of the returned object corresponds to that of
179    the adb command and not the command executed in the shell.
180
181    Args:
182      args: a sequence of program arguments to pass to the shell.
183      check: whether to raise if the process terminates with a non-zero exit
184        code.
185
186    Returns:
187      An object representing a process that has finished and that can be
188      queried.
189    """
190    return self.run(['shell'] + args, check)
191
192  def run(self,
193          args: Sequence[Text],
194          check: bool = None) -> subprocess.CompletedProcess:
195    """Runs an adb command and waits for it to complete."""
196    return _run_command(self._args + args, check=check)
197
198  def uninstall(self, package_name: Text, check: bool = None):
199    """Uninstalls the specified package."""
200    self.run(['uninstall', package_name], check=check)
201
202  def list_packages(self) -> Sequence[Text]:
203    """Lists packages installed on the device."""
204    p = self.shell(['pm', 'list', 'packages'])
205    return [l.split(':')[1] for l in p.stdout.splitlines()]
206
207
208def _run_command(args, check=True, **kwargs) -> subprocess.CompletedProcess:
209  """A wrapper for subprocess.run that overrides defaults and adds logging."""
210  env = kwargs.get('env', {})
211
212  # Log the command-line for debugging failed tests. Note that we convert
213  # tokens to strings for _shlex_join.
214  env_str = ['env', '-i'] + ['%s=%s' % (k, v) for k, v in env.items()]
215  args_str = [str(t) for t in args]
216
217  # Override some defaults. Note that 'check' deviates from this pattern to
218  # avoid getting warnings about using subprocess.run without an explicitly set
219  # `check` parameter.
220  kwargs.setdefault('capture_output', True)
221  kwargs.setdefault('universal_newlines', True)
222
223  logging.debug('Running command: %s', _shlex_join(env_str + args_str))
224
225  return subprocess.run(args, check=check, **kwargs)
226
227
228def _add_owner_exec_permission(path: pathlib.Path):
229  path.chmod(path.stat().st_mode | stat.S_IEXEC)
230
231
232def get_test_app_apks(app_module_name: Text) -> Sequence[pathlib.Path]:
233  """Returns a test app's apk file paths."""
234  return [_get_test_file(app_module_name + '.apk')]
235
236
237def _get_standalone_zip_path():
238  """Returns the suite standalone zip file's path."""
239  return _get_test_file('csuite-standalone.zip')
240
241
242def _get_test_file(name: Text) -> pathlib.Path:
243  test_dir = _get_test_dir()
244  test_file = test_dir.joinpath(name)
245
246  if not test_file.exists():
247    raise RuntimeError('Unable to find the file `%s` in the test execution dir '
248                       '`%s`; are you missing a data dependency in the build '
249                       'module?' % (name, test_dir))
250
251  return test_file
252
253
254def _shlex_join(split_command: Sequence[Text]) -> Text:
255  """Concatenate tokens and return a shell-escaped string."""
256  # This is an alternative to shlex.join that doesn't exist in Python versions
257  # < 3.8.
258  return ' '.join(shlex.quote(t) for t in split_command)
259
260
261def _get_test_dir() -> pathlib.Path:
262  return pathlib.Path(__file__).parent
263
264
265def main():
266  global _KEEP_TEMP_DIRS
267
268  parser = argparse.ArgumentParser(parents=[csuite_test.create_arg_parser()])
269  parser.add_argument(
270      '--log-level',
271      choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
272      default='WARNING',
273      help='sets the logging level threshold')
274  parser.add_argument(
275      '--keep-temp-dirs',
276      type=bool,
277      help='keeps any created temporary directories for debugging failures')
278  args, unittest_argv = parser.parse_known_args(sys.argv)
279
280  _KEEP_TEMP_DIRS = args.keep_temp_dirs
281  logging.basicConfig(level=getattr(logging, args.log_level))
282
283  csuite_test.run_tests(args, unittest_argv)
284