1#!/usr/bin/python
2#
3# Copyright 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import errno
18import random
19from socket import *  # pylint: disable=wildcard-import
20import time
21import unittest
22
23from scapy import all as scapy
24
25import csocket
26import iproute
27import multinetwork_base
28import packets
29import net_test
30
31# Setsockopt values.
32IPV6_ADDR_PREFERENCES = 72
33IPV6_PREFER_SRC_PUBLIC = 0x0002
34
35# The retrans timer is also the DAD timeout. We set this to a value that's not
36# so short that DAD will complete before we attempt to use the network, but
37# short enough that we don't have to wait too long for DAD to complete.
38RETRANS_TIMER = 150
39
40
41class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
42  """Test for IPv6 source address selection.
43
44  Relevant kernel commits:
45    upstream net-next:
46      7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
47      c58da4c net: ipv6: allow explicitly choosing optimistic addresses
48      9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface.
49      c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr().
50      c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr().
51      3985e8a ipv6: sysctl to restrict candidate source addresses
52
53    android-3.10:
54      2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
55      0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic
56      0633924 ipv6: sysctl to restrict candidate source addresses
57  """
58
59  def SetIPv6Sysctl(self, ifname, sysctl, value):
60    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value)
61
62  def SetDAD(self, ifname, value):
63    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
64    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)
65
66  def SetOptimisticDAD(self, ifname, value):
67    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)
68
69  def SetUseTempaddrs(self, ifname, value):
70    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)
71
72  def SetUseOptimistic(self, ifname, value):
73    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)
74
75  def GetSourceIP(self, netid, mode="mark"):
76    s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
77    # Because why not...testing for temporary addresses is a separate thing.
78    s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)
79
80    s.connect((net_test.IPV6_ADDR, 123))
81    src_addr = s.getsockname()[0]
82    self.assertTrue(src_addr)
83    return src_addr
84
85  def assertAddressNotPresent(self, address):
86    self.assertRaises(IOError, self.iproute.GetAddress, address)
87
88  def assertAddressHasExpectedAttributes(
89      self, address, expected_ifindex, expected_flags):
90    ifa_msg = self.iproute.GetAddress(address)[0]
91    self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
92    self.assertEquals(64, ifa_msg.prefixlen)
93    self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
94    self.assertEquals(expected_ifindex, ifa_msg.index)
95    self.assertEquals(expected_flags, ifa_msg.flags & expected_flags)
96
97  def AddressIsTentative(self, address):
98    ifa_msg = self.iproute.GetAddress(address)[0]
99    return ifa_msg.flags & iproute.IFA_F_TENTATIVE
100
101  def BindToAddress(self, address):
102    s = net_test.UDPSocket(AF_INET6)
103    s.bind((address, 0, 0, 0))
104
105  def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
106    pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
107    cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
108    s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
109    return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0)
110
111  def assertAddressUsable(self, address, netid):
112    self.BindToAddress(address)
113    self.SendWithSourceAddress(address, netid)
114    # No exceptions? Good.
115
116  def assertAddressNotUsable(self, address, netid):
117    self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
118    self.assertRaisesErrno(errno.EINVAL,
119                           self.SendWithSourceAddress, address, netid)
120
121  def assertAddressSelected(self, address, netid):
122    self.assertEquals(address, self.GetSourceIP(netid))
123
124  def assertAddressNotSelected(self, address, netid):
125    self.assertNotEquals(address, self.GetSourceIP(netid))
126
127  def WaitForDad(self, address):
128    for _ in xrange(20):
129      if not self.AddressIsTentative(address):
130        return
131      time.sleep(0.1)
132    raise AssertionError("%s did not complete DAD after 2 seconds")
133
134
135class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):
136
137  def setUp(self):
138    # [0]  Make sure DAD, optimistic DAD, and the use_optimistic option
139    # are all consistently disabled at the outset.
140    for netid in self.tuns:
141      ifname = self.GetInterfaceName(netid)
142      self.SetDAD(ifname, 0)
143      self.SetOptimisticDAD(ifname, 0)
144      self.SetUseTempaddrs(ifname, 0)
145      self.SetUseOptimistic(ifname, 0)
146      self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0)
147
148    # [1]  Pick an interface on which to test.
149    self.test_netid = random.choice(self.tuns.keys())
150    self.test_ip = self.MyAddress(6, self.test_netid)
151    self.test_ifindex = self.ifindices[self.test_netid]
152    self.test_ifname = self.GetInterfaceName(self.test_netid)
153    self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True)
154
155    # [2]  Delete the test interface's IPv6 address.
156    self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
157    self.assertAddressNotPresent(self.test_ip)
158
159    self.assertAddressNotUsable(self.test_ip, self.test_netid)
160    # Verify that the link-local address is not tentative.
161    # Even though we disable DAD above, without this change occasionally the
162    # test fails. This might be due to us disabling DAD only after the
163    # link-local address is generated.
164    self.WaitForDad(self.test_lladdr)
165
166
167class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):
168
169  def testRfc6724Behaviour(self):
170    # [3]  Get an IPv6 address back, in DAD start-up.
171    self.SetDAD(self.test_ifname, 1)  # Enable DAD
172    # Send a RA to start SLAAC and subsequent DAD.
173    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
174    # Get flags and prove tentative-ness.
175    self.assertAddressHasExpectedAttributes(
176        self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)
177
178    # Even though the interface has an IPv6 address, its tentative nature
179    # prevents it from being selected.
180    self.assertAddressNotUsable(self.test_ip, self.test_netid)
181    self.assertAddressNotSelected(self.test_ip, self.test_netid)
182
183    # Busy wait for DAD to complete (should be less than 1 second).
184    self.WaitForDad(self.test_ip)
185
186    # The test_ip should have completed DAD by now, and should be the
187    # chosen source address, eligible to bind to, etc.
188    self.assertAddressUsable(self.test_ip, self.test_netid)
189    self.assertAddressSelected(self.test_ip, self.test_netid)
190
191
192class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):
193
194  def testRfc6724Behaviour(self):
195    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
196    self.SetDAD(self.test_ifname, 1)  # Enable DAD
197    self.SetOptimisticDAD(self.test_ifname, 1)
198    # Send a RA to start SLAAC and subsequent DAD.
199    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
200    # Get flags and prove optimism.
201    self.assertAddressHasExpectedAttributes(
202        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
203
204    # Optimistic addresses are usable but are not selected.
205    if net_test.LINUX_VERSION >= (3, 18, 0):
206      # The version checked in to android kernels <= 3.10 requires the
207      # use_optimistic sysctl to be turned on.
208      self.assertAddressUsable(self.test_ip, self.test_netid)
209    self.assertAddressNotSelected(self.test_ip, self.test_netid)
210
211    # Busy wait for DAD to complete (should be less than 1 second).
212    self.WaitForDad(self.test_ip)
213
214    # The test_ip should have completed DAD by now, and should be the
215    # chosen source address.
216    self.assertAddressUsable(self.test_ip, self.test_netid)
217    self.assertAddressSelected(self.test_ip, self.test_netid)
218
219
220class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):
221
222  def testModifiedRfc6724Behaviour(self):
223    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
224    self.SetDAD(self.test_ifname, 1)  # Enable DAD
225    self.SetOptimisticDAD(self.test_ifname, 1)
226    self.SetUseOptimistic(self.test_ifname, 1)
227    # Send a RA to start SLAAC and subsequent DAD.
228    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
229    # Get flags and prove optimistism.
230    self.assertAddressHasExpectedAttributes(
231        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
232
233    # The interface has an IPv6 address and, despite its optimistic nature,
234    # the use_optimistic option allows it to be selected.
235    self.assertAddressUsable(self.test_ip, self.test_netid)
236    self.assertAddressSelected(self.test_ip, self.test_netid)
237
238
239class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
240
241  def testModifiedRfc6724Behaviour(self):
242    # [3]  Add a valid IPv6 address to this interface and verify it is
243    # selected as the source address.
244    preferred_ip = self.OnlinkPrefix(6, self.test_netid) + "cafe"
245    self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
246    self.assertAddressHasExpectedAttributes(
247        preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
248    self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid))
249
250    # [4]  Get another IPv6 address, in optimistic DAD start-up.
251    self.SetDAD(self.test_ifname, 1)  # Enable DAD
252    self.SetOptimisticDAD(self.test_ifname, 1)
253    self.SetUseOptimistic(self.test_ifname, 1)
254    # Send a RA to start SLAAC and subsequent DAD.
255    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
256    # Get flags and prove optimism.
257    self.assertAddressHasExpectedAttributes(
258        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
259
260    # Since the interface has another IPv6 address, the optimistic address
261    # is not selected--the other, valid address is chosen.
262    self.assertAddressUsable(self.test_ip, self.test_netid)
263    self.assertAddressNotSelected(self.test_ip, self.test_netid)
264    self.assertAddressSelected(preferred_ip, self.test_netid)
265
266
267class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):
268
269  def testDadFailure(self):
270    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
271    self.SetDAD(self.test_ifname, 1)  # Enable DAD
272    self.SetOptimisticDAD(self.test_ifname, 1)
273    self.SetUseOptimistic(self.test_ifname, 1)
274    # Send a RA to start SLAAC and subsequent DAD.
275    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
276    # Prove optimism and usability.
277    self.assertAddressHasExpectedAttributes(
278        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
279    self.assertAddressUsable(self.test_ip, self.test_netid)
280    self.assertAddressSelected(self.test_ip, self.test_netid)
281
282    # Send a NA for the optimistic address, indicating address conflict
283    # ("DAD defense").
284    conflict_macaddr = "02:00:0b:ad:d0:0d"
285    dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
286                   scapy.IPv6(src=self.test_ip, dst="ff02::1") /
287                   scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
288                   scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
289    self.ReceiveEtherPacketOn(self.test_netid, dad_defense)
290
291    # The address should have failed DAD, and therefore no longer be usable.
292    self.assertAddressNotUsable(self.test_ip, self.test_netid)
293    self.assertAddressNotSelected(self.test_ip, self.test_netid)
294
295    # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.
296
297
298class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
299
300  def testSendToOnlinkDestination(self):
301    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
302    self.SetDAD(self.test_ifname, 1)  # Enable DAD
303    self.SetOptimisticDAD(self.test_ifname, 1)
304    self.SetUseOptimistic(self.test_ifname, 1)
305    # Send a RA to start SLAAC and subsequent DAD.
306    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
307    # Prove optimism and usability.
308    self.assertAddressHasExpectedAttributes(
309        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
310    self.assertAddressUsable(self.test_ip, self.test_netid)
311    self.assertAddressSelected(self.test_ip, self.test_netid)
312
313    # [4]  Send to an on-link destination and observe a Neighbor Solicitation
314    # packet with a source address that is NOT the optimistic address.
315    # In this setup, the only usable address is the link-local address.
316    onlink_dest = self.GetRandomDestination(
317        self.OnlinkPrefix(6, self.test_netid))
318    self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)
319
320    if net_test.LINUX_VERSION >= (3, 18, 0):
321      # Older versions will actually choose the optimistic address to
322      # originate Neighbor Solications (RFC violation).
323      expected_ns = packets.NS(
324          self.test_lladdr,
325          onlink_dest,
326          self.MyMacAddress(self.test_netid))[1]
327      self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)
328
329
330# TODO(ek): add tests listening for netlink events.
331
332
333class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
334
335  def testChoosesNonInterfaceSourceAddress(self):
336    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0)
337    src_ip = self.GetSourceIP(self.test_netid)
338    self.assertFalse(src_ip in [self.test_ip, self.test_lladdr])
339    self.assertTrue(src_ip in
340                    [self.MyAddress(6, netid)
341                     for netid in self.tuns if netid != self.test_netid])
342
343
344class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
345
346  def testChoosesOnlyInterfaceSourceAddress(self):
347    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1)
348    # self.test_ifname does not have a global IPv6 address, so the only
349    # candidate is the existing link-local address.
350    self.assertAddressSelected(self.test_lladdr, self.test_netid)
351
352
353if __name__ == "__main__":
354  unittest.main()
355