1#!/usr/bin/env python3.4 2# 3# Copyright 2016- The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from concurrent.futures import ThreadPoolExecutor 18import queue 19import re 20import threading 21import time 22 23from acts import logger 24from acts.controllers.sl4a_lib import rpc_client 25 26 27class EventDispatcherError(Exception): 28 """The base class for all EventDispatcher exceptions.""" 29 30 31class IllegalStateError(EventDispatcherError): 32 """Raise when user tries to put event_dispatcher into an illegal state.""" 33 34 35class DuplicateError(EventDispatcherError): 36 """Raise when two event handlers have been assigned to an event name.""" 37 38 39class EventDispatcher: 40 """A class for managing the events for an SL4A Session. 41 42 Attributes: 43 _serial: The serial of the device. 44 _rpc_client: The rpc client for that session. 45 _started: A bool that holds whether or not the event dispatcher is 46 running. 47 _executor: The thread pool executor for running event handlers and 48 polling. 49 _event_dict: A dictionary of str eventName = Queue<Event> eventQueue 50 _handlers: A dictionary of str eventName => (lambda, args) handler 51 _lock: A lock that prevents multiple reads/writes to the event queues. 52 log: The EventDispatcher's logger. 53 """ 54 55 DEFAULT_TIMEOUT = 60 56 57 def __init__(self, serial, rpc_client): 58 self._serial = serial 59 self._rpc_client = rpc_client 60 self._started = False 61 self._executor = None 62 self._event_dict = {} 63 self._handlers = {} 64 self._lock = threading.RLock() 65 66 def _log_formatter(message): 67 """Defines the formatting used in the logger.""" 68 return '[E Dispatcher|%s|%s] %s' % (self._serial, 69 self._rpc_client.uid, message) 70 71 self.log = logger.create_logger(_log_formatter) 72 73 def poll_events(self): 74 """Continuously polls all types of events from sl4a. 75 76 Events are sorted by name and store in separate queues. 77 If there are registered handlers, the handlers will be called with 78 corresponding event immediately upon event discovery, and the event 79 won't be stored. If exceptions occur, stop the dispatcher and return 80 """ 81 while self._started: 82 try: 83 event_obj = self._rpc_client.eventWait(50000) 84 except rpc_client.Sl4aConnectionError as e: 85 if self._rpc_client.is_alive: 86 self.log.warning('Closing due to closed session.') 87 break 88 else: 89 self.log.warning('Closing due to error: %s.' % e) 90 self.close() 91 raise e 92 if not event_obj: 93 continue 94 elif 'name' not in event_obj: 95 self.log.error('Received Malformed event {}'.format(event_obj)) 96 continue 97 else: 98 event_name = event_obj['name'] 99 # if handler registered, process event 100 if event_name == 'EventDispatcherShutdown': 101 self.log.debug('Received shutdown signal.') 102 # closeSl4aSession has been called, which closes the event 103 # dispatcher. Stop execution on this polling thread. 104 return 105 if event_name in self._handlers: 106 self.handle_subscribed_event(event_obj, event_name) 107 else: 108 self._lock.acquire() 109 if event_name in self._event_dict: # otherwise, cache event 110 self._event_dict[event_name].put(event_obj) 111 else: 112 q = queue.Queue() 113 q.put(event_obj) 114 self._event_dict[event_name] = q 115 self._lock.release() 116 117 def register_handler(self, handler, event_name, args): 118 """Registers an event handler. 119 120 One type of event can only have one event handler associated with it. 121 122 Args: 123 handler: The event handler function to be registered. 124 event_name: Name of the event the handler is for. 125 args: User arguments to be passed to the handler when it's called. 126 127 Raises: 128 IllegalStateError: Raised if attempts to register a handler after 129 the dispatcher starts running. 130 DuplicateError: Raised if attempts to register more than one 131 handler for one type of event. 132 """ 133 if self._started: 134 raise IllegalStateError('Cannot register service after polling is ' 135 'started.') 136 self._lock.acquire() 137 try: 138 if event_name in self._handlers: 139 raise DuplicateError( 140 'A handler for {} already exists'.format(event_name)) 141 self._handlers[event_name] = (handler, args) 142 finally: 143 self._lock.release() 144 145 def start(self): 146 """Starts the event dispatcher. 147 148 Initiates executor and start polling events. 149 150 Raises: 151 IllegalStateError: Can't start a dispatcher again when it's already 152 running. 153 """ 154 if not self._started: 155 self._started = True 156 self._executor = ThreadPoolExecutor(max_workers=32) 157 self._executor.submit(self.poll_events) 158 else: 159 raise IllegalStateError("Dispatcher is already started.") 160 161 def close(self): 162 """Clean up and release resources. 163 164 This function should only be called after a 165 rpc_client.closeSl4aSession() call. 166 """ 167 if not self._started: 168 return 169 self._started = False 170 self._executor.shutdown(wait=True) 171 self.clear_all_events() 172 173 def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT): 174 """Pop an event from its queue. 175 176 Return and remove the oldest entry of an event. 177 Block until an event of specified name is available or 178 times out if timeout is set. 179 180 Args: 181 event_name: Name of the event to be popped. 182 timeout: Number of seconds to wait when event is not present. 183 Never times out if None. 184 185 Returns: 186 event: The oldest entry of the specified event. None if timed out. 187 188 Raises: 189 IllegalStateError: Raised if pop is called before the dispatcher 190 starts polling. 191 """ 192 if not self._started: 193 raise IllegalStateError( 194 'Dispatcher needs to be started before popping.') 195 196 e_queue = self.get_event_q(event_name) 197 198 if not e_queue: 199 raise IllegalStateError( 200 'Failed to get an event queue for {}'.format(event_name)) 201 202 try: 203 # Block for timeout 204 if timeout: 205 return e_queue.get(True, timeout) 206 # Non-blocking poll for event 207 elif timeout == 0: 208 return e_queue.get(False) 209 else: 210 # Block forever on event wait 211 return e_queue.get(True) 212 except queue.Empty: 213 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 214 timeout, event_name)) 215 216 def wait_for_event(self, 217 event_name, 218 predicate, 219 timeout=DEFAULT_TIMEOUT, 220 *args, 221 **kwargs): 222 """Wait for an event that satisfies a predicate to appear. 223 224 Continuously pop events of a particular name and check against the 225 predicate until an event that satisfies the predicate is popped or 226 timed out. Note this will remove all the events of the same name that 227 do not satisfy the predicate in the process. 228 229 Args: 230 event_name: Name of the event to be popped. 231 predicate: A function that takes an event and returns True if the 232 predicate is satisfied, False otherwise. 233 timeout: Number of seconds to wait. 234 *args: Optional positional args passed to predicate(). 235 **kwargs: Optional keyword args passed to predicate(). 236 consume_ignored_events: Whether or not to consume events while 237 searching for the desired event. Defaults to True if unset. 238 239 Returns: 240 The event that satisfies the predicate. 241 242 Raises: 243 queue.Empty: Raised if no event that satisfies the predicate was 244 found before time out. 245 """ 246 deadline = time.time() + timeout 247 ignored_events = [] 248 consume_events = kwargs.pop('consume_ignored_events', True) 249 while True: 250 event = None 251 try: 252 event = self.pop_event(event_name, 1) 253 if not consume_events: 254 ignored_events.append(event) 255 except queue.Empty: 256 pass 257 258 if event and predicate(event, *args, **kwargs): 259 for ignored_event in ignored_events: 260 self.get_event_q(event_name).put(ignored_event) 261 return event 262 263 if time.time() > deadline: 264 for ignored_event in ignored_events: 265 self.get_event_q(event_name).put(ignored_event) 266 raise queue.Empty( 267 'Timeout after {}s waiting for event: {}'.format( 268 timeout, event_name)) 269 270 def pop_events(self, regex_pattern, timeout, freq=1): 271 """Pop events whose names match a regex pattern. 272 273 If such event(s) exist, pop one event from each event queue that 274 satisfies the condition. Otherwise, wait for an event that satisfies 275 the condition to occur, with timeout. 276 277 Results are sorted by timestamp in ascending order. 278 279 Args: 280 regex_pattern: The regular expression pattern that an event name 281 should match in order to be popped. 282 timeout: Number of seconds to wait for events in case no event 283 matching the condition exits when the function is called. 284 285 Returns: 286 results: Pop events whose names match a regex pattern. 287 Empty if none exist and the wait timed out. 288 289 Raises: 290 IllegalStateError: Raised if pop is called before the dispatcher 291 starts polling. 292 queue.Empty: Raised if no event was found before time out. 293 """ 294 if not self._started: 295 raise IllegalStateError( 296 "Dispatcher needs to be started before popping.") 297 deadline = time.time() + timeout 298 while True: 299 # TODO: fix the sleep loop 300 results = self._match_and_pop(regex_pattern) 301 if len(results) != 0 or time.time() > deadline: 302 break 303 time.sleep(freq) 304 if len(results) == 0: 305 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 306 timeout, regex_pattern)) 307 308 return sorted(results, key=lambda event: event['time']) 309 310 def _match_and_pop(self, regex_pattern): 311 """Pop one event from each of the event queues whose names 312 match (in a sense of regular expression) regex_pattern. 313 """ 314 results = [] 315 self._lock.acquire() 316 for name in self._event_dict.keys(): 317 if re.match(regex_pattern, name): 318 q = self._event_dict[name] 319 if q: 320 try: 321 results.append(q.get(False)) 322 except queue.Empty: 323 pass 324 self._lock.release() 325 return results 326 327 def get_event_q(self, event_name): 328 """Obtain the queue storing events of the specified name. 329 330 If no event of this name has been polled, wait for one to. 331 332 Returns: A queue storing all the events of the specified name. 333 """ 334 self._lock.acquire() 335 if (event_name not in self._event_dict 336 or self._event_dict[event_name] is None): 337 self._event_dict[event_name] = queue.Queue() 338 self._lock.release() 339 340 event_queue = self._event_dict[event_name] 341 return event_queue 342 343 def handle_subscribed_event(self, event_obj, event_name): 344 """Execute the registered handler of an event. 345 346 Retrieve the handler and its arguments, and execute the handler in a 347 new thread. 348 349 Args: 350 event_obj: Json object of the event. 351 event_name: Name of the event to call handler for. 352 """ 353 handler, args = self._handlers[event_name] 354 self._executor.submit(handler, event_obj, *args) 355 356 def _handle(self, event_handler, event_name, user_args, event_timeout, 357 cond, cond_timeout): 358 """Pop an event of specified type and calls its handler on it. If 359 condition is not None, block until condition is met or timeout. 360 """ 361 if cond: 362 cond.wait(cond_timeout) 363 event = self.pop_event(event_name, event_timeout) 364 return event_handler(event, *user_args) 365 366 def handle_event(self, 367 event_handler, 368 event_name, 369 user_args, 370 event_timeout=None, 371 cond=None, 372 cond_timeout=None): 373 """Handle events that don't have registered handlers 374 375 In a new thread, poll one event of specified type from its queue and 376 execute its handler. If no such event exists, the thread waits until 377 one appears. 378 379 Args: 380 event_handler: Handler for the event, which should take at least 381 one argument - the event json object. 382 event_name: Name of the event to be handled. 383 user_args: User arguments for the handler; to be passed in after 384 the event json. 385 event_timeout: Number of seconds to wait for the event to come. 386 cond: A condition to wait on before executing the handler. Should 387 be a threading.Event object. 388 cond_timeout: Number of seconds to wait before the condition times 389 out. Never times out if None. 390 391 Returns: 392 worker: A concurrent.Future object associated with the handler. 393 If blocking call worker.result() is triggered, the handler 394 needs to return something to unblock. 395 """ 396 worker = self._executor.submit(self._handle, event_handler, event_name, 397 user_args, event_timeout, cond, 398 cond_timeout) 399 return worker 400 401 def pop_all(self, event_name): 402 """Return and remove all stored events of a specified name. 403 404 Pops all events from their queue. May miss the latest ones. 405 If no event is available, return immediately. 406 407 Args: 408 event_name: Name of the events to be popped. 409 410 Returns: 411 results: List of the desired events. 412 413 Raises: 414 IllegalStateError: Raised if pop is called before the dispatcher 415 starts polling. 416 """ 417 if not self._started: 418 raise IllegalStateError(("Dispatcher needs to be started before " 419 "popping.")) 420 results = [] 421 try: 422 self._lock.acquire() 423 while True: 424 e = self._event_dict[event_name].get(block=False) 425 results.append(e) 426 except (queue.Empty, KeyError): 427 return results 428 finally: 429 self._lock.release() 430 431 def clear_events(self, event_name): 432 """Clear all events of a particular name. 433 434 Args: 435 event_name: Name of the events to be popped. 436 """ 437 self._lock.acquire() 438 try: 439 q = self.get_event_q(event_name) 440 q.queue.clear() 441 except queue.Empty: 442 return 443 finally: 444 self._lock.release() 445 446 def clear_all_events(self): 447 """Clear all event queues and their cached events.""" 448 self._lock.acquire() 449 self._event_dict.clear() 450 self._lock.release() 451