1% send, sniff, sr* tests for Scapy
2
3~ netaccess
4
5############
6############
7+ Test bridge_and_sniff() using tap sockets
8
9~ tap linux
10
11= Create two tap interfaces
12
13import subprocess
14from threading import Thread
15
16tap0, tap1 = [TunTapInterface("tap%d" % i) for i in range(2)]
17
18if LINUX:
19    for i in range(2):
20        assert subprocess.check_call(["ip", "link", "set", "tap%d" % i, "up"]) == 0
21else:
22    for i in range(2):
23        assert subprocess.check_call(["ifconfig", "tap%d" % i, "up"]) == 0
24
25= Run a sniff thread on the tap1 **interface**
26* It will terminate when 5 IP packets from 1.2.3.4 have been sniffed
27t_sniff = Thread(
28    target=sniff,
29    kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary,
30            "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}
31)
32t_sniff.start()
33
34= Run a bridge_and_sniff thread between the taps **sockets**
35* It will terminate when 5 IP packets from 1.2.3.4 have been forwarded
36t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1),
37                  kwargs={"store": False, "count": 5, 'prn': Packet.summary,
38                          "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"})
39t_bridge.start()
40
41= Send five IP packets from 1.2.3.4 to the tap0 **interface**
42time.sleep(1)
43sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0",
44      count=5)
45
46= Wait for the threads
47t_bridge.join()
48t_sniff.join()
49
50= Run a sniff thread on the tap1 **interface**
51* It will terminate when 5 IP packets from 2.3.4.5 have been sniffed
52t_sniff = Thread(
53    target=sniff,
54    kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary,
55            "lfilter": lambda p: IP in p and p[IP].src == "2.3.4.5"}
56)
57t_sniff.start()
58
59= Run a bridge_and_sniff thread between the taps **sockets**
60* It will "NAT" packets from 1.2.3.4 to 2.3.4.5 and will terminate when 5 IP packets have been forwarded
61def nat_1_2(pkt):
62    if IP in pkt and pkt[IP].src == "1.2.3.4":
63        pkt[IP].src = "2.3.4.5"
64        del pkt[IP].chksum
65        return pkt
66    return False
67
68t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1),
69                  kwargs={"store": False, "count": 5, 'prn': Packet.summary,
70                          "xfrm12": nat_1_2,
71                          "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"})
72t_bridge.start()
73
74= Send five IP packets from 1.2.3.4 to the tap0 **interface**
75time.sleep(1)
76sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0",
77      count=5)
78
79= Wait for the threads
80t_bridge.join()
81t_sniff.join()
82
83= Delete the tap interfaces
84del tap0, tap1
85
86
87############
88############
89+ Test bridge_and_sniff() using tun sockets
90
91~ tun linux not_pcapdnet
92
93= Create two tun interfaces
94
95import subprocess
96from threading import Thread
97
98tun0, tun1 = [TunTapInterface("tun%d" % i) for i in range(2)]
99
100if LINUX:
101    for i in range(2):
102        assert subprocess.check_call(["ip", "link", "set", "tun%d" % i, "up"]) == 0
103else:
104    for i in range(2):
105        assert subprocess.check_call(["ifconfig", "tun%d" % i, "up"]) == 0
106
107= Run a sniff thread on the tun1 **interface**
108* It will terminate when 5 IP packets from 1.2.3.4 have been sniffed
109t_sniff = Thread(
110    target=sniff,
111    kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary,
112            "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}
113)
114t_sniff.start()
115
116= Run a bridge_and_sniff thread between the tuns **sockets**
117* It will terminate when 5 IP packets from 1.2.3.4 have been forwarded.
118t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1),
119                  kwargs={"store": False, "count": 5, 'prn': Packet.summary,
120                          "xfrm12": lambda pkt: pkt,
121                          "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"})
122t_bridge.start()
123
124= Send five IP packets from 1.2.3.4 to the tun0 **interface**
125time.sleep(1)
126conf.route.add(net="1.2.3.4/32", dev="tun0")
127send(IP(src="1.2.3.4", dst="1.2.3.4") / ICMP(), count=5)
128conf.route.delt(net="1.2.3.4/32", dev="tun0")
129
130= Wait for the threads
131t_bridge.join()
132t_sniff.join()
133
134= Run a sniff thread on the tun1 **interface**
135* It will terminate when 5 IP packets from 2.3.4.5 have been sniffed
136t_sniff = Thread(
137    target=sniff,
138    kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary,
139            "lfilter": lambda p: IP in p and p[IP].src == "2.3.4.5"}
140)
141t_sniff.start()
142
143= Run a bridge_and_sniff thread between the tuns **sockets**
144* It will "NAT" packets from 1.2.3.4 to 2.3.4.5 and will terminate when 5 IP packets have been forwarded
145def nat_1_2(pkt):
146    if IP in pkt and pkt[IP].src == "1.2.3.4":
147        pkt[IP].src = "2.3.4.5"
148        del pkt[IP].chksum
149        return pkt
150    return False
151
152t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1),
153                  kwargs={"store": False, "count": 5, 'prn': Packet.summary,
154                          "xfrm12": nat_1_2,
155                          "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"})
156t_bridge.start()
157
158= Send five IP packets from 1.2.3.4 to the tun0 **interface**
159time.sleep(1)
160conf.route.add(net="1.2.3.4/32", dev="tun0")
161send(IP(src="1.2.3.4", dst="1.2.3.4") / ICMP(), count=5)
162conf.route.delt(net="1.2.3.4/32", dev="tun0")
163
164= Wait for the threads
165t_bridge.join()
166t_sniff.join()
167
168= Delete the tun interfaces
169del tun0, tun1
170