from autotest_lib.client.common_lib.cros.cfm import cras_input_node from autotest_lib.client.common_lib.cros.cfm import cras_output_node class CrasNodeCollector(object): """Utility class for obtaining node data from cras_test_client.""" DEVICE_COLUMN_COUNT = 2 NODE_COLUMN_COUNT = 8 DEVICE_HEADERS = ['ID', 'Name'] OUTPUT_NODE_HEADERS = ['Stable Id', 'ID', 'Vol', 'Plugged', 'L/R swapped', 'Time Hotword', 'Type', 'Name'] INPUT_NODE_HEADERS = ['Stable Id', 'ID', 'Gain', 'Plugged', 'L/Rswapped', 'Time Hotword', 'Type', 'Name'] def __init__(self, host): """ Constructor @param host the device under test (CrOS). """ self._host = host def _replace_multiple_whitespace_with_one(self, string): """ Replace multiple sequential whitespaces with a single whitespace. @returns a string """ return ' '.join(string.split()) def _construct_columns(self, columns_str, columns_count): """ Constructs a list of strings from a single string. @param columns_str A whitespace separated list of values. @param columns_count number of columns to create @returns a list with strings. """ # 1) Replace multiple whitespaces with one # 2) Split on whitespace, create nof_columns columns columns_str = self._replace_multiple_whitespace_with_one(columns_str) return columns_str.split(None, columns_count-1) def _collect_input_device_data(self): cmd = ("cras_test_client --dump_server_info " " | awk '/Input Devices:/,/Input Nodes:/'") lines = self._host.run_output(cmd).split('\n') # Ignore the first two lines ("Input Devices" and headers) and the # last line ("Input Nodes") lines = lines[2:-1] rows = [self._construct_columns(line, self.DEVICE_COLUMN_COUNT) for line in lines] return [dict(zip(self.DEVICE_HEADERS, row)) for row in rows] def _collect_output_device_data(self): cmd = ("cras_test_client --dump_server_info " " | awk '/Output Devices:/,/Output Nodes:/'") lines = self._host.run_output(cmd).split('\n') # Ignore the first two lines ("Output Devices" and headers) and the # last line ("Output Nodes") lines = lines[2:-1] rows = [self._construct_columns(line, self.DEVICE_COLUMN_COUNT) for line in lines] return [dict(zip(self.DEVICE_HEADERS, row)) for row in rows] def _collect_output_node_cras_data(self): """ Collects output nodes data using cras_test_client. @returns a list of dictionaries where keys are in OUTPUT_NODE_HEADERS """ # It's a bit hacky to use awk; we should probably do the parsing # in Python instead using textfsm or some other lib. cmd = ("cras_test_client --dump_server_info" "| awk '/Output Nodes:/,/Input Devices:/'") lines = self._host.run_output(cmd).split('\n') # Ignore the first two lines ("Output Nodes:" and headers) and the # last line ("Input Devices:") lines = lines[2:-1] rows = [self._construct_columns(line, self.NODE_COLUMN_COUNT) for line in lines] return [dict(zip(self.OUTPUT_NODE_HEADERS, row)) for row in rows] def _collect_input_node_cras_data(self): """ Collects input nodes data using cras_test_client. @returns a list of dictionaries where keys are in INPUT_NODE_HEADERS """ cmd = ("cras_test_client --dump_server_info " " | awk '/Input Nodes:/,/Attached clients:/'") lines = self._host.run_output(cmd).split('\n') # Ignore the first two lines ("Input Nodes:" and headers) and the # last line ("Attached clients:") lines = lines[2:-1] rows = [self._construct_columns(line, self.NODE_COLUMN_COUNT) for line in lines] return [dict(zip(self.INPUT_NODE_HEADERS, row)) for row in rows] def _get_device_name(self, device_data, device_id): for device in device_data: if device['ID'] == device_id: return device['Name'] return None def _create_input_node(self, node_data, device_data): """ Create a CrasInputNode. @param node_data Input Node data @param device_data Input Device data @return CrasInputNode """ device_id = node_data['ID'].split(':')[0] device_name = self._get_device_name(device_data, device_id) return cras_input_node.CrasInputNode( node_id=node_data['ID'], node_name=node_data['Name'], gain=node_data['Gain'], node_type=node_data['Type'], device_id=device_id, device_name=device_name) def _create_output_node(self, node_data, device_data): """ Create a CrasOutputNode. @param node_data Output Node data @param device_data Output Device data @return CrasOutputNode """ device_id = node_data['ID'].split(':')[0] device_name = self._get_device_name(device_data, device_id) return cras_output_node.CrasOutputNode( node_id=node_data['ID'], node_type=node_data['Type'], node_name=node_data['Name'], volume=node_data['Vol'], device_id=device_id, device_name=device_name) def get_input_nodes(self): device_data = self._collect_input_device_data() node_data = self._collect_input_node_cras_data() return [self._create_input_node(d, device_data) for d in node_data] def get_output_nodes(self): device_data = self._collect_output_device_data() node_data = self._collect_output_node_cras_data() return [self._create_output_node(d, device_data) for d in node_data]