1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import multiprocessing
19import time
20
21from datetime import datetime
22from uuid import uuid4
23
24from acts import signals
25from acts import tracelogger
26from acts import utils
27from acts.controllers import iperf_client
28from acts.controllers import iperf_server
29
30AC_VO = 'AC_VO'
31AC_VI = 'AC_VI'
32AC_BE = 'AC_BE'
33AC_BK = 'AC_BK'
34
35# TODO(fxb/61421): Add tests to check all DSCP classes are mapped to the correct
36# AC (there are many that aren't included here). Requires implementation of
37# sniffer.
38DEFAULT_AC_TO_TOS_TAG_MAP = {
39    AC_VO: '0xC0',
40    AC_VI: '0x80',
41    AC_BE: '0x0',
42    AC_BK: '0x20'
43}
44UDP = 'udp'
45TCP = 'tcp'
46DEFAULT_IPERF_PORT = 5201
47DEFAULT_STREAM_TIME = 10
48DEFAULT_IP_ADDR_TIMEOUT = 15
49PROCESS_JOIN_TIMEOUT = 60
50AVAILABLE = True
51UNAVAILABLE = False
52
53
54class WmmTransceiverError(signals.ControllerError):
55    pass
56
57
58def create(config, identifier=None, wlan_devices=None, access_points=None):
59    """Creates a WmmTransceiver from a config.
60
61    Args:
62        config: dict, config parameters for the transceiver. Contains:
63            - iperf_config: dict, the config to use for creating IPerfClients
64                and IPerfServers (excluding port).
65            - port_range_start: int, the lower bound of the port range to use
66                for creating IPerfServers. Defaults to 5201.
67            - wlan_device: string, the identifier of the wlan_device used for
68                this WmmTransceiver (optional)
69
70        identifier: string, identifier for the WmmTransceiver. Must be provided
71            either as arg or in the config.
72        wlan_devices: list of WlanDevice objects from which to get the
73            wlan_device, if any, used as this transceiver
74        access_points: list of AccessPoint objects from which to get the
75            access_point, if any, used as this transceiver
76    """
77    try:
78        # If identifier is not provided as func arg, it must be provided via
79        # config file.
80        if not identifier:
81            identifier = config['identifier']
82        iperf_config = config['iperf_config']
83
84    except KeyError as err:
85        raise WmmTransceiverError(
86            'Parameter not provided as func arg, nor found in config: %s' %
87            err)
88
89    if wlan_devices is None:
90        wlan_devices = []
91
92    if access_points is None:
93        access_points = []
94
95    port_range_start = config.get('port_range_start', DEFAULT_IPERF_PORT)
96
97    wd = None
98    ap = None
99    if 'wlan_device' in config:
100        wd = _find_wlan_device(config['wlan_device'], wlan_devices)
101    elif 'access_point' in config:
102        ap = _find_access_point(config['access_point'], access_points)
103
104    return WmmTransceiver(iperf_config,
105                          identifier,
106                          wlan_device=wd,
107                          access_point=ap,
108                          port_range_start=port_range_start)
109
110
111def _find_wlan_device(wlan_device_identifier, wlan_devices):
112    """Returns WlanDevice based on string identifier (e.g. ip, serial, etc.)
113
114    Args:
115        wlan_device_identifier: string, identifier for the desired WlanDevice
116        wlan_devices: list, WlanDevices to search through
117
118    Returns:
119        WlanDevice, with identifier matching wlan_device_identifier
120
121    Raises:
122        WmmTransceiverError, if no WlanDevice matches identifier
123    """
124    for wd in wlan_devices:
125        if wlan_device_identifier == wd.identifier:
126            return wd
127    raise WmmTransceiverError('No WlanDevice with identifier: %s' %
128                              wlan_device_identifier)
129
130
131def _find_access_point(access_point_ip, access_points):
132    """Returns AccessPoint based on string ip address
133
134    Args:
135        access_point_ip: string, control plane ip addr of the desired AP,
136        access_points: list, AccessPoints to search through
137
138    Returns:
139        AccessPoint, with hostname matching access_point_ip
140
141    Raises:
142        WmmTransceiverError, if no AccessPoint matches ip"""
143    for ap in access_points:
144        if ap.ssh_settings.hostname == access_point_ip:
145            return ap
146    raise WmmTransceiverError('No AccessPoint with ip: %s' % access_point_ip)
147
148
149class WmmTransceiver(object):
150    """Object for handling WMM tagged streams between devices"""
151    def __init__(self,
152                 iperf_config,
153                 identifier,
154                 wlan_device=None,
155                 access_point=None,
156                 port_range_start=5201):
157
158        self.identifier = identifier
159        self.log = tracelogger.TraceLogger(
160            WmmTransceiverLoggerAdapter(logging.getLogger(),
161                                        {'identifier': self.identifier}))
162        # WlanDevice or AccessPoint, that is used as the transceiver. Only one
163        # will be set. This helps consolodate association, setup, teardown, etc.
164        self.wlan_device = wlan_device
165        self.access_point = access_point
166
167        # Parameters used to create IPerfClient and IPerfServer objects on
168        # device
169        self._iperf_config = iperf_config
170        self._test_interface = self._iperf_config.get('test_interface')
171        self._port_range_start = port_range_start
172        self._next_server_port = port_range_start
173
174        # Maps IPerfClients, used for streams from this device, to True if
175        # available, False if reserved
176        self._iperf_clients = {}
177
178        # Maps IPerfServers, used to receive streams from other devices, to True
179        # if available, False if reserved
180        self._iperf_servers = {}
181
182        # Maps ports of servers, which are provided to other transceivers, to
183        # the actual IPerfServer objects
184        self._iperf_server_ports = {}
185
186        # Maps stream UUIDs to IPerfClients reserved for that streams use
187        self._reserved_clients = {}
188
189        # Maps stream UUIDs to (WmmTransceiver, IPerfServer) tuples, where the
190        # server is reserved on the transceiver for that streams use
191        self._reserved_servers = {}
192
193        # Maps with shared memory functionality to be used across the parallel
194        # streams. active_streams holds UUIDs of streams that are currently
195        # running on this device (mapped to True, since there is no
196        # multiprocessing set). stream_results maps UUIDs of streams completed
197        # on this device to IPerfResult results for that stream.
198        self._manager = multiprocessing.Manager()
199        self._active_streams = self._manager.dict()
200        self._stream_results = self._manager.dict()
201
202        # Holds parameters for streams that are prepared to run asynchronously
203        # (i.e. resources have been allocated). Maps UUIDs of the future streams
204        # to a dict, containing the stream parameters.
205        self._pending_async_streams = {}
206
207        # Set of UUIDs of asynchronous streams that have at least started, but
208        # have not had their resources reclaimed yet
209        self._ran_async_streams = set()
210
211        # Set of stream parallel process, which can be joined if completed
212        # successfully, or  terminated and joined in the event of an error
213        self._running_processes = set()
214
215    def run_synchronous_traffic_stream(self, stream_parameters, subnet):
216        """Runs a traffic stream with IPerf3 between two WmmTransceivers and
217        saves the results.
218
219        Args:
220            stream_parameters: dict, containing parameters to used for the
221                stream. See _parse_stream_parameters for details.
222            subnet: string, the subnet of the network to use for the stream
223
224        Returns:
225            uuid: UUID object, identifier of the stream
226        """
227        (receiver, access_category, bandwidth,
228         stream_time) = self._parse_stream_parameters(stream_parameters)
229        uuid = uuid4()
230
231        (client, server_ip,
232         server_port) = self._get_stream_resources(uuid, receiver, subnet)
233
234        self._validate_server_address(server_ip, uuid)
235
236        self.log.info('Running synchronous stream to %s WmmTransceiver' %
237                      receiver.identifier)
238        self._run_traffic(uuid,
239                          client,
240                          server_ip,
241                          server_port,
242                          self._active_streams,
243                          self._stream_results,
244                          access_category=access_category,
245                          bandwidth=bandwidth,
246                          stream_time=stream_time)
247
248        self._return_stream_resources(uuid)
249        return uuid
250
251    def prepare_asynchronous_stream(self, stream_parameters, subnet):
252        """Reserves resources and saves configs for upcoming asynchronous
253        traffic streams, so they can be started more simultaneously.
254
255        Args:
256            stream_parameters: dict, containing parameters to used for the
257                stream. See _parse_stream_parameters for details.
258            subnet: string, the subnet of the network to use for the stream
259
260        Returns:
261            uuid: UUID object, identifier of the stream
262        """
263        (receiver, access_category, bandwidth,
264         time) = self._parse_stream_parameters(stream_parameters)
265        uuid = uuid4()
266
267        (client, server_ip,
268         server_port) = self._get_stream_resources(uuid, receiver, subnet)
269
270        self._validate_server_address(server_ip, uuid)
271
272        pending_stream_config = {
273            'client': client,
274            'server_ip': server_ip,
275            'server_port': server_port,
276            'access_category': access_category,
277            'bandwidth': bandwidth,
278            'time': time
279        }
280
281        self._pending_async_streams[uuid] = pending_stream_config
282        self.log.info('Stream to %s WmmTransceiver prepared.' %
283                      receiver.identifier)
284        return uuid
285
286    def start_asynchronous_streams(self, start_time=None):
287        """Starts pending asynchronous streams between two WmmTransceivers as
288        parallel processes.
289
290        Args:
291            start_time: float, time, seconds since epoch, at which to start the
292                stream (for better synchronicity). If None, start immediately.
293        """
294        for uuid in self._pending_async_streams:
295            pending_stream_config = self._pending_async_streams[uuid]
296            client = pending_stream_config['client']
297            server_ip = pending_stream_config['server_ip']
298            server_port = pending_stream_config['server_port']
299            access_category = pending_stream_config['access_category']
300            bandwidth = pending_stream_config['bandwidth']
301            time = pending_stream_config['time']
302
303            process = multiprocessing.Process(target=self._run_traffic,
304                                              args=[
305                                                  uuid, client, server_ip,
306                                                  server_port,
307                                                  self._active_streams,
308                                                  self._stream_results
309                                              ],
310                                              kwargs={
311                                                  'access_category':
312                                                  access_category,
313                                                  'bandwidth': bandwidth,
314                                                  'stream_time': time,
315                                                  'start_time': start_time
316                                              })
317
318            # This needs to be set here to ensure its marked active before
319            # it even starts.
320            self._active_streams[uuid] = True
321            process.start()
322            self._ran_async_streams.add(uuid)
323            self._running_processes.add(process)
324
325        self._pending_async_streams.clear()
326
327    def cleanup_asynchronous_streams(self, timeout=PROCESS_JOIN_TIMEOUT):
328        """Releases reservations on resources (IPerfClients and IPerfServers)
329        that were held for asynchronous streams, both pending and finished.
330        Attempts to join any running processes, logging an error if timeout is
331        exceeded.
332
333        Args:
334            timeout: time, in seconds, to wait for each running process, if any,
335                to join
336        """
337        self.log.info('Cleaning up any asynchronous streams.')
338
339        # Releases resources for any streams that were prepared, but no run
340        for uuid in self._pending_async_streams:
341            self.log.error(
342                'Pending asynchronous stream %s never ran. Cleaning.' % uuid)
343            self._return_stream_resources(uuid)
344        self._pending_async_streams.clear()
345
346        # Attempts to join any running streams, terminating them after timeout
347        # if necessary.
348        while self._running_processes:
349            process = self._running_processes.pop()
350            process.join(timeout)
351            if process.is_alive():
352                self.log.error(
353                    'Stream process failed to join in %s seconds. Terminating.'
354                    % timeout)
355                process.terminate()
356                process.join()
357        self._active_streams.clear()
358
359        # Release resources for any finished streams
360        while self._ran_async_streams:
361            uuid = self._ran_async_streams.pop()
362            self._return_stream_resources(uuid)
363
364    def get_results(self, uuid):
365        """Retrieves a streams IPerfResults from stream_results
366
367        Args:
368            uuid: UUID object, identifier of the stream
369        """
370        return self._stream_results.get(uuid, None)
371
372    def destroy_resources(self):
373        for server in self._iperf_servers:
374            server.stop()
375        self._iperf_servers.clear()
376        self._iperf_server_ports.clear()
377        self._iperf_clients.clear()
378        self._next_server_port = self._port_range_start
379        self._stream_results.clear()
380
381    @property
382    def has_active_streams(self):
383        return bool(self._active_streams)
384
385    # Helper Functions
386
387    def _run_traffic(self,
388                     uuid,
389                     client,
390                     server_ip,
391                     server_port,
392                     active_streams,
393                     stream_results,
394                     access_category=None,
395                     bandwidth=None,
396                     stream_time=DEFAULT_STREAM_TIME,
397                     start_time=None):
398        """Runs an iperf3 stream.
399
400        1. Adds stream UUID to active_streams
401        2. Runs stream
402        3. Saves results to stream_results
403        4. Removes stream UUID from active_streams
404
405        Args:
406            uuid: UUID object, identifier for stream
407            client: IPerfClient object on device
408            server_ip: string, ip address of IPerfServer for stream
409            server_port: int, port of the IPerfServer for stream
410            active_streams: multiprocessing.Manager.dict, which holds stream
411                UUIDs of active streams on the device
412            stream_results: multiprocessing.Manager.dict, which maps stream
413                UUIDs of streams to IPerfResult objects
414            access_category: string, WMM access category to use with iperf
415                (AC_BK, AC_BE, AC_VI, AC_VO). Unset if None.
416            bandwidth: int, bandwidth in mbps to use with iperf. Implies UDP.
417                Unlimited if None.
418            stream_time: int, time in seconds, to run iperf stream
419            start_time: float, time, seconds since epoch, at which to start the
420                stream (for better synchronicity). If None, start immediately.
421        """
422        active_streams[uuid] = True
423        # SSH sessions must be started within the process that is going to
424        # use it.
425        if type(client) == iperf_client.IPerfClientOverSsh:
426            with utils.SuppressLogOutput():
427                client.start_ssh()
428
429        ac_flag = ''
430        bandwidth_flag = ''
431        time_flag = '-t %s' % stream_time
432
433        if access_category:
434            ac_flag = ' -S %s' % DEFAULT_AC_TO_TOS_TAG_MAP[access_category]
435
436        if bandwidth:
437            bandwidth_flag = ' -u -b %sM' % bandwidth
438
439        iperf_flags = '-p %s -i 1 %s%s%s -J' % (server_port, time_flag,
440                                                ac_flag, bandwidth_flag)
441        if not start_time:
442            start_time = time.time()
443        time_str = datetime.fromtimestamp(start_time).strftime('%H:%M:%S.%f')
444        self.log.info(
445            'At %s, starting %s second stream to %s:%s with (AC: %s, Bandwidth: %s)'
446            % (time_str, stream_time, server_ip, server_port, access_category,
447               bandwidth if bandwidth else 'Unlimited'))
448
449        # If present, wait for stream start time
450        if start_time:
451            current_time = time.time()
452            while current_time < start_time:
453                current_time = time.time()
454        path = client.start(server_ip, iperf_flags, '%s' % uuid)
455        stream_results[uuid] = iperf_server.IPerfResult(
456            path, reporting_speed_units='mbps')
457
458        if type(client) == iperf_client.IPerfClientOverSsh:
459            client.close_ssh()
460        active_streams.pop(uuid)
461
462    def _get_stream_resources(self, uuid, receiver, subnet):
463        """Reserves an IPerfClient and IPerfServer for a stream.
464
465        Args:
466            uuid: UUID object, identifier of the stream
467            receiver: WmmTransceiver object, which will be the streams receiver
468            subnet: string, subnet of test network, to retrieve the appropriate
469                server address
470
471        Returns:
472            (IPerfClient, string, int) representing the client, server address,
473            and server port to use for the stream
474        """
475        client = self._get_client(uuid)
476        server_ip, server_port = self._get_server(receiver, uuid, subnet)
477        return (client, server_ip, server_port)
478
479    def _return_stream_resources(self, uuid):
480        """Releases reservations on a streams IPerfClient and IPerfServer, so
481        they can be used by a future stream.
482
483        Args:
484            uuid: UUID object, identifier of the stream
485        """
486        if uuid in self._active_streams:
487            raise EnvironmentError('Resource still being used by stream %s' %
488                                   uuid)
489        (receiver, server_port) = self._reserved_servers.pop(uuid)
490        receiver._release_server(server_port)
491        client = self._reserved_clients.pop(uuid)
492        self._iperf_clients[client] = AVAILABLE
493
494    def _get_client(self, uuid):
495        """Retrieves and reserves IPerfClient for use in a stream. If none are
496        available, a new one is created.
497
498        Args:
499            uuid: UUID object, identifier for stream, used to link client to
500                stream for teardown
501
502        Returns:
503            IPerfClient on device
504        """
505        reserved_client = None
506        for client in self._iperf_clients:
507            if self._iperf_clients[client] == AVAILABLE:
508                reserved_client = client
509                break
510        else:
511            reserved_client = iperf_client.create([self._iperf_config])[0]
512            # Due to the nature of multiprocessing, ssh connections must
513            # be started inside the parallel processes, so it must be closed
514            # here.
515            if type(reserved_client) == iperf_client.IPerfClientOverSsh:
516                reserved_client.close_ssh()
517
518        self._iperf_clients[reserved_client] = UNAVAILABLE
519        self._reserved_clients[uuid] = reserved_client
520        return reserved_client
521
522    def _get_server(self, receiver, uuid, subnet):
523        """Retrieves the address and port of a reserved IPerfServer object from
524        the receiver object for use in a stream.
525
526        Args:
527            receiver: WmmTransceiver, to get an IPerfServer from
528            uuid: UUID, identifier for stream, used to link server to stream
529                for teardown
530            subnet: string, subnet of test network, to retrieve the appropriate
531                server address
532
533        Returns:
534            (string, int) representing the IPerfServer address and port
535        """
536        (server_ip, server_port) = receiver._reserve_server(subnet)
537        self._reserved_servers[uuid] = (receiver, server_port)
538        return (server_ip, server_port)
539
540    def _reserve_server(self, subnet):
541        """Reserves an available IPerfServer for use in a stream from another
542        WmmTransceiver. If none are available, a new one is created.
543
544        Args:
545            subnet: string, subnet of test network, to retrieve the appropriate
546                server address
547
548        Returns:
549            (string, int) representing the IPerfServer address and port
550        """
551        reserved_server = None
552        for server in self._iperf_servers:
553            if self._iperf_servers[server] == AVAILABLE:
554                reserved_server = server
555                break
556        else:
557            iperf_server_config = self._iperf_config
558            iperf_server_config.update({'port': self._next_server_port})
559            self._next_server_port += 1
560            reserved_server = iperf_server.create([iperf_server_config])[0]
561            self._iperf_server_ports[reserved_server.port] = reserved_server
562
563        self._iperf_servers[reserved_server] = UNAVAILABLE
564        reserved_server.start()
565        end_time = time.time() + DEFAULT_IP_ADDR_TIMEOUT
566        while time.time() < end_time:
567            if self.wlan_device:
568                addresses = utils.get_interface_ip_addresses(
569                    self.wlan_device.device, self._test_interface)
570            else:
571                addresses = reserved_server.get_interface_ip_addresses(
572                    self._test_interface)
573            for addr in addresses['ipv4_private']:
574                if utils.ip_in_subnet(addr, subnet):
575                    return (addr, reserved_server.port)
576        raise AttributeError(
577            'Reserved server has no ipv4 address in the %s subnet' % subnet)
578
579    def _release_server(self, server_port):
580        """Releases reservation on IPerfServer, which was held for a stream
581        from another WmmTransceiver.
582
583        Args:
584            server_port: int, the port of the IPerfServer being returned (since)
585                it is the identifying characteristic
586        """
587        server = self._iperf_server_ports[server_port]
588        server.stop()
589        self._iperf_servers[server] = AVAILABLE
590
591    def _validate_server_address(self, server_ip, uuid, timeout=60):
592        """ Verifies server address can be pinged before attempting to run
593        traffic, since iperf is unforgiving when the server is unreachable.
594
595        Args:
596            server_ip: string, ip address of the iperf server
597            uuid: string, uuid of the stream to use this server
598            timeout: int, time in seconds to wait for server to respond to pings
599
600        Raises:
601            WmmTransceiverError, if, after timeout, server ip is unreachable.
602        """
603        self.log.info('Verifying server address (%s) is reachable.' %
604                      server_ip)
605        end_time = time.time() + timeout
606        while time.time() < end_time:
607            if self.can_ping(server_ip):
608                break
609            else:
610                self.log.debug(
611                    'Could not ping server address (%s). Retrying in 1 second.'
612                    % (server_ip))
613                time.sleep(1)
614        else:
615            self._return_stream_resources(uuid)
616            raise WmmTransceiverError('IPerfServer address (%s) unreachable.' %
617                                      server_ip)
618
619    def can_ping(self, dest_ip):
620        """ Utilizes can_ping function in wlan_device or access_point device to
621        ping dest_ip
622
623        Args:
624            dest_ip: string, ip address to ping
625
626        Returns:
627            True, if dest address is reachable
628            False, otherwise
629        """
630        if self.wlan_device:
631            return self.wlan_device.can_ping(dest_ip)
632        else:
633            return self.access_point.can_ping(dest_ip)
634
635    def _parse_stream_parameters(self, stream_parameters):
636        """Parses stream_parameters from dictionary.
637
638        Args:
639            stream_parameters: dict of stream parameters
640                'receiver': WmmTransceiver, the receiver for the stream
641                'access_category': String, the access category to use for the
642                    stream. Unset if None.
643                'bandwidth': int, bandwidth in mbps for the stream. If set,
644                    implies UDP. If unset, implies TCP and unlimited bandwidth.
645                'time': int, time in seconds to run stream.
646
647        Returns:
648            (receiver, access_category, bandwidth, time) as
649            (WmmTransceiver, String, int, int)
650        """
651        receiver = stream_parameters['receiver']
652        access_category = stream_parameters.get('access_category', None)
653        bandwidth = stream_parameters.get('bandwidth', None)
654        time = stream_parameters.get('time', DEFAULT_STREAM_TIME)
655        return (receiver, access_category, bandwidth, time)
656
657
658class WmmTransceiverLoggerAdapter(logging.LoggerAdapter):
659    def process(self, msg, kwargs):
660        if self.extra['identifier']:
661            log_identifier = ' | %s' % self.extra['identifier']
662        else:
663            log_identifier = ''
664        msg = "[WmmTransceiver%s] %s" % (log_identifier, msg)
665        return (msg, kwargs)
666