1import json 2import threading 3 4import time 5from concurrent import futures 6 7from acts import logger 8 9SOCKET_TIMEOUT = 60 10 11# The Session UID when a UID has not been received yet. 12UNKNOWN_UID = -1 13 14 15class Sl4aException(Exception): 16 """The base class for all SL4A exceptions.""" 17 18 19class Sl4aStartError(Sl4aException): 20 """Raised when sl4a is not able to be started.""" 21 22 23class Sl4aApiError(Sl4aException): 24 """Raised when remote API reports an error.""" 25 26 27class Sl4aConnectionError(Sl4aException): 28 """An error raised upon failure to connect to SL4A.""" 29 30 31class Sl4aProtocolError(Sl4aException): 32 """Raised when there an error in exchanging data with server on device.""" 33 NO_RESPONSE_FROM_HANDSHAKE = 'No response from handshake.' 34 NO_RESPONSE_FROM_SERVER = 'No response from server.' 35 MISMATCHED_API_ID = 'Mismatched API id.' 36 37 38class MissingSl4AError(Sl4aException): 39 """An error raised when an Sl4aClient is created without SL4A installed.""" 40 41 42class RpcClient(object): 43 """An RPC client capable of processing multiple RPCs concurrently. 44 45 Attributes: 46 _free_connections: A list of all idle RpcConnections. 47 _working_connections: A list of all working RpcConnections. 48 _lock: A lock used for accessing critical memory. 49 max_connections: The maximum number of RpcConnections at a time. 50 Increasing or decreasing the number of max connections does NOT 51 modify the thread pool size being used for self.future RPC calls. 52 _log: The logger for this RpcClient. 53 """ 54 """The default value for the maximum amount of connections for a client.""" 55 DEFAULT_MAX_CONNECTION = 15 56 57 class AsyncClient(object): 58 """An object that allows RPC calls to be called asynchronously. 59 60 Attributes: 61 _rpc_client: The RpcClient to use when making calls. 62 _executor: The ThreadPoolExecutor used to keep track of workers 63 """ 64 65 def __init__(self, rpc_client): 66 self._rpc_client = rpc_client 67 self._executor = futures.ThreadPoolExecutor( 68 max_workers=max(rpc_client.max_connections - 2, 1)) 69 70 def rpc(self, name, *args, **kwargs): 71 future = self._executor.submit(name, *args, **kwargs) 72 return future 73 74 def __getattr__(self, name): 75 """Wrapper for python magic to turn method calls into RPC calls.""" 76 77 def rpc_call(*args, **kwargs): 78 future = self._executor.submit( 79 self._rpc_client.__getattr__(name), *args, **kwargs) 80 return future 81 82 return rpc_call 83 84 def __init__(self, 85 uid, 86 serial, 87 on_error_callback, 88 _create_connection_func, 89 max_connections=None): 90 """Creates a new RpcClient object. 91 92 Args: 93 uid: The session uid this client is a part of. 94 serial: The serial of the Android device. Used for logging. 95 on_error_callback: A callback for when a connection error is raised. 96 _create_connection_func: A reference to the function that creates a 97 new session. 98 max_connections: The maximum number of connections the RpcClient 99 can have. 100 """ 101 self._serial = serial 102 self.on_error = on_error_callback 103 self._create_connection_func = _create_connection_func 104 self._free_connections = [self._create_connection_func(uid)] 105 106 self.uid = self._free_connections[0].uid 107 self._lock = threading.Lock() 108 109 def _log_formatter(message): 110 """Formats the message to be logged.""" 111 return '[RPC Service|%s|%s] %s' % (self._serial, self.uid, message) 112 113 self._log = logger.create_logger(_log_formatter) 114 115 self._working_connections = [] 116 if max_connections is None: 117 self.max_connections = RpcClient.DEFAULT_MAX_CONNECTION 118 else: 119 self.max_connections = max_connections 120 121 self._async_client = RpcClient.AsyncClient(self) 122 self.is_alive = True 123 124 def terminate(self): 125 """Terminates all connections to the SL4A server.""" 126 if len(self._working_connections) > 0: 127 self._log.warning( 128 '%s connections are still active, and waiting on ' 129 'responses.Closing these connections now.' % len( 130 self._working_connections)) 131 connections = self._free_connections + self._working_connections 132 for connection in connections: 133 self._log.debug( 134 'Closing connection over ports %s' % connection.ports) 135 connection.close() 136 self._free_connections = [] 137 self._working_connections = [] 138 self.is_alive = False 139 140 def _get_free_connection(self): 141 """Returns a free connection to be used for an RPC call. 142 143 This function also adds the client to the working set to prevent 144 multiple users from obtaining the same client. 145 """ 146 while True: 147 if len(self._free_connections) > 0: 148 with self._lock: 149 # Check if another thread grabbed the remaining connection. 150 # while we were waiting for the lock. 151 if len(self._free_connections) == 0: 152 continue 153 client = self._free_connections.pop() 154 self._working_connections.append(client) 155 return client 156 157 client_count = ( 158 len(self._free_connections) + len(self._working_connections)) 159 if client_count < self.max_connections: 160 with self._lock: 161 client_count = (len(self._free_connections) + 162 len(self._working_connections)) 163 if client_count < self.max_connections: 164 client = self._create_connection_func(self.uid) 165 self._working_connections.append(client) 166 return client 167 time.sleep(.01) 168 169 def _release_working_connection(self, connection): 170 """Marks a working client as free. 171 172 Args: 173 connection: The client to mark as free. 174 Raises: 175 A ValueError if the client is not a known working connection. 176 """ 177 # We need to keep this code atomic because the client count is based on 178 # the length of the free and working connection list lengths. 179 with self._lock: 180 self._working_connections.remove(connection) 181 self._free_connections.append(connection) 182 183 def rpc(self, method, *args, timeout=None, retries=1): 184 """Sends an rpc to sl4a. 185 186 Sends an rpc call to sl4a over this RpcClient's corresponding session. 187 188 Args: 189 method: str, The name of the method to execute. 190 args: any, The args to send to sl4a. 191 timeout: The amount of time to wait for a response. 192 retries: Misnomer, is actually the number of tries. 193 194 Returns: 195 The result of the rpc. 196 197 Raises: 198 Sl4aProtocolError: Something went wrong with the sl4a protocol. 199 Sl4aApiError: The rpc went through, however executed with errors. 200 """ 201 connection = self._get_free_connection() 202 ticket = connection.get_new_ticket() 203 204 if timeout: 205 connection.set_timeout(timeout) 206 data = {'id': ticket, 'method': method, 'params': args} 207 request = json.dumps(data) 208 response = '' 209 try: 210 for i in range(1, retries + 1): 211 connection.send_request(request) 212 self._log.debug('Sent: %s' % request) 213 214 response = connection.get_response() 215 self._log.debug('Received: %s', response) 216 if not response: 217 if i < retries: 218 self._log.warning( 219 'No response for RPC method %s on iteration %s', 220 method, i) 221 continue 222 else: 223 self.on_error(connection) 224 raise Sl4aProtocolError( 225 Sl4aProtocolError.NO_RESPONSE_FROM_SERVER) 226 else: 227 break 228 except BrokenPipeError as e: 229 if self.is_alive: 230 self._log.error('Exception %s happened while communicating to ' 231 'SL4A.', e) 232 self.on_error(connection) 233 else: 234 self._log.warning('The connection was killed during cleanup:') 235 self._log.warning(e) 236 raise Sl4aConnectionError(e) 237 finally: 238 if timeout: 239 connection.set_timeout(SOCKET_TIMEOUT) 240 self._release_working_connection(connection) 241 result = json.loads(str(response, encoding='utf8')) 242 243 if result['error']: 244 err_msg = 'RPC call %s to device failed with error %s' % ( 245 method, result['error']) 246 self._log.error(err_msg) 247 raise Sl4aApiError(err_msg) 248 if result['id'] != ticket: 249 self._log.error('RPC method %s with mismatched api id %s', method, 250 result['id']) 251 raise Sl4aProtocolError(Sl4aProtocolError.MISMATCHED_API_ID) 252 return result['result'] 253 254 @property 255 def future(self): 256 """Returns a magic function that returns a future running an RPC call. 257 258 This function effectively allows the idiom: 259 260 >>> rpc_client = RpcClient(...) 261 >>> # returns after call finishes 262 >>> rpc_client.someRpcCall() 263 >>> # Immediately returns a reference to the RPC's future, running 264 >>> # the lengthy RPC call on another thread. 265 >>> future = rpc_client.future.someLengthyRpcCall() 266 >>> rpc_client.doOtherThings() 267 >>> ... 268 >>> # Wait for and get the returned value of the lengthy RPC. 269 >>> # Can specify a timeout as well. 270 >>> value = future.result() 271 272 The number of concurrent calls to this method is limited to 273 (max_connections - 2), to prevent future calls from exhausting all free 274 connections. 275 """ 276 return self._async_client 277 278 def __getattr__(self, name): 279 """Wrapper for python magic to turn method calls into RPC calls.""" 280 281 def rpc_call(*args, **kwargs): 282 return self.rpc(name, *args, **kwargs) 283 284 if not self.is_alive: 285 raise Sl4aStartError( 286 'This SL4A session has already been terminated. You must ' 287 'create a new session to continue.') 288 return rpc_call 289