1#!/usr/bin/env python3.4
2#
3#   Copyright 2016- The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from concurrent.futures import ThreadPoolExecutor
18import queue
19import re
20import threading
21import time
22
23from acts import logger
24from acts.controllers.sl4a_lib import rpc_client
25
26
27class EventDispatcherError(Exception):
28    """The base class for all EventDispatcher exceptions."""
29
30
31class IllegalStateError(EventDispatcherError):
32    """Raise when user tries to put event_dispatcher into an illegal state."""
33
34
35class DuplicateError(EventDispatcherError):
36    """Raise when two event handlers have been assigned to an event name."""
37
38
39class EventDispatcher:
40    """A class for managing the events for an SL4A Session.
41
42    Attributes:
43        _serial: The serial of the device.
44        _rpc_client: The rpc client for that session.
45        _started: A bool that holds whether or not the event dispatcher is
46                  running.
47        _executor: The thread pool executor for running event handlers and
48                   polling.
49        _event_dict: A dictionary of str eventName = Queue<Event> eventQueue
50        _handlers: A dictionary of str eventName => (lambda, args) handler
51        _lock: A lock that prevents multiple reads/writes to the event queues.
52        log: The EventDispatcher's logger.
53    """
54
55    DEFAULT_TIMEOUT = 60
56
57    def __init__(self, serial, rpc_client):
58        self._serial = serial
59        self._rpc_client = rpc_client
60        self._started = False
61        self._executor = None
62        self._event_dict = {}
63        self._handlers = {}
64        self._lock = threading.RLock()
65
66        def _log_formatter(message):
67            """Defines the formatting used in the logger."""
68            return '[E Dispatcher|%s|%s] %s' % (self._serial,
69                                                self._rpc_client.uid, message)
70
71        self.log = logger.create_logger(_log_formatter)
72
73    def poll_events(self):
74        """Continuously polls all types of events from sl4a.
75
76        Events are sorted by name and store in separate queues.
77        If there are registered handlers, the handlers will be called with
78        corresponding event immediately upon event discovery, and the event
79        won't be stored. If exceptions occur, stop the dispatcher and return
80        """
81        while self._started:
82            try:
83                event_obj = self._rpc_client.eventWait(50000)
84            except rpc_client.Sl4aConnectionError as e:
85                if self._rpc_client.is_alive:
86                    self.log.warning('Closing due to closed session.')
87                    break
88                else:
89                    self.log.warning('Closing due to error: %s.' % e)
90                    self.close()
91                    raise e
92            if not event_obj:
93                continue
94            elif 'name' not in event_obj:
95                self.log.error('Received Malformed event {}'.format(event_obj))
96                continue
97            else:
98                event_name = event_obj['name']
99            # if handler registered, process event
100            if event_name == 'EventDispatcherShutdown':
101                self.log.debug('Received shutdown signal.')
102                # closeSl4aSession has been called, which closes the event
103                # dispatcher. Stop execution on this polling thread.
104                return
105            if event_name in self._handlers:
106                self.handle_subscribed_event(event_obj, event_name)
107            else:
108                self._lock.acquire()
109                if event_name in self._event_dict:  # otherwise, cache event
110                    self._event_dict[event_name].put(event_obj)
111                else:
112                    q = queue.Queue()
113                    q.put(event_obj)
114                    self._event_dict[event_name] = q
115                self._lock.release()
116
117    def register_handler(self, handler, event_name, args):
118        """Registers an event handler.
119
120        One type of event can only have one event handler associated with it.
121
122        Args:
123            handler: The event handler function to be registered.
124            event_name: Name of the event the handler is for.
125            args: User arguments to be passed to the handler when it's called.
126
127        Raises:
128            IllegalStateError: Raised if attempts to register a handler after
129                the dispatcher starts running.
130            DuplicateError: Raised if attempts to register more than one
131                handler for one type of event.
132        """
133        if self._started:
134            raise IllegalStateError('Cannot register service after polling is '
135                                    'started.')
136        self._lock.acquire()
137        try:
138            if event_name in self._handlers:
139                raise DuplicateError(
140                    'A handler for {} already exists'.format(event_name))
141            self._handlers[event_name] = (handler, args)
142        finally:
143            self._lock.release()
144
145    def start(self):
146        """Starts the event dispatcher.
147
148        Initiates executor and start polling events.
149
150        Raises:
151            IllegalStateError: Can't start a dispatcher again when it's already
152                running.
153        """
154        if not self._started:
155            self._started = True
156            self._executor = ThreadPoolExecutor(max_workers=32)
157            self._executor.submit(self.poll_events)
158        else:
159            raise IllegalStateError("Dispatcher is already started.")
160
161    def close(self):
162        """Clean up and release resources.
163
164        This function should only be called after a
165        rpc_client.closeSl4aSession() call.
166        """
167        if not self._started:
168            return
169        self._started = False
170        self._executor.shutdown(wait=True)
171        self.clear_all_events()
172
173    def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
174        """Pop an event from its queue.
175
176        Return and remove the oldest entry of an event.
177        Block until an event of specified name is available or
178        times out if timeout is set.
179
180        Args:
181            event_name: Name of the event to be popped.
182            timeout: Number of seconds to wait when event is not present.
183                Never times out if None.
184
185        Returns:
186            event: The oldest entry of the specified event. None if timed out.
187
188        Raises:
189            IllegalStateError: Raised if pop is called before the dispatcher
190                starts polling.
191        """
192        if not self._started:
193            raise IllegalStateError(
194                'Dispatcher needs to be started before popping.')
195
196        e_queue = self.get_event_q(event_name)
197
198        if not e_queue:
199            raise IllegalStateError(
200                'Failed to get an event queue for {}'.format(event_name))
201
202        try:
203            # Block for timeout
204            if timeout:
205                return e_queue.get(True, timeout)
206            # Non-blocking poll for event
207            elif timeout == 0:
208                return e_queue.get(False)
209            else:
210                # Block forever on event wait
211                return e_queue.get(True)
212        except queue.Empty:
213            raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
214                timeout, event_name))
215
216    def wait_for_event(self,
217                       event_name,
218                       predicate,
219                       timeout=DEFAULT_TIMEOUT,
220                       *args,
221                       **kwargs):
222        """Wait for an event that satisfies a predicate to appear.
223
224        Continuously pop events of a particular name and check against the
225        predicate until an event that satisfies the predicate is popped or
226        timed out. Note this will remove all the events of the same name that
227        do not satisfy the predicate in the process.
228
229        Args:
230            event_name: Name of the event to be popped.
231            predicate: A function that takes an event and returns True if the
232                predicate is satisfied, False otherwise.
233            timeout: Number of seconds to wait.
234            *args: Optional positional args passed to predicate().
235            **kwargs: Optional keyword args passed to predicate().
236                consume_ignored_events: Whether or not to consume events while
237                    searching for the desired event. Defaults to True if unset.
238
239        Returns:
240            The event that satisfies the predicate.
241
242        Raises:
243            queue.Empty: Raised if no event that satisfies the predicate was
244                found before time out.
245        """
246        deadline = time.time() + timeout
247        ignored_events = []
248        consume_events = kwargs.pop('consume_ignored_events', True)
249        while True:
250            event = None
251            try:
252                event = self.pop_event(event_name, 1)
253                if not consume_events:
254                    ignored_events.append(event)
255            except queue.Empty:
256                pass
257
258            if event and predicate(event, *args, **kwargs):
259                for ignored_event in ignored_events:
260                    self.get_event_q(event_name).put(ignored_event)
261                return event
262
263            if time.time() > deadline:
264                for ignored_event in ignored_events:
265                    self.get_event_q(event_name).put(ignored_event)
266                raise queue.Empty(
267                    'Timeout after {}s waiting for event: {}'.format(
268                        timeout, event_name))
269
270    def pop_events(self, regex_pattern, timeout, freq=1):
271        """Pop events whose names match a regex pattern.
272
273        If such event(s) exist, pop one event from each event queue that
274        satisfies the condition. Otherwise, wait for an event that satisfies
275        the condition to occur, with timeout.
276
277        Results are sorted by timestamp in ascending order.
278
279        Args:
280            regex_pattern: The regular expression pattern that an event name
281                should match in order to be popped.
282            timeout: Number of seconds to wait for events in case no event
283                matching the condition exits when the function is called.
284
285        Returns:
286            results: Pop events whose names match a regex pattern.
287                Empty if none exist and the wait timed out.
288
289        Raises:
290            IllegalStateError: Raised if pop is called before the dispatcher
291                starts polling.
292            queue.Empty: Raised if no event was found before time out.
293        """
294        if not self._started:
295            raise IllegalStateError(
296                "Dispatcher needs to be started before popping.")
297        deadline = time.time() + timeout
298        while True:
299            # TODO: fix the sleep loop
300            results = self._match_and_pop(regex_pattern)
301            if len(results) != 0 or time.time() > deadline:
302                break
303            time.sleep(freq)
304        if len(results) == 0:
305            raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
306                timeout, regex_pattern))
307
308        return sorted(results, key=lambda event: event['time'])
309
310    def _match_and_pop(self, regex_pattern):
311        """Pop one event from each of the event queues whose names
312        match (in a sense of regular expression) regex_pattern.
313        """
314        results = []
315        self._lock.acquire()
316        for name in self._event_dict.keys():
317            if re.match(regex_pattern, name):
318                q = self._event_dict[name]
319                if q:
320                    try:
321                        results.append(q.get(False))
322                    except queue.Empty:
323                        pass
324        self._lock.release()
325        return results
326
327    def get_event_q(self, event_name):
328        """Obtain the queue storing events of the specified name.
329
330        If no event of this name has been polled, wait for one to.
331
332        Returns: A queue storing all the events of the specified name.
333        """
334        self._lock.acquire()
335        if (event_name not in self._event_dict
336                or self._event_dict[event_name] is None):
337            self._event_dict[event_name] = queue.Queue()
338        self._lock.release()
339
340        event_queue = self._event_dict[event_name]
341        return event_queue
342
343    def handle_subscribed_event(self, event_obj, event_name):
344        """Execute the registered handler of an event.
345
346        Retrieve the handler and its arguments, and execute the handler in a
347            new thread.
348
349        Args:
350            event_obj: Json object of the event.
351            event_name: Name of the event to call handler for.
352        """
353        handler, args = self._handlers[event_name]
354        self._executor.submit(handler, event_obj, *args)
355
356    def _handle(self, event_handler, event_name, user_args, event_timeout,
357                cond, cond_timeout):
358        """Pop an event of specified type and calls its handler on it. If
359        condition is not None, block until condition is met or timeout.
360        """
361        if cond:
362            cond.wait(cond_timeout)
363        event = self.pop_event(event_name, event_timeout)
364        return event_handler(event, *user_args)
365
366    def handle_event(self,
367                     event_handler,
368                     event_name,
369                     user_args,
370                     event_timeout=None,
371                     cond=None,
372                     cond_timeout=None):
373        """Handle events that don't have registered handlers
374
375        In a new thread, poll one event of specified type from its queue and
376        execute its handler. If no such event exists, the thread waits until
377        one appears.
378
379        Args:
380            event_handler: Handler for the event, which should take at least
381                one argument - the event json object.
382            event_name: Name of the event to be handled.
383            user_args: User arguments for the handler; to be passed in after
384                the event json.
385            event_timeout: Number of seconds to wait for the event to come.
386            cond: A condition to wait on before executing the handler. Should
387                be a threading.Event object.
388            cond_timeout: Number of seconds to wait before the condition times
389                out. Never times out if None.
390
391        Returns:
392            worker: A concurrent.Future object associated with the handler.
393                If blocking call worker.result() is triggered, the handler
394                needs to return something to unblock.
395        """
396        worker = self._executor.submit(self._handle, event_handler, event_name,
397                                       user_args, event_timeout, cond,
398                                       cond_timeout)
399        return worker
400
401    def pop_all(self, event_name):
402        """Return and remove all stored events of a specified name.
403
404        Pops all events from their queue. May miss the latest ones.
405        If no event is available, return immediately.
406
407        Args:
408            event_name: Name of the events to be popped.
409
410        Returns:
411           results: List of the desired events.
412
413        Raises:
414            IllegalStateError: Raised if pop is called before the dispatcher
415                starts polling.
416        """
417        if not self._started:
418            raise IllegalStateError(("Dispatcher needs to be started before "
419                                     "popping."))
420        results = []
421        try:
422            self._lock.acquire()
423            while True:
424                e = self._event_dict[event_name].get(block=False)
425                results.append(e)
426        except (queue.Empty, KeyError):
427            return results
428        finally:
429            self._lock.release()
430
431    def clear_events(self, event_name):
432        """Clear all events of a particular name.
433
434        Args:
435            event_name: Name of the events to be popped.
436        """
437        self._lock.acquire()
438        try:
439            q = self.get_event_q(event_name)
440            q.queue.clear()
441        except queue.Empty:
442            return
443        finally:
444            self._lock.release()
445
446    def clear_all_events(self):
447        """Clear all event queues and their cached events."""
448        self._lock.acquire()
449        self._event_dict.clear()
450        self._lock.release()
451