1import json
2import threading
3
4import time
5from concurrent import futures
6
7from acts import logger
8
9SOCKET_TIMEOUT = 60
10
11# The Session UID when a UID has not been received yet.
12UNKNOWN_UID = -1
13
14
15class Sl4aException(Exception):
16    """The base class for all SL4A exceptions."""
17
18
19class Sl4aStartError(Sl4aException):
20    """Raised when sl4a is not able to be started."""
21
22
23class Sl4aApiError(Sl4aException):
24    """Raised when remote API reports an error."""
25
26
27class Sl4aConnectionError(Sl4aException):
28    """An error raised upon failure to connect to SL4A."""
29
30
31class Sl4aProtocolError(Sl4aException):
32    """Raised when there an error in exchanging data with server on device."""
33    NO_RESPONSE_FROM_HANDSHAKE = 'No response from handshake.'
34    NO_RESPONSE_FROM_SERVER = 'No response from server.'
35    MISMATCHED_API_ID = 'Mismatched API id.'
36
37
38class MissingSl4AError(Sl4aException):
39    """An error raised when an Sl4aClient is created without SL4A installed."""
40
41
42class RpcClient(object):
43    """An RPC client capable of processing multiple RPCs concurrently.
44
45    Attributes:
46        _free_connections: A list of all idle RpcConnections.
47        _working_connections: A list of all working RpcConnections.
48        _lock: A lock used for accessing critical memory.
49        max_connections: The maximum number of RpcConnections at a time.
50            Increasing or decreasing the number of max connections does NOT
51            modify the thread pool size being used for self.future RPC calls.
52        _log: The logger for this RpcClient.
53    """
54    """The default value for the maximum amount of connections for a client."""
55    DEFAULT_MAX_CONNECTION = 15
56
57    class AsyncClient(object):
58        """An object that allows RPC calls to be called asynchronously.
59
60        Attributes:
61            _rpc_client: The RpcClient to use when making calls.
62            _executor: The ThreadPoolExecutor used to keep track of workers
63        """
64
65        def __init__(self, rpc_client):
66            self._rpc_client = rpc_client
67            self._executor = futures.ThreadPoolExecutor(
68                max_workers=max(rpc_client.max_connections - 2, 1))
69
70        def rpc(self, name, *args, **kwargs):
71            future = self._executor.submit(name, *args, **kwargs)
72            return future
73
74        def __getattr__(self, name):
75            """Wrapper for python magic to turn method calls into RPC calls."""
76
77            def rpc_call(*args, **kwargs):
78                future = self._executor.submit(
79                    self._rpc_client.__getattr__(name), *args, **kwargs)
80                return future
81
82            return rpc_call
83
84    def __init__(self,
85                 uid,
86                 serial,
87                 on_error_callback,
88                 _create_connection_func,
89                 max_connections=None):
90        """Creates a new RpcClient object.
91
92        Args:
93            uid: The session uid this client is a part of.
94            serial: The serial of the Android device. Used for logging.
95            on_error_callback: A callback for when a connection error is raised.
96            _create_connection_func: A reference to the function that creates a
97                new session.
98            max_connections: The maximum number of connections the RpcClient
99                can have.
100        """
101        self._serial = serial
102        self.on_error = on_error_callback
103        self._create_connection_func = _create_connection_func
104        self._free_connections = [self._create_connection_func(uid)]
105
106        self.uid = self._free_connections[0].uid
107        self._lock = threading.Lock()
108
109        def _log_formatter(message):
110            """Formats the message to be logged."""
111            return '[RPC Service|%s|%s] %s' % (self._serial, self.uid, message)
112
113        self._log = logger.create_logger(_log_formatter)
114
115        self._working_connections = []
116        if max_connections is None:
117            self.max_connections = RpcClient.DEFAULT_MAX_CONNECTION
118        else:
119            self.max_connections = max_connections
120
121        self._async_client = RpcClient.AsyncClient(self)
122        self.is_alive = True
123
124    def terminate(self):
125        """Terminates all connections to the SL4A server."""
126        if len(self._working_connections) > 0:
127            self._log.warning(
128                '%s connections are still active, and waiting on '
129                'responses.Closing these connections now.' % len(
130                    self._working_connections))
131        connections = self._free_connections + self._working_connections
132        for connection in connections:
133            self._log.debug(
134                'Closing connection over ports %s' % connection.ports)
135            connection.close()
136        self._free_connections = []
137        self._working_connections = []
138        self.is_alive = False
139
140    def _get_free_connection(self):
141        """Returns a free connection to be used for an RPC call.
142
143        This function also adds the client to the working set to prevent
144        multiple users from obtaining the same client.
145        """
146        while True:
147            if len(self._free_connections) > 0:
148                with self._lock:
149                    # Check if another thread grabbed the remaining connection.
150                    # while we were waiting for the lock.
151                    if len(self._free_connections) == 0:
152                        continue
153                    client = self._free_connections.pop()
154                    self._working_connections.append(client)
155                    return client
156
157            client_count = (
158                len(self._free_connections) + len(self._working_connections))
159            if client_count < self.max_connections:
160                with self._lock:
161                    client_count = (len(self._free_connections) +
162                                    len(self._working_connections))
163                    if client_count < self.max_connections:
164                        client = self._create_connection_func(self.uid)
165                        self._working_connections.append(client)
166                        return client
167            time.sleep(.01)
168
169    def _release_working_connection(self, connection):
170        """Marks a working client as free.
171
172        Args:
173            connection: The client to mark as free.
174        Raises:
175            A ValueError if the client is not a known working connection.
176        """
177        # We need to keep this code atomic because the client count is based on
178        # the length of the free and working connection list lengths.
179        with self._lock:
180            self._working_connections.remove(connection)
181            self._free_connections.append(connection)
182
183    def rpc(self, method, *args, timeout=None, retries=1):
184        """Sends an rpc to sl4a.
185
186        Sends an rpc call to sl4a over this RpcClient's corresponding session.
187
188        Args:
189            method: str, The name of the method to execute.
190            args: any, The args to send to sl4a.
191            timeout: The amount of time to wait for a response.
192            retries: Misnomer, is actually the number of tries.
193
194        Returns:
195            The result of the rpc.
196
197        Raises:
198            Sl4aProtocolError: Something went wrong with the sl4a protocol.
199            Sl4aApiError: The rpc went through, however executed with errors.
200        """
201        connection = self._get_free_connection()
202        ticket = connection.get_new_ticket()
203
204        if timeout:
205            connection.set_timeout(timeout)
206        data = {'id': ticket, 'method': method, 'params': args}
207        request = json.dumps(data)
208        response = ''
209        try:
210            for i in range(1, retries + 1):
211                connection.send_request(request)
212                self._log.debug('Sent: %s' % request)
213
214                response = connection.get_response()
215                self._log.debug('Received: %s', response)
216                if not response:
217                    if i < retries:
218                        self._log.warning(
219                            'No response for RPC method %s on iteration %s',
220                            method, i)
221                        continue
222                    else:
223                        self.on_error(connection)
224                        raise Sl4aProtocolError(
225                            Sl4aProtocolError.NO_RESPONSE_FROM_SERVER)
226                else:
227                    break
228        except BrokenPipeError as e:
229            if self.is_alive:
230                self._log.error('Exception %s happened while communicating to '
231                                'SL4A.', e)
232                self.on_error(connection)
233            else:
234                self._log.warning('The connection was killed during cleanup:')
235                self._log.warning(e)
236            raise Sl4aConnectionError(e)
237        finally:
238            if timeout:
239                connection.set_timeout(SOCKET_TIMEOUT)
240            self._release_working_connection(connection)
241        result = json.loads(str(response, encoding='utf8'))
242
243        if result['error']:
244            err_msg = 'RPC call %s to device failed with error %s' % (
245                method, result['error'])
246            self._log.error(err_msg)
247            raise Sl4aApiError(err_msg)
248        if result['id'] != ticket:
249            self._log.error('RPC method %s with mismatched api id %s', method,
250                            result['id'])
251            raise Sl4aProtocolError(Sl4aProtocolError.MISMATCHED_API_ID)
252        return result['result']
253
254    @property
255    def future(self):
256        """Returns a magic function that returns a future running an RPC call.
257
258        This function effectively allows the idiom:
259
260        >>> rpc_client = RpcClient(...)
261        >>> # returns after call finishes
262        >>> rpc_client.someRpcCall()
263        >>> # Immediately returns a reference to the RPC's future, running
264        >>> # the lengthy RPC call on another thread.
265        >>> future = rpc_client.future.someLengthyRpcCall()
266        >>> rpc_client.doOtherThings()
267        >>> ...
268        >>> # Wait for and get the returned value of the lengthy RPC.
269        >>> # Can specify a timeout as well.
270        >>> value = future.result()
271
272        The number of concurrent calls to this method is limited to
273        (max_connections - 2), to prevent future calls from exhausting all free
274        connections.
275        """
276        return self._async_client
277
278    def __getattr__(self, name):
279        """Wrapper for python magic to turn method calls into RPC calls."""
280
281        def rpc_call(*args, **kwargs):
282            return self.rpc(name, *args, **kwargs)
283
284        if not self.is_alive:
285            raise Sl4aStartError(
286                'This SL4A session has already been terminated. You must '
287                'create a new session to continue.')
288        return rpc_call
289