1#!/usr/bin/python 2# 3# Copyright 2015 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import errno 18import random 19from socket import * # pylint: disable=wildcard-import 20import subprocess 21import time 22import unittest 23 24from scapy import all as scapy 25 26import multinetwork_base 27import net_test 28 29 30RTMGRP_NEIGH = 4 31 32NUD_INCOMPLETE = 0x01 33NUD_REACHABLE = 0x02 34NUD_STALE = 0x04 35NUD_DELAY = 0x08 36NUD_PROBE = 0x10 37NUD_FAILED = 0x20 38NUD_PERMANENT = 0x80 39 40 41# TODO: Support IPv4. 42class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): 43 44 # Set a 500-ms retrans timer so we can test for ND retransmits without 45 # waiting too long. Apparently this cannot go below 500ms. 46 RETRANS_TIME_MS = 500 47 48 # This can only be in seconds, so 1000 is the minimum. 49 DELAY_TIME_MS = 1000 50 51 # Unfortunately, this must be above the delay timer or the kernel ND code will 52 # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is 53 # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value 54 # that's 2x the delay timer. 55 BASE_REACHABLE_TIME_MS = 2 * DELAY_TIME_MS 56 MAX_REACHABLE_TIME_MS = 1.5 * BASE_REACHABLE_TIME_MS 57 58 @classmethod 59 def setUpClass(cls): 60 super(NeighbourTest, cls).setUpClass() 61 for netid in cls.tuns: 62 iface = cls.GetInterfaceName(netid) 63 # This can't be set in an RA. 64 cls.SetSysctl( 65 "/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" % iface, 66 cls.DELAY_TIME_MS / 1000) 67 68 def setUp(self): 69 super(NeighbourTest, self).setUp() 70 71 for netid in self.tuns: 72 # Clear the ND cache entries for all routers, so each test starts with 73 # the IPv6 default router in state STALE. 74 addr = self._RouterAddress(netid, 6) 75 ifindex = self.ifindices[netid] 76 self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED) 77 78 # Configure IPv6 by sending an RA. 79 self.SendRA(netid, 80 retranstimer=self.RETRANS_TIME_MS, 81 reachabletime=self.BASE_REACHABLE_TIME_MS) 82 83 self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE) 84 self.sock.bind((0, RTMGRP_NEIGH)) 85 net_test.SetNonBlocking(self.sock) 86 87 self.netid = random.choice(self.tuns.keys()) 88 self.ifindex = self.ifindices[self.netid] 89 90 def GetNeighbour(self, addr): 91 version = 6 if ":" in addr else 4 92 for msg, args in self.iproute.DumpNeighbours(version): 93 if args["NDA_DST"] == addr: 94 return msg, args 95 96 def GetNdEntry(self, addr): 97 return self.GetNeighbour(addr) 98 99 def CheckNoNdEvents(self): 100 self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK) 101 102 def assertNeighbourState(self, state, addr): 103 self.assertEquals(state, self.GetNdEntry(addr)[0].state) 104 105 def assertNeighbourAttr(self, addr, name, value): 106 self.assertEquals(value, self.GetNdEntry(addr)[1][name]) 107 108 def ExpectNeighbourNotification(self, addr, state, attrs=None): 109 msg = self.sock.recv(4096) 110 msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg) 111 self.assertEquals(addr, actual_attrs["NDA_DST"]) 112 self.assertEquals(state, msg.state) 113 if attrs: 114 for name in attrs: 115 self.assertEquals(attrs[name], actual_attrs[name]) 116 117 def ExpectProbe(self, is_unicast, addr): 118 version = 6 if ":" in addr else 4 119 if version == 6: 120 llsrc = self.MyMacAddress(self.netid) 121 if is_unicast: 122 src = self.MyLinkLocalAddress(self.netid) 123 dst = addr 124 else: 125 solicited = inet_pton(AF_INET6, addr) 126 last3bytes = tuple([ord(b) for b in solicited[-3:]]) 127 dst = "ff02::1:ff%02x:%02x%02x" % last3bytes 128 src = self.MyAddress(6, self.netid) 129 expected = ( 130 scapy.IPv6(src=src, dst=dst) / 131 scapy.ICMPv6ND_NS(tgt=addr) / 132 scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc) 133 ) 134 msg = "%s probe" % ("Unicast" if is_unicast else "Multicast") 135 self.ExpectPacketOn(self.netid, msg, expected) 136 else: 137 raise NotImplementedError 138 139 def ExpectUnicastProbe(self, addr): 140 self.ExpectProbe(True, addr) 141 142 def ExpectMulticastNS(self, addr): 143 self.ExpectProbe(False, addr) 144 145 def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None, 146 S=1, O=0, R=1): 147 version = 6 if ":" in addr else 4 148 if srcaddr is None: 149 srcaddr = addr 150 if dstaddr is None: 151 dstaddr = self.MyLinkLocalAddress(self.netid) 152 if version == 6: 153 packet = ( 154 scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) / 155 scapy.IPv6(src=srcaddr, dst=dstaddr) / 156 scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) / 157 scapy.ICMPv6NDOptDstLLAddr(lladdr=mac) 158 ) 159 self.ReceiveEtherPacketOn(self.netid, packet) 160 else: 161 raise NotImplementedError 162 163 def MonitorSleepMs(self, interval, addr): 164 slept = 0 165 while slept < interval: 166 sleep_ms = min(100, interval - slept) 167 time.sleep(sleep_ms / 1000.0) 168 slept += sleep_ms 169 print self.GetNdEntry(addr) 170 171 def MonitorSleep(self, intervalseconds, addr): 172 self.MonitorSleepMs(intervalseconds * 1000, addr) 173 174 def SleepMs(self, ms): 175 time.sleep(ms / 1000.0) 176 177 def testNotifications(self): 178 """Tests neighbour notifications. 179 180 Relevant kernel commits: 181 upstream net-next: 182 765c9c6 neigh: Better handling of transition to NUD_PROBE state 183 53385d2 neigh: Netlink notification for administrative NUD state change 184 (only checked on kernel v3.13+, not on v3.10) 185 186 android-3.10: 187 e4a6d6b neigh: Better handling of transition to NUD_PROBE state 188 189 android-3.18: 190 2011e72 neigh: Better handling of transition to NUD_PROBE state 191 """ 192 router4 = self._RouterAddress(self.netid, 4) 193 router6 = self._RouterAddress(self.netid, 6) 194 self.assertNeighbourState(NUD_PERMANENT, router4) 195 self.assertNeighbourState(NUD_STALE, router6) 196 197 # Send a packet and check that we go into DELAY. 198 routing_mode = random.choice(["mark", "oif", "uid"]) 199 s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode) 200 s.connect((net_test.IPV6_ADDR, 53)) 201 s.send(net_test.UDP_PAYLOAD) 202 self.assertNeighbourState(NUD_DELAY, router6) 203 204 # Wait for the probe interval, then check that we're in PROBE, and that the 205 # kernel has notified us. 206 self.SleepMs(self.DELAY_TIME_MS * 1.1) 207 self.ExpectNeighbourNotification(router6, NUD_PROBE) 208 self.assertNeighbourState(NUD_PROBE, router6) 209 self.ExpectUnicastProbe(router6) 210 211 # Respond to the NS and verify we're in REACHABLE again. 212 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid)) 213 self.assertNeighbourState(NUD_REACHABLE, router6) 214 if net_test.LINUX_VERSION >= (3, 13, 0): 215 # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative 216 # NUD state change" produces notifications for NUD_REACHABLE, but these 217 # are not generated on earlier kernels. 218 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 219 220 # Wait until the reachable time has passed, and verify we're in STALE. 221 self.SleepMs(self.MAX_REACHABLE_TIME_MS * 1.2) 222 self.assertNeighbourState(NUD_STALE, router6) 223 self.ExpectNeighbourNotification(router6, NUD_STALE) 224 225 # Send a packet, and verify we go into DELAY and then to PROBE. 226 s.send(net_test.UDP_PAYLOAD) 227 self.assertNeighbourState(NUD_DELAY, router6) 228 self.SleepMs(self.DELAY_TIME_MS * 1.1) 229 self.assertNeighbourState(NUD_PROBE, router6) 230 self.ExpectNeighbourNotification(router6, NUD_PROBE) 231 232 # Wait for the probes to time out, and expect a FAILED notification. 233 self.assertNeighbourAttr(router6, "NDA_PROBES", 1) 234 self.ExpectUnicastProbe(router6) 235 236 self.SleepMs(self.RETRANS_TIME_MS) 237 self.ExpectUnicastProbe(router6) 238 self.assertNeighbourAttr(router6, "NDA_PROBES", 2) 239 240 self.SleepMs(self.RETRANS_TIME_MS) 241 self.ExpectUnicastProbe(router6) 242 self.assertNeighbourAttr(router6, "NDA_PROBES", 3) 243 244 self.SleepMs(self.RETRANS_TIME_MS) 245 self.assertNeighbourState(NUD_FAILED, router6) 246 self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3}) 247 248 def testRepeatedProbes(self): 249 router4 = self._RouterAddress(self.netid, 4) 250 router6 = self._RouterAddress(self.netid, 6) 251 routermac = self.RouterMacAddress(self.netid) 252 self.assertNeighbourState(NUD_PERMANENT, router4) 253 self.assertNeighbourState(NUD_STALE, router6) 254 255 def ForceProbe(addr, mac): 256 self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE) 257 self.assertNeighbourState(NUD_PROBE, addr) 258 self.SleepMs(1) # TODO: Why is this necessary? 259 self.assertNeighbourState(NUD_PROBE, addr) 260 self.ExpectUnicastProbe(addr) 261 self.ReceiveUnicastAdvertisement(addr, mac) 262 self.assertNeighbourState(NUD_REACHABLE, addr) 263 264 for _ in xrange(5): 265 ForceProbe(router6, routermac) 266 267 def testIsRouterFlag(self): 268 router6 = self._RouterAddress(self.netid, 6) 269 self.assertNeighbourState(NUD_STALE, router6) 270 271 # Get into FAILED. 272 ifindex = self.ifindices[self.netid] 273 self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED) 274 self.ExpectNeighbourNotification(router6, NUD_FAILED) 275 self.assertNeighbourState(NUD_FAILED, router6) 276 277 time.sleep(1) 278 279 # Send another packet and expect a multicast NS. 280 routing_mode = random.choice(["mark", "oif", "uid"]) 281 s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode) 282 s.connect((net_test.IPV6_ADDR, 53)) 283 s.send(net_test.UDP_PAYLOAD) 284 self.ExpectMulticastNS(router6) 285 286 # Receive a unicast NA with the R flag set to 0. 287 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid), 288 srcaddr=self._RouterAddress(self.netid, 6), 289 dstaddr=self.MyAddress(6, self.netid), 290 S=1, O=0, R=0) 291 292 # Expect that this takes us to REACHABLE. 293 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 294 self.assertNeighbourState(NUD_REACHABLE, router6) 295 296 297if __name__ == "__main__": 298 unittest.main() 299