1#!/usr/bin/python
2#
3# Copyright 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import errno
18import fcntl
19from socket import *  # pylint: disable=wildcard-import
20import unittest
21
22import csocket
23import cstruct
24import net_test
25
26IPV4_LOOPBACK_ADDR = "127.0.0.1"
27IPV6_LOOPBACK_ADDR = "::1"
28LOOPBACK_DEV = "lo"
29LOOPBACK_IFINDEX = 1
30
31SIOCKILLADDR = 0x8939
32
33
34Ifreq = cstruct.Struct("Ifreq", "=16s16s", "name data")
35In6Ifreq = cstruct.Struct("In6Ifreq", "=16sIi", "addr prefixlen ifindex")
36
37def KillAddrIoctl(addr):
38  """Calls the SIOCKILLADDR ioctl on the provided IP address.
39
40  Args:
41    addr The IP address to pass to the ioctl.
42
43  Raises:
44    ValueError: If addr is of an unsupported address family.
45  """
46  family, _, _, _, _ = getaddrinfo(addr, None, AF_UNSPEC, SOCK_DGRAM, 0,
47                                   AI_NUMERICHOST)[0]
48  if family == AF_INET6:
49    addr = inet_pton(AF_INET6, addr)
50    ifreq = In6Ifreq((addr, 128, LOOPBACK_IFINDEX)).Pack()
51  elif family == AF_INET:
52    addr = inet_pton(AF_INET, addr)
53    sockaddr = csocket.SockaddrIn((AF_INET, 0, addr)).Pack()
54    ifreq = Ifreq((LOOPBACK_DEV, sockaddr)).Pack()
55  else:
56    raise ValueError('Address family %r not supported.' % family)
57  datagram_socket = socket(family, SOCK_DGRAM)
58  try:
59    fcntl.ioctl(datagram_socket.fileno(), SIOCKILLADDR, ifreq)
60  finally:
61    datagram_socket.close()
62
63# For convenience.
64def CreateIPv4SocketPair():
65  return net_test.CreateSocketPair(AF_INET, SOCK_STREAM, IPV4_LOOPBACK_ADDR)
66
67def CreateIPv6SocketPair():
68  return net_test.CreateSocketPair(AF_INET6, SOCK_STREAM, IPV6_LOOPBACK_ADDR)
69
70
71@unittest.skipUnless(net_test.LINUX_VERSION >= (4, 4, 0), "grace period")
72class TcpNukeAddrTest(net_test.NetworkTest):
73
74  """Tests that SIOCKILLADDR no longer exists.
75
76  The out-of-tree SIOCKILLADDR was replaced by the upstream SOCK_DESTROY
77  operation in Linux 4.5. It was backported to common Android trees all the way
78  back to android-3.10 and is required by CTS from Android N onwards.
79  This test ensures that it no longer works in 4.4 and above kernels.
80
81  Relevant kernel commits:
82    android-4.4:
83      3094efd84c Revert "net: socket ioctl to reset connections matching local address"
84  """
85
86  def CheckNukeAddrUnsupported(self, socketpair, addr):
87    s1, s2 = socketpair
88    self.assertRaisesErrno(errno.ENOTTY, KillAddrIoctl, addr)
89    data = "foo"
90    try:
91      self.assertEquals(len(data), s1.send(data))
92      self.assertEquals(data, s2.recv(4096))
93      self.assertSocketsNotClosed(socketpair)
94    finally:
95      s1.close()
96      s2.close()
97
98  def assertSocketsNotClosed(self, socketpair):
99    for sock in socketpair:
100      self.assertTrue(sock.getpeername())
101
102  def testIpv4Unsupported(self):
103    self.CheckNukeAddrUnsupported(CreateIPv4SocketPair(), IPV4_LOOPBACK_ADDR)
104    self.CheckNukeAddrUnsupported(CreateIPv4SocketPair(), "0.0.0.0")
105
106  def testIpv6Unsupported(self):
107    self.CheckNukeAddrUnsupported(CreateIPv6SocketPair(), IPV6_LOOPBACK_ADDR)
108    self.CheckNukeAddrUnsupported(CreateIPv4SocketPair(), "::")
109
110
111if __name__ == "__main__":
112  unittest.main()
113