1from autotest_lib.client.common_lib.cros.cfm import cras_input_node 2from autotest_lib.client.common_lib.cros.cfm import cras_output_node 3 4class CrasNodeCollector(object): 5 """Utility class for obtaining node data from cras_test_client.""" 6 7 DEVICE_COLUMN_COUNT = 2 8 NODE_COLUMN_COUNT = 8 9 DEVICE_HEADERS = ['ID', 'Name'] 10 OUTPUT_NODE_HEADERS = ['Stable Id', 'ID', 'Vol', 'Plugged', 'L/R swapped', 11 'Time Hotword', 'Type', 'Name'] 12 INPUT_NODE_HEADERS = ['Stable Id', 'ID', 'Gain', 'Plugged', 'L/Rswapped', 13 'Time Hotword', 'Type', 'Name'] 14 15 def __init__(self, host): 16 """ 17 Constructor 18 @param host the device under test (CrOS). 19 """ 20 self._host = host 21 22 def _replace_multiple_whitespace_with_one(self, string): 23 """ 24 Replace multiple sequential whitespaces with a single whitespace. 25 @returns a string 26 """ 27 return ' '.join(string.split()) 28 29 def _construct_columns(self, columns_str, columns_count): 30 """ 31 Constructs a list of strings from a single string. 32 33 @param columns_str A whitespace separated list of values. 34 @param columns_count number of columns to create 35 @returns a list with strings. 36 """ 37 # 1) Replace multiple whitespaces with one 38 # 2) Split on whitespace, create nof_columns columns 39 columns_str = self._replace_multiple_whitespace_with_one(columns_str) 40 return columns_str.split(None, columns_count-1) 41 42 def _collect_input_device_data(self): 43 cmd = ("cras_test_client --dump_server_info " 44 " | awk '/Input Devices:/,/Input Nodes:/'") 45 lines = self._host.run_output(cmd).split('\n') 46 # Ignore the first two lines ("Input Devices" and headers) and the 47 # last line ("Input Nodes") 48 lines = lines[2:-1] 49 rows = [self._construct_columns(line, self.DEVICE_COLUMN_COUNT) 50 for line in lines] 51 return [dict(zip(self.DEVICE_HEADERS, row)) for row in rows] 52 53 def _collect_output_device_data(self): 54 cmd = ("cras_test_client --dump_server_info " 55 " | awk '/Output Devices:/,/Output Nodes:/'") 56 lines = self._host.run_output(cmd).split('\n') 57 # Ignore the first two lines ("Output Devices" and headers) and the 58 # last line ("Output Nodes") 59 lines = lines[2:-1] 60 rows = [self._construct_columns(line, self.DEVICE_COLUMN_COUNT) 61 for line in lines] 62 return [dict(zip(self.DEVICE_HEADERS, row)) for row in rows] 63 64 def _collect_output_node_cras_data(self): 65 """ 66 Collects output nodes data using cras_test_client. 67 68 @returns a list of dictionaries where keys are in OUTPUT_NODE_HEADERS 69 """ 70 # It's a bit hacky to use awk; we should probably do the parsing 71 # in Python instead using textfsm or some other lib. 72 cmd = ("cras_test_client --dump_server_info" 73 "| awk '/Output Nodes:/,/Input Devices:/'") 74 lines = self._host.run_output(cmd).split('\n') 75 # Ignore the first two lines ("Output Nodes:" and headers) and the 76 # last line ("Input Devices:") 77 lines = lines[2:-1] 78 rows = [self._construct_columns(line, self.NODE_COLUMN_COUNT) 79 for line in lines] 80 return [dict(zip(self.OUTPUT_NODE_HEADERS, row)) for row in rows] 81 82 def _collect_input_node_cras_data(self): 83 """ 84 Collects input nodes data using cras_test_client. 85 86 @returns a list of dictionaries where keys are in INPUT_NODE_HEADERS 87 """ 88 cmd = ("cras_test_client --dump_server_info " 89 " | awk '/Input Nodes:/,/Attached clients:/'") 90 lines = self._host.run_output(cmd).split('\n') 91 # Ignore the first two lines ("Input Nodes:" and headers) and the 92 # last line ("Attached clients:") 93 lines = lines[2:-1] 94 rows = [self._construct_columns(line, self.NODE_COLUMN_COUNT) 95 for line in lines] 96 return [dict(zip(self.INPUT_NODE_HEADERS, row)) for row in rows] 97 98 def _get_device_name(self, device_data, device_id): 99 for device in device_data: 100 if device['ID'] == device_id: 101 return device['Name'] 102 return None 103 104 def _create_input_node(self, node_data, device_data): 105 """ 106 Create a CrasInputNode. 107 @param node_data Input Node data 108 @param device_data Input Device data 109 @return CrasInputNode 110 """ 111 device_id = node_data['ID'].split(':')[0] 112 device_name = self._get_device_name(device_data, device_id) 113 return cras_input_node.CrasInputNode( 114 node_id=node_data['ID'], 115 node_name=node_data['Name'], 116 gain=node_data['Gain'], 117 node_type=node_data['Type'], 118 device_id=device_id, 119 device_name=device_name) 120 121 def _create_output_node(self, node_data, device_data): 122 """ 123 Create a CrasOutputNode. 124 @param node_data Output Node data 125 @param device_data Output Device data 126 @return CrasOutputNode 127 """ 128 device_id = node_data['ID'].split(':')[0] 129 device_name = self._get_device_name(device_data, device_id) 130 return cras_output_node.CrasOutputNode( 131 node_id=node_data['ID'], 132 node_type=node_data['Type'], 133 node_name=node_data['Name'], 134 volume=node_data['Vol'], 135 device_id=device_id, 136 device_name=device_name) 137 138 def get_input_nodes(self): 139 device_data = self._collect_input_device_data() 140 node_data = self._collect_input_node_cras_data() 141 return [self._create_input_node(d, device_data) for d in node_data] 142 143 def get_output_nodes(self): 144 device_data = self._collect_output_device_data() 145 node_data = self._collect_output_node_cras_data() 146 return [self._create_output_node(d, device_data) for d in node_data] 147