1#!/usr/bin/python
2#
3# Copyright 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# pylint: disable=g-bad-todo
18
19import errno
20import os
21import posix
22import random
23from socket import *  # pylint: disable=wildcard-import
24import struct
25import sys
26import threading
27import time
28import unittest
29
30from scapy import all as scapy
31
32import csocket
33import multinetwork_base
34import net_test
35
36
37HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
38
39ICMP_ECHO = 8
40ICMP_ECHOREPLY = 0
41ICMPV6_ECHO_REQUEST = 128
42ICMPV6_ECHO_REPLY = 129
43IPV6_MIN_MTU = 1280
44ICMPV6_HEADER_LEN = 8
45ICMPV6_PKT_TOOBIG = 2
46
47
48class PingReplyThread(threading.Thread):
49
50  MIN_TTL = 10
51  INTERMEDIATE_IPV4 = "192.0.2.2"
52  INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
53  NEIGHBOURS = ["fe80::1"]
54  LINK_MTU = 1300
55
56  def __init__(self, tun, mymac, routermac, routeraddr):
57    super(PingReplyThread, self).__init__()
58    self._tun = tun
59    self._started = False
60    self._stopped = False
61    self._mymac = mymac
62    self._routermac = routermac
63    self._routeraddr = routeraddr
64
65  def IsStarted(self):
66    return self._started
67
68  def Stop(self):
69    self._stopped = True
70
71  def ChecksumValid(self, packet):
72    # Get and clear the checksums.
73    def GetAndClearChecksum(layer):
74      if not layer:
75        return
76      try:
77        checksum = layer.chksum
78        del layer.chksum
79      except AttributeError:
80        checksum = layer.cksum
81        del layer.cksum
82      return checksum
83
84    def GetChecksum(layer):
85      try:
86        return layer.chksum
87      except AttributeError:
88        return layer.cksum
89
90    layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
91    sums = {}
92    for name in layers:
93      sums[name] = GetAndClearChecksum(packet.getlayer(name))
94
95    # Serialize the packet, so scapy recalculates the checksums, and compare
96    # them with the ones in the packet.
97    packet = packet.__class__(str(packet))
98    for name in layers:
99      layer = packet.getlayer(name)
100      if layer and GetChecksum(layer) != sums[name]:
101        return False
102
103    return True
104
105  def SendTimeExceeded(self, version, packet):
106    if version == 4:
107      src = packet.getlayer(scapy.IP).src
108      self.SendPacket(
109          scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
110          scapy.ICMP(type=11, code=0) /
111          packet)
112    elif version == 6:
113      src = packet.getlayer(scapy.IPv6).src
114      self.SendPacket(
115          scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
116          scapy.ICMPv6TimeExceeded(code=0) /
117          packet)
118
119  def SendPacketTooBig(self, packet):
120      src = packet.getlayer(scapy.IPv6).src
121      datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN
122      self.SendPacket(
123          scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
124          scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) /
125          str(packet)[:datalen])
126
127  def IPv4Packet(self, ip):
128    icmp = ip.getlayer(scapy.ICMP)
129
130    # We only support ping for now.
131    if (ip.proto != IPPROTO_ICMP or
132        icmp.type != ICMP_ECHO or
133        icmp.code != 0):
134      return
135
136    # Check the checksums.
137    if not self.ChecksumValid(ip):
138      return
139
140    if ip.ttl < self.MIN_TTL:
141      self.SendTimeExceeded(4, ip)
142      return
143
144    icmp.type = ICMP_ECHOREPLY
145    self.SwapAddresses(ip)
146    self.SendPacket(ip)
147
148  def IPv6Packet(self, ipv6):
149    icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
150
151    # We only support ping for now.
152    if (ipv6.nh != IPPROTO_ICMPV6 or
153        not icmpv6 or
154        icmpv6.type != ICMPV6_ECHO_REQUEST or
155        icmpv6.code != 0):
156      return
157
158    # Check the checksums.
159    if not self.ChecksumValid(ipv6):
160      return
161
162    if ipv6.dst.startswith("ff02::"):
163      ipv6.dst = ipv6.src
164      for src in [self._routeraddr]:
165        ipv6.src = src
166        icmpv6.type = ICMPV6_ECHO_REPLY
167        self.SendPacket(ipv6)
168    elif ipv6.hlim < self.MIN_TTL:
169      self.SendTimeExceeded(6, ipv6)
170    elif ipv6.plen > self.LINK_MTU:
171      self.SendPacketTooBig(ipv6)
172    else:
173      icmpv6.type = ICMPV6_ECHO_REPLY
174      if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr:
175        return
176      self.SwapAddresses(ipv6)
177      self.SendPacket(ipv6)
178
179  def SwapAddresses(self, packet):
180    src = packet.src
181    packet.src = packet.dst
182    packet.dst = src
183
184  def SendPacket(self, packet):
185    packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
186    try:
187      posix.write(self._tun.fileno(), str(packet))
188    except Exception, e:
189      if not self._stopped:
190        raise e
191
192  def run(self):
193    self._started = True
194    while not self._stopped:
195      try:
196        packet = posix.read(self._tun.fileno(), 4096)
197      except OSError, e:
198        if e.errno == errno.EAGAIN:
199          continue
200        else:
201          break
202      except ValueError, e:
203        if not self._stopped:
204          raise e
205
206      ether = scapy.Ether(packet)
207      if ether.type == net_test.ETH_P_IPV6:
208        self.IPv6Packet(ether.payload)
209      elif ether.type == net_test.ETH_P_IP:
210        self.IPv4Packet(ether.payload)
211
212
213class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
214
215  @classmethod
216  def WaitForReplyThreads(cls):
217    # Wait 2s for the reply threads to start. If they don't, don't blow up, as
218    # that would cause tearDownClass not to be called and thus not clean up
219    # routing configuration, breaking subsequent tests. Instead, just let these
220    # tests fail.
221    _INTERVAL = 0.1
222    _ATTEMPTS = 20
223    for i in xrange(0, _ATTEMPTS):
224      for netid in cls.NETIDS:
225        if all(thread.IsStarted() for thread in cls.reply_threads.values()):
226          return
227        time.sleep(_INTERVAL)
228    msg = "WARNING: reply threads not all started after %.1f seconds\n" % (
229        _ATTEMPTS * _INTERVAL)
230    sys.stderr.write(msg)
231
232  @classmethod
233  def StopReplyThreads(cls):
234    for thread in cls.reply_threads.values():
235      thread.Stop()
236
237  @classmethod
238  def setUpClass(cls):
239    super(Ping6Test, cls).setUpClass()
240    cls.reply_threads = {}
241    for netid in cls.NETIDS:
242      cls.reply_threads[netid] = PingReplyThread(
243        cls.tuns[netid],
244        cls.MyMacAddress(netid),
245        cls.RouterMacAddress(netid),
246        cls._RouterAddress(netid, 6))
247      cls.reply_threads[netid].start()
248    cls.WaitForReplyThreads()
249    cls.netid = random.choice(cls.NETIDS)
250    cls.SetDefaultNetwork(cls.netid)
251
252  @classmethod
253  def tearDownClass(cls):
254    cls.StopReplyThreads()
255    cls.ClearDefaultNetwork()
256    super(Ping6Test, cls).tearDownClass()
257
258  def setUp(self):
259    self.ifname = self.GetInterfaceName(self.netid)
260    self.ifindex = self.ifindices[self.netid]
261    self.lladdr = net_test.GetLinkAddress(self.ifname, True)
262    self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
263
264  def assertValidPingResponse(self, s, data):
265    family = s.family
266
267    # Receive the reply.
268    rcvd, src = s.recvfrom(32768)
269    self.assertNotEqual(0, len(rcvd), "No data received")
270
271    # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
272    # as IPv4.
273    if src[0].startswith("::ffff:"):
274      family = AF_INET
275      src = (src[0].replace("::ffff:", ""), src[1:])
276
277    # Check the data being sent is valid.
278    self.assertGreater(len(data), 7, "Not enough data for ping packet")
279    if family == AF_INET:
280      self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
281    elif family == AF_INET6:
282      self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
283    else:
284      self.fail("Unknown socket address family %d" * s.family)
285
286    # Check address, ICMP type, and ICMP code.
287    if family == AF_INET:
288      addr, unused_port = src
289      self.assertGreaterEqual(len(addr), len("1.1.1.1"))
290      self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
291    else:
292      addr, unused_port, flowlabel, scope_id = src  # pylint: disable=unbalanced-tuple-unpacking
293      self.assertGreaterEqual(len(addr), len("::"))
294      self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
295      # Check that the flow label is zero and that the scope ID is sane.
296      self.assertEqual(flowlabel, 0)
297      if addr.startswith("fe80::"):
298        self.assertTrue(scope_id in self.ifindices.values())
299      else:
300        self.assertEquals(0, scope_id)
301
302    # TODO: check the checksum. We can't do this easily now for ICMPv6 because
303    # we don't have the IP addresses so we can't construct the pseudoheader.
304
305    # Check the sequence number and the data.
306    self.assertEqual(len(data), len(rcvd))
307    self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
308
309  def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
310                        txmem=0, rxmem=0):
311    expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
312                "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
313                "%02X" % state,
314                "%08X:%08X" % (txmem, rxmem),
315                str(os.getuid()), "2", "0"]
316    actual = self.ReadProcNetSocket(name)[-1]
317    self.assertListEqual(expected, actual)
318
319  def testIPv4SendWithNoConnection(self):
320    s = net_test.IPv4PingSocket()
321    self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
322
323  def testIPv6SendWithNoConnection(self):
324    s = net_test.IPv6PingSocket()
325    self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
326
327  def testIPv4LoopbackPingWithConnect(self):
328    s = net_test.IPv4PingSocket()
329    s.connect(("127.0.0.1", 55))
330    data = net_test.IPV4_PING + "foobarbaz"
331    s.send(data)
332    self.assertValidPingResponse(s, data)
333
334  def testIPv6LoopbackPingWithConnect(self):
335    s = net_test.IPv6PingSocket()
336    s.connect(("::1", 55))
337    s.send(net_test.IPV6_PING)
338    self.assertValidPingResponse(s, net_test.IPV6_PING)
339
340  def testIPv4PingUsingSendto(self):
341    s = net_test.IPv4PingSocket()
342    written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
343    self.assertEquals(len(net_test.IPV4_PING), written)
344    self.assertValidPingResponse(s, net_test.IPV4_PING)
345
346  def testIPv6PingUsingSendto(self):
347    s = net_test.IPv6PingSocket()
348    written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
349    self.assertEquals(len(net_test.IPV6_PING), written)
350    self.assertValidPingResponse(s, net_test.IPV6_PING)
351
352  def testIPv4NoCrash(self):
353    # Python 2.x does not provide either read() or recvmsg.
354    s = net_test.IPv4PingSocket()
355    written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
356    self.assertEquals(len(net_test.IPV4_PING), written)
357    fd = s.fileno()
358    reply = posix.read(fd, 4096)
359    self.assertEquals(written, len(reply))
360
361  def testIPv6NoCrash(self):
362    # Python 2.x does not provide either read() or recvmsg.
363    s = net_test.IPv6PingSocket()
364    written = s.sendto(net_test.IPV6_PING, ("::1", 55))
365    self.assertEquals(len(net_test.IPV6_PING), written)
366    fd = s.fileno()
367    reply = posix.read(fd, 4096)
368    self.assertEquals(written, len(reply))
369
370  def testCrossProtocolCrash(self):
371    # Checks that an ICMP error containing a ping packet that matches the ID
372    # of a socket of the wrong protocol (which can happen when using 464xlat)
373    # doesn't crash the kernel.
374
375    # We can only test this using IPv6 unreachables and IPv4 ping sockets,
376    # because IPv4 packets sent by scapy.send() on loopback are not received by
377    # the kernel. So we don't actually use this function yet.
378    def GetIPv4Unreachable(port):  # pylint: disable=unused-variable
379      return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
380              scapy.ICMP(type=3, code=0) /
381              scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
382              scapy.ICMP(type=8, id=port, seq=1))
383
384    def GetIPv6Unreachable(port):
385      return (scapy.IPv6(src="::1", dst="::1") /
386              scapy.ICMPv6DestUnreach() /
387              scapy.IPv6(src="::1", dst="::1") /
388              scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
389
390    # An unreachable matching the ID of a socket of the wrong protocol
391    # shouldn't crash.
392    s = net_test.IPv4PingSocket()
393    s.connect(("127.0.0.1", 12345))
394    _, port = s.getsockname()
395    scapy.send(GetIPv6Unreachable(port), verbose=False)
396    # No crash? Good.
397
398  def testCrossProtocolCalls(self):
399    """Tests that passing in the wrong family returns EAFNOSUPPORT.
400
401    Relevant kernel commits:
402      upstream net:
403        91a0b60 net/ping: handle protocol mismatching scenario
404        9145736d net: ping: Return EAFNOSUPPORT when appropriate.
405
406      android-3.10:
407        78a6809 net/ping: handle protocol mismatching scenario
408        428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
409    """
410
411    def CheckEAFNoSupport(function, *args):
412      self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
413
414    ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
415
416    # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
417    # IPv4 socket address structures, we need to pass down a socket address
418    # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
419    # will fail immediately with EINVAL because the passed-in socket length is
420    # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
421    ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
422    ipv4sockaddr = csocket.SockaddrIn6(
423        ipv4sockaddr.Pack() +
424        "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
425
426    s4 = net_test.IPv4PingSocket()
427    s6 = net_test.IPv6PingSocket()
428
429    # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
430    # address family, because the Python implementation will just pass garbage
431    # down to the kernel. So call the C functions directly.
432    CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
433    CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
434    CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
435    CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
436    CheckEAFNoSupport(csocket.Sendmsg,
437                      s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
438    CheckEAFNoSupport(csocket.Sendmsg,
439                      s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
440
441  def testIPv4Bind(self):
442    # Bind to unspecified address.
443    s = net_test.IPv4PingSocket()
444    s.bind(("0.0.0.0", 544))
445    self.assertEquals(("0.0.0.0", 544), s.getsockname())
446
447    # Bind to loopback.
448    s = net_test.IPv4PingSocket()
449    s.bind(("127.0.0.1", 99))
450    self.assertEquals(("127.0.0.1", 99), s.getsockname())
451
452    # Binding twice is not allowed.
453    self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
454
455    # But binding two different sockets to the same ID is allowed.
456    s2 = net_test.IPv4PingSocket()
457    s2.bind(("127.0.0.1", 99))
458    self.assertEquals(("127.0.0.1", 99), s2.getsockname())
459    s3 = net_test.IPv4PingSocket()
460    s3.bind(("127.0.0.1", 99))
461    self.assertEquals(("127.0.0.1", 99), s3.getsockname())
462
463    # If two sockets bind to the same port, the first one to call read() gets
464    # the response.
465    s4 = net_test.IPv4PingSocket()
466    s5 = net_test.IPv4PingSocket()
467    s4.bind(("0.0.0.0", 167))
468    s5.bind(("0.0.0.0", 167))
469    s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
470    self.assertValidPingResponse(s5, net_test.IPV4_PING)
471    net_test.SetSocketTimeout(s4, 100)
472    self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
473
474    # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
475    s6 = net_test.IPv4PingSocket()
476    s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
477    self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
478
479    # Can't bind after sendto.
480    s = net_test.IPv4PingSocket()
481    s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
482    self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
483
484  def testIPv6Bind(self):
485    # Bind to unspecified address.
486    s = net_test.IPv6PingSocket()
487    s.bind(("::", 769))
488    self.assertEquals(("::", 769, 0, 0), s.getsockname())
489
490    # Bind to loopback.
491    s = net_test.IPv6PingSocket()
492    s.bind(("::1", 99))
493    self.assertEquals(("::1", 99, 0, 0), s.getsockname())
494
495    # Binding twice is not allowed.
496    self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
497
498    # But binding two different sockets to the same ID is allowed.
499    s2 = net_test.IPv6PingSocket()
500    s2.bind(("::1", 99))
501    self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
502    s3 = net_test.IPv6PingSocket()
503    s3.bind(("::1", 99))
504    self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
505
506    # Binding both IPv4 and IPv6 to the same socket works.
507    s4 = net_test.IPv4PingSocket()
508    s6 = net_test.IPv6PingSocket()
509    s4.bind(("0.0.0.0", 444))
510    s6.bind(("::", 666, 0, 0))
511
512    # Can't bind after sendto.
513    s = net_test.IPv6PingSocket()
514    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
515    self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
516
517  def testIPv4InvalidBind(self):
518    s = net_test.IPv4PingSocket()
519    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
520                           s.bind, ("255.255.255.255", 1026))
521    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
522                           s.bind, ("224.0.0.1", 651))
523    # Binding to an address we don't have only works with IP_TRANSPARENT.
524    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
525                           s.bind, (net_test.IPV4_ADDR, 651))
526    try:
527      s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
528      s.bind((net_test.IPV4_ADDR, 651))
529    except IOError, e:
530      if e.errno == errno.EACCES:
531        pass  # We're not root. let it go for now.
532
533  def testIPv6InvalidBind(self):
534    s = net_test.IPv6PingSocket()
535    self.assertRaisesErrno(errno.EINVAL,
536                           s.bind, ("ff02::2", 1026))
537
538    # Binding to an address we don't have only works with IPV6_TRANSPARENT.
539    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
540                           s.bind, (net_test.IPV6_ADDR, 651))
541    try:
542      s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
543      s.bind((net_test.IPV6_ADDR, 651))
544    except IOError, e:
545      if e.errno == errno.EACCES:
546        pass  # We're not root. let it go for now.
547
548  def testAfUnspecBind(self):
549    # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
550    s4 = net_test.IPv4PingSocket()
551    sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
552    sockaddr.family = AF_UNSPEC
553    csocket.Bind(s4, sockaddr)
554    self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
555
556    # But not if the address is anything else.
557    sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
558    sockaddr.family = AF_UNSPEC
559    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
560
561    # This doesn't work for IPv6.
562    s6 = net_test.IPv6PingSocket()
563    sockaddr = csocket.Sockaddr(("::1", 58997))
564    sockaddr.family = AF_UNSPEC
565    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
566
567  def testIPv6ScopedBind(self):
568    # Can't bind to a link-local address without a scope ID.
569    s = net_test.IPv6PingSocket()
570    self.assertRaisesErrno(errno.EINVAL,
571                           s.bind, (self.lladdr, 1026, 0, 0))
572
573    # Binding to a link-local address with a scope ID works, and the scope ID is
574    # returned by a subsequent getsockname. Interestingly, Python's getsockname
575    # returns "fe80:1%foo", even though it does not understand it.
576    expected = self.lladdr + "%" + self.ifname
577    s.bind((self.lladdr, 4646, 0, self.ifindex))
578    self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
579
580    # Of course, for the above to work the address actually has to be configured
581    # on the machine.
582    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
583                           s.bind, ("fe80::f00", 1026, 0, 1))
584
585    # Scope IDs on non-link-local addresses are silently ignored.
586    s = net_test.IPv6PingSocket()
587    s.bind(("::1", 1234, 0, 1))
588    self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
589
590  def testBindAffectsIdentifier(self):
591    s = net_test.IPv6PingSocket()
592    s.bind((self.globaladdr, 0xf976))
593    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
594    self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
595
596    s = net_test.IPv6PingSocket()
597    s.bind((self.globaladdr, 0xace))
598    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
599    self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
600
601  def testLinkLocalAddress(self):
602    s = net_test.IPv6PingSocket()
603    # Sending to a link-local address with no scope fails with EINVAL.
604    self.assertRaisesErrno(errno.EINVAL,
605                           s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
606    # Sending to link-local address with a scope succeeds. Note that Python
607    # doesn't understand the "fe80::1%lo" format, even though it returns it.
608    s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
609    # No exceptions? Good.
610
611  def testLinkLocalOif(self):
612    """Checks that ping to link-local addresses works correctly.
613
614    Relevant kernel commits:
615      upstream net:
616        5e45789 net: ipv6: Fix ping to link-local addresses.
617    """
618    s = net_test.IPv6PingSocket()
619    for mode in ["oif", "ucast_oif", None]:
620      s = net_test.IPv6PingSocket()
621      for netid in self.NETIDS:
622        dst = self._RouterAddress(netid, 6)
623        self.assertTrue(dst.startswith("fe80:"))
624
625        if mode:
626          self.SelectInterface(s, netid, mode)
627          scopeid = 0
628        else:
629          scopeid = self.ifindices[netid]
630
631        if mode == "oif":
632          # If SO_BINDTODEVICE has been set, any attempt to send on another
633          # interface returns EINVAL.
634          othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1)
635                                   % len(self.NETIDS)]
636          otherscopeid = self.ifindices[othernetid]
637          self.assertRaisesErrno(
638              errno.EINVAL,
639              s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid))
640
641        s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid))
642        # If we got a reply, we sent the packet out on the right interface.
643        self.assertValidPingResponse(s, net_test.IPV6_PING)
644
645  def testMappedAddressFails(self):
646    s = net_test.IPv6PingSocket()
647    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
648    self.assertValidPingResponse(s, net_test.IPV6_PING)
649    s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
650    self.assertValidPingResponse(s, net_test.IPV6_PING)
651    self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
652                           ("::ffff:192.0.2.1", 55))
653
654  @unittest.skipUnless(False, "skipping: does not work yet")
655  def testFlowLabel(self):
656    s = net_test.IPv6PingSocket()
657
658    # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
659    # the flow label in the packet is not set.
660    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
661    self.assertValidPingResponse(s, net_test.IPV6_PING)  # Checks flow label==0.
662
663    # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
664    # that is not registered with the flow manager should return EINVAL...
665    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
666    # ... but this doesn't work yet.
667    if False:
668      self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
669                             (net_test.IPV6_ADDR, 93, 0xdead, 0))
670
671    # After registering the flow label, it gets sent properly, appears in the
672    # output packet, and is returned in the response.
673    net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
674    self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
675                                     net_test.IPV6_FLOWINFO_SEND))
676    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
677    _, src = s.recvfrom(32768)
678    _, _, flowlabel, _ = src
679    self.assertEqual(0xdead, flowlabel & 0xfffff)
680
681  def testIPv4Error(self):
682    s = net_test.IPv4PingSocket()
683    s.setsockopt(SOL_IP, IP_TTL, 2)
684    s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
685    s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
686    # We can't check the actual error because Python 2.7 doesn't implement
687    # recvmsg, but we can at least check that the socket returns an error.
688    self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
689
690  def testIPv6Error(self):
691    s = net_test.IPv6PingSocket()
692    s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
693    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
694    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
695    # We can't check the actual error because Python 2.7 doesn't implement
696    # recvmsg, but we can at least check that the socket returns an error.
697    self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
698
699  def testIPv6MulticastPing(self):
700    s = net_test.IPv6PingSocket()
701    # Send a multicast ping and check we get at least one duplicate.
702    # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
703    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
704    s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
705    self.assertValidPingResponse(s, net_test.IPV6_PING)
706    self.assertValidPingResponse(s, net_test.IPV6_PING)
707
708  def testIPv4LargePacket(self):
709    s = net_test.IPv4PingSocket()
710    data = net_test.IPV4_PING + 20000 * "a"
711    s.sendto(data, ("127.0.0.1", 987))
712    self.assertValidPingResponse(s, data)
713
714  def testIPv6LargePacket(self):
715    s = net_test.IPv6PingSocket()
716    s.bind(("::", 0xace))
717    data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
718    s.sendto(data, ("::1", 953))
719
720  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
721  def testIcmpSocketsNotInIcmp6(self):
722    numrows = len(self.ReadProcNetSocket("icmp"))
723    numrows6 = len(self.ReadProcNetSocket("icmp6"))
724    s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
725    s.bind(("127.0.0.1", 0xace))
726    s.connect(("127.0.0.1", 0xbeef))
727    self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
728    self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
729
730  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
731  def testIcmp6SocketsNotInIcmp(self):
732    numrows = len(self.ReadProcNetSocket("icmp"))
733    numrows6 = len(self.ReadProcNetSocket("icmp6"))
734    s = net_test.IPv6PingSocket()
735    s.bind(("::1", 0xace))
736    s.connect(("::1", 0xbeef))
737    self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
738    self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
739
740  def testProcNetIcmp(self):
741    s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
742    s.bind(("127.0.0.1", 0xace))
743    s.connect(("127.0.0.1", 0xbeef))
744    self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
745
746  @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
747  def testProcNetIcmp6(self):
748    numrows6 = len(self.ReadProcNetSocket("icmp6"))
749    s = net_test.IPv6PingSocket()
750    s.bind(("::1", 0xace))
751    s.connect(("::1", 0xbeef))
752    self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
753
754    # Check the row goes away when the socket is closed.
755    s.close()
756    self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
757
758    # Try send, bind and connect to check the addresses and the state.
759    s = net_test.IPv6PingSocket()
760    self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
761    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
762    self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
763
764    # Can't bind after sendto, apparently.
765    s = net_test.IPv6PingSocket()
766    self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
767    s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
768    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
769
770    # Check receive bytes.
771    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
772    s.connect(("ff02::1", 0xdead))
773    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
774    s.send(net_test.IPV6_PING)
775    s.recvfrom(32768, MSG_PEEK)  # Wait until the receive thread replies.
776    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
777                           txmem=0, rxmem=0x300)
778    self.assertValidPingResponse(s, net_test.IPV6_PING)
779    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
780                           txmem=0, rxmem=0)
781
782  def testProcNetUdp6(self):
783    s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
784    s.bind(("::1", 0xace))
785    s.connect(("::1", 0xbeef))
786    self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
787
788  def testProcNetRaw6(self):
789    s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
790    s.bind(("::1", 0xace))
791    s.connect(("::1", 0xbeef))
792    self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
793
794  def testIPv6MTU(self):
795    """Tests IPV6_RECVERR and path MTU discovery on ping sockets.
796
797    Relevant kernel commits:
798      upstream net-next:
799        dcb94b8 ipv6: fix endianness error in icmpv6_err
800    """
801    s = net_test.IPv6PingSocket()
802    s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1)
803    s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2)
804    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
805    s.connect((net_test.IPV6_ADDR, 55))
806    pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * "a"
807    s.send(pkt)
808    self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768)
809    data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE)
810
811    # Compare the offending packet with the one we sent. To do this we need to
812    # calculate the ident of the packet we sent and blank out the checksum of
813    # the one we received.
814    ident = struct.pack("!H", s.getsockname()[1])
815    pkt = pkt[:4] + ident + pkt[6:]
816    data = data[:2] + "\x00\x00" + pkt[4:]
817    self.assertEquals(pkt, data)
818
819    # Check the address that the packet was sent to.
820    # ... except in 4.1, where it just returns an AF_UNSPEC, like this:
821    # recvmsg(9, {msg_name(0)={sa_family=AF_UNSPEC,
822    #     sa_data="\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"},
823    #     msg_iov(1)=[{"\x80\x00\x04\x6b\x00\xc4\x00\x03\x61\x61\x61\x61\x61\x61"..., 4096}],
824    #     msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...},
825    #     msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232
826    if net_test.LINUX_VERSION != (4, 1, 0):
827      self.assertEquals(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr)
828
829    # Check the cmsg data, including the link MTU.
830    mtu = PingReplyThread.LINK_MTU
831    src = self.reply_threads[self.netid].INTERMEDIATE_IPV6
832    msglist = [
833        (net_test.SOL_IPV6, net_test.IPV6_RECVERR,
834         (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6,
835                                   ICMPV6_PKT_TOOBIG, 0, mtu, 0)),
836          csocket.Sockaddr((src, 0))))
837    ]
838
839    # IP[V6]_RECVERR in 3.10 appears to return incorrect data for the port.
840    # The fix might have been in 676d236, but we don't have that in 3.10 and it
841    # touches code all over the tree. Instead, just don't check the port.
842    if net_test.LINUX_VERSION <= (3, 14, 0):
843      msglist[0][2][1].port = cmsg[0][2][1].port
844
845    self.assertEquals(msglist, cmsg)
846
847
848if __name__ == "__main__":
849  unittest.main()
850