1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18 19from blueberry.tests.gd.cert.event_stream import EventStream 20from blueberry.tests.gd.cert.event_stream import IEventStream 21from blueberry.tests.gd.cert.captures import HciCaptures 22from blueberry.tests.gd.cert.closable import Closable 23from blueberry.tests.gd.cert.closable import safeClose 24from blueberry.tests.gd.cert.truth import assertThat 25from datetime import timedelta 26from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_facade 27import hci_packets as hci 28 29 30class PyLeAclManagerAclConnection(IEventStream, Closable): 31 32 def __init__(self, le_acl_manager, address, remote_addr, remote_addr_type, handle, event_stream): 33 """ 34 An abstract representation for an LE ACL connection in GD certification test 35 :param le_acl_manager: The LeAclManager from this GD device 36 :param address: The local device address 37 :param remote_addr: Remote device address 38 :param remote_addr_type: Remote device address type 39 :param handle: Connection handle 40 :param event_stream: The connection event stream for this connection 41 """ 42 self.le_acl_manager = le_acl_manager 43 # todo enable filtering after sorting out handles 44 # self.our_acl_stream = FilteringEventStream(acl_stream, None) 45 self.handle = handle 46 self.connection_event_stream = event_stream 47 self.acl_stream = EventStream( 48 self.le_acl_manager.FetchAclData(le_acl_manager_facade.LeHandleMsg(handle=self.handle))) 49 self.remote_address = remote_addr 50 self.remote_address_type = remote_addr_type 51 self.own_address = address 52 self.disconnect_reason = None 53 54 def close(self): 55 safeClose(self.connection_event_stream) 56 safeClose(self.acl_stream) 57 58 def disconnect(self): 59 self.le_acl_manager.Disconnect(le_acl_manager_facade.LeHandleMsg(handle=self.handle)) 60 61 def wait_for_disconnection_complete(self, timeout=timedelta(seconds=30)): 62 disconnection_complete = HciCaptures.DisconnectionCompleteCapture() 63 assertThat(self.connection_event_stream).emits(disconnection_complete, timeout=timeout) 64 self.disconnect_reason = disconnection_complete.get().reason 65 66 def send(self, data): 67 self.le_acl_manager.SendAclData(le_acl_manager_facade.LeAclData(handle=self.handle, payload=bytes(data))) 68 69 def get_event_queue(self): 70 return self.acl_stream.get_event_queue() 71 72 73class PyLeAclManager(Closable): 74 75 def __init__(self, device): 76 """ 77 LE ACL Manager for GD Certification test 78 :param device: The GD device 79 """ 80 self.le_acl_manager = device.hci_le_acl_manager 81 82 self.incoming_connection_event_stream = None 83 self.outgoing_connection_event_streams = {} 84 self.active_connections = [] 85 self.next_token = 1 86 87 def close(self): 88 safeClose(self.incoming_connection_event_stream) 89 for v in self.outgoing_connection_event_streams.values(): 90 safeClose(v[0]) 91 for connection in self.active_connections: 92 safeClose(connection) 93 94 def listen_for_incoming_connections(self): 95 assertThat(self.incoming_connection_event_stream).isNone() 96 self.incoming_connection_event_stream = EventStream( 97 self.le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) 98 99 def connect_to_remote(self, remote_addr): 100 token = self.initiate_connection(remote_addr) 101 return self.complete_outgoing_connection(token) 102 103 def wait_for_connection(self): 104 self.listen_for_incoming_connections() 105 return self.complete_incoming_connection() 106 107 def wait_for_connection_fail(self, token): 108 assertThat(self.outgoing_connection_event_streams[token]).isNotNone() 109 event_stream = self.outgoing_connection_event_streams[token][0] 110 connection_fail = HciCaptures.LeConnectionCompleteCapture() 111 assertThat(event_stream).emits(connection_fail, timeout=timedelta(seconds=35)) 112 complete = connection_fail.get() 113 assertThat(complete.status == hci.ErrorCode.CONNECTION_ACCEPT_TIMEOUT).isTrue() 114 115 def cancel_connection(self, token): 116 assertThat(token in self.outgoing_connection_event_streams).isTrue() 117 pair = self.outgoing_connection_event_streams.pop(token) 118 safeClose(pair[0]) 119 self.le_acl_manager.CancelConnection(pair[1]) 120 121 def initiate_connection(self, remote_addr, is_direct=True): 122 assertThat(self.next_token in self.outgoing_connection_event_streams).isFalse() 123 create_connection_msg = le_acl_manager_facade.CreateConnectionMsg(peer_address=remote_addr, is_direct=is_direct) 124 self.outgoing_connection_event_streams[self.next_token] = EventStream( 125 self.le_acl_manager.CreateConnection(create_connection_msg)), remote_addr 126 token = self.next_token 127 self.next_token += 1 128 return token 129 130 def is_on_background_list(self, remote_addr): 131 return self.le_acl_manager.IsOnBackgroundList( 132 le_acl_manager_facade.BackgroundRequestMsg(peer_address=remote_addr)) 133 134 def remove_from_background_list(self, remote_addr): 135 self.le_acl_manager.RemoveFromBackgroundList( 136 le_acl_manager_facade.BackgroundRequestMsg(peer_address=remote_addr)) 137 138 def complete_connection(self, event_stream): 139 connection_complete = HciCaptures.LeConnectionCompleteCapture() 140 assertThat(event_stream).emits(connection_complete) 141 complete = connection_complete.get() 142 handle = complete.connection_handle 143 remote = repr(complete.peer_address) 144 remote_address_type = complete.peer_address_type 145 if complete.subevent_code == hci.SubeventCode.ENHANCED_CONNECTION_COMPLETE: 146 address = complete.local_resolvable_private_address 147 else: 148 address = None 149 connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, remote_address_type, handle, 150 event_stream) 151 self.active_connections.append(connection) 152 return connection 153 154 def complete_incoming_connection(self): 155 assertThat(self.incoming_connection_event_stream).isNotNone() 156 event_stream = self.incoming_connection_event_stream 157 self.incoming_connection_event_stream = None 158 return self.complete_connection(event_stream) 159 160 def complete_outgoing_connection(self, token): 161 assertThat(self.outgoing_connection_event_streams[token]).isNotNone() 162 event_stream = self.outgoing_connection_event_streams.pop(token)[0] 163 return self.complete_connection(event_stream) 164