1#!/usr/bin/python
2#
3# Copyright 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# pylint: disable=g-bad-todo
18
19import errno
20import os
21import posix
22import random
23from socket import *  # pylint: disable=wildcard-import
24import struct
25import sys
26import threading
27import time
28import unittest
29
30from scapy import all as scapy
31
32import csocket
33import multinetwork_base
34import net_test
35
36
37HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
38
39ICMP_ECHO = 8
40ICMP_ECHOREPLY = 0
41ICMPV6_ECHO_REQUEST = 128
42ICMPV6_ECHO_REPLY = 129
43IPV6_MIN_MTU = 1280
44ICMPV6_HEADER_LEN = 8
45ICMPV6_PKT_TOOBIG = 2
46
47
48class PingReplyThread(threading.Thread):
49
50  MIN_TTL = 10
51  INTERMEDIATE_IPV4 = "192.0.2.2"
52  INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
53  NEIGHBOURS = ["fe80::1"]
54  LINK_MTU = 1300
55
56  def __init__(self, tun, mymac, routermac, routeraddr):
57    super(PingReplyThread, self).__init__()
58    self._tun = tun
59    self._started = False
60    self._stopped = False
61    self._mymac = mymac
62    self._routermac = routermac
63    self._routeraddr = routeraddr
64
65  def IsStarted(self):
66    return self._started
67
68  def Stop(self):
69    self._stopped = True
70
71  def ChecksumValid(self, packet):
72    # Get and clear the checksums.
73    def GetAndClearChecksum(layer):
74      if not layer:
75        return
76      try:
77        checksum = layer.chksum
78        del layer.chksum
79      except AttributeError:
80        checksum = layer.cksum
81        del layer.cksum
82      return checksum
83
84    def GetChecksum(layer):
85      try:
86        return layer.chksum
87      except AttributeError:
88        return layer.cksum
89
90    layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
91    sums = {}
92    for name in layers:
93      sums[name] = GetAndClearChecksum(packet.getlayer(name))
94
95    # Serialize the packet, so scapy recalculates the checksums, and compare
96    # them with the ones in the packet.
97    packet = packet.__class__(str(packet))
98    for name in layers:
99      layer = packet.getlayer(name)
100      if layer and GetChecksum(layer) != sums[name]:
101        return False
102
103    return True
104
105  def SendTimeExceeded(self, version, packet):
106    if version == 4:
107      src = packet.getlayer(scapy.IP).src
108      self.SendPacket(
109          scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
110          scapy.ICMP(type=11, code=0) /
111          packet)
112    elif version == 6:
113      src = packet.getlayer(scapy.IPv6).src
114      self.SendPacket(
115          scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
116          scapy.ICMPv6TimeExceeded(code=0) /
117          packet)
118
119  def SendPacketTooBig(self, packet):
120      src = packet.getlayer(scapy.IPv6).src
121      datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN
122      self.SendPacket(
123          scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
124          scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) /
125          str(packet)[:datalen])
126
127  def IPv4Packet(self, ip):
128    icmp = ip.getlayer(scapy.ICMP)
129
130    # We only support ping for now.
131    if (ip.proto != IPPROTO_ICMP or
132        icmp.type != ICMP_ECHO or
133        icmp.code != 0):
134      return
135
136    # Check the checksums.
137    if not self.ChecksumValid(ip):
138      return
139
140    if ip.ttl < self.MIN_TTL:
141      self.SendTimeExceeded(4, ip)
142      return
143
144    icmp.type = ICMP_ECHOREPLY
145    self.SwapAddresses(ip)
146    self.SendPacket(ip)
147
148  def IPv6Packet(self, ipv6):
149    icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
150
151    # We only support ping for now.
152    if (ipv6.nh != IPPROTO_ICMPV6 or
153        not icmpv6 or
154        icmpv6.type != ICMPV6_ECHO_REQUEST or
155        icmpv6.code != 0):
156      return
157
158    # Check the checksums.
159    if not self.ChecksumValid(ipv6):
160      return
161
162    if ipv6.dst.startswith("ff02::"):
163      ipv6.dst = ipv6.src
164      for src in [self._routeraddr]:
165        ipv6.src = src
166        icmpv6.type = ICMPV6_ECHO_REPLY
167        self.SendPacket(ipv6)
168    elif ipv6.hlim < self.MIN_TTL:
169      self.SendTimeExceeded(6, ipv6)
170    elif ipv6.plen > self.LINK_MTU:
171      self.SendPacketTooBig(ipv6)
172    else:
173      icmpv6.type = ICMPV6_ECHO_REPLY
174      if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr:
175        return
176      self.SwapAddresses(ipv6)
177      self.SendPacket(ipv6)
178
179  def SwapAddresses(self, packet):
180    src = packet.src
181    packet.src = packet.dst
182    packet.dst = src
183
184  def SendPacket(self, packet):
185    packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
186    try:
187      posix.write(self._tun.fileno(), str(packet))
188    except Exception as e:
189      if not self._stopped:
190        raise e
191
192  def run(self):
193    self._started = True
194    while not self._stopped:
195      try:
196        packet = posix.read(self._tun.fileno(), 4096)
197      except OSError as e:
198        if e.errno == errno.EAGAIN:
199          continue
200        else:
201          break
202      except ValueError as e:
203        if not self._stopped:
204          raise e
205
206      ether = scapy.Ether(packet)
207      if ether.type == net_test.ETH_P_IPV6:
208        self.IPv6Packet(ether.payload)
209      elif ether.type == net_test.ETH_P_IP:
210        self.IPv4Packet(ether.payload)
211
212
213class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
214
215  @classmethod
216  def WaitForReplyThreads(cls):
217    # Wait 2s for the reply threads to start. If they don't, don't blow up, as
218    # that would cause tearDownClass not to be called and thus not clean up
219    # routing configuration, breaking subsequent tests. Instead, just let these
220    # tests fail.
221    _INTERVAL = 0.1
222    _ATTEMPTS = 20
223    for i in range(0, _ATTEMPTS):
224      for netid in cls.NETIDS:
225        if all(thread.IsStarted() for thread in list(cls.reply_threads.values())):
226          return
227        time.sleep(_INTERVAL)
228    msg = "WARNING: reply threads not all started after %.1f seconds\n" % (
229        _ATTEMPTS * _INTERVAL)
230    sys.stderr.write(msg)
231
232  @classmethod
233  def StopReplyThreads(cls):
234    for thread in list(cls.reply_threads.values()):
235      thread.Stop()
236
237  @classmethod
238  def setUpClass(cls):
239    super(Ping6Test, cls).setUpClass()
240    cls.reply_threads = {}
241    for netid in cls.NETIDS:
242      cls.reply_threads[netid] = PingReplyThread(
243        cls.tuns[netid],
244        cls.MyMacAddress(netid),
245        cls.RouterMacAddress(netid),
246        cls._RouterAddress(netid, 6))
247      cls.reply_threads[netid].start()
248    cls.WaitForReplyThreads()
249    cls.netid = random.choice(cls.NETIDS)
250    cls.SetDefaultNetwork(cls.netid)
251
252  @classmethod
253  def tearDownClass(cls):
254    cls.StopReplyThreads()
255    cls.ClearDefaultNetwork()
256    super(Ping6Test, cls).tearDownClass()
257
258  def setUp(self):
259    self.ifname = self.GetInterfaceName(self.netid)
260    self.ifindex = self.ifindices[self.netid]
261    self.lladdr = net_test.GetLinkAddress(self.ifname, True)
262    self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
263
264  def assertValidPingResponse(self, s, data):
265    family = s.family
266
267    # Receive the reply.
268    rcvd, src = s.recvfrom(32768)
269    self.assertNotEqual(0, len(rcvd), "No data received")
270
271    # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
272    # as IPv4.
273    if src[0].startswith("::ffff:"):
274      family = AF_INET
275      src = (src[0].replace("::ffff:", ""), src[1:])
276
277    # Check the data being sent is valid.
278    self.assertGreater(len(data), 7, "Not enough data for ping packet")
279    if family == AF_INET:
280      self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
281    elif family == AF_INET6:
282      self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
283    else:
284      self.fail("Unknown socket address family %d" * s.family)
285
286    # Check address, ICMP type, and ICMP code.
287    if family == AF_INET:
288      addr, unused_port = src
289      self.assertGreaterEqual(len(addr), len("1.1.1.1"))
290      self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
291    else:
292      addr, unused_port, flowlabel, scope_id = src  # pylint: disable=unbalanced-tuple-unpacking
293      self.assertGreaterEqual(len(addr), len("::"))
294      self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
295      # Check that the flow label is zero and that the scope ID is sane.
296      self.assertEqual(flowlabel, 0)
297      if addr.startswith("fe80::"):
298        self.assertTrue(scope_id in list(self.ifindices.values()))
299      else:
300        self.assertEqual(0, scope_id)
301
302    # TODO: check the checksum. We can't do this easily now for ICMPv6 because
303    # we don't have the IP addresses so we can't construct the pseudoheader.
304
305    # Check the sequence number and the data.
306    self.assertEqual(len(data), len(rcvd))
307    self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
308
309  @staticmethod
310  def IsAlmostEqual(expected, actual, delta):
311    return abs(expected - actual) < delta
312
313  def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
314                        txmem=0, rxmem=0):
315    expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
316                "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
317                "%02X" % state,
318                "%08X:%08X" % (txmem, rxmem),
319                str(os.getuid()), "2", "0"]
320    for actual in self.ReadProcNetSocket(name):
321      # Check that rxmem and txmem don't differ too much from each other.
322      actual_txmem, actual_rxmem = expected[3].split(":")
323      if self.IsAlmostEqual(txmem, int(actual_txmem, 16), txmem / 4):
324        return
325      if self.IsAlmostEqual(rxmem, int(actual_rxmem, 16), rxmem / 4):
326        return
327
328      # Check all the parameters except rxmem and txmem.
329      expected[3] = actual[3]
330      if expected == actual:
331        return
332
333    self.fail("Cound not find socket matching %s" % expected)
334
335  def testIPv4SendWithNoConnection(self):
336    s = net_test.IPv4PingSocket()
337    self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
338
339  def testIPv6SendWithNoConnection(self):
340    s = net_test.IPv6PingSocket()
341    self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
342
343  def testIPv4LoopbackPingWithConnect(self):
344    s = net_test.IPv4PingSocket()
345    s.connect(("127.0.0.1", 55))
346    data = net_test.IPV4_PING + "foobarbaz"
347    s.send(data)
348    self.assertValidPingResponse(s, data)
349
350  def testIPv6LoopbackPingWithConnect(self):
351    s = net_test.IPv6PingSocket()
352    s.connect(("::1", 55))
353    s.send(net_test.IPV6_PING)
354    self.assertValidPingResponse(s, net_test.IPV6_PING)
355
356  def testIPv4PingUsingSendto(self):
357    s = net_test.IPv4PingSocket()
358    written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
359    self.assertEqual(len(net_test.IPV4_PING), written)
360    self.assertValidPingResponse(s, net_test.IPV4_PING)
361
362  def testIPv6PingUsingSendto(self):
363    s = net_test.IPv6PingSocket()
364    written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
365    self.assertEqual(len(net_test.IPV6_PING), written)
366    self.assertValidPingResponse(s, net_test.IPV6_PING)
367
368  def testIPv4NoCrash(self):
369    # Python 2.x does not provide either read() or recvmsg.
370    s = net_test.IPv4PingSocket()
371    written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
372    self.assertEqual(len(net_test.IPV4_PING), written)
373    fd = s.fileno()
374    reply = posix.read(fd, 4096)
375    self.assertEqual(written, len(reply))
376
377  def testIPv6NoCrash(self):
378    # Python 2.x does not provide either read() or recvmsg.
379    s = net_test.IPv6PingSocket()
380    written = s.sendto(net_test.IPV6_PING, ("::1", 55))
381    self.assertEqual(len(net_test.IPV6_PING), written)
382    fd = s.fileno()
383    reply = posix.read(fd, 4096)
384    self.assertEqual(written, len(reply))
385
386  def testCrossProtocolCrash(self):
387    # Checks that an ICMP error containing a ping packet that matches the ID
388    # of a socket of the wrong protocol (which can happen when using 464xlat)
389    # doesn't crash the kernel.
390
391    # We can only test this using IPv6 unreachables and IPv4 ping sockets,
392    # because IPv4 packets sent by scapy.send() on loopback are not received by
393    # the kernel. So we don't actually use this function yet.
394    def GetIPv4Unreachable(port):  # pylint: disable=unused-variable
395      return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
396              scapy.ICMP(type=3, code=0) /
397              scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
398              scapy.ICMP(type=8, id=port, seq=1))
399
400    def GetIPv6Unreachable(port):
401      return (scapy.IPv6(src="::1", dst="::1") /
402              scapy.ICMPv6DestUnreach() /
403              scapy.IPv6(src="::1", dst="::1") /
404              scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
405
406    # An unreachable matching the ID of a socket of the wrong protocol
407    # shouldn't crash.
408    s = net_test.IPv4PingSocket()
409    s.connect(("127.0.0.1", 12345))
410    _, port = s.getsockname()
411    scapy.send(GetIPv6Unreachable(port), verbose=False)
412    # No crash? Good.
413
414  def testCrossProtocolCalls(self):
415    """Tests that passing in the wrong family returns EAFNOSUPPORT.
416
417    Relevant kernel commits:
418      upstream net:
419        91a0b60 net/ping: handle protocol mismatching scenario
420        9145736d net: ping: Return EAFNOSUPPORT when appropriate.
421
422      android-3.10:
423        78a6809 net/ping: handle protocol mismatching scenario
424        428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
425    """
426
427    def CheckEAFNoSupport(function, *args):
428      self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
429
430    ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
431
432    # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
433    # IPv4 socket address structures, we need to pass down a socket address
434    # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
435    # will fail immediately with EINVAL because the passed-in socket length is
436    # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
437    ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
438    ipv4sockaddr = csocket.SockaddrIn6(
439        ipv4sockaddr.Pack() +
440        "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
441
442    s4 = net_test.IPv4PingSocket()
443    s6 = net_test.IPv6PingSocket()
444
445    # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
446    # address family, because the Python implementation will just pass garbage
447    # down to the kernel. So call the C functions directly.
448    CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
449    CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
450    CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
451    CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
452    CheckEAFNoSupport(csocket.Sendmsg,
453                      s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
454    CheckEAFNoSupport(csocket.Sendmsg,
455                      s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
456
457  def testIPv4Bind(self):
458    # Bind to unspecified address.
459    s = net_test.IPv4PingSocket()
460    s.bind(("0.0.0.0", 544))
461    self.assertEqual(("0.0.0.0", 544), s.getsockname())
462
463    # Bind to loopback.
464    s = net_test.IPv4PingSocket()
465    s.bind(("127.0.0.1", 99))
466    self.assertEqual(("127.0.0.1", 99), s.getsockname())
467
468    # Binding twice is not allowed.
469    self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
470
471    # But binding two different sockets to the same ID is allowed.
472    s2 = net_test.IPv4PingSocket()
473    s2.bind(("127.0.0.1", 99))
474    self.assertEqual(("127.0.0.1", 99), s2.getsockname())
475    s3 = net_test.IPv4PingSocket()
476    s3.bind(("127.0.0.1", 99))
477    self.assertEqual(("127.0.0.1", 99), s3.getsockname())
478
479    # If two sockets bind to the same port, the first one to call read() gets
480    # the response.
481    s4 = net_test.IPv4PingSocket()
482    s5 = net_test.IPv4PingSocket()
483    s4.bind(("0.0.0.0", 167))
484    s5.bind(("0.0.0.0", 167))
485    s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
486    self.assertValidPingResponse(s5, net_test.IPV4_PING)
487    csocket.SetSocketTimeout(s4, 100)
488    self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
489
490    # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
491    s6 = net_test.IPv4PingSocket()
492    s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
493    self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
494
495    # Can't bind after sendto.
496    s = net_test.IPv4PingSocket()
497    s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
498    self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
499
500  def testIPv6Bind(self):
501    # Bind to unspecified address.
502    s = net_test.IPv6PingSocket()
503    s.bind(("::", 769))
504    self.assertEqual(("::", 769, 0, 0), s.getsockname())
505
506    # Bind to loopback.
507    s = net_test.IPv6PingSocket()
508    s.bind(("::1", 99))
509    self.assertEqual(("::1", 99, 0, 0), s.getsockname())
510
511    # Binding twice is not allowed.
512    self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
513
514    # But binding two different sockets to the same ID is allowed.
515    s2 = net_test.IPv6PingSocket()
516    s2.bind(("::1", 99))
517    self.assertEqual(("::1", 99, 0, 0), s2.getsockname())
518    s3 = net_test.IPv6PingSocket()
519    s3.bind(("::1", 99))
520    self.assertEqual(("::1", 99, 0, 0), s3.getsockname())
521
522    # Binding both IPv4 and IPv6 to the same socket works.
523    s4 = net_test.IPv4PingSocket()
524    s6 = net_test.IPv6PingSocket()
525    s4.bind(("0.0.0.0", 444))
526    s6.bind(("::", 666, 0, 0))
527
528    # Can't bind after sendto.
529    s = net_test.IPv6PingSocket()
530    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
531    self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
532
533  def testIPv4InvalidBind(self):
534    s = net_test.IPv4PingSocket()
535    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
536                           s.bind, ("255.255.255.255", 1026))
537    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
538                           s.bind, ("224.0.0.1", 651))
539    # Binding to an address we don't have only works with IP_TRANSPARENT.
540    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
541                           s.bind, (net_test.IPV4_ADDR, 651))
542    try:
543      s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
544      s.bind((net_test.IPV4_ADDR, 651))
545    except IOError as e:
546      if e.errno == errno.EACCES:
547        pass  # We're not root. let it go for now.
548
549  def testIPv6InvalidBind(self):
550    s = net_test.IPv6PingSocket()
551    self.assertRaisesErrno(errno.EINVAL,
552                           s.bind, ("ff02::2", 1026))
553
554    # Binding to an address we don't have only works with IPV6_TRANSPARENT.
555    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
556                           s.bind, (net_test.IPV6_ADDR, 651))
557    try:
558      s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
559      s.bind((net_test.IPV6_ADDR, 651))
560    except IOError as e:
561      if e.errno == errno.EACCES:
562        pass  # We're not root. let it go for now.
563
564  def testAfUnspecBind(self):
565    # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
566    s4 = net_test.IPv4PingSocket()
567    sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
568    sockaddr.family = AF_UNSPEC
569    csocket.Bind(s4, sockaddr)
570    self.assertEqual(("0.0.0.0", 12996), s4.getsockname())
571
572    # But not if the address is anything else.
573    sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
574    sockaddr.family = AF_UNSPEC
575    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
576
577    # This doesn't work for IPv6.
578    s6 = net_test.IPv6PingSocket()
579    sockaddr = csocket.Sockaddr(("::1", 58997))
580    sockaddr.family = AF_UNSPEC
581    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
582
583  def testIPv6ScopedBind(self):
584    # Can't bind to a link-local address without a scope ID.
585    s = net_test.IPv6PingSocket()
586    self.assertRaisesErrno(errno.EINVAL,
587                           s.bind, (self.lladdr, 1026, 0, 0))
588
589    # Binding to a link-local address with a scope ID works, and the scope ID is
590    # returned by a subsequent getsockname. Interestingly, Python's getsockname
591    # returns "fe80:1%foo", even though it does not understand it.
592    expected = self.lladdr + "%" + self.ifname
593    s.bind((self.lladdr, 4646, 0, self.ifindex))
594    self.assertEqual((expected, 4646, 0, self.ifindex), s.getsockname())
595
596    # Of course, for the above to work the address actually has to be configured
597    # on the machine.
598    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
599                           s.bind, ("fe80::f00", 1026, 0, 1))
600
601    # Scope IDs on non-link-local addresses are silently ignored.
602    s = net_test.IPv6PingSocket()
603    s.bind(("::1", 1234, 0, 1))
604    self.assertEqual(("::1", 1234, 0, 0), s.getsockname())
605
606  def testBindAffectsIdentifier(self):
607    s = net_test.IPv6PingSocket()
608    s.bind((self.globaladdr, 0xf976))
609    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
610    self.assertEqual("\xf9\x76", s.recv(32768)[4:6])
611
612    s = net_test.IPv6PingSocket()
613    s.bind((self.globaladdr, 0xace))
614    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
615    self.assertEqual("\x0a\xce", s.recv(32768)[4:6])
616
617  def testLinkLocalAddress(self):
618    s = net_test.IPv6PingSocket()
619    # Sending to a link-local address with no scope fails with EINVAL.
620    self.assertRaisesErrno(errno.EINVAL,
621                           s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
622    # Sending to link-local address with a scope succeeds. Note that Python
623    # doesn't understand the "fe80::1%lo" format, even though it returns it.
624    s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
625    # No exceptions? Good.
626
627  def testLinkLocalOif(self):
628    """Checks that ping to link-local addresses works correctly.
629
630    Relevant kernel commits:
631      upstream net:
632        5e45789 net: ipv6: Fix ping to link-local addresses.
633    """
634    for mode in ["oif", "ucast_oif", None]:
635      s = net_test.IPv6PingSocket()
636      for netid in self.NETIDS:
637        s2 = net_test.IPv6PingSocket()
638        dst = self._RouterAddress(netid, 6)
639        self.assertTrue(dst.startswith("fe80:"))
640
641        if mode:
642          self.SelectInterface(s, netid, mode)
643          self.SelectInterface(s2, netid, mode)
644          scopeid = 0
645        else:
646          scopeid = self.ifindices[netid]
647
648        if mode == "oif":
649          # If SO_BINDTODEVICE has been set, any attempt to send on another
650          # interface returns EINVAL.
651          othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1)
652                                   % len(self.NETIDS)]
653          otherscopeid = self.ifindices[othernetid]
654          self.assertRaisesErrno(
655              errno.EINVAL,
656              s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid))
657          self.assertRaisesErrno(
658              errno.EINVAL,
659              s.connect, (dst, 55, 0, otherscopeid))
660
661        # Try using both sendto and connect/send.
662        # If we get a reply, we sent the packet out on the right interface.
663        s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid))
664        self.assertValidPingResponse(s, net_test.IPV6_PING)
665
666        # IPV6_UNICAST_IF doesn't work on connected sockets.
667        if mode != "ucast_oif":
668          s2.connect((dst, 123, 0, scopeid))
669          s2.send(net_test.IPV6_PING)
670          self.assertValidPingResponse(s2, net_test.IPV6_PING)
671
672  def testMappedAddressFails(self):
673    s = net_test.IPv6PingSocket()
674    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
675    self.assertValidPingResponse(s, net_test.IPV6_PING)
676    s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
677    self.assertValidPingResponse(s, net_test.IPV6_PING)
678    self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
679                           ("::ffff:192.0.2.1", 55))
680
681  @unittest.skipUnless(False, "skipping: does not work yet")
682  def testFlowLabel(self):
683    s = net_test.IPv6PingSocket()
684
685    # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
686    # the flow label in the packet is not set.
687    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
688    self.assertValidPingResponse(s, net_test.IPV6_PING)  # Checks flow label==0.
689
690    # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
691    # that is not registered with the flow manager should return EINVAL...
692    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
693    # ... but this doesn't work yet.
694    if False:
695      self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
696                             (net_test.IPV6_ADDR, 93, 0xdead, 0))
697
698    # After registering the flow label, it gets sent properly, appears in the
699    # output packet, and is returned in the response.
700    net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
701    self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
702                                     net_test.IPV6_FLOWINFO_SEND))
703    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
704    _, src = s.recvfrom(32768)
705    _, _, flowlabel, _ = src
706    self.assertEqual(0xdead, flowlabel & 0xfffff)
707
708  def testIPv4Error(self):
709    s = net_test.IPv4PingSocket()
710    s.setsockopt(SOL_IP, IP_TTL, 2)
711    s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
712    s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
713    # We can't check the actual error because Python 2.7 doesn't implement
714    # recvmsg, but we can at least check that the socket returns an error.
715    self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
716
717  def testIPv6Error(self):
718    s = net_test.IPv6PingSocket()
719    s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
720    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
721    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
722    # We can't check the actual error because Python 2.7 doesn't implement
723    # recvmsg, but we can at least check that the socket returns an error.
724    self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
725
726  def testIPv6MulticastPing(self):
727    s = net_test.IPv6PingSocket()
728    # Send a multicast ping and check we get at least one duplicate.
729    # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
730    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
731    s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
732    self.assertValidPingResponse(s, net_test.IPV6_PING)
733    self.assertValidPingResponse(s, net_test.IPV6_PING)
734
735  def testIPv4LargePacket(self):
736    s = net_test.IPv4PingSocket()
737    data = net_test.IPV4_PING + 20000 * "a"
738    s.sendto(data, ("127.0.0.1", 987))
739    self.assertValidPingResponse(s, data)
740
741  def testIPv6LargePacket(self):
742    s = net_test.IPv6PingSocket()
743    s.bind(("::", 0xace))
744    data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
745    s.sendto(data, ("::1", 953))
746
747  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
748  def testIcmpSocketsNotInIcmp6(self):
749    numrows = len(self.ReadProcNetSocket("icmp"))
750    numrows6 = len(self.ReadProcNetSocket("icmp6"))
751    s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
752    s.bind(("127.0.0.1", 0xace))
753    s.connect(("127.0.0.1", 0xbeef))
754    self.assertEqual(numrows + 1, len(self.ReadProcNetSocket("icmp")))
755    self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6")))
756
757  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
758  def testIcmp6SocketsNotInIcmp(self):
759    numrows = len(self.ReadProcNetSocket("icmp"))
760    numrows6 = len(self.ReadProcNetSocket("icmp6"))
761    s = net_test.IPv6PingSocket()
762    s.bind(("::1", 0xace))
763    s.connect(("::1", 0xbeef))
764    self.assertEqual(numrows, len(self.ReadProcNetSocket("icmp")))
765    self.assertEqual(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
766
767  def testProcNetIcmp(self):
768    s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
769    s.bind(("127.0.0.1", 0xace))
770    s.connect(("127.0.0.1", 0xbeef))
771    self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
772
773  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
774  def testProcNetIcmp6(self):
775    numrows6 = len(self.ReadProcNetSocket("icmp6"))
776    s = net_test.IPv6PingSocket()
777    s.bind(("::1", 0xace))
778    s.connect(("::1", 0xbeef))
779    self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
780
781    # Check the row goes away when the socket is closed.
782    s.close()
783    self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6")))
784
785    # Try send, bind and connect to check the addresses and the state.
786    s = net_test.IPv6PingSocket()
787    self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
788    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
789    self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
790
791    # Can't bind after sendto, apparently.
792    s = net_test.IPv6PingSocket()
793    self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
794    s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
795    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
796
797    # Check receive bytes.
798    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
799    s.connect(("ff02::1", 0xdead))
800    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
801    s.send(net_test.IPV6_PING)
802    s.recvfrom(32768, MSG_PEEK)  # Wait until the receive thread replies.
803    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
804                           txmem=0, rxmem=0x300)
805    self.assertValidPingResponse(s, net_test.IPV6_PING)
806    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
807                           txmem=0, rxmem=0)
808
809  def testProcNetUdp6(self):
810    s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
811    s.bind(("::1", 0xace))
812    s.connect(("::1", 0xbeef))
813    self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
814
815  def testProcNetRaw6(self):
816    s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
817    s.bind(("::1", 0xace))
818    s.connect(("::1", 0xbeef))
819    self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
820
821  def testIPv6MTU(self):
822    """Tests IPV6_RECVERR and path MTU discovery on ping sockets.
823
824    Relevant kernel commits:
825      upstream net-next:
826        dcb94b8 ipv6: fix endianness error in icmpv6_err
827    """
828    s = net_test.IPv6PingSocket()
829    s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1)
830    s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2)
831    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
832    s.connect((net_test.IPV6_ADDR, 55))
833    pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * "a"
834    s.send(pkt)
835    self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768)
836    data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE)
837
838    # Compare the offending packet with the one we sent. To do this we need to
839    # calculate the ident of the packet we sent and blank out the checksum of
840    # the one we received.
841    ident = struct.pack("!H", s.getsockname()[1])
842    pkt = pkt[:4] + ident + pkt[6:]
843    data = data[:2] + "\x00\x00" + pkt[4:]
844    self.assertEqual(pkt, data)
845
846    # Check the address that the packet was sent to.
847    # ... except in 4.1, where it just returns an AF_UNSPEC, like this:
848    # recvmsg(9, {msg_name(0)={sa_family=AF_UNSPEC,
849    #     sa_data="\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"},
850    #     msg_iov(1)=[{"\x80\x00\x04\x6b\x00\xc4\x00\x03\x61\x61\x61\x61\x61\x61"..., 4096}],
851    #     msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...},
852    #     msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232
853    if net_test.LINUX_VERSION != (4, 1, 0):
854      self.assertEqual(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr)
855
856    # Check the cmsg data, including the link MTU.
857    mtu = PingReplyThread.LINK_MTU
858    src = self.reply_threads[self.netid].INTERMEDIATE_IPV6
859    msglist = [
860        (net_test.SOL_IPV6, net_test.IPV6_RECVERR,
861         (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6,
862                                   ICMPV6_PKT_TOOBIG, 0, mtu, 0)),
863          csocket.Sockaddr((src, 0))))
864    ]
865
866    # IP[V6]_RECVERR in 3.10 appears to return incorrect data for the port.
867    # The fix might have been in 676d236, but we don't have that in 3.10 and it
868    # touches code all over the tree. Instead, just don't check the port.
869    if net_test.LINUX_VERSION <= (3, 14, 0):
870      msglist[0][2][1].port = cmsg[0][2][1].port
871
872    self.assertEqual(msglist, cmsg)
873
874
875if __name__ == "__main__":
876  unittest.main()
877