1#!/usr/bin/python 2# 3# Copyright 2015 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import errno 18import random 19from socket import * # pylint: disable=wildcard-import 20import time 21import unittest 22 23from scapy import all as scapy 24 25import multinetwork_base 26import net_test 27 28 29RTMGRP_NEIGH = 4 30 31NUD_INCOMPLETE = 0x01 32NUD_REACHABLE = 0x02 33NUD_STALE = 0x04 34NUD_DELAY = 0x08 35NUD_PROBE = 0x10 36NUD_FAILED = 0x20 37NUD_PERMANENT = 0x80 38 39 40# TODO: Support IPv4. 41class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): 42 43 # Set a 100-ms retrans timer so we can test for ND retransmits without 44 # waiting too long. Apparently this cannot go below 500ms. 45 RETRANS_TIME_MS = 500 46 47 # This can only be in seconds, so 1000 is the minimum. 48 DELAY_TIME_MS = 1000 49 50 # Unfortunately, this must be above the delay timer or the kernel ND code will 51 # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is 52 # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value 53 # that's 2x the delay timer. 54 REACHABLE_TIME_MS = 2 * DELAY_TIME_MS 55 56 @classmethod 57 def setUpClass(cls): 58 super(NeighbourTest, cls).setUpClass() 59 for netid in cls.tuns: 60 iface = cls.GetInterfaceName(netid) 61 # This can't be set in an RA. 62 cls.SetSysctl( 63 "/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" % iface, 64 cls.DELAY_TIME_MS / 1000) 65 66 def setUp(self): 67 super(NeighbourTest, self).setUp() 68 69 for netid in self.tuns: 70 # Clear the ND cache entries for all routers, so each test starts with 71 # the IPv6 default router in state STALE. 72 addr = self._RouterAddress(netid, 6) 73 ifindex = self.ifindices[netid] 74 self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED) 75 76 # Configure IPv6 by sending an RA. 77 self.SendRA(netid, 78 retranstimer=self.RETRANS_TIME_MS, 79 reachabletime=self.REACHABLE_TIME_MS) 80 81 self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE) 82 self.sock.bind((0, RTMGRP_NEIGH)) 83 net_test.SetNonBlocking(self.sock) 84 85 self.netid = random.choice(self.tuns.keys()) 86 self.ifindex = self.ifindices[self.netid] 87 88 def GetNeighbour(self, addr): 89 version = 6 if ":" in addr else 4 90 for msg, args in self.iproute.DumpNeighbours(version): 91 if args["NDA_DST"] == addr: 92 return msg, args 93 94 def GetNdEntry(self, addr): 95 return self.GetNeighbour(addr) 96 97 def CheckNoNdEvents(self): 98 self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK) 99 100 def assertNeighbourState(self, state, addr): 101 self.assertEquals(state, self.GetNdEntry(addr)[0].state) 102 103 def assertNeighbourAttr(self, addr, name, value): 104 self.assertEquals(value, self.GetNdEntry(addr)[1][name]) 105 106 def ExpectNeighbourNotification(self, addr, state, attrs=None): 107 msg = self.sock.recv(4096) 108 msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg) 109 self.assertEquals(addr, actual_attrs["NDA_DST"]) 110 self.assertEquals(state, msg.state) 111 if attrs: 112 for name in attrs: 113 self.assertEquals(attrs[name], actual_attrs[name]) 114 115 def ExpectProbe(self, is_unicast, addr): 116 version = 6 if ":" in addr else 4 117 if version == 6: 118 llsrc = self.MyMacAddress(self.netid) 119 if is_unicast: 120 src = self.MyLinkLocalAddress(self.netid) 121 dst = addr 122 else: 123 solicited = inet_pton(AF_INET6, addr) 124 last3bytes = tuple([ord(b) for b in solicited[-3:]]) 125 dst = "ff02::1:ff%02x:%02x%02x" % last3bytes 126 src = self.MyAddress(6, self.netid) 127 expected = ( 128 scapy.IPv6(src=src, dst=dst) / 129 scapy.ICMPv6ND_NS(tgt=addr) / 130 scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc) 131 ) 132 msg = "%s probe" % ("Unicast" if is_unicast else "Multicast") 133 self.ExpectPacketOn(self.netid, msg, expected) 134 else: 135 raise NotImplementedError 136 137 def ExpectUnicastProbe(self, addr): 138 self.ExpectProbe(True, addr) 139 140 def ExpectMulticastNS(self, addr): 141 self.ExpectProbe(False, addr) 142 143 def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None, 144 S=1, O=0, R=1): 145 version = 6 if ":" in addr else 4 146 if srcaddr is None: 147 srcaddr = addr 148 if dstaddr is None: 149 dstaddr = self.MyLinkLocalAddress(self.netid) 150 if version == 6: 151 packet = ( 152 scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) / 153 scapy.IPv6(src=srcaddr, dst=dstaddr) / 154 scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) / 155 scapy.ICMPv6NDOptDstLLAddr(lladdr=mac) 156 ) 157 self.ReceiveEtherPacketOn(self.netid, packet) 158 else: 159 raise NotImplementedError 160 161 def MonitorSleepMs(self, interval, addr): 162 slept = 0 163 while slept < interval: 164 sleep_ms = min(100, interval - slept) 165 time.sleep(sleep_ms / 1000.0) 166 slept += sleep_ms 167 print self.GetNdEntry(addr) 168 169 def MonitorSleep(self, intervalseconds, addr): 170 self.MonitorSleepMs(intervalseconds * 1000, addr) 171 172 def SleepMs(self, ms): 173 time.sleep(ms / 1000.0) 174 175 def testNotifications(self): 176 """Tests neighbour notifications. 177 178 Relevant kernel commits: 179 upstream net-next: 180 765c9c6 neigh: Better handling of transition to NUD_PROBE state 181 53385d2 neigh: Netlink notification for administrative NUD state change 182 (only checked on kernel v3.13+, not on v3.10) 183 184 android-3.10: 185 e4a6d6b neigh: Better handling of transition to NUD_PROBE state 186 187 android-3.18: 188 2011e72 neigh: Better handling of transition to NUD_PROBE state 189 """ 190 191 router4 = self._RouterAddress(self.netid, 4) 192 router6 = self._RouterAddress(self.netid, 6) 193 self.assertNeighbourState(NUD_PERMANENT, router4) 194 self.assertNeighbourState(NUD_STALE, router6) 195 196 # Send a packet and check that we go into DELAY. 197 routing_mode = random.choice(["mark", "oif", "uid"]) 198 s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode) 199 s.connect((net_test.IPV6_ADDR, 53)) 200 s.send(net_test.UDP_PAYLOAD) 201 self.assertNeighbourState(NUD_DELAY, router6) 202 203 # Wait for the probe interval, then check that we're in PROBE, and that the 204 # kernel has notified us. 205 self.SleepMs(self.DELAY_TIME_MS) 206 self.ExpectNeighbourNotification(router6, NUD_PROBE) 207 self.assertNeighbourState(NUD_PROBE, router6) 208 self.ExpectUnicastProbe(router6) 209 210 # Respond to the NS and verify we're in REACHABLE again. 211 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid)) 212 self.assertNeighbourState(NUD_REACHABLE, router6) 213 if net_test.LINUX_VERSION >= (3, 13, 0): 214 # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative 215 # NUD state change" produces notifications for NUD_REACHABLE, but these 216 # are not generated on earlier kernels. 217 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 218 219 # Wait until the reachable time has passed, and verify we're in STALE. 220 self.SleepMs(self.REACHABLE_TIME_MS * 1.5) 221 self.assertNeighbourState(NUD_STALE, router6) 222 self.ExpectNeighbourNotification(router6, NUD_STALE) 223 224 # Send a packet, and verify we go into DELAY and then to PROBE. 225 s.send(net_test.UDP_PAYLOAD) 226 self.assertNeighbourState(NUD_DELAY, router6) 227 self.SleepMs(self.DELAY_TIME_MS) 228 self.assertNeighbourState(NUD_PROBE, router6) 229 self.ExpectNeighbourNotification(router6, NUD_PROBE) 230 231 # Wait for the probes to time out, and expect a FAILED notification. 232 self.assertNeighbourAttr(router6, "NDA_PROBES", 1) 233 self.ExpectUnicastProbe(router6) 234 235 self.SleepMs(self.RETRANS_TIME_MS) 236 self.ExpectUnicastProbe(router6) 237 self.assertNeighbourAttr(router6, "NDA_PROBES", 2) 238 239 self.SleepMs(self.RETRANS_TIME_MS) 240 self.ExpectUnicastProbe(router6) 241 self.assertNeighbourAttr(router6, "NDA_PROBES", 3) 242 243 self.SleepMs(self.RETRANS_TIME_MS) 244 self.assertNeighbourState(NUD_FAILED, router6) 245 self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3}) 246 247 def testRepeatedProbes(self): 248 router4 = self._RouterAddress(self.netid, 4) 249 router6 = self._RouterAddress(self.netid, 6) 250 routermac = self.RouterMacAddress(self.netid) 251 self.assertNeighbourState(NUD_PERMANENT, router4) 252 self.assertNeighbourState(NUD_STALE, router6) 253 254 def ForceProbe(addr, mac): 255 self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE) 256 self.assertNeighbourState(NUD_PROBE, addr) 257 self.SleepMs(1) # TODO: Why is this necessary? 258 self.assertNeighbourState(NUD_PROBE, addr) 259 self.ExpectUnicastProbe(addr) 260 self.ReceiveUnicastAdvertisement(addr, mac) 261 self.assertNeighbourState(NUD_REACHABLE, addr) 262 263 for _ in xrange(5): 264 ForceProbe(router6, routermac) 265 266 def testIsRouterFlag(self): 267 router6 = self._RouterAddress(self.netid, 6) 268 self.assertNeighbourState(NUD_STALE, router6) 269 270 # Get into FAILED. 271 ifindex = self.ifindices[self.netid] 272 self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED) 273 self.ExpectNeighbourNotification(router6, NUD_FAILED) 274 self.assertNeighbourState(NUD_FAILED, router6) 275 276 time.sleep(1) 277 278 # Send another packet and expect a multicast NS. 279 routing_mode = random.choice(["mark", "oif", "uid"]) 280 s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode) 281 s.connect((net_test.IPV6_ADDR, 53)) 282 s.send(net_test.UDP_PAYLOAD) 283 self.ExpectMulticastNS(router6) 284 285 # Receive a unicast NA with the R flag set to 0. 286 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid), 287 srcaddr=self._RouterAddress(self.netid, 6), 288 dstaddr=self.MyAddress(6, self.netid), 289 S=1, O=0, R=0) 290 291 # Expect that this takes us to REACHABLE. 292 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 293 self.assertNeighbourState(NUD_REACHABLE, router6) 294 295 296if __name__ == "__main__": 297 unittest.main() 298