1#!/usr/bin/python3.4
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from acts import asserts
18from acts.test_decorators import test_tracker_info
19from acts_contrib.test_utils.net import connectivity_const as cconsts
20from acts_contrib.test_utils.wifi.aware import aware_const as aconsts
21from acts.controllers.ap_lib.hostapd_constants import BAND_2G
22from acts.controllers.ap_lib.hostapd_constants import BAND_5G
23from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils
24from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
25from acts_contrib.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
26from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
27from scapy.all import *
28
29
30class MacRandomNoLeakageTest(AwareBaseTest, WifiBaseTest):
31    """Set of tests for Wi-Fi Aware MAC address randomization of NMI (NAN
32    management interface) and NDI (NAN data interface)."""
33
34    SERVICE_NAME = "GoogleTestServiceXYZ"
35    ping_msg = 'PING'
36
37    AWARE_DEFAULT_CHANNEL_5_BAND = 149
38    AWARE_DEFAULT_CHANNEL_24_BAND = 6
39
40    ENCR_TYPE_OPEN = 0
41    ENCR_TYPE_PASSPHRASE = 1
42    ENCR_TYPE_PMK = 2
43
44    PASSPHRASE = "This is some random passphrase - very very secure!!"
45    PMK = "ODU0YjE3YzdmNDJiNWI4NTQ2NDJjNDI3M2VkZTQyZGU="
46
47    def setup_class(self):
48        super().setup_class()
49
50        asserts.assert_true(hasattr(self, 'packet_capture'),
51                            "Needs packet_capture attribute to support sniffing.")
52        self.configure_packet_capture(channel_5g=self.AWARE_DEFAULT_CHANNEL_5_BAND,
53                                      channel_2g=self.AWARE_DEFAULT_CHANNEL_24_BAND)
54
55    def setup_test(self):
56        WifiBaseTest.setup_test(self)
57        AwareBaseTest.setup_test(self)
58
59    def teardown_test(self):
60        WifiBaseTest.teardown_test(self)
61        AwareBaseTest.teardown_test(self)
62
63    def verify_mac_no_leakage(self, pcap_procs, factory_mac_addresses, mac_addresses):
64        # Get 2G and 5G pcaps
65        pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_5G][1], BAND_5G.upper())
66        pcap_5g = rdpcap(pcap_fname)
67
68        pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_2G][1], BAND_2G.upper())
69        pcap_2g = rdpcap(pcap_fname)
70        pcaps = pcap_5g + pcap_2g
71
72        # Verify factory MAC is not leaked in both 2G and 5G pcaps
73        ads = [self.android_devices[0], self.android_devices[1]]
74        for i, mac in enumerate(factory_mac_addresses):
75            wutils.verify_mac_not_found_in_pcap(ads[i], mac, pcaps)
76
77        # Verify random MACs are being used and in pcaps
78        for i, mac in enumerate(mac_addresses):
79            wutils.verify_mac_is_found_in_pcap(ads[i], mac, pcaps)
80
81    def transfer_mac_format(self, mac):
82        """add ':' to mac String, and transfer to lower case
83
84    Args:
85        mac: String of mac without ':'
86        @return: Lower case String of mac like "xx:xx:xx:xx:xx:xx"
87    """
88        return re.sub(r"(?<=\w)(?=(?:\w\w)+$)", ":", mac.lower())
89
90    def start_aware(self, dut, is_publish):
91        """Start Aware attach, then start Publish/Subscribe based on role
92
93     Args:
94         dut: Aware device
95         is_publish: True for Publisher, False for subscriber
96         @:return: dict with Aware discovery session info
97    """
98        aware_id = dut.droid.wifiAwareAttach(True)
99        autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED)
100        event = autils.wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
101        mac = self.transfer_mac_format(event["data"]["mac"])
102        dut.log.info("NMI=%s", mac)
103
104        if is_publish:
105            config = autils.create_discovery_config(self.SERVICE_NAME,
106                                                    aconsts.PUBLISH_TYPE_UNSOLICITED)
107            disc_id = dut.droid.wifiAwarePublish(aware_id, config)
108            autils.wait_for_event(dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
109        else:
110            config = autils.create_discovery_config(self.SERVICE_NAME,
111                                                    aconsts.SUBSCRIBE_TYPE_PASSIVE)
112            disc_id = dut.droid.wifiAwareSubscribe(aware_id, config)
113            autils.wait_for_event(dut, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED)
114        aware_session = {"awareId": aware_id, "discId": disc_id, "mac": mac}
115        return aware_session
116
117    def create_date_path(self, p_dut, pub_session, s_dut, sub_session, sec_type):
118        """Create NDP based on the security type(open, PMK, PASSPHRASE), run socket connect
119
120    Args:
121        p_dut: Publish device
122        p_disc_id: Publisher discovery id
123        peer_id_on_pub: peer id on publisher
124        s_dut: Subscribe device
125        s_disc_id: Subscriber discovery id
126        peer_id_on_sub: peer id on subscriber
127        sec_type: NDP security type(open, PMK or PASSPHRASE)
128        @:return: dict with NDP info
129    """
130
131        passphrase = None
132        pmk = None
133
134        if sec_type == self.ENCR_TYPE_PASSPHRASE:
135            passphrase = self.PASSPHRASE
136        if sec_type == self.ENCR_TYPE_PMK:
137            pmk = self.PMK
138
139        p_req_key = autils.request_network(
140            p_dut,
141            p_dut.droid.wifiAwareCreateNetworkSpecifier(pub_session["discId"],
142                                                        None,
143                                                        passphrase, pmk))
144        s_req_key = autils.request_network(
145            s_dut,
146            s_dut.droid.wifiAwareCreateNetworkSpecifier(sub_session["discId"],
147                                                        sub_session["peerId"],
148                                                        passphrase, pmk))
149
150        p_net_event_nc = autils.wait_for_event_with_keys(
151            p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
152            (cconsts.NETWORK_CB_KEY_EVENT,
153             cconsts.NETWORK_CB_CAPABILITIES_CHANGED),
154            (cconsts.NETWORK_CB_KEY_ID, p_req_key))
155        s_net_event_nc = autils.wait_for_event_with_keys(
156            s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
157            (cconsts.NETWORK_CB_KEY_EVENT,
158             cconsts.NETWORK_CB_CAPABILITIES_CHANGED),
159            (cconsts.NETWORK_CB_KEY_ID, s_req_key))
160        p_net_event_lp = autils.wait_for_event_with_keys(
161            p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
162            (cconsts.NETWORK_CB_KEY_EVENT,
163             cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
164            (cconsts.NETWORK_CB_KEY_ID, p_req_key))
165        s_net_event_lp = autils.wait_for_event_with_keys(
166            s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
167            (cconsts.NETWORK_CB_KEY_EVENT,
168             cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
169            (cconsts.NETWORK_CB_KEY_ID, s_req_key))
170
171        p_aware_if = p_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
172        s_aware_if = s_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
173        p_if_mac = self.transfer_mac_format(autils.get_mac_addr(p_dut, p_aware_if))
174        p_dut.log.info("NDI %s=%s", p_aware_if, p_if_mac)
175        s_if_mac = self.transfer_mac_format(autils.get_mac_addr(s_dut, s_aware_if))
176        s_dut.log.info("NDI %s=%s", s_aware_if, s_if_mac)
177
178        s_ipv6 = p_net_event_nc["data"][aconsts.NET_CAP_IPV6]
179        p_ipv6 = s_net_event_nc["data"][aconsts.NET_CAP_IPV6]
180        asserts.assert_true(
181            autils.verify_socket_connect(p_dut, s_dut, p_ipv6, s_ipv6, 0),
182            "Failed socket link with Pub as Server")
183        asserts.assert_true(
184            autils.verify_socket_connect(s_dut, p_dut, s_ipv6, p_ipv6, 0),
185            "Failed socket link with Sub as Server")
186
187        ndp_info = {"pubReqKey": p_req_key, "pubIfMac": p_if_mac,
188                    "subReqKey": s_req_key, "subIfMac": s_if_mac}
189        return ndp_info
190
191    @test_tracker_info(uuid="c9c66873-a8e0-4830-8baa-ada03223bcef")
192    def test_ib_multi_data_path_mac_random_test(self):
193        """Verify there is no factory MAC Address leakage during the Aware discovery, NDP creation,
194        socket setup and IP service connection."""
195
196        p_dut = self.android_devices[0]
197        s_dut = self.android_devices[1]
198        mac_addresses = []
199        factory_mac_addresses = []
200        sec_types = [self.ENCR_TYPE_PMK, self.ENCR_TYPE_PASSPHRASE]
201
202        self.log.info("Starting packet capture")
203        pcap_procs = wutils.start_pcap(
204            self.packet_capture, 'dual', self.test_name)
205
206        factory_mac_1 = p_dut.droid.wifigetFactorymacAddresses()[0]
207        p_dut.log.info("Factory Address: %s", factory_mac_1)
208        factory_mac_2 = s_dut.droid.wifigetFactorymacAddresses()[0]
209        s_dut.log.info("Factory Address: %s", factory_mac_2)
210        factory_mac_addresses.append(factory_mac_1)
211        factory_mac_addresses.append(factory_mac_2)
212
213        # Start Aware and exchange messages
214        publish_session = self.start_aware(p_dut, True)
215        subscribe_session = self.start_aware(s_dut, False)
216        mac_addresses.append(publish_session["mac"])
217        mac_addresses.append(subscribe_session["mac"])
218        discovery_event = autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED)
219        subscribe_session["peerId"] = discovery_event['data'][aconsts.SESSION_CB_KEY_PEER_ID]
220
221        msg_id = self.get_next_msg_id()
222        s_dut.droid.wifiAwareSendMessage(subscribe_session["discId"], subscribe_session["peerId"],
223                                         msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES)
224        autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT)
225        pub_rx_msg_event = autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED)
226        publish_session["peerId"] = pub_rx_msg_event['data'][aconsts.SESSION_CB_KEY_PEER_ID]
227
228        msg_id = self.get_next_msg_id()
229        p_dut.droid.wifiAwareSendMessage(publish_session["discId"], publish_session["peerId"],
230                                         msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES)
231        autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT)
232        autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED)
233
234        # Create Aware NDP
235        p_req_keys = []
236        s_req_keys = []
237        for sec in sec_types:
238            ndp_info = self.create_date_path(p_dut, publish_session, s_dut, subscribe_session, sec)
239            p_req_keys.append(ndp_info["pubReqKey"])
240            s_req_keys.append(ndp_info["subReqKey"])
241            mac_addresses.append(ndp_info["pubIfMac"])
242            mac_addresses.append(ndp_info["subIfMac"])
243
244        # clean-up
245        for p_req_key in p_req_keys:
246            p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key)
247        for s_req_key in s_req_keys:
248            s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key)
249        p_dut.droid.wifiAwareDestroyAll()
250        s_dut.droid.wifiAwareDestroyAll()
251
252        self.log.info("Stopping packet capture")
253        wutils.stop_pcap(self.packet_capture, pcap_procs, False)
254
255        self.verify_mac_no_leakage(pcap_procs, factory_mac_addresses, mac_addresses)
256