1#!/usr/bin/python
2#
3# Copyright 2015 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import errno
18import random
19from socket import *  # pylint: disable=wildcard-import
20import subprocess
21import time
22import unittest
23
24from scapy import all as scapy
25
26import multinetwork_base
27import net_test
28
29
30RTMGRP_NEIGH = 4
31
32NUD_INCOMPLETE = 0x01
33NUD_REACHABLE = 0x02
34NUD_STALE = 0x04
35NUD_DELAY = 0x08
36NUD_PROBE = 0x10
37NUD_FAILED = 0x20
38NUD_PERMANENT = 0x80
39
40
41# TODO: Support IPv4.
42class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
43
44  # Set a 500-ms retrans timer so we can test for ND retransmits without
45  # waiting too long. Apparently this cannot go below 500ms.
46  RETRANS_TIME_MS = 500
47
48  # This can only be in seconds, so 1000 is the minimum.
49  DELAY_TIME_MS = 1000
50
51  # Unfortunately, this must be above the delay timer or the kernel ND code will
52  # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is
53  # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value
54  # that's 2x the delay timer.
55  BASE_REACHABLE_TIME_MS = 2 * DELAY_TIME_MS
56  MAX_REACHABLE_TIME_MS = 1.5 * BASE_REACHABLE_TIME_MS
57
58  @classmethod
59  def setUpClass(cls):
60    super(NeighbourTest, cls).setUpClass()
61    for netid in cls.tuns:
62      iface = cls.GetInterfaceName(netid)
63      # This can't be set in an RA.
64      cls.SetSysctl(
65          "/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" % iface,
66          cls.DELAY_TIME_MS / 1000)
67
68  def setUp(self):
69    super(NeighbourTest, self).setUp()
70
71    for netid in self.tuns:
72      # Clear the ND cache entries for all routers, so each test starts with
73      # the IPv6 default router in state STALE.
74      addr = self._RouterAddress(netid, 6)
75      ifindex = self.ifindices[netid]
76      self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED)
77
78      # Configure IPv6 by sending an RA.
79      self.SendRA(netid,
80                  retranstimer=self.RETRANS_TIME_MS,
81                  reachabletime=self.BASE_REACHABLE_TIME_MS)
82
83    self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
84    self.sock.bind((0, RTMGRP_NEIGH))
85    net_test.SetNonBlocking(self.sock)
86
87    self.netid = random.choice(self.tuns.keys())
88    self.ifindex = self.ifindices[self.netid]
89
90  def GetNeighbour(self, addr):
91    version = 6 if ":" in addr else 4
92    for msg, args in self.iproute.DumpNeighbours(version):
93      if args["NDA_DST"] == addr:
94        return msg, args
95
96  def GetNdEntry(self, addr):
97    return self.GetNeighbour(addr)
98
99  def CheckNoNdEvents(self):
100    self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK)
101
102  def assertNeighbourState(self, state, addr):
103    self.assertEquals(state, self.GetNdEntry(addr)[0].state)
104
105  def assertNeighbourAttr(self, addr, name, value):
106    self.assertEquals(value, self.GetNdEntry(addr)[1][name])
107
108  def ExpectNeighbourNotification(self, addr, state, attrs=None):
109    msg = self.sock.recv(4096)
110    msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg)
111    self.assertEquals(addr, actual_attrs["NDA_DST"])
112    self.assertEquals(state, msg.state)
113    if attrs:
114      for name in attrs:
115        self.assertEquals(attrs[name], actual_attrs[name])
116
117  def ExpectProbe(self, is_unicast, addr):
118    version = 6 if ":" in addr else 4
119    if version == 6:
120      llsrc = self.MyMacAddress(self.netid)
121      if is_unicast:
122        src = self.MyLinkLocalAddress(self.netid)
123        dst = addr
124      else:
125        solicited = inet_pton(AF_INET6, addr)
126        last3bytes = tuple([ord(b) for b in solicited[-3:]])
127        dst = "ff02::1:ff%02x:%02x%02x" % last3bytes
128        src = self.MyAddress(6, self.netid)
129      expected = (
130          scapy.IPv6(src=src, dst=dst) /
131          scapy.ICMPv6ND_NS(tgt=addr) /
132          scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc)
133      )
134      msg = "%s probe" % ("Unicast" if is_unicast else "Multicast")
135      self.ExpectPacketOn(self.netid, msg, expected)
136    else:
137      raise NotImplementedError
138
139  def ExpectUnicastProbe(self, addr):
140    self.ExpectProbe(True, addr)
141
142  def ExpectMulticastNS(self, addr):
143    self.ExpectProbe(False, addr)
144
145  def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None,
146                                  S=1, O=0, R=1):
147    version = 6 if ":" in addr else 4
148    if srcaddr is None:
149      srcaddr = addr
150    if dstaddr is None:
151      dstaddr = self.MyLinkLocalAddress(self.netid)
152    if version == 6:
153      packet = (
154          scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) /
155          scapy.IPv6(src=srcaddr, dst=dstaddr) /
156          scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) /
157          scapy.ICMPv6NDOptDstLLAddr(lladdr=mac)
158      )
159      self.ReceiveEtherPacketOn(self.netid, packet)
160    else:
161      raise NotImplementedError
162
163  def MonitorSleepMs(self, interval, addr):
164    slept = 0
165    while slept < interval:
166      sleep_ms = min(100, interval - slept)
167      time.sleep(sleep_ms / 1000.0)
168      slept += sleep_ms
169      print self.GetNdEntry(addr)
170
171  def MonitorSleep(self, intervalseconds, addr):
172    self.MonitorSleepMs(intervalseconds * 1000, addr)
173
174  def SleepMs(self, ms):
175    time.sleep(ms / 1000.0)
176
177  def testNotifications(self):
178    """Tests neighbour notifications.
179
180    Relevant kernel commits:
181      upstream net-next:
182        765c9c6 neigh: Better handling of transition to NUD_PROBE state
183        53385d2 neigh: Netlink notification for administrative NUD state change
184          (only checked on kernel v3.13+, not on v3.10)
185
186      android-3.10:
187        e4a6d6b neigh: Better handling of transition to NUD_PROBE state
188
189      android-3.18:
190        2011e72 neigh: Better handling of transition to NUD_PROBE state
191    """
192    router4 = self._RouterAddress(self.netid, 4)
193    router6 = self._RouterAddress(self.netid, 6)
194    self.assertNeighbourState(NUD_PERMANENT, router4)
195    self.assertNeighbourState(NUD_STALE, router6)
196
197    # Send a packet and check that we go into DELAY.
198    routing_mode = random.choice(["mark", "oif", "uid"])
199    s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
200    s.connect((net_test.IPV6_ADDR, 53))
201    s.send(net_test.UDP_PAYLOAD)
202    self.assertNeighbourState(NUD_DELAY, router6)
203
204    # Wait for the probe interval, then check that we're in PROBE, and that the
205    # kernel has notified us.
206    self.SleepMs(self.DELAY_TIME_MS * 1.1)
207    self.ExpectNeighbourNotification(router6, NUD_PROBE)
208    self.assertNeighbourState(NUD_PROBE, router6)
209    self.ExpectUnicastProbe(router6)
210
211    # Respond to the NS and verify we're in REACHABLE again.
212    self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid))
213    self.assertNeighbourState(NUD_REACHABLE, router6)
214    if net_test.LINUX_VERSION >= (3, 13, 0):
215      # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative
216      # NUD state change" produces notifications for NUD_REACHABLE, but these
217      # are not generated on earlier kernels.
218      self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
219
220    # Wait until the reachable time has passed, and verify we're in STALE.
221    self.SleepMs(self.MAX_REACHABLE_TIME_MS * 1.2)
222    self.assertNeighbourState(NUD_STALE, router6)
223    self.ExpectNeighbourNotification(router6, NUD_STALE)
224
225    # Send a packet, and verify we go into DELAY and then to PROBE.
226    s.send(net_test.UDP_PAYLOAD)
227    self.assertNeighbourState(NUD_DELAY, router6)
228    self.SleepMs(self.DELAY_TIME_MS * 1.1)
229    self.assertNeighbourState(NUD_PROBE, router6)
230    self.ExpectNeighbourNotification(router6, NUD_PROBE)
231
232    # Wait for the probes to time out, and expect a FAILED notification.
233    self.assertNeighbourAttr(router6, "NDA_PROBES", 1)
234    self.ExpectUnicastProbe(router6)
235
236    self.SleepMs(self.RETRANS_TIME_MS)
237    self.ExpectUnicastProbe(router6)
238    self.assertNeighbourAttr(router6, "NDA_PROBES", 2)
239
240    self.SleepMs(self.RETRANS_TIME_MS)
241    self.ExpectUnicastProbe(router6)
242    self.assertNeighbourAttr(router6, "NDA_PROBES", 3)
243
244    self.SleepMs(self.RETRANS_TIME_MS)
245    self.assertNeighbourState(NUD_FAILED, router6)
246    self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3})
247
248  def testRepeatedProbes(self):
249    router4 = self._RouterAddress(self.netid, 4)
250    router6 = self._RouterAddress(self.netid, 6)
251    routermac = self.RouterMacAddress(self.netid)
252    self.assertNeighbourState(NUD_PERMANENT, router4)
253    self.assertNeighbourState(NUD_STALE, router6)
254
255    def ForceProbe(addr, mac):
256      self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE)
257      self.assertNeighbourState(NUD_PROBE, addr)
258      self.SleepMs(1)  # TODO: Why is this necessary?
259      self.assertNeighbourState(NUD_PROBE, addr)
260      self.ExpectUnicastProbe(addr)
261      self.ReceiveUnicastAdvertisement(addr, mac)
262      self.assertNeighbourState(NUD_REACHABLE, addr)
263
264    for _ in xrange(5):
265      ForceProbe(router6, routermac)
266
267  def testIsRouterFlag(self):
268    router6 = self._RouterAddress(self.netid, 6)
269    self.assertNeighbourState(NUD_STALE, router6)
270
271    # Get into FAILED.
272    ifindex = self.ifindices[self.netid]
273    self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED)
274    self.ExpectNeighbourNotification(router6, NUD_FAILED)
275    self.assertNeighbourState(NUD_FAILED, router6)
276
277    time.sleep(1)
278
279    # Send another packet and expect a multicast NS.
280    routing_mode = random.choice(["mark", "oif", "uid"])
281    s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
282    s.connect((net_test.IPV6_ADDR, 53))
283    s.send(net_test.UDP_PAYLOAD)
284    self.ExpectMulticastNS(router6)
285
286    # Receive a unicast NA with the R flag set to 0.
287    self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid),
288                                     srcaddr=self._RouterAddress(self.netid, 6),
289                                     dstaddr=self.MyAddress(6, self.netid),
290                                     S=1, O=0, R=0)
291
292    # Expect that this takes us to REACHABLE.
293    self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
294    self.assertNeighbourState(NUD_REACHABLE, router6)
295
296
297if __name__ == "__main__":
298  unittest.main()
299