1#!/usr/bin/python 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo 18 19import errno 20import os 21import posix 22import random 23from socket import * # pylint: disable=wildcard-import 24import threading 25import time 26import unittest 27 28from scapy import all as scapy 29 30import csocket 31import multinetwork_base 32import net_test 33 34 35HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6") 36 37ICMP_ECHO = 8 38ICMP_ECHOREPLY = 0 39ICMPV6_ECHO_REQUEST = 128 40ICMPV6_ECHO_REPLY = 129 41 42 43class PingReplyThread(threading.Thread): 44 45 MIN_TTL = 10 46 INTERMEDIATE_IPV4 = "192.0.2.2" 47 INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d" 48 NEIGHBOURS = ["fe80::1"] 49 50 def __init__(self, tun, mymac, routermac): 51 super(PingReplyThread, self).__init__() 52 self._tun = tun 53 self._stopped = False 54 self._mymac = mymac 55 self._routermac = routermac 56 57 def Stop(self): 58 self._stopped = True 59 60 def ChecksumValid(self, packet): 61 # Get and clear the checksums. 62 def GetAndClearChecksum(layer): 63 if not layer: 64 return 65 try: 66 checksum = layer.chksum 67 del layer.chksum 68 except AttributeError: 69 checksum = layer.cksum 70 del layer.cksum 71 return checksum 72 73 def GetChecksum(layer): 74 try: 75 return layer.chksum 76 except AttributeError: 77 return layer.cksum 78 79 layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest] 80 sums = {} 81 for name in layers: 82 sums[name] = GetAndClearChecksum(packet.getlayer(name)) 83 84 # Serialize the packet, so scapy recalculates the checksums, and compare 85 # them with the ones in the packet. 86 packet = packet.__class__(str(packet)) 87 for name in layers: 88 layer = packet.getlayer(name) 89 if layer and GetChecksum(layer) != sums[name]: 90 return False 91 92 return True 93 94 def SendTimeExceeded(self, version, packet): 95 if version == 4: 96 src = packet.getlayer(scapy.IP).src 97 self.SendPacket( 98 scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) / 99 scapy.ICMP(type=11, code=0) / 100 packet) 101 elif version == 6: 102 src = packet.getlayer(scapy.IPv6).src 103 self.SendPacket( 104 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 105 scapy.ICMPv6TimeExceeded(code=0) / 106 packet) 107 108 def IPv4Packet(self, ip): 109 icmp = ip.getlayer(scapy.ICMP) 110 111 # We only support ping for now. 112 if (ip.proto != IPPROTO_ICMP or 113 icmp.type != ICMP_ECHO or 114 icmp.code != 0): 115 return 116 117 # Check the checksums. 118 if not self.ChecksumValid(ip): 119 return 120 121 if ip.ttl < self.MIN_TTL: 122 self.SendTimeExceeded(4, ip) 123 return 124 125 icmp.type = ICMP_ECHOREPLY 126 self.SwapAddresses(ip) 127 self.SendPacket(ip) 128 129 def IPv6Packet(self, ipv6): 130 icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest) 131 132 # We only support ping for now. 133 if (ipv6.nh != IPPROTO_ICMPV6 or 134 not icmpv6 or 135 icmpv6.type != ICMPV6_ECHO_REQUEST or 136 icmpv6.code != 0): 137 return 138 139 # Check the checksums. 140 if not self.ChecksumValid(ipv6): 141 return 142 143 if ipv6.dst.startswith("ff02::"): 144 ipv6.dst = ipv6.src 145 for src in self.NEIGHBOURS: 146 ipv6.src = src 147 icmpv6.type = ICMPV6_ECHO_REPLY 148 self.SendPacket(ipv6) 149 elif ipv6.hlim < self.MIN_TTL: 150 self.SendTimeExceeded(6, ipv6) 151 else: 152 icmpv6.type = ICMPV6_ECHO_REPLY 153 self.SwapAddresses(ipv6) 154 self.SendPacket(ipv6) 155 156 def SwapAddresses(self, packet): 157 src = packet.src 158 packet.src = packet.dst 159 packet.dst = src 160 161 def SendPacket(self, packet): 162 packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet 163 try: 164 posix.write(self._tun.fileno(), str(packet)) 165 except ValueError: 166 pass 167 168 def run(self): 169 while not self._stopped: 170 171 try: 172 packet = posix.read(self._tun.fileno(), 4096) 173 except OSError, e: 174 if e.errno == errno.EAGAIN: 175 continue 176 else: 177 break 178 179 ether = scapy.Ether(packet) 180 if ether.type == net_test.ETH_P_IPV6: 181 self.IPv6Packet(ether.payload) 182 elif ether.type == net_test.ETH_P_IP: 183 self.IPv4Packet(ether.payload) 184 185 186class Ping6Test(multinetwork_base.MultiNetworkBaseTest): 187 188 @classmethod 189 def setUpClass(cls): 190 super(Ping6Test, cls).setUpClass() 191 cls.netid = random.choice(cls.NETIDS) 192 cls.reply_thread = PingReplyThread( 193 cls.tuns[cls.netid], 194 cls.MyMacAddress(cls.netid), 195 cls.RouterMacAddress(cls.netid)) 196 cls.SetDefaultNetwork(cls.netid) 197 cls.reply_thread.start() 198 199 @classmethod 200 def tearDownClass(cls): 201 cls.reply_thread.Stop() 202 cls.ClearDefaultNetwork() 203 super(Ping6Test, cls).tearDownClass() 204 205 def setUp(self): 206 self.ifname = self.GetInterfaceName(self.netid) 207 self.ifindex = self.ifindices[self.netid] 208 self.lladdr = net_test.GetLinkAddress(self.ifname, True) 209 self.globaladdr = net_test.GetLinkAddress(self.ifname, False) 210 211 def assertValidPingResponse(self, s, data): 212 family = s.family 213 214 # Receive the reply. 215 rcvd, src = s.recvfrom(32768) 216 self.assertNotEqual(0, len(rcvd), "No data received") 217 218 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it 219 # as IPv4. 220 if src[0].startswith("::ffff:"): 221 family = AF_INET 222 src = (src[0].replace("::ffff:", ""), src[1:]) 223 224 # Check the data being sent is valid. 225 self.assertGreater(len(data), 7, "Not enough data for ping packet") 226 if family == AF_INET: 227 self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request") 228 elif family == AF_INET6: 229 self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request") 230 else: 231 self.fail("Unknown socket address family %d" * s.family) 232 233 # Check address, ICMP type, and ICMP code. 234 if family == AF_INET: 235 addr, unused_port = src 236 self.assertGreaterEqual(len(addr), len("1.1.1.1")) 237 self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply") 238 else: 239 addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking 240 self.assertGreaterEqual(len(addr), len("::")) 241 self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply") 242 # Check that the flow label is zero and that the scope ID is sane. 243 self.assertEqual(flowlabel, 0) 244 if addr.startswith("fe80::"): 245 self.assertTrue(scope_id in self.ifindices.values()) 246 else: 247 self.assertEquals(0, scope_id) 248 249 # TODO: check the checksum. We can't do this easily now for ICMPv6 because 250 # we don't have the IP addresses so we can't construct the pseudoheader. 251 252 # Check the sequence number and the data. 253 self.assertEqual(len(data), len(rcvd)) 254 self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex")) 255 256 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, 257 txmem=0, rxmem=0): 258 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), 259 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport), 260 "%02X" % state, 261 "%08X:%08X" % (txmem, rxmem), 262 str(os.getuid()), "2", "0"] 263 actual = self.ReadProcNetSocket(name)[-1] 264 self.assertListEqual(expected, actual) 265 266 def testIPv4SendWithNoConnection(self): 267 s = net_test.IPv4PingSocket() 268 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING) 269 270 def testIPv6SendWithNoConnection(self): 271 s = net_test.IPv6PingSocket() 272 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING) 273 274 def testIPv4LoopbackPingWithConnect(self): 275 s = net_test.IPv4PingSocket() 276 s.connect(("127.0.0.1", 55)) 277 data = net_test.IPV4_PING + "foobarbaz" 278 s.send(data) 279 self.assertValidPingResponse(s, data) 280 281 def testIPv6LoopbackPingWithConnect(self): 282 s = net_test.IPv6PingSocket() 283 s.connect(("::1", 55)) 284 s.send(net_test.IPV6_PING) 285 self.assertValidPingResponse(s, net_test.IPV6_PING) 286 287 def testIPv4PingUsingSendto(self): 288 s = net_test.IPv4PingSocket() 289 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 290 self.assertEquals(len(net_test.IPV4_PING), written) 291 self.assertValidPingResponse(s, net_test.IPV4_PING) 292 293 def testIPv6PingUsingSendto(self): 294 s = net_test.IPv6PingSocket() 295 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 296 self.assertEquals(len(net_test.IPV6_PING), written) 297 self.assertValidPingResponse(s, net_test.IPV6_PING) 298 299 def testIPv4NoCrash(self): 300 # Python 2.x does not provide either read() or recvmsg. 301 s = net_test.IPv4PingSocket() 302 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) 303 self.assertEquals(len(net_test.IPV4_PING), written) 304 fd = s.fileno() 305 reply = posix.read(fd, 4096) 306 self.assertEquals(written, len(reply)) 307 308 def testIPv6NoCrash(self): 309 # Python 2.x does not provide either read() or recvmsg. 310 s = net_test.IPv6PingSocket() 311 written = s.sendto(net_test.IPV6_PING, ("::1", 55)) 312 self.assertEquals(len(net_test.IPV6_PING), written) 313 fd = s.fileno() 314 reply = posix.read(fd, 4096) 315 self.assertEquals(written, len(reply)) 316 317 def testCrossProtocolCrash(self): 318 # Checks that an ICMP error containing a ping packet that matches the ID 319 # of a socket of the wrong protocol (which can happen when using 464xlat) 320 # doesn't crash the kernel. 321 322 # We can only test this using IPv6 unreachables and IPv4 ping sockets, 323 # because IPv4 packets sent by scapy.send() on loopback are not received by 324 # the kernel. So we don't actually use this function yet. 325 def GetIPv4Unreachable(port): # pylint: disable=unused-variable 326 return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") / 327 scapy.ICMP(type=3, code=0) / 328 scapy.IP(src="127.0.0.1", dst="127.0.0.1") / 329 scapy.ICMP(type=8, id=port, seq=1)) 330 331 def GetIPv6Unreachable(port): 332 return (scapy.IPv6(src="::1", dst="::1") / 333 scapy.ICMPv6DestUnreach() / 334 scapy.IPv6(src="::1", dst="::1") / 335 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz")) 336 337 # An unreachable matching the ID of a socket of the wrong protocol 338 # shouldn't crash. 339 s = net_test.IPv4PingSocket() 340 s.connect(("127.0.0.1", 12345)) 341 _, port = s.getsockname() 342 scapy.send(GetIPv6Unreachable(port)) 343 # No crash? Good. 344 345 def testCrossProtocolCalls(self): 346 """Tests that passing in the wrong family returns EAFNOSUPPORT. 347 348 Relevant kernel commits: 349 upstream net: 350 91a0b60 net/ping: handle protocol mismatching scenario 351 9145736d net: ping: Return EAFNOSUPPORT when appropriate. 352 353 android-3.10: 354 78a6809 net/ping: handle protocol mismatching scenario 355 428e6d6 net: ping: Return EAFNOSUPPORT when appropriate. 356 """ 357 358 def CheckEAFNoSupport(function, *args): 359 self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args) 360 361 ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53)) 362 363 # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed 364 # IPv4 socket address structures, we need to pass down a socket address 365 # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls 366 # will fail immediately with EINVAL because the passed-in socket length is 367 # too short. So create a sockaddr_in that's as long as a sockaddr_in6. 368 ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53)) 369 ipv4sockaddr = csocket.SockaddrIn6( 370 ipv4sockaddr.Pack() + 371 "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn))) 372 373 s4 = net_test.IPv4PingSocket() 374 s6 = net_test.IPv6PingSocket() 375 376 # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong 377 # address family, because the Python implementation will just pass garbage 378 # down to the kernel. So call the C functions directly. 379 CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr) 380 CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr) 381 CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr) 382 CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr) 383 CheckEAFNoSupport(csocket.Sendmsg, 384 s4, ipv6sockaddr, net_test.IPV4_PING, None, 0) 385 CheckEAFNoSupport(csocket.Sendmsg, 386 s6, ipv4sockaddr, net_test.IPV6_PING, None, 0) 387 388 def testIPv4Bind(self): 389 # Bind to unspecified address. 390 s = net_test.IPv4PingSocket() 391 s.bind(("0.0.0.0", 544)) 392 self.assertEquals(("0.0.0.0", 544), s.getsockname()) 393 394 # Bind to loopback. 395 s = net_test.IPv4PingSocket() 396 s.bind(("127.0.0.1", 99)) 397 self.assertEquals(("127.0.0.1", 99), s.getsockname()) 398 399 # Binding twice is not allowed. 400 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) 401 402 # But binding two different sockets to the same ID is allowed. 403 s2 = net_test.IPv4PingSocket() 404 s2.bind(("127.0.0.1", 99)) 405 self.assertEquals(("127.0.0.1", 99), s2.getsockname()) 406 s3 = net_test.IPv4PingSocket() 407 s3.bind(("127.0.0.1", 99)) 408 self.assertEquals(("127.0.0.1", 99), s3.getsockname()) 409 410 # If two sockets bind to the same port, the first one to call read() gets 411 # the response. 412 s4 = net_test.IPv4PingSocket() 413 s5 = net_test.IPv4PingSocket() 414 s4.bind(("0.0.0.0", 167)) 415 s5.bind(("0.0.0.0", 167)) 416 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44)) 417 self.assertValidPingResponse(s5, net_test.IPV4_PING) 418 net_test.SetSocketTimeout(s4, 100) 419 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768) 420 421 # If SO_REUSEADDR is turned off, then we get EADDRINUSE. 422 s6 = net_test.IPv4PingSocket() 423 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0) 424 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167)) 425 426 # Can't bind after sendto. 427 s = net_test.IPv4PingSocket() 428 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132)) 429 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429)) 430 431 def testIPv6Bind(self): 432 # Bind to unspecified address. 433 s = net_test.IPv6PingSocket() 434 s.bind(("::", 769)) 435 self.assertEquals(("::", 769, 0, 0), s.getsockname()) 436 437 # Bind to loopback. 438 s = net_test.IPv6PingSocket() 439 s.bind(("::1", 99)) 440 self.assertEquals(("::1", 99, 0, 0), s.getsockname()) 441 442 # Binding twice is not allowed. 443 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) 444 445 # But binding two different sockets to the same ID is allowed. 446 s2 = net_test.IPv6PingSocket() 447 s2.bind(("::1", 99)) 448 self.assertEquals(("::1", 99, 0, 0), s2.getsockname()) 449 s3 = net_test.IPv6PingSocket() 450 s3.bind(("::1", 99)) 451 self.assertEquals(("::1", 99, 0, 0), s3.getsockname()) 452 453 # Binding both IPv4 and IPv6 to the same socket works. 454 s4 = net_test.IPv4PingSocket() 455 s6 = net_test.IPv6PingSocket() 456 s4.bind(("0.0.0.0", 444)) 457 s6.bind(("::", 666, 0, 0)) 458 459 # Can't bind after sendto. 460 s = net_test.IPv6PingSocket() 461 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132)) 462 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429)) 463 464 def testIPv4InvalidBind(self): 465 s = net_test.IPv4PingSocket() 466 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 467 s.bind, ("255.255.255.255", 1026)) 468 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 469 s.bind, ("224.0.0.1", 651)) 470 # Binding to an address we don't have only works with IP_TRANSPARENT. 471 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 472 s.bind, (net_test.IPV4_ADDR, 651)) 473 try: 474 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) 475 s.bind((net_test.IPV4_ADDR, 651)) 476 except IOError, e: 477 if e.errno == errno.EACCES: 478 pass # We're not root. let it go for now. 479 480 def testIPv6InvalidBind(self): 481 s = net_test.IPv6PingSocket() 482 self.assertRaisesErrno(errno.EINVAL, 483 s.bind, ("ff02::2", 1026)) 484 485 # Binding to an address we don't have only works with IPV6_TRANSPARENT. 486 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 487 s.bind, (net_test.IPV6_ADDR, 651)) 488 try: 489 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) 490 s.bind((net_test.IPV6_ADDR, 651)) 491 except IOError, e: 492 if e.errno == errno.EACCES: 493 pass # We're not root. let it go for now. 494 495 def testAfUnspecBind(self): 496 # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0. 497 s4 = net_test.IPv4PingSocket() 498 sockaddr = csocket.Sockaddr(("0.0.0.0", 12996)) 499 sockaddr.family = AF_UNSPEC 500 csocket.Bind(s4, sockaddr) 501 self.assertEquals(("0.0.0.0", 12996), s4.getsockname()) 502 503 # But not if the address is anything else. 504 sockaddr = csocket.Sockaddr(("127.0.0.1", 58234)) 505 sockaddr.family = AF_UNSPEC 506 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr) 507 508 # This doesn't work for IPv6. 509 s6 = net_test.IPv6PingSocket() 510 sockaddr = csocket.Sockaddr(("::1", 58997)) 511 sockaddr.family = AF_UNSPEC 512 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr) 513 514 def testIPv6ScopedBind(self): 515 # Can't bind to a link-local address without a scope ID. 516 s = net_test.IPv6PingSocket() 517 self.assertRaisesErrno(errno.EINVAL, 518 s.bind, (self.lladdr, 1026, 0, 0)) 519 520 # Binding to a link-local address with a scope ID works, and the scope ID is 521 # returned by a subsequent getsockname. Interestingly, Python's getsockname 522 # returns "fe80:1%foo", even though it does not understand it. 523 expected = self.lladdr + "%" + self.ifname 524 s.bind((self.lladdr, 4646, 0, self.ifindex)) 525 self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname()) 526 527 # Of course, for the above to work the address actually has to be configured 528 # on the machine. 529 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 530 s.bind, ("fe80::f00", 1026, 0, 1)) 531 532 # Scope IDs on non-link-local addresses are silently ignored. 533 s = net_test.IPv6PingSocket() 534 s.bind(("::1", 1234, 0, 1)) 535 self.assertEquals(("::1", 1234, 0, 0), s.getsockname()) 536 537 def testBindAffectsIdentifier(self): 538 s = net_test.IPv6PingSocket() 539 s.bind((self.globaladdr, 0xf976)) 540 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 541 self.assertEquals("\xf9\x76", s.recv(32768)[4:6]) 542 543 s = net_test.IPv6PingSocket() 544 s.bind((self.globaladdr, 0xace)) 545 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 546 self.assertEquals("\x0a\xce", s.recv(32768)[4:6]) 547 548 def testLinkLocalAddress(self): 549 s = net_test.IPv6PingSocket() 550 # Sending to a link-local address with no scope fails with EINVAL. 551 self.assertRaisesErrno(errno.EINVAL, 552 s.sendto, net_test.IPV6_PING, ("fe80::1", 55)) 553 # Sending to link-local address with a scope succeeds. Note that Python 554 # doesn't understand the "fe80::1%lo" format, even though it returns it. 555 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex)) 556 # No exceptions? Good. 557 558 def testMappedAddressFails(self): 559 s = net_test.IPv6PingSocket() 560 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 561 self.assertValidPingResponse(s, net_test.IPV6_PING) 562 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55)) 563 self.assertValidPingResponse(s, net_test.IPV6_PING) 564 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 565 ("::ffff:192.0.2.1", 55)) 566 567 @unittest.skipUnless(False, "skipping: does not work yet") 568 def testFlowLabel(self): 569 s = net_test.IPv6PingSocket() 570 571 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but 572 # the flow label in the packet is not set. 573 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 574 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0. 575 576 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label 577 # that is not registered with the flow manager should return EINVAL... 578 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1) 579 # ... but this doesn't work yet. 580 if False: 581 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 582 (net_test.IPV6_ADDR, 93, 0xdead, 0)) 583 584 # After registering the flow label, it gets sent properly, appears in the 585 # output packet, and is returned in the response. 586 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead) 587 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, 588 net_test.IPV6_FLOWINFO_SEND)) 589 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 590 _, src = s.recvfrom(32768) 591 _, _, flowlabel, _ = src 592 self.assertEqual(0xdead, flowlabel & 0xfffff) 593 594 def testIPv4Error(self): 595 s = net_test.IPv4PingSocket() 596 s.setsockopt(SOL_IP, IP_TTL, 2) 597 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1) 598 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 599 # We can't check the actual error because Python 2.7 doesn't implement 600 # recvmsg, but we can at least check that the socket returns an error. 601 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 602 603 def testIPv6Error(self): 604 s = net_test.IPv6PingSocket() 605 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2) 606 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 607 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 608 # We can't check the actual error because Python 2.7 doesn't implement 609 # recvmsg, but we can at least check that the socket returns an error. 610 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 611 612 def testIPv6MulticastPing(self): 613 s = net_test.IPv6PingSocket() 614 # Send a multicast ping and check we get at least one duplicate. 615 # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug. 616 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 617 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex)) 618 self.assertValidPingResponse(s, net_test.IPV6_PING) 619 self.assertValidPingResponse(s, net_test.IPV6_PING) 620 621 def testIPv4LargePacket(self): 622 s = net_test.IPv4PingSocket() 623 data = net_test.IPV4_PING + 20000 * "a" 624 s.sendto(data, ("127.0.0.1", 987)) 625 self.assertValidPingResponse(s, data) 626 627 def testIPv6LargePacket(self): 628 s = net_test.IPv6PingSocket() 629 s.bind(("::", 0xace)) 630 data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa" 631 s.sendto(data, ("::1", 953)) 632 633 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 634 def testIcmpSocketsNotInIcmp6(self): 635 numrows = len(self.ReadProcNetSocket("icmp")) 636 numrows6 = len(self.ReadProcNetSocket("icmp6")) 637 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 638 s.bind(("127.0.0.1", 0xace)) 639 s.connect(("127.0.0.1", 0xbeef)) 640 self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp"))) 641 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 642 643 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 644 def testIcmp6SocketsNotInIcmp(self): 645 numrows = len(self.ReadProcNetSocket("icmp")) 646 numrows6 = len(self.ReadProcNetSocket("icmp6")) 647 s = net_test.IPv6PingSocket() 648 s.bind(("::1", 0xace)) 649 s.connect(("::1", 0xbeef)) 650 self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp"))) 651 self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) 652 653 def testProcNetIcmp(self): 654 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 655 s.bind(("127.0.0.1", 0xace)) 656 s.connect(("127.0.0.1", 0xbeef)) 657 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1) 658 659 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 660 def testProcNetIcmp6(self): 661 numrows6 = len(self.ReadProcNetSocket("icmp6")) 662 s = net_test.IPv6PingSocket() 663 s.bind(("::1", 0xace)) 664 s.connect(("::1", 0xbeef)) 665 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1) 666 667 # Check the row goes away when the socket is closed. 668 s.close() 669 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 670 671 # Try send, bind and connect to check the addresses and the state. 672 s = net_test.IPv6PingSocket() 673 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 674 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345)) 675 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6"))) 676 677 # Can't bind after sendto, apparently. 678 s = net_test.IPv6PingSocket() 679 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 680 s.bind((self.lladdr, 0xd00d, 0, self.ifindex)) 681 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7) 682 683 # Check receive bytes. 684 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 685 s.connect(("ff02::1", 0xdead)) 686 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1) 687 s.send(net_test.IPV6_PING) 688 time.sleep(0.01) # Give the other thread time to reply. 689 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 690 txmem=0, rxmem=0x300) 691 self.assertValidPingResponse(s, net_test.IPV6_PING) 692 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 693 txmem=0, rxmem=0) 694 695 def testProcNetUdp6(self): 696 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP) 697 s.bind(("::1", 0xace)) 698 s.connect(("::1", 0xbeef)) 699 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1) 700 701 def testProcNetRaw6(self): 702 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW) 703 s.bind(("::1", 0xace)) 704 s.connect(("::1", 0xbeef)) 705 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1) 706 707 708if __name__ == "__main__": 709 unittest.main() 710