1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from google.protobuf import empty_pb2 as empty_proto
18from cert.event_stream import EventStream
19from cert.event_stream import IEventStream
20from cert.captures import HciCaptures
21from cert.closable import Closable
22from cert.closable import safeClose
23from bluetooth_packets_python3 import hci_packets
24from cert.truth import assertThat
25from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
26
27
28class PyAclManagerAclConnection(IEventStream, Closable):
29
30    def __init__(self, acl_manager, remote_addr, handle, event_stream):
31        self.acl_manager = acl_manager
32        self.handle = handle
33        self.remote_addr = remote_addr
34        self.connection_event_stream = event_stream
35        self.acl_stream = EventStream(self.acl_manager.FetchAclData(acl_manager_facade.HandleMsg(handle=self.handle)))
36
37    def disconnect(self, reason):
38        packet_bytes = bytes(hci_packets.DisconnectBuilder(self.handle, reason).Serialize())
39        self.acl_manager.ConnectionCommand(acl_manager_facade.ConnectionCommandMsg(packet=packet_bytes))
40
41    def close(self):
42        safeClose(self.connection_event_stream)
43        safeClose(self.acl_stream)
44
45    def wait_for_disconnection_complete(self):
46        disconnection_complete = HciCaptures.DisconnectionCompleteCapture()
47        assertThat(self.connection_event_stream).emits(disconnection_complete)
48        self.disconnect_reason = disconnection_complete.get().GetReason()
49
50    def send(self, data):
51        self.acl_manager.SendAclData(acl_manager_facade.AclData(handle=self.handle, payload=bytes(data)))
52
53    def get_event_queue(self):
54        return self.acl_stream.get_event_queue()
55
56
57class PyAclManager:
58
59    def __init__(self, device):
60        self.acl_manager = device.hci_acl_manager
61        self.incoming_connection_event_stream = None
62        self.outgoing_connection_event_stream = None
63
64    def close(self):
65        safeClose(self.incoming_connection_event_stream)
66        safeClose(self.outgoing_connection_event_stream)
67
68    def listen_for_an_incoming_connection(self):
69        assertThat(self.incoming_connection_event_stream).isNone()
70        self.incoming_connection_event_stream = EventStream(
71            self.acl_manager.FetchIncomingConnection(empty_proto.Empty()))
72
73    def initiate_connection(self, remote_addr):
74        assertThat(self.outgoing_connection_event_stream).isNone()
75        remote_addr_bytes = bytes(remote_addr, 'utf8') if type(remote_addr) is str else bytes(remote_addr)
76        self.outgoing_connection_event_stream = EventStream(
77            self.acl_manager.CreateConnection(acl_manager_facade.ConnectionMsg(address=remote_addr_bytes)))
78
79    def complete_connection(self, event_stream):
80        connection_complete = HciCaptures.ConnectionCompleteCapture()
81        assertThat(event_stream).emits(connection_complete)
82        complete = connection_complete.get()
83        handle = complete.GetConnectionHandle()
84        address = complete.GetBdAddr()
85        return PyAclManagerAclConnection(self.acl_manager, address, handle, event_stream)
86
87    def complete_incoming_connection(self):
88        assertThat(self.incoming_connection_event_stream).isNotNone()
89        event_stream = self.incoming_connection_event_stream
90        self.incoming_connection_event_stream = None
91        return self.complete_connection(event_stream)
92
93    def complete_outgoing_connection(self):
94        assertThat(self.outgoing_connection_event_stream).isNotNone()
95        event_stream = self.outgoing_connection_event_stream
96        self.outgoing_connection_event_stream = None
97        return self.complete_connection(event_stream)
98