1#!/usr/bin/python 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo 18 19import errno 20import os 21import posix 22import random 23from socket import * # pylint: disable=wildcard-import 24import struct 25import sys 26import threading 27import time 28import unittest 29 30from scapy import all as scapy 31 32import csocket 33import multinetwork_base 34import net_test 35 36 37HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6") 38 39ICMP_ECHO = 8 40ICMP_ECHOREPLY = 0 41ICMPV6_ECHO_REQUEST = 128 42ICMPV6_ECHO_REPLY = 129 43IPV6_MIN_MTU = 1280 44ICMPV6_HEADER_LEN = 8 45ICMPV6_PKT_TOOBIG = 2 46 47 48class PingReplyThread(threading.Thread): 49 50 MIN_TTL = 10 51 INTERMEDIATE_IPV4 = "192.0.2.2" 52 INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d" 53 NEIGHBOURS = ["fe80::1"] 54 LINK_MTU = 1300 55 56 def __init__(self, tun, mymac, routermac, routeraddr): 57 super(PingReplyThread, self).__init__() 58 self._tun = tun 59 self._started = False 60 self._stopped = False 61 self._mymac = mymac 62 self._routermac = routermac 63 self._routeraddr = routeraddr 64 65 def IsStarted(self): 66 return self._started 67 68 def Stop(self): 69 self._stopped = True 70 71 def ChecksumValid(self, packet): 72 # Get and clear the checksums. 73 def GetAndClearChecksum(layer): 74 if not layer: 75 return 76 try: 77 checksum = layer.chksum 78 del layer.chksum 79 except AttributeError: 80 checksum = layer.cksum 81 del layer.cksum 82 return checksum 83 84 def GetChecksum(layer): 85 try: 86 return layer.chksum 87 except AttributeError: 88 return layer.cksum 89 90 layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest] 91 sums = {} 92 for name in layers: 93 sums[name] = GetAndClearChecksum(packet.getlayer(name)) 94 95 # Serialize the packet, so scapy recalculates the checksums, and compare 96 # them with the ones in the packet. 97 packet = packet.__class__(str(packet)) 98 for name in layers: 99 layer = packet.getlayer(name) 100 if layer and GetChecksum(layer) != sums[name]: 101 return False 102 103 return True 104 105 def SendTimeExceeded(self, version, packet): 106 if version == 4: 107 src = packet.getlayer(scapy.IP).src 108 self.SendPacket( 109 scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) / 110 scapy.ICMP(type=11, code=0) / 111 packet) 112 elif version == 6: 113 src = packet.getlayer(scapy.IPv6).src 114 self.SendPacket( 115 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 116 scapy.ICMPv6TimeExceeded(code=0) / 117 packet) 118 119 def SendPacketTooBig(self, packet): 120 src = packet.getlayer(scapy.IPv6).src 121 datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN 122 self.SendPacket( 123 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 124 scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) / 125 str(packet)[:datalen]) 126 127 def IPv4Packet(self, ip): 128 icmp = ip.getlayer(scapy.ICMP) 129 130 # We only support ping for now. 131 if (ip.proto != IPPROTO_ICMP or 132 icmp.type != ICMP_ECHO or 133 icmp.code != 0): 134 return 135 136 # Check the checksums. 137 if not self.ChecksumValid(ip): 138 return 139 140 if ip.ttl < self.MIN_TTL: 141 self.SendTimeExceeded(4, ip) 142 return 143 144 icmp.type = ICMP_ECHOREPLY 145 self.SwapAddresses(ip) 146 self.SendPacket(ip) 147 148 def IPv6Packet(self, ipv6): 149 icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest) 150 151 # We only support ping for now. 152 if (ipv6.nh != IPPROTO_ICMPV6 or 153 not icmpv6 or 154 icmpv6.type != ICMPV6_ECHO_REQUEST or 155 icmpv6.code != 0): 156 return 157 158 # Check the checksums. 159 if not self.ChecksumValid(ipv6): 160 return 161 162 if ipv6.dst.startswith("ff02::"): 163 ipv6.dst = ipv6.src 164 for src in [self._routeraddr]: 165 ipv6.src = src 166 icmpv6.type = ICMPV6_ECHO_REPLY 167 self.SendPacket(ipv6) 168 elif ipv6.hlim < self.MIN_TTL: 169 self.SendTimeExceeded(6, ipv6) 170 elif ipv6.plen > self.LINK_MTU: 171 self.SendPacketTooBig(ipv6) 172 else: 173 icmpv6.type = ICMPV6_ECHO_REPLY 174 if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr: 175 return 176 self.SwapAddresses(ipv6) 177 self.SendPacket(ipv6) 178 179 def SwapAddresses(self, packet): 180 src = packet.src 181 packet.src = packet.dst 182 packet.dst = src 183 184 def SendPacket(self, packet): 185 packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet 186 try: 187 posix.write(self._tun.fileno(), str(packet)) 188 except Exception, e: 189 if not self._stopped: 190 raise e 191 192 def run(self): 193 self._started = True 194 while not self._stopped: 195 try: 196 packet = posix.read(self._tun.fileno(), 4096) 197 except OSError, e: 198 if e.errno == errno.EAGAIN: 199 continue 200 else: 201 break 202 except ValueError, e: 203 if not self._stopped: 204 raise e 205 206 ether = scapy.Ether(packet) 207 if ether.type == net_test.ETH_P_IPV6: 208 self.IPv6Packet(ether.payload) 209 elif ether.type == net_test.ETH_P_IP: 210 self.IPv4Packet(ether.payload) 211 212 213class Ping6Test(multinetwork_base.MultiNetworkBaseTest): 214 215 @classmethod 216 def WaitForReplyThreads(cls): 217 # Wait 2s for the reply threads to start. If they don't, don't blow up, as 218 # that would cause tearDownClass not to be called and thus not clean up 219 # routing configuration, breaking subsequent tests. Instead, just let these 220 # tests fail. 221 _INTERVAL = 0.1 222 _ATTEMPTS = 20 223 for i in xrange(0, _ATTEMPTS): 224 for netid in cls.NETIDS: 225 if all(thread.IsStarted() for thread in cls.reply_threads.values()): 226 return 227 time.sleep(_INTERVAL) 228 msg = "WARNING: reply threads not all started after %.1f seconds\n" % ( 229 _ATTEMPTS * _INTERVAL) 230 sys.stderr.write(msg) 231 232 @classmethod 233 def StopReplyThreads(cls): 234 for thread in cls.reply_threads.values(): 235 thread.Stop() 236 237 @classmethod 238 def setUpClass(cls): 239 super(Ping6Test, cls).setUpClass() 240 cls.reply_threads = {} 241 for netid in cls.NETIDS: 242 cls.reply_threads[netid] = PingReplyThread( 243 cls.tuns[netid], 244 cls.MyMacAddress(netid), 245 cls.RouterMacAddress(netid), 246 cls._RouterAddress(netid, 6)) 247 cls.reply_threads[netid].start() 248 cls.WaitForReplyThreads() 249 cls.netid = random.choice(cls.NETIDS) 250 cls.SetDefaultNetwork(cls.netid) 251 252 @classmethod 253 def tearDownClass(cls): 254 cls.StopReplyThreads() 255 cls.ClearDefaultNetwork() 256 super(Ping6Test, cls).tearDownClass() 257 258 def setUp(self): 259 self.ifname = self.GetInterfaceName(self.netid) 260 self.ifindex = self.ifindices[self.netid] 261 self.lladdr = net_test.GetLinkAddress(self.ifname, True) 262 self.globaladdr = net_test.GetLinkAddress(self.ifname, False) 263 264 def assertValidPingResponse(self, s, data): 265 family = s.family 266 267 # Receive the reply. 268 rcvd, src = s.recvfrom(32768) 269 self.assertNotEqual(0, len(rcvd), "No data received") 270 271 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it 272 # as IPv4. 273 if src[0].startswith("::ffff:"): 274 family = AF_INET 275 src = (src[0].replace("::ffff:", ""), src[1:]) 276 277 # Check the data being sent is valid. 278 self.assertGreater(len(data), 7, "Not enough data for ping packet") 279 if family == AF_INET: 280 self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request") 281 elif family == AF_INET6: 282 self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request") 283 else: 284 self.fail("Unknown socket address family %d" * s.family) 285 286 # Check address, ICMP type, and ICMP code. 287 if family == AF_INET: 288 addr, unused_port = src 289 self.assertGreaterEqual(len(addr), len("1.1.1.1")) 290 self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply") 291 else: 292 addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking 293 self.assertGreaterEqual(len(addr), len("::")) 294 self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply") 295 # Check that the flow label is zero and that the scope ID is sane. 296 self.assertEqual(flowlabel, 0) 297 if addr.startswith("fe80::"): 298 self.assertTrue(scope_id in self.ifindices.values()) 299 else: 300 self.assertEquals(0, scope_id) 301 302 # TODO: check the checksum. We can't do this easily now for ICMPv6 because 303 # we don't have the IP addresses so we can't construct the pseudoheader. 304 305 # Check the sequence number and the data. 306 self.assertEqual(len(data), len(rcvd)) 307 self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex")) 308 309 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, 310 txmem=0, rxmem=0): 311 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), 312 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport), 313 "%02X" % state, 314 "%08X:%08X" % (txmem, rxmem), 315 str(os.getuid()), "2", "0"] 316 actual = self.ReadProcNetSocket(name)[-1] 317 self.assertListEqual(expected, actual) 318 319 def testIPv4SendWithNoConnection(self): 320 s = net_test.IPv4PingSocket() 321 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING) 322 323 def testIPv6SendWithNoConnection(self): 324 s = net_test.IPv6PingSocket() 325 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING) 326 327 def testIPv4LoopbackPingWithConnect(self): 328 s = net_test.IPv4PingSocket() 329 s.connect(("127.0.0.1", 55)) 330 data = net_test.IPV4_PING + "foobarbaz" 331 s.send(data) 332 self.assertValidPingResponse(s, data) 333 334 def testIPv6LoopbackPingWithConnect(self): 335 s = net_test.IPv6PingSocket() 336 s.connect(("::1", 55)) 337 s.send(net_test.IPV6_PING) 338 self.assertValidPingResponse(s, net_test.IPV6_PING) 339 340 def testIPv4PingUsingSendto(self): 341 s = net_test.IPv4PingSocket() 342 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 343 self.assertEquals(len(net_test.IPV4_PING), written) 344 self.assertValidPingResponse(s, net_test.IPV4_PING) 345 346 def testIPv6PingUsingSendto(self): 347 s = net_test.IPv6PingSocket() 348 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 349 self.assertEquals(len(net_test.IPV6_PING), written) 350 self.assertValidPingResponse(s, net_test.IPV6_PING) 351 352 def testIPv4NoCrash(self): 353 # Python 2.x does not provide either read() or recvmsg. 354 s = net_test.IPv4PingSocket() 355 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) 356 self.assertEquals(len(net_test.IPV4_PING), written) 357 fd = s.fileno() 358 reply = posix.read(fd, 4096) 359 self.assertEquals(written, len(reply)) 360 361 def testIPv6NoCrash(self): 362 # Python 2.x does not provide either read() or recvmsg. 363 s = net_test.IPv6PingSocket() 364 written = s.sendto(net_test.IPV6_PING, ("::1", 55)) 365 self.assertEquals(len(net_test.IPV6_PING), written) 366 fd = s.fileno() 367 reply = posix.read(fd, 4096) 368 self.assertEquals(written, len(reply)) 369 370 def testCrossProtocolCrash(self): 371 # Checks that an ICMP error containing a ping packet that matches the ID 372 # of a socket of the wrong protocol (which can happen when using 464xlat) 373 # doesn't crash the kernel. 374 375 # We can only test this using IPv6 unreachables and IPv4 ping sockets, 376 # because IPv4 packets sent by scapy.send() on loopback are not received by 377 # the kernel. So we don't actually use this function yet. 378 def GetIPv4Unreachable(port): # pylint: disable=unused-variable 379 return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") / 380 scapy.ICMP(type=3, code=0) / 381 scapy.IP(src="127.0.0.1", dst="127.0.0.1") / 382 scapy.ICMP(type=8, id=port, seq=1)) 383 384 def GetIPv6Unreachable(port): 385 return (scapy.IPv6(src="::1", dst="::1") / 386 scapy.ICMPv6DestUnreach() / 387 scapy.IPv6(src="::1", dst="::1") / 388 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz")) 389 390 # An unreachable matching the ID of a socket of the wrong protocol 391 # shouldn't crash. 392 s = net_test.IPv4PingSocket() 393 s.connect(("127.0.0.1", 12345)) 394 _, port = s.getsockname() 395 scapy.send(GetIPv6Unreachable(port), verbose=False) 396 # No crash? Good. 397 398 def testCrossProtocolCalls(self): 399 """Tests that passing in the wrong family returns EAFNOSUPPORT. 400 401 Relevant kernel commits: 402 upstream net: 403 91a0b60 net/ping: handle protocol mismatching scenario 404 9145736d net: ping: Return EAFNOSUPPORT when appropriate. 405 406 android-3.10: 407 78a6809 net/ping: handle protocol mismatching scenario 408 428e6d6 net: ping: Return EAFNOSUPPORT when appropriate. 409 """ 410 411 def CheckEAFNoSupport(function, *args): 412 self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args) 413 414 ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53)) 415 416 # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed 417 # IPv4 socket address structures, we need to pass down a socket address 418 # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls 419 # will fail immediately with EINVAL because the passed-in socket length is 420 # too short. So create a sockaddr_in that's as long as a sockaddr_in6. 421 ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53)) 422 ipv4sockaddr = csocket.SockaddrIn6( 423 ipv4sockaddr.Pack() + 424 "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn))) 425 426 s4 = net_test.IPv4PingSocket() 427 s6 = net_test.IPv6PingSocket() 428 429 # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong 430 # address family, because the Python implementation will just pass garbage 431 # down to the kernel. So call the C functions directly. 432 CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr) 433 CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr) 434 CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr) 435 CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr) 436 CheckEAFNoSupport(csocket.Sendmsg, 437 s4, ipv6sockaddr, net_test.IPV4_PING, None, 0) 438 CheckEAFNoSupport(csocket.Sendmsg, 439 s6, ipv4sockaddr, net_test.IPV6_PING, None, 0) 440 441 def testIPv4Bind(self): 442 # Bind to unspecified address. 443 s = net_test.IPv4PingSocket() 444 s.bind(("0.0.0.0", 544)) 445 self.assertEquals(("0.0.0.0", 544), s.getsockname()) 446 447 # Bind to loopback. 448 s = net_test.IPv4PingSocket() 449 s.bind(("127.0.0.1", 99)) 450 self.assertEquals(("127.0.0.1", 99), s.getsockname()) 451 452 # Binding twice is not allowed. 453 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) 454 455 # But binding two different sockets to the same ID is allowed. 456 s2 = net_test.IPv4PingSocket() 457 s2.bind(("127.0.0.1", 99)) 458 self.assertEquals(("127.0.0.1", 99), s2.getsockname()) 459 s3 = net_test.IPv4PingSocket() 460 s3.bind(("127.0.0.1", 99)) 461 self.assertEquals(("127.0.0.1", 99), s3.getsockname()) 462 463 # If two sockets bind to the same port, the first one to call read() gets 464 # the response. 465 s4 = net_test.IPv4PingSocket() 466 s5 = net_test.IPv4PingSocket() 467 s4.bind(("0.0.0.0", 167)) 468 s5.bind(("0.0.0.0", 167)) 469 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44)) 470 self.assertValidPingResponse(s5, net_test.IPV4_PING) 471 net_test.SetSocketTimeout(s4, 100) 472 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768) 473 474 # If SO_REUSEADDR is turned off, then we get EADDRINUSE. 475 s6 = net_test.IPv4PingSocket() 476 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0) 477 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167)) 478 479 # Can't bind after sendto. 480 s = net_test.IPv4PingSocket() 481 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132)) 482 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429)) 483 484 def testIPv6Bind(self): 485 # Bind to unspecified address. 486 s = net_test.IPv6PingSocket() 487 s.bind(("::", 769)) 488 self.assertEquals(("::", 769, 0, 0), s.getsockname()) 489 490 # Bind to loopback. 491 s = net_test.IPv6PingSocket() 492 s.bind(("::1", 99)) 493 self.assertEquals(("::1", 99, 0, 0), s.getsockname()) 494 495 # Binding twice is not allowed. 496 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) 497 498 # But binding two different sockets to the same ID is allowed. 499 s2 = net_test.IPv6PingSocket() 500 s2.bind(("::1", 99)) 501 self.assertEquals(("::1", 99, 0, 0), s2.getsockname()) 502 s3 = net_test.IPv6PingSocket() 503 s3.bind(("::1", 99)) 504 self.assertEquals(("::1", 99, 0, 0), s3.getsockname()) 505 506 # Binding both IPv4 and IPv6 to the same socket works. 507 s4 = net_test.IPv4PingSocket() 508 s6 = net_test.IPv6PingSocket() 509 s4.bind(("0.0.0.0", 444)) 510 s6.bind(("::", 666, 0, 0)) 511 512 # Can't bind after sendto. 513 s = net_test.IPv6PingSocket() 514 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132)) 515 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429)) 516 517 def testIPv4InvalidBind(self): 518 s = net_test.IPv4PingSocket() 519 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 520 s.bind, ("255.255.255.255", 1026)) 521 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 522 s.bind, ("224.0.0.1", 651)) 523 # Binding to an address we don't have only works with IP_TRANSPARENT. 524 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 525 s.bind, (net_test.IPV4_ADDR, 651)) 526 try: 527 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) 528 s.bind((net_test.IPV4_ADDR, 651)) 529 except IOError, e: 530 if e.errno == errno.EACCES: 531 pass # We're not root. let it go for now. 532 533 def testIPv6InvalidBind(self): 534 s = net_test.IPv6PingSocket() 535 self.assertRaisesErrno(errno.EINVAL, 536 s.bind, ("ff02::2", 1026)) 537 538 # Binding to an address we don't have only works with IPV6_TRANSPARENT. 539 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 540 s.bind, (net_test.IPV6_ADDR, 651)) 541 try: 542 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) 543 s.bind((net_test.IPV6_ADDR, 651)) 544 except IOError, e: 545 if e.errno == errno.EACCES: 546 pass # We're not root. let it go for now. 547 548 def testAfUnspecBind(self): 549 # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0. 550 s4 = net_test.IPv4PingSocket() 551 sockaddr = csocket.Sockaddr(("0.0.0.0", 12996)) 552 sockaddr.family = AF_UNSPEC 553 csocket.Bind(s4, sockaddr) 554 self.assertEquals(("0.0.0.0", 12996), s4.getsockname()) 555 556 # But not if the address is anything else. 557 sockaddr = csocket.Sockaddr(("127.0.0.1", 58234)) 558 sockaddr.family = AF_UNSPEC 559 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr) 560 561 # This doesn't work for IPv6. 562 s6 = net_test.IPv6PingSocket() 563 sockaddr = csocket.Sockaddr(("::1", 58997)) 564 sockaddr.family = AF_UNSPEC 565 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr) 566 567 def testIPv6ScopedBind(self): 568 # Can't bind to a link-local address without a scope ID. 569 s = net_test.IPv6PingSocket() 570 self.assertRaisesErrno(errno.EINVAL, 571 s.bind, (self.lladdr, 1026, 0, 0)) 572 573 # Binding to a link-local address with a scope ID works, and the scope ID is 574 # returned by a subsequent getsockname. Interestingly, Python's getsockname 575 # returns "fe80:1%foo", even though it does not understand it. 576 expected = self.lladdr + "%" + self.ifname 577 s.bind((self.lladdr, 4646, 0, self.ifindex)) 578 self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname()) 579 580 # Of course, for the above to work the address actually has to be configured 581 # on the machine. 582 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 583 s.bind, ("fe80::f00", 1026, 0, 1)) 584 585 # Scope IDs on non-link-local addresses are silently ignored. 586 s = net_test.IPv6PingSocket() 587 s.bind(("::1", 1234, 0, 1)) 588 self.assertEquals(("::1", 1234, 0, 0), s.getsockname()) 589 590 def testBindAffectsIdentifier(self): 591 s = net_test.IPv6PingSocket() 592 s.bind((self.globaladdr, 0xf976)) 593 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 594 self.assertEquals("\xf9\x76", s.recv(32768)[4:6]) 595 596 s = net_test.IPv6PingSocket() 597 s.bind((self.globaladdr, 0xace)) 598 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 599 self.assertEquals("\x0a\xce", s.recv(32768)[4:6]) 600 601 def testLinkLocalAddress(self): 602 s = net_test.IPv6PingSocket() 603 # Sending to a link-local address with no scope fails with EINVAL. 604 self.assertRaisesErrno(errno.EINVAL, 605 s.sendto, net_test.IPV6_PING, ("fe80::1", 55)) 606 # Sending to link-local address with a scope succeeds. Note that Python 607 # doesn't understand the "fe80::1%lo" format, even though it returns it. 608 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex)) 609 # No exceptions? Good. 610 611 def testLinkLocalOif(self): 612 """Checks that ping to link-local addresses works correctly. 613 614 Relevant kernel commits: 615 upstream net: 616 5e45789 net: ipv6: Fix ping to link-local addresses. 617 """ 618 s = net_test.IPv6PingSocket() 619 for mode in ["oif", "ucast_oif", None]: 620 s = net_test.IPv6PingSocket() 621 for netid in self.NETIDS: 622 dst = self._RouterAddress(netid, 6) 623 self.assertTrue(dst.startswith("fe80:")) 624 625 if mode: 626 self.SelectInterface(s, netid, mode) 627 scopeid = 0 628 else: 629 scopeid = self.ifindices[netid] 630 631 if mode == "oif": 632 # If SO_BINDTODEVICE has been set, any attempt to send on another 633 # interface returns EINVAL. 634 othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1) 635 % len(self.NETIDS)] 636 otherscopeid = self.ifindices[othernetid] 637 self.assertRaisesErrno( 638 errno.EINVAL, 639 s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid)) 640 641 s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid)) 642 # If we got a reply, we sent the packet out on the right interface. 643 self.assertValidPingResponse(s, net_test.IPV6_PING) 644 645 def testMappedAddressFails(self): 646 s = net_test.IPv6PingSocket() 647 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 648 self.assertValidPingResponse(s, net_test.IPV6_PING) 649 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55)) 650 self.assertValidPingResponse(s, net_test.IPV6_PING) 651 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 652 ("::ffff:192.0.2.1", 55)) 653 654 @unittest.skipUnless(False, "skipping: does not work yet") 655 def testFlowLabel(self): 656 s = net_test.IPv6PingSocket() 657 658 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but 659 # the flow label in the packet is not set. 660 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 661 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0. 662 663 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label 664 # that is not registered with the flow manager should return EINVAL... 665 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1) 666 # ... but this doesn't work yet. 667 if False: 668 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 669 (net_test.IPV6_ADDR, 93, 0xdead, 0)) 670 671 # After registering the flow label, it gets sent properly, appears in the 672 # output packet, and is returned in the response. 673 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead) 674 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, 675 net_test.IPV6_FLOWINFO_SEND)) 676 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 677 _, src = s.recvfrom(32768) 678 _, _, flowlabel, _ = src 679 self.assertEqual(0xdead, flowlabel & 0xfffff) 680 681 def testIPv4Error(self): 682 s = net_test.IPv4PingSocket() 683 s.setsockopt(SOL_IP, IP_TTL, 2) 684 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1) 685 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 686 # We can't check the actual error because Python 2.7 doesn't implement 687 # recvmsg, but we can at least check that the socket returns an error. 688 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 689 690 def testIPv6Error(self): 691 s = net_test.IPv6PingSocket() 692 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2) 693 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 694 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 695 # We can't check the actual error because Python 2.7 doesn't implement 696 # recvmsg, but we can at least check that the socket returns an error. 697 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 698 699 def testIPv6MulticastPing(self): 700 s = net_test.IPv6PingSocket() 701 # Send a multicast ping and check we get at least one duplicate. 702 # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug. 703 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 704 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex)) 705 self.assertValidPingResponse(s, net_test.IPV6_PING) 706 self.assertValidPingResponse(s, net_test.IPV6_PING) 707 708 def testIPv4LargePacket(self): 709 s = net_test.IPv4PingSocket() 710 data = net_test.IPV4_PING + 20000 * "a" 711 s.sendto(data, ("127.0.0.1", 987)) 712 self.assertValidPingResponse(s, data) 713 714 def testIPv6LargePacket(self): 715 s = net_test.IPv6PingSocket() 716 s.bind(("::", 0xace)) 717 data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa" 718 s.sendto(data, ("::1", 953)) 719 720 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 721 def testIcmpSocketsNotInIcmp6(self): 722 numrows = len(self.ReadProcNetSocket("icmp")) 723 numrows6 = len(self.ReadProcNetSocket("icmp6")) 724 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 725 s.bind(("127.0.0.1", 0xace)) 726 s.connect(("127.0.0.1", 0xbeef)) 727 self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp"))) 728 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 729 730 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 731 def testIcmp6SocketsNotInIcmp(self): 732 numrows = len(self.ReadProcNetSocket("icmp")) 733 numrows6 = len(self.ReadProcNetSocket("icmp6")) 734 s = net_test.IPv6PingSocket() 735 s.bind(("::1", 0xace)) 736 s.connect(("::1", 0xbeef)) 737 self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp"))) 738 self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) 739 740 def testProcNetIcmp(self): 741 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 742 s.bind(("127.0.0.1", 0xace)) 743 s.connect(("127.0.0.1", 0xbeef)) 744 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1) 745 746 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 747 def testProcNetIcmp6(self): 748 numrows6 = len(self.ReadProcNetSocket("icmp6")) 749 s = net_test.IPv6PingSocket() 750 s.bind(("::1", 0xace)) 751 s.connect(("::1", 0xbeef)) 752 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1) 753 754 # Check the row goes away when the socket is closed. 755 s.close() 756 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 757 758 # Try send, bind and connect to check the addresses and the state. 759 s = net_test.IPv6PingSocket() 760 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 761 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345)) 762 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6"))) 763 764 # Can't bind after sendto, apparently. 765 s = net_test.IPv6PingSocket() 766 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 767 s.bind((self.lladdr, 0xd00d, 0, self.ifindex)) 768 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7) 769 770 # Check receive bytes. 771 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 772 s.connect(("ff02::1", 0xdead)) 773 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1) 774 s.send(net_test.IPV6_PING) 775 s.recvfrom(32768, MSG_PEEK) # Wait until the receive thread replies. 776 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 777 txmem=0, rxmem=0x300) 778 self.assertValidPingResponse(s, net_test.IPV6_PING) 779 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 780 txmem=0, rxmem=0) 781 782 def testProcNetUdp6(self): 783 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP) 784 s.bind(("::1", 0xace)) 785 s.connect(("::1", 0xbeef)) 786 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1) 787 788 def testProcNetRaw6(self): 789 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW) 790 s.bind(("::1", 0xace)) 791 s.connect(("::1", 0xbeef)) 792 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1) 793 794 def testIPv6MTU(self): 795 """Tests IPV6_RECVERR and path MTU discovery on ping sockets. 796 797 Relevant kernel commits: 798 upstream net-next: 799 dcb94b8 ipv6: fix endianness error in icmpv6_err 800 """ 801 s = net_test.IPv6PingSocket() 802 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1) 803 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2) 804 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 805 s.connect((net_test.IPV6_ADDR, 55)) 806 pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * "a" 807 s.send(pkt) 808 self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768) 809 data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE) 810 811 # Compare the offending packet with the one we sent. To do this we need to 812 # calculate the ident of the packet we sent and blank out the checksum of 813 # the one we received. 814 ident = struct.pack("!H", s.getsockname()[1]) 815 pkt = pkt[:4] + ident + pkt[6:] 816 data = data[:2] + "\x00\x00" + pkt[4:] 817 self.assertEquals(pkt, data) 818 819 # Check the address that the packet was sent to. 820 # ... except in 4.1, where it just returns an AF_UNSPEC, like this: 821 # recvmsg(9, {msg_name(0)={sa_family=AF_UNSPEC, 822 # sa_data="\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, 823 # msg_iov(1)=[{"\x80\x00\x04\x6b\x00\xc4\x00\x03\x61\x61\x61\x61\x61\x61"..., 4096}], 824 # msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...}, 825 # msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232 826 if net_test.LINUX_VERSION != (4, 1, 0): 827 self.assertEquals(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr) 828 829 # Check the cmsg data, including the link MTU. 830 mtu = PingReplyThread.LINK_MTU 831 src = self.reply_threads[self.netid].INTERMEDIATE_IPV6 832 msglist = [ 833 (net_test.SOL_IPV6, net_test.IPV6_RECVERR, 834 (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6, 835 ICMPV6_PKT_TOOBIG, 0, mtu, 0)), 836 csocket.Sockaddr((src, 0)))) 837 ] 838 839 # IP[V6]_RECVERR in 3.10 appears to return incorrect data for the port. 840 # The fix might have been in 676d236, but we don't have that in 3.10 and it 841 # touches code all over the tree. Instead, just don't check the port. 842 if net_test.LINUX_VERSION <= (3, 14, 0): 843 msglist[0][2][1].port = cmsg[0][2][1].port 844 845 self.assertEquals(msglist, cmsg) 846 847 848if __name__ == "__main__": 849 unittest.main() 850