1#   Copyright 2016 - The Android Open Source Project
2#
3#   Licensed under the Apache License, Version 2.0 (the "License");
4#   you may not use this file except in compliance with the License.
5#   You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#   Unless required by applicable law or agreed to in writing, software
10#   distributed under the License is distributed on an "AS IS" BASIS,
11#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#   See the License for the specific language governing permissions and
13#   limitations under the License.
14
15import ipaddress
16import re
17
18from acts.libs.proc import job
19
20
21class LinuxIpCommand(object):
22    """Interface for doing standard IP commands on a linux system.
23
24    Wraps standard shell commands used for ip into a python object that can
25    be interacted with more easily.
26    """
27
28    def __init__(self, runner):
29        """
30        Args:
31            runner: Object that can take unix commands and run them in an
32                    enviroment (eg. connection.SshConnection).
33        """
34        self._runner = runner
35
36    def get_ipv4_addresses(self, net_interface):
37        """Gets all ipv4 addresses of a network interface.
38
39        Args:
40            net_interface: string, The network interface to get info on
41                           (eg. wlan0).
42
43        Returns: An iterator of tuples that contain (address, broadcast).
44                 where address is a ipaddress.IPv4Interface and broadcast
45                 is an ipaddress.IPv4Address.
46        """
47        results = self._runner.run('ip addr show dev %s' % net_interface)
48        lines = results.stdout.splitlines()
49
50        # Example stdout:
51        # 2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP group default qlen 1000
52        #   link/ether 48:0f:cf:3c:9d:89 brd ff:ff:ff:ff:ff:ff
53        #   inet 192.168.1.1/24 brd 192.168.1.255 scope global eth0
54        #       valid_lft forever preferred_lft forever
55        #   inet6 2620:0:1000:1500:a968:a776:2d80:a8b3/64 scope global temporary dynamic
56        #       valid_lft 599919sec preferred_lft 80919sec
57
58        for line in lines:
59            line = line.strip()
60            match = re.search('inet (?P<address>[^\s]*) brd (?P<bcast>[^\s]*)',
61                              line)
62            if match:
63                d = match.groupdict()
64                address = ipaddress.IPv4Interface(d['address'])
65                bcast = ipaddress.IPv4Address(d['bcast'])
66                yield (address, bcast)
67
68            match = re.search('inet (?P<address>[^\s]*)', line)
69            if match:
70                d = match.groupdict()
71                address = ipaddress.IPv4Interface(d['address'])
72                yield (address, None)
73
74    def add_ipv4_address(self, net_interface, address, broadcast=None):
75        """Adds an ipv4 address to a net_interface.
76
77        Args:
78            net_interface: string, The network interface
79                           to get the new ipv4 (eg. wlan0).
80            address: ipaddress.IPv4Interface, The new ipaddress and netmask
81                     to add to an interface.
82            broadcast: ipaddress.IPv4Address, The broadcast address to use for
83                       this net_interfaces subnet.
84        """
85        if broadcast:
86            self._runner.run('ip addr add %s broadcast %s dev %s' %
87                             (address, broadcast, net_interface))
88        else:
89            self._runner.run('ip addr add %s dev %s' %
90                             (address, net_interface))
91
92    def remove_ipv4_address(self, net_interface, address, ignore_status=False):
93        """Remove an ipv4 address.
94
95        Removes an ipv4 address from a network interface.
96
97        Args:
98            net_interface: string, The network interface to remove the
99                           ipv4 address from (eg. wlan0).
100            address: ipaddress.IPv4Interface or ipaddress.IPv4Address,
101                     The ip address to remove from the net_interface.
102            ignore_status: True if the exit status can be ignored
103        Returns:
104            The job result from a the command
105        """
106        return self._runner.run(
107            'ip addr del %s dev %s' % (address, net_interface),
108            ignore_status=ignore_status)
109
110    def set_ipv4_address(self, net_interface, address, broadcast=None):
111        """Set the ipv4 address.
112
113        Sets the ipv4 address of a network interface. If the network interface
114        has any other ipv4 addresses these will be cleared.
115
116        Args:
117            net_interface: string, The network interface to set the ip address
118                           on (eg. wlan0).
119            address: ipaddress.IPv4Interface, The ip address and subnet to give
120                     the net_interface.
121            broadcast: ipaddress.IPv4Address, The broadcast address to use for
122                       the subnet.
123        """
124        self.clear_ipv4_addresses(net_interface)
125        self.add_ipv4_address(net_interface, address, broadcast)
126
127    def clear_ipv4_addresses(self, net_interface):
128        """Clears all ipv4 addresses registered to a net_interface.
129
130        Args:
131            net_interface: string, The network interface to clear addresses from
132                           (eg. wlan0).
133        """
134        ip_info = self.get_ipv4_addresses(net_interface)
135
136        for address, _ in ip_info:
137            result = self.remove_ipv4_address(net_interface, address,
138                                              ignore_status=True)
139            # It is possible that the address has already been removed by the
140            # time this command has been called. In such a case, we would get
141            # this error message.
142            error_msg = 'RTNETLINK answers: Cannot assign requested address'
143            if result.exit_status != 0:
144                if error_msg in result.stderr:
145                    # If it was removed by another process, log a warning
146                    if address not in self.get_ipv4_addresses(net_interface):
147                        self._runner.log.warning(
148                            'Unable to remove address %s. The address was '
149                            'removed by another process.' % address)
150                        continue
151                    # If it was not removed, raise an error
152                    self._runner.log.error(
153                        'Unable to remove address %s. The address is still '
154                        'registered to %s, despite call for removal.' %
155                        (address, net_interface))
156                raise job.Error(result)
157