1#!/usr/bin/python
2#
3# Copyright 2015 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import errno
18import random
19from socket import *  # pylint: disable=wildcard-import
20import time
21import unittest
22
23from scapy import all as scapy
24
25import csocket
26import multinetwork_base
27import net_test
28
29
30RTMGRP_NEIGH = 4
31
32NUD_INCOMPLETE = 0x01
33NUD_REACHABLE = 0x02
34NUD_STALE = 0x04
35NUD_DELAY = 0x08
36NUD_PROBE = 0x10
37NUD_FAILED = 0x20
38NUD_PERMANENT = 0x80
39
40
41# TODO: Support IPv4.
42class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
43
44  # Set a 500-ms retrans timer so we can test for ND retransmits without
45  # waiting too long. Apparently this cannot go below 500ms.
46  RETRANS_TIME_MS = 500
47
48  # This can only be in seconds, so 1000 is the minimum.
49  DELAY_TIME_MS = 1000
50
51  # Unfortunately, this must be above the delay timer or the kernel ND code will
52  # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is
53  # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value
54  # that's 2x the delay timer.
55  BASE_REACHABLE_TIME_MS = 2 * DELAY_TIME_MS
56  MAX_REACHABLE_TIME_MS = 1.5 * BASE_REACHABLE_TIME_MS
57
58  # Kernel default unicast solicit is 3, but it need be changed larger
59  # when test recofiguration during probing
60  UCAST_SOLICIT_DEFAULT = 3
61  UCAST_SOLICIT_LARGE = 10
62
63  @classmethod
64  def setUpClass(cls):
65    super(NeighbourTest, cls).setUpClass()
66    for netid in cls.tuns:
67      iface = cls.GetInterfaceName(netid)
68      # This can't be set in an RA.
69      for proto in ["ipv4", "ipv6"]:
70          cls.SetSysctl(
71              "/proc/sys/net/%s/neigh/%s/delay_first_probe_time" % (proto, iface),
72              cls.DELAY_TIME_MS / 1000)
73          cls.SetSysctl(
74              "/proc/sys/net/%s/neigh/%s/retrans_time_ms" % (proto, iface),
75              cls.RETRANS_TIME_MS)
76
77  def setUp(self):
78    super(NeighbourTest, self).setUp()
79
80    for netid in self.tuns:
81      # Clear the ND cache entries for all routers, so each test starts with
82      # the IPv6 default router in state STALE.
83      addr = self._RouterAddress(netid, 6)
84      ifindex = self.ifindices[netid]
85      self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED)
86
87      # Configure IPv6 by sending an RA.
88      self.SendRA(netid,
89                  retranstimer=self.RETRANS_TIME_MS,
90                  reachabletime=self.BASE_REACHABLE_TIME_MS)
91
92    self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
93    self.sock.bind((0, RTMGRP_NEIGH))
94    net_test.SetNonBlocking(self.sock)
95
96    self.netid = random.choice(list(self.tuns.keys()))
97    self.ifindex = self.ifindices[self.netid]
98
99    # MultinetworkBaseTest always uses NUD_PERMANENT for router ARP entries.
100    # Temporarily change those entries to NUD_STALE so we can test them.
101    if net_test.LINUX_VERSION < (4, 9, 0):
102      # Cannot change state from NUD_PERMANENT to NUD_STALE directly,
103      # so delete it to make it NUD_FAILED then change it to NUD_STALE.
104      router = self._RouterAddress(self.netid, 4)
105      macaddr = self.RouterMacAddress(self.netid)
106      self.iproute.DelNeighbour(4, router, macaddr, self.ifindex)
107      self.ExpectNeighbourNotification(router, NUD_FAILED)
108      self.assertNeighbourState(NUD_FAILED, router)
109    self.ChangeRouterNudState(4, NUD_STALE)
110
111  def SetUnicastSolicit(self, proto, iface, value):
112    self.SetSysctl(
113        "/proc/sys/net/%s/neigh/%s/ucast_solicit" % (proto, iface), value)
114
115  def tearDown(self):
116    super(NeighbourTest, self).tearDown()
117    # It is already reset to default by TearDownClass,
118    # but here we need to set it to default after each testcase.
119    iface = self.GetInterfaceName(self.netid)
120    for proto in ["ipv4", "ipv6"]:
121      self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_DEFAULT)
122
123    # Change router ARP entries back to NUD_PERMANENT,
124    # so as not to affect other tests.
125    self.ChangeRouterNudState(4, NUD_PERMANENT)
126
127  def ChangeRouterNudState(self, version, state):
128    router = self._RouterAddress(self.netid, version)
129    macaddr = self.RouterMacAddress(self.netid)
130    self.iproute.UpdateNeighbour(version, router, macaddr, self.ifindex, state)
131    self.ExpectNeighbourNotification(router, state)
132    self.assertNeighbourState(state, router)
133
134  def GetNeighbour(self, addr, ifindex):
135    version = csocket.AddressVersion(addr)
136    for msg, args in self.iproute.DumpNeighbours(version, ifindex):
137      if args["NDA_DST"] == addr:
138        return msg, args
139
140  def GetNdEntry(self, addr):
141    return self.GetNeighbour(addr, self.ifindex)
142
143  def CheckNoNdEvents(self):
144    self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK)
145
146  def assertNeighbourState(self, state, addr):
147    self.assertEqual(state, self.GetNdEntry(addr)[0].state)
148
149  def assertNeighbourAttr(self, addr, name, value):
150    self.assertEqual(value, self.GetNdEntry(addr)[1][name])
151
152  def ExpectNeighbourNotification(self, addr, state, attrs=None):
153    msg = self.sock.recv(4096)
154    msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg)
155    self.assertEqual(addr, actual_attrs["NDA_DST"])
156    self.assertEqual(state, msg.state)
157    if attrs:
158      for name in attrs:
159        self.assertEqual(attrs[name], actual_attrs[name])
160
161  def ExpectProbe(self, is_unicast, addr):
162    version = csocket.AddressVersion(addr)
163    llsrc = self.MyMacAddress(self.netid)
164    if version == 6:
165      if is_unicast:
166        src = self.MyLinkLocalAddress(self.netid)
167        dst = addr
168      else:
169        solicited = inet_pton(AF_INET6, addr)
170        last3bytes = tuple([ord(b) for b in solicited[-3:]])
171        dst = "ff02::1:ff%02x:%02x%02x" % last3bytes
172        src = self.MyAddress(6, self.netid)
173      expected = (
174          scapy.IPv6(src=src, dst=dst) /
175          scapy.ICMPv6ND_NS(tgt=addr) /
176          scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc)
177      )
178      msg = "%s probe" % ("Unicast" if is_unicast else "Multicast")
179      self.ExpectPacketOn(self.netid, msg, expected)
180    else:  # version == 4
181      if is_unicast:
182        src = self._MyIPv4Address(self.netid)
183        dst = addr
184      else:
185        raise NotImplementedError("This test does not support broadcast ARP")
186      expected = scapy.ARP(psrc=src, pdst=dst, hwsrc=llsrc, op=1)
187      msg = "Unicast ARP probe"
188      self.ExpectPacketOn(self.netid, msg, expected)
189
190  def ExpectUnicastProbe(self, addr):
191    self.ExpectProbe(True, addr)
192
193  def ExpectMulticastNS(self, addr):
194    self.ExpectProbe(False, addr)
195
196  def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None,
197                                  S=1, O=0, R=1):
198    version = csocket.AddressVersion(addr)
199    if srcaddr is None:
200      srcaddr = addr
201    if dstaddr is None:
202      dstaddr = self.MyLinkLocalAddress(self.netid)
203    if version == 6:
204      packet = (
205          scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) /
206          scapy.IPv6(src=srcaddr, dst=dstaddr) /
207          scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) /
208          scapy.ICMPv6NDOptDstLLAddr(lladdr=mac)
209      )
210      self.ReceiveEtherPacketOn(self.netid, packet)
211    else:
212      raise NotImplementedError
213
214  def SendDnsRequest(self, addr):
215    version = csocket.AddressVersion(addr)
216    routing_mode = random.choice(["mark", "oif", "uid"])
217    s = self.BuildSocket(version, net_test.UDPSocket, self.netid, routing_mode)
218    s.connect((addr, 53))
219    s.send(net_test.UDP_PAYLOAD)
220    return s
221
222  def MonitorSleepMs(self, interval, addr):
223    slept = 0
224    while slept < interval:
225      sleep_ms = min(100, interval - slept)
226      time.sleep(sleep_ms / 1000.0)
227      slept += sleep_ms
228      print(self.GetNdEntry(addr))
229
230  def MonitorSleep(self, intervalseconds, addr):
231    self.MonitorSleepMs(intervalseconds * 1000, addr)
232
233  def SleepMs(self, ms):
234    time.sleep(ms / 1000.0)
235
236  def testNotifications(self):
237    """Tests neighbour notifications.
238
239    Relevant kernel commits:
240      upstream net-next:
241        765c9c6 neigh: Better handling of transition to NUD_PROBE state
242        53385d2 neigh: Netlink notification for administrative NUD state change
243          (only checked on kernel v3.13+, not on v3.10)
244
245      android-3.10:
246        e4a6d6b neigh: Better handling of transition to NUD_PROBE state
247
248      android-3.18:
249        2011e72 neigh: Better handling of transition to NUD_PROBE state
250    """
251    router4 = self._RouterAddress(self.netid, 4)
252    router6 = self._RouterAddress(self.netid, 6)
253    self.assertNeighbourState(NUD_STALE, router4)
254    self.assertNeighbourState(NUD_STALE, router6)
255
256    # Send a packet and check that we go into DELAY.
257    s = self.SendDnsRequest(net_test.IPV6_ADDR)
258    self.assertNeighbourState(NUD_DELAY, router6)
259
260    # Wait for the probe interval, then check that we're in PROBE, and that the
261    # kernel has notified us.
262    self.SleepMs(self.DELAY_TIME_MS * 1.1)
263    self.ExpectNeighbourNotification(router6, NUD_PROBE)
264    self.assertNeighbourState(NUD_PROBE, router6)
265    self.ExpectUnicastProbe(router6)
266
267    # Respond to the NS and verify we're in REACHABLE again.
268    self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid))
269    self.assertNeighbourState(NUD_REACHABLE, router6)
270    if net_test.LINUX_VERSION >= (3, 13, 0):
271      # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative
272      # NUD state change" produces notifications for NUD_REACHABLE, but these
273      # are not generated on earlier kernels.
274      self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
275
276    # Wait until the reachable time has passed, and verify we're in STALE.
277    self.SleepMs(self.MAX_REACHABLE_TIME_MS * 1.2)
278    self.assertNeighbourState(NUD_STALE, router6)
279    self.ExpectNeighbourNotification(router6, NUD_STALE)
280
281    # Send a packet, and verify we go into DELAY and then to PROBE.
282    s.send(net_test.UDP_PAYLOAD)
283    self.assertNeighbourState(NUD_DELAY, router6)
284    self.SleepMs(self.DELAY_TIME_MS * 1.1)
285    self.assertNeighbourState(NUD_PROBE, router6)
286    self.ExpectNeighbourNotification(router6, NUD_PROBE)
287
288    # Wait for the probes to time out, and expect a FAILED notification.
289    self.assertNeighbourAttr(router6, "NDA_PROBES", 1)
290    self.ExpectUnicastProbe(router6)
291
292    self.SleepMs(self.RETRANS_TIME_MS)
293    self.ExpectUnicastProbe(router6)
294    self.assertNeighbourAttr(router6, "NDA_PROBES", 2)
295
296    self.SleepMs(self.RETRANS_TIME_MS)
297    self.ExpectUnicastProbe(router6)
298    self.assertNeighbourAttr(router6, "NDA_PROBES", 3)
299
300    self.SleepMs(self.RETRANS_TIME_MS)
301    self.assertNeighbourState(NUD_FAILED, router6)
302    self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3})
303
304  def testRepeatedProbes(self):
305    router4 = self._RouterAddress(self.netid, 4)
306    router6 = self._RouterAddress(self.netid, 6)
307    routermac = self.RouterMacAddress(self.netid)
308    self.assertNeighbourState(NUD_STALE, router4)
309    self.assertNeighbourState(NUD_STALE, router6)
310
311    def ForceProbe(addr, mac):
312      self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE)
313      self.assertNeighbourState(NUD_PROBE, addr)
314      self.ExpectNeighbourNotification(addr, NUD_PROBE)
315      self.SleepMs(1)  # TODO: Why is this necessary?
316      self.assertNeighbourState(NUD_PROBE, addr)
317      self.ExpectUnicastProbe(addr)
318      self.ReceiveUnicastAdvertisement(addr, mac)
319      self.assertNeighbourState(NUD_REACHABLE, addr)
320      self.ExpectNeighbourNotification(addr, NUD_REACHABLE)
321
322    for _ in range(5):
323      ForceProbe(router6, routermac)
324
325  def testIsRouterFlag(self):
326    router6 = self._RouterAddress(self.netid, 6)
327    self.assertNeighbourState(NUD_STALE, router6)
328
329    # Get into FAILED.
330    ifindex = self.ifindices[self.netid]
331    self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED)
332    self.ExpectNeighbourNotification(router6, NUD_FAILED)
333    self.assertNeighbourState(NUD_FAILED, router6)
334
335    time.sleep(1)
336
337    # Send another packet and expect a multicast NS.
338    self.SendDnsRequest(net_test.IPV6_ADDR)
339    self.ExpectMulticastNS(router6)
340
341    # Receive a unicast NA with the R flag set to 0.
342    self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid),
343                                     srcaddr=self._RouterAddress(self.netid, 6),
344                                     dstaddr=self.MyAddress(6, self.netid),
345                                     S=1, O=0, R=0)
346
347    # Expect that this takes us to REACHABLE.
348    self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
349    self.assertNeighbourState(NUD_REACHABLE, router6)
350
351  def DoReconfigureDuringProbing(self, version):
352    if version == 6:
353      proto = "ipv6"
354      ip_addr = net_test.IPV6_ADDR
355    else:
356      proto = "ipv4"
357      ip_addr = net_test.IPV4_ADDR
358    router = self._RouterAddress(self.netid, version)
359    self.assertNeighbourState(NUD_STALE, router)
360
361    iface = self.GetInterfaceName(self.netid)
362    # set unicast solicit larger.
363    self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_LARGE)
364
365    # Send a packet and check that we go into DELAY.
366    self.SendDnsRequest(ip_addr)
367    self.assertNeighbourState(NUD_DELAY, router)
368
369    # Probing 4 times but no reponse
370    self.SleepMs(self.DELAY_TIME_MS * 1.1)
371    self.ExpectNeighbourNotification(router, NUD_PROBE)
372    self.assertNeighbourState(NUD_PROBE, router)
373    self.ExpectUnicastProbe(router)
374
375    for i in range(0, 3):
376      self.SleepMs(self.RETRANS_TIME_MS)
377      self.ExpectUnicastProbe(router)
378
379    # reconfiguration to 3 while probing and the state change to NUD_FAILED
380    self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_DEFAULT)
381    self.SleepMs(self.RETRANS_TIME_MS)
382    self.ExpectNeighbourNotification(router, NUD_FAILED)
383    self.assertNeighbourState(NUD_FAILED, router)
384
385  # Check neighbor state after re-config ARP probe times.
386  def testReconfigureDuringProbing(self):
387    self.DoReconfigureDuringProbing(4)
388    self.DoReconfigureDuringProbing(6)
389
390if __name__ == "__main__":
391  unittest.main()
392