1#!/usr/bin/env python3
2#
3#   Copyright 2016- The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from concurrent.futures import ThreadPoolExecutor
18import queue
19import re
20import threading
21import time
22
23from acts import logger
24from acts.controllers.sl4a_lib import rpc_client
25
26
27class EventDispatcherError(Exception):
28    """The base class for all EventDispatcher exceptions."""
29
30
31class IllegalStateError(EventDispatcherError):
32    """Raise when user tries to put event_dispatcher into an illegal state."""
33
34
35class DuplicateError(EventDispatcherError):
36    """Raise when two event handlers have been assigned to an event name."""
37
38
39class EventDispatcher:
40    """A class for managing the events for an SL4A Session.
41
42    Attributes:
43        _serial: The serial of the device.
44        _rpc_client: The rpc client for that session.
45        _started: A bool that holds whether or not the event dispatcher is
46                  running.
47        _executor: The thread pool executor for running event handlers and
48                   polling.
49        _event_dict: A dictionary of str eventName = Queue<Event> eventQueue
50        _handlers: A dictionary of str eventName => (lambda, args) handler
51        _lock: A lock that prevents multiple reads/writes to the event queues.
52        log: The EventDispatcher's logger.
53    """
54
55    DEFAULT_TIMEOUT = 60
56
57    def __init__(self, serial, rpc_client):
58        self._serial = serial
59        self._rpc_client = rpc_client
60        self._started = False
61        self._executor = None
62        self._event_dict = {}
63        self._handlers = {}
64        self._lock = threading.RLock()
65
66        def _log_formatter(message):
67            """Defines the formatting used in the logger."""
68            return '[E Dispatcher|%s|%s] %s' % (self._serial,
69                                                self._rpc_client.uid, message)
70
71        self.log = logger.create_logger(_log_formatter)
72
73    def poll_events(self):
74        """Continuously polls all types of events from sl4a.
75
76        Events are sorted by name and store in separate queues.
77        If there are registered handlers, the handlers will be called with
78        corresponding event immediately upon event discovery, and the event
79        won't be stored. If exceptions occur, stop the dispatcher and return
80        """
81        while self._started:
82            try:
83                # 60000 in ms, timeout in second
84                event_obj = self._rpc_client.eventWait(60000, timeout=120)
85            except rpc_client.Sl4aConnectionError as e:
86                if self._rpc_client.is_alive:
87                    self.log.warning('Closing due to closed session.')
88                    break
89                else:
90                    self.log.warning('Closing due to error: %s.' % e)
91                    self.close()
92                    raise e
93            if not event_obj:
94                continue
95            elif 'name' not in event_obj:
96                self.log.error('Received Malformed event {}'.format(event_obj))
97                continue
98            else:
99                event_name = event_obj['name']
100            # if handler registered, process event
101            if event_name == 'EventDispatcherShutdown':
102                self.log.debug('Received shutdown signal.')
103                # closeSl4aSession has been called, which closes the event
104                # dispatcher. Stop execution on this polling thread.
105                return
106            if event_name in self._handlers:
107                self.log.debug(
108                    'Using handler %s for event: %r' %
109                    (self._handlers[event_name].__name__, event_obj))
110                self.handle_subscribed_event(event_obj, event_name)
111            else:
112                self.log.debug('Queuing event: %r' % event_obj)
113                self._lock.acquire()
114                if event_name in self._event_dict:  # otherwise, cache event
115                    self._event_dict[event_name].put(event_obj)
116                else:
117                    q = queue.Queue()
118                    q.put(event_obj)
119                    self._event_dict[event_name] = q
120                self._lock.release()
121
122    def register_handler(self, handler, event_name, args):
123        """Registers an event handler.
124
125        One type of event can only have one event handler associated with it.
126
127        Args:
128            handler: The event handler function to be registered.
129            event_name: Name of the event the handler is for.
130            args: User arguments to be passed to the handler when it's called.
131
132        Raises:
133            IllegalStateError: Raised if attempts to register a handler after
134                the dispatcher starts running.
135            DuplicateError: Raised if attempts to register more than one
136                handler for one type of event.
137        """
138        if self._started:
139            raise IllegalStateError('Cannot register service after polling is '
140                                    'started.')
141        self._lock.acquire()
142        try:
143            if event_name in self._handlers:
144                raise DuplicateError(
145                    'A handler for {} already exists'.format(event_name))
146            self._handlers[event_name] = (handler, args)
147        finally:
148            self._lock.release()
149
150    def start(self):
151        """Starts the event dispatcher.
152
153        Initiates executor and start polling events.
154
155        Raises:
156            IllegalStateError: Can't start a dispatcher again when it's already
157                running.
158        """
159        if not self._started:
160            self._started = True
161            self._executor = ThreadPoolExecutor(max_workers=32)
162            self._executor.submit(self.poll_events)
163        else:
164            raise IllegalStateError("Dispatcher is already started.")
165
166    def close(self):
167        """Clean up and release resources.
168
169        This function should only be called after a
170        rpc_client.closeSl4aSession() call.
171        """
172        if not self._started:
173            return
174        self._started = False
175        self._executor.shutdown(wait=True)
176        self.clear_all_events()
177
178    def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
179        """Pop an event from its queue.
180
181        Return and remove the oldest entry of an event.
182        Block until an event of specified name is available or
183        times out if timeout is set.
184
185        Args:
186            event_name: Name of the event to be popped.
187            timeout: Number of seconds to wait when event is not present.
188                Never times out if None.
189
190        Returns:
191            event: The oldest entry of the specified event. None if timed out.
192
193        Raises:
194            IllegalStateError: Raised if pop is called before the dispatcher
195                starts polling.
196        """
197        if not self._started:
198            raise IllegalStateError(
199                'Dispatcher needs to be started before popping.')
200
201        e_queue = self.get_event_q(event_name)
202
203        if not e_queue:
204            raise IllegalStateError(
205                'Failed to get an event queue for {}'.format(event_name))
206
207        try:
208            # Block for timeout
209            if timeout:
210                return e_queue.get(True, timeout)
211            # Non-blocking poll for event
212            elif timeout == 0:
213                return e_queue.get(False)
214            else:
215                # Block forever on event wait
216                return e_queue.get(True)
217        except queue.Empty:
218            msg = 'Timeout after {}s waiting for event: {}'.format(
219                timeout, event_name)
220            self.log.info(msg)
221            raise queue.Empty(msg)
222
223    def wait_for_event(self,
224                       event_name,
225                       predicate,
226                       timeout=DEFAULT_TIMEOUT,
227                       *args,
228                       **kwargs):
229        """Wait for an event that satisfies a predicate to appear.
230
231        Continuously pop events of a particular name and check against the
232        predicate until an event that satisfies the predicate is popped or
233        timed out. Note this will remove all the events of the same name that
234        do not satisfy the predicate in the process.
235
236        Args:
237            event_name: Name of the event to be popped.
238            predicate: A function that takes an event and returns True if the
239                predicate is satisfied, False otherwise.
240            timeout: Number of seconds to wait.
241            *args: Optional positional args passed to predicate().
242            **kwargs: Optional keyword args passed to predicate().
243                consume_ignored_events: Whether or not to consume events while
244                    searching for the desired event. Defaults to True if unset.
245
246        Returns:
247            The event that satisfies the predicate.
248
249        Raises:
250            queue.Empty: Raised if no event that satisfies the predicate was
251                found before time out.
252        """
253        deadline = time.time() + timeout
254        ignored_events = []
255        consume_events = kwargs.pop('consume_ignored_events', True)
256        while True:
257            event = None
258            try:
259                event = self.pop_event(event_name, 1)
260                if consume_events:
261                    self.log.debug('Consuming event: %r' % event)
262                else:
263                    self.log.debug('Peeking at event: %r' % event)
264                    ignored_events.append(event)
265            except queue.Empty:
266                pass
267
268            if event and predicate(event, *args, **kwargs):
269                for ignored_event in ignored_events:
270                    self.get_event_q(event_name).put(ignored_event)
271                self.log.debug('Matched event: %r with %s' %
272                               (event, predicate.__name__))
273                return event
274
275            if time.time() > deadline:
276                for ignored_event in ignored_events:
277                    self.get_event_q(event_name).put(ignored_event)
278                msg = 'Timeout after {}s waiting for event: {}'.format(
279                    timeout, event_name)
280                self.log.info(msg)
281                raise queue.Empty(msg)
282
283    def pop_events(self, regex_pattern, timeout, freq=1):
284        """Pop events whose names match a regex pattern.
285
286        If such event(s) exist, pop one event from each event queue that
287        satisfies the condition. Otherwise, wait for an event that satisfies
288        the condition to occur, with timeout.
289
290        Results are sorted by timestamp in ascending order.
291
292        Args:
293            regex_pattern: The regular expression pattern that an event name
294                should match in order to be popped.
295            timeout: Number of seconds to wait for events in case no event
296                matching the condition exits when the function is called.
297
298        Returns:
299            results: Pop events whose names match a regex pattern.
300                Empty if none exist and the wait timed out.
301
302        Raises:
303            IllegalStateError: Raised if pop is called before the dispatcher
304                starts polling.
305            queue.Empty: Raised if no event was found before time out.
306        """
307        if not self._started:
308            raise IllegalStateError(
309                "Dispatcher needs to be started before popping.")
310        deadline = time.time() + timeout
311        while True:
312            # TODO: fix the sleep loop
313            results = self._match_and_pop(regex_pattern)
314            if len(results) != 0 or time.time() > deadline:
315                break
316            time.sleep(freq)
317        if len(results) == 0:
318            msg = 'Timeout after {}s waiting for event: {}'.format(
319                timeout, regex_pattern)
320            self.log.error(msg)
321            raise queue.Empty(msg)
322
323        return sorted(results, key=lambda event: event['time'])
324
325    def _match_and_pop(self, regex_pattern):
326        """Pop one event from each of the event queues whose names
327        match (in a sense of regular expression) regex_pattern.
328        """
329        results = []
330        self._lock.acquire()
331        for name in self._event_dict.keys():
332            if re.match(regex_pattern, name):
333                q = self._event_dict[name]
334                if q:
335                    try:
336                        results.append(q.get(False))
337                    except queue.Empty:
338                        pass
339        self._lock.release()
340        return results
341
342    def get_event_q(self, event_name):
343        """Obtain the queue storing events of the specified name.
344
345        If no event of this name has been polled, wait for one to.
346
347        Returns: A queue storing all the events of the specified name.
348        """
349        self._lock.acquire()
350        if (event_name not in self._event_dict
351                or self._event_dict[event_name] is None):
352            self._event_dict[event_name] = queue.Queue()
353        self._lock.release()
354
355        event_queue = self._event_dict[event_name]
356        return event_queue
357
358    def handle_subscribed_event(self, event_obj, event_name):
359        """Execute the registered handler of an event.
360
361        Retrieve the handler and its arguments, and execute the handler in a
362            new thread.
363
364        Args:
365            event_obj: Json object of the event.
366            event_name: Name of the event to call handler for.
367        """
368        handler, args = self._handlers[event_name]
369        self._executor.submit(handler, event_obj, *args)
370
371    def _handle(self, event_handler, event_name, user_args, event_timeout,
372                cond, cond_timeout):
373        """Pop an event of specified type and calls its handler on it. If
374        condition is not None, block until condition is met or timeout.
375        """
376        if cond:
377            cond.wait(cond_timeout)
378        event = self.pop_event(event_name, event_timeout)
379        return event_handler(event, *user_args)
380
381    def handle_event(self,
382                     event_handler,
383                     event_name,
384                     user_args,
385                     event_timeout=None,
386                     cond=None,
387                     cond_timeout=None):
388        """Handle events that don't have registered handlers
389
390        In a new thread, poll one event of specified type from its queue and
391        execute its handler. If no such event exists, the thread waits until
392        one appears.
393
394        Args:
395            event_handler: Handler for the event, which should take at least
396                one argument - the event json object.
397            event_name: Name of the event to be handled.
398            user_args: User arguments for the handler; to be passed in after
399                the event json.
400            event_timeout: Number of seconds to wait for the event to come.
401            cond: A condition to wait on before executing the handler. Should
402                be a threading.Event object.
403            cond_timeout: Number of seconds to wait before the condition times
404                out. Never times out if None.
405
406        Returns:
407            worker: A concurrent.Future object associated with the handler.
408                If blocking call worker.result() is triggered, the handler
409                needs to return something to unblock.
410        """
411        worker = self._executor.submit(self._handle, event_handler, event_name,
412                                       user_args, event_timeout, cond,
413                                       cond_timeout)
414        return worker
415
416    def pop_all(self, event_name):
417        """Return and remove all stored events of a specified name.
418
419        Pops all events from their queue. May miss the latest ones.
420        If no event is available, return immediately.
421
422        Args:
423            event_name: Name of the events to be popped.
424
425        Returns:
426           results: List of the desired events.
427
428        Raises:
429            IllegalStateError: Raised if pop is called before the dispatcher
430                starts polling.
431        """
432        if not self._started:
433            raise IllegalStateError(("Dispatcher needs to be started before "
434                                     "popping."))
435        results = []
436        try:
437            self._lock.acquire()
438            while True:
439                e = self._event_dict[event_name].get(block=False)
440                results.append(e)
441        except (queue.Empty, KeyError):
442            return results
443        finally:
444            self._lock.release()
445
446    def clear_events(self, event_name):
447        """Clear all events of a particular name.
448
449        Args:
450            event_name: Name of the events to be popped.
451        """
452        self._lock.acquire()
453        try:
454            q = self.get_event_q(event_name)
455            q.queue.clear()
456        except queue.Empty:
457            return
458        finally:
459            self._lock.release()
460
461    def clear_all_events(self):
462        """Clear all event queues and their cached events."""
463        self._lock.acquire()
464        self._event_dict.clear()
465        self._lock.release()
466
467    def is_event_match(self, event, field, value):
468        return self.is_event_match_for_list(event, field, [value])
469
470    def is_event_match_for_list(self, event, field, value_list):
471        try:
472            value_in_event = event['data'][field]
473        except KeyError:
474            return False
475        for value in value_list:
476            if value_in_event == value:
477                return True
478        return False