# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import abc import logging import os import re import common from autotest_lib.client.common_lib import error, utils from autotest_lib.client.common_lib.cros import dev_server from autotest_lib.client.common_lib.cros import gs_cache_client # Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py. class ControlFileGetter(object): """ Interface for classes that can list and fetch known control files. """ __metaclass__ = abc.ABCMeta @abc.abstractmethod def get_control_file_list(self, suite_name=''): """ Gather a list of paths to control files. @param suite_name: The name of a suite we would like control files for. @return A list of file paths. @throws NoControlFileList if there is an error while listing. """ pass @abc.abstractmethod def get_control_file_contents(self, test_path): """ Given a path to a control file, return its contents. @param test_path: the path to the control file. @return the contents of the control file specified by the path. @throws ControlFileNotFound if the file cannot be retrieved. """ pass @abc.abstractmethod def get_control_file_contents_by_name(self, test_name): """ Given the name of a control file, return its contents. @param test_name: the name of the test whose control file is desired. @return the contents of the control file specified by the name. @throws ControlFileNotFound if the file cannot be retrieved. """ pass class SuiteControlFileGetter(ControlFileGetter): """Interface that additionally supports getting by suite.""" @abc.abstractmethod def get_suite_info(self, suite_name=''): """ Gather the control paths and contents of all the control files. @param suite_name: The name of a suite we would like control files for. @return the control paths and contents of all the control files specified by the name. @throws SuiteControlFileException if the info cannot be retrieved. """ pass class CacheingAndFilteringControlFileGetter(ControlFileGetter): """Wraps ControlFileGetter to cache the retrieved control file list and filter out unwanted control files.""" CONTROL_FILE_FILTERS = ['src/debian/control'] def __init__(self): super(CacheingAndFilteringControlFileGetter, self).__init__() self._files = [] def get_control_file_list(self, suite_name=''): """ Gather a list of paths to control files. Gets a list of control files; populates |self._files| with that list and then returns the paths to all useful and wanted files in the list. @param suite_name: The name of a suite we would like control files for. @return A list of file paths. @throws NoControlFileList if there is an error while listing. """ files = self._get_control_file_list(suite_name=suite_name) for cf_filter in self.CONTROL_FILE_FILTERS: files = filter(lambda path: not path.endswith(cf_filter), files) self._files = files return self._files @abc.abstractmethod def _get_control_file_list(self, suite_name=''): pass def get_control_file_path(self, test_name): """ Given the name of a control file, return its path. Searches through previously-compiled list in |self._files| for a test named |test_name| and returns the contents of the control file for that test if it is found. @param test_name: the name of the test whose control file is desired. @return control file path @throws ControlFileNotFound if the file cannot be retrieved. """ if not self._files and not self.get_control_file_list(): raise error.ControlFileNotFound('No control files found.') if 'control' not in test_name: regexp = re.compile(os.path.join(test_name, 'control$')) else: regexp = re.compile(test_name + '$') candidates = filter(regexp.search, self._files) if not candidates: raise error.ControlFileNotFound('No control file for ' + test_name) if len(candidates) > 1: raise error.ControlFileNotFound(test_name + ' is not unique.') return candidates[0] def get_control_file_contents_by_name(self, test_name): """ Given the name of a control file, return its contents. Searches through previously-compiled list in |self._files| for a test named |test_name| and returns the contents of the control file for that test if it is found. @param test_name: the name of the test whose control file is desired. @return the contents of the control file specified by the name. @throws ControlFileNotFound if the file cannot be retrieved. """ path = self.get_control_file_path(test_name) return self.get_control_file_contents(path) class FileSystemGetter(CacheingAndFilteringControlFileGetter): """ Class that can list and fetch known control files from disk. @var _CONTROL_PATTERN: control file name format to match. """ _CONTROL_PATTERN = '^control(?:\..+)?$' def __init__(self, paths): """ @param paths: base directories to start search. """ super(FileSystemGetter, self).__init__() self._paths = paths def _is_useful_file(self, name): return '__init__.py' not in name and '.svn' not in name def _get_control_file_list(self, suite_name=''): """ Gather a list of paths to control files under |self._paths|. Searches under |self._paths| for files that match |self._CONTROL_PATTERN|. Populates |self._files| with that list and then returns the paths to all useful files in the list. @param suite_name: The name of a suite we would like control files for. @return A list of files that match |self._CONTROL_PATTERN|. @throws NoControlFileList if we find no files. """ if suite_name: logging.debug('Getting control files for a specific suite has ' 'not been implemented for FileSystemGetter. ' 'Getting all control files instead.') regexp = re.compile(self._CONTROL_PATTERN) directories = self._paths # Some of our callers are ill-considered and request that we # search all of /usr/local/autotest (crbug.com/771823). # Fixing the callers immediately is somewhere between a # nuisance and hard. So, we have a blacklist, hoping two # wrongs will somehow make it right. blacklist = { 'site-packages', 'venv', 'results', 'logs', 'containers', } while len(directories) > 0: directory = directories.pop() if not os.path.exists(directory): continue try: for name in os.listdir(directory): if name in blacklist: continue fullpath = os.path.join(directory, name) if os.path.isfile(fullpath): if regexp.search(name): # if we are a control file self._files.append(fullpath) elif (not os.path.islink(fullpath) and os.path.isdir(fullpath)): directories.append(fullpath) except OSError: # Some directories under results/ like the Chrome Crash # Reports will cause issues when attempted to be searched. logging.error('Unable to search directory %s for control ' 'files.', directory) pass if not self._files: msg = 'No control files under ' + ','.join(self._paths) raise error.NoControlFileList(msg) return [f for f in self._files if self._is_useful_file(f)] def get_control_file_contents(self, test_path): """ Get the contents of the control file at |test_path|. @return The contents of the aforementioned file. @throws ControlFileNotFound if the file cannot be retrieved. """ try: return utils.read_file(test_path) except EnvironmentError as (errno, strerror): msg = "Can't retrieve {0}: {1} ({2})".format(test_path, strerror, errno) raise error.ControlFileNotFound(msg) class DevServerGetter(CacheingAndFilteringControlFileGetter, SuiteControlFileGetter): """Class that can list and fetch known control files from DevServer. @var _CONTROL_PATTERN: control file name format to match. """ def __init__(self, build, ds): """ @param build: The build from which to get control files. @param ds: An existing dev_server.DevServer object to use. """ super(DevServerGetter, self).__init__() self._dev_server = ds self._build = build @staticmethod def create(build, ds=None): """Wraps constructor. Can be mocked for testing purposes. @param build: The build from which to get control files. @param ds: An existing dev_server.DevServer object to use (default=None) @returns: New DevServerGetter. """ return DevServerGetter(build, ds) def _get_control_file_list(self, suite_name=''): """ Gather a list of paths to control files from |self._dev_server|. Get a listing of all the control files for |self._build| on |self._dev_server|. Populates |self._files| with that list and then returns paths (under the autotest dir) to them. If suite_name is specified, this method populates |self._files| with the control files from just the specified suite. @param suite_name: The name of a suite we would like control files for. @return A list of control file paths. @throws NoControlFileList if there is an error while listing. """ try: return self._dev_server.list_control_files(self._build, suite_name=suite_name) except dev_server.DevServerException as e: raise error.NoControlFileList(e) def get_control_file_contents(self, test_path): """ Return the contents of |test_path| from |self._dev_server|. Get the contents of the control file at |test_path| for |self._build| on |self._dev_server|. @return The contents of |test_path|. None on failure. @throws ControlFileNotFound if the file cannot be retrieved. """ try: return self._dev_server.get_control_file(self._build, test_path) except dev_server.DevServerException as e: raise error.ControlFileNotFound(e) def _list_suite_controls(self, suite_name=''): """ Gather a dict {path:content} of all control files from |self._dev_server|. Get a dict of contents of all the control files for |self._build| on |self._dev_server|: path is the key, and the control file content is the value. @param suite_name: The name of a suite we would like control files for. @return A dict of paths and contents of all control files. @throws NoControlFileList if there is an error while listing. """ cache_client = gs_cache_client.GsCacheClient(self._dev_server) return cache_client.list_suite_controls(self._build, suite_name) def get_suite_info(self, suite_name=''): """ Gather info of a list of control files from |self._dev_server|. The info is a dict: {control_path: control_file_content} for |self._build| on |self._dev_server|. @param suite_name: The name of a suite we would like control files for. @return A dict of paths and contents of all control files: {path1: content1, path2: content2, ..., pathX: contentX} """ file_contents = self._list_suite_controls(suite_name=suite_name) files = file_contents.keys() for cf_filter in self.CONTROL_FILE_FILTERS: files = filter(lambda path: not path.endswith(cf_filter), files) self._files = files return {f: file_contents[f] for f in files}