1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import re
8import shutil
9import subprocess
10import tempfile
11import time
12import urllib
13import urllib2
14
15from autotest_lib.client.bin import test
16from autotest_lib.client.bin import utils
17from autotest_lib.client.common_lib import error
18from autotest_lib.client.common_lib import file_utils
19from autotest_lib.client.cros.input_playback import input_playback
20
21
22class touch_playback_test_base(test.test):
23    """Base class for touch tests involving playback."""
24    version = 1
25
26    _INPUTCONTROL = '/opt/google/input/inputcontrol'
27
28
29    @property
30    def _has_touchpad(self):
31        """True if device under test has a touchpad; else False."""
32        return self.player.has('touchpad')
33
34
35    @property
36    def _has_touchscreen(self):
37        """True if device under test has a touchscreen; else False."""
38        return self.player.has('touchscreen')
39
40
41    @property
42    def _has_mouse(self):
43        """True if device under test has or emulates a USB mouse; else False."""
44        return self.player.has('mouse')
45
46
47    def warmup(self, mouse_props=None):
48        """Test setup.
49
50        Instantiate player object to find touch devices, if any.
51        These devices can be used for playback later.
52        Emulate a USB mouse if a property file is provided.
53        Check if the inputcontrol script is avaiable on the disk.
54
55        @param mouse_props: optional property file for a mouse to emulate.
56                            Created using 'evemu-describe /dev/input/X'.
57
58        """
59        self.player = input_playback.InputPlayback()
60        if mouse_props:
61            self.player.emulate(input_type='mouse', property_file=mouse_props)
62        self.player.find_connected_inputs()
63
64        self._autotest_ext = None
65        self._has_inputcontrol = os.path.isfile(self._INPUTCONTROL)
66        self._platform = utils.get_board()
67        if 'cheets' in self._platform:
68            self._platform = self._platform[:-len('-cheets')]
69
70
71    def _find_test_files(self, input_type, gestures):
72        """Determine where the playback gesture files for this test are.
73
74        Expected file format is: <boardname>_<input type>_<hwid>_<gesture name>
75            e.g. samus_touchpad_164.17_scroll_down
76
77        @param input_type: device type, e.g. 'touchpad'
78        @param gestures: list of gesture name strings used in filename
79
80        @returns: None if not all files are found.  Dictionary of filepaths if
81                  they are found, indexed by gesture names as given.
82        @raises: error.TestError if no device is found or if device should have
83                 a hw_id but does not.
84
85        """
86        if not self.player.has(input_type):
87            raise error.TestError('Device does not have a %s!' % input_type)
88
89        if input_type in ['touchpad', 'touchscreen', 'stylus']:
90            hw_id = self.player.devices[input_type].hw_id
91            if not hw_id:
92                raise error.TestError('No valid hw_id for %s!' % input_type)
93            filename_fmt = '%s_%s_%s' % (self._platform, input_type, hw_id)
94
95        else:
96            device_name = self.player.devices[input_type].name
97            filename_fmt = '%s_%s' % (device_name, input_type)
98
99        filepaths = {}
100        for gesture in gestures:
101            filename = '%s_%s' % (filename_fmt, gesture)
102            filepath = self._download_remote_test_file(filename, input_type)
103            if not filepath:
104                logging.info('Did not find files for this device!')
105                return None
106
107            filepaths[gesture] = filepath
108
109        return filepaths
110
111
112    def _find_test_files_from_directions(self, input_type, fmt_str, directions):
113        """Find gesture files given a list of directions and name format.
114
115        @param input_type: device type, e.g. 'touchpad'
116        @param fmt_str: format string for filename, e.g. 'scroll-%s'
117        @param directions: list of directions for fmt_string
118
119        @returns: None if not all files are found.  Dictionary of filepaths if
120                  they are found, indexed by directions as given.
121        @raises: error.TestError if no hw_id is found.
122
123        """
124        gestures = [fmt_str % d for d in directions]
125        temp_filepaths = self._find_test_files(input_type, gestures)
126
127        filepaths = {}
128        if temp_filepaths:
129            filepaths = {d: temp_filepaths[fmt_str % d] for d in directions}
130
131        return filepaths
132
133
134    def _download_remote_test_file(self, filename, input_type):
135        """Download a file from the remote touch playback folder.
136
137        @param filename: string of filename
138        @param input_type: device type, e.g. 'touchpad'
139
140        @returns: Path to local file or None if file is not found.
141
142        """
143        REMOTE_STORAGE_URL = ('https://storage.googleapis.com/'
144                              'chromiumos-test-assets-public/touch_playback')
145        filename = urllib.quote(filename)
146
147        if input_type in ['touchpad', 'touchscreen', 'stylus']:
148            url = '%s/%s/%s' % (REMOTE_STORAGE_URL, self._platform, filename)
149        else:
150            url = '%s/TYPE-%s/%s' % (REMOTE_STORAGE_URL, input_type, filename)
151        local_file = os.path.join(self.bindir, filename)
152
153        logging.info('Looking for %s', url)
154        try:
155            file_utils.download_file(url, local_file)
156        except urllib2.URLError as e:
157            logging.info('File download failed!')
158            logging.debug(e.msg)
159            return None
160
161        return local_file
162
163
164    def _emulate_mouse(self, property_file=None):
165        """Emulate a mouse with the given property file.
166
167        player will use default mouse if no file is provided.
168
169        """
170        self.player.emulate(input_type='mouse', property_file=property_file)
171        self.player.find_connected_inputs()
172        if not self._has_mouse:
173            raise error.TestError('Mouse emulation failed!')
174
175
176    def _playback(self, filepath, touch_type='touchpad'):
177        """Playback a given input file on the given input."""
178        self.player.playback(filepath, touch_type)
179
180
181    def _blocking_playback(self, filepath, touch_type='touchpad'):
182        """Playback a given input file on the given input; block until done."""
183        self.player.blocking_playback(filepath, touch_type)
184
185
186    def _set_touch_setting_by_inputcontrol(self, setting, value):
187        """Set a given touch setting the given value by inputcontrol.
188
189        @param setting: Name of touch setting, e.g. 'tapclick'.
190        @param value: True for enabled, False for disabled.
191
192        """
193        cmd_value = 1 if value else 0
194        utils.run('%s --%s %d' % (self._INPUTCONTROL, setting, cmd_value))
195        logging.info('%s turned %s.', setting, 'on' if value else 'off')
196
197
198    def _set_touch_setting(self, inputcontrol_setting, autotest_ext_setting,
199                           value):
200        """Set a given touch setting the given value.
201
202        @param inputcontrol_setting: Name of touch setting for the inputcontrol
203                                     script, e.g. 'tapclick'.
204        @param autotest_ext_setting: Name of touch setting for the autotest
205                                     extension, e.g. 'TapToClick'.
206        @param value: True for enabled, False for disabled.
207
208        """
209        if self._has_inputcontrol:
210            self._set_touch_setting_by_inputcontrol(inputcontrol_setting, value)
211        elif self._autotest_ext is not None:
212            self._autotest_ext.EvaluateJavaScript(
213                    'chrome.autotestPrivate.set%s(%s);'
214                    % (autotest_ext_setting, ("%s" % value).lower()))
215            # TODO: remove this sleep once checking for value is available.
216            time.sleep(1)
217        else:
218            raise error.TestFail('Both inputcontrol and the autotest '
219                                 'extension are not availble.')
220
221
222    def _set_australian_scrolling(self, value):
223        """Set australian scrolling to the given value.
224
225        @param value: True for enabled, False for disabled.
226
227        """
228        self._set_touch_setting('australian_scrolling', 'NaturalScroll', value)
229
230
231    def _set_tap_to_click(self, value):
232        """Set tap-to-click to the given value.
233
234        @param value: True for enabled, False for disabled.
235
236        """
237        self._set_touch_setting('tapclick', 'TapToClick', value)
238
239
240    def _set_tap_dragging(self, value):
241        """Set tap dragging to the given value.
242
243        @param value: True for enabled, False for disabled.
244
245        """
246        self._set_touch_setting('tapdrag', 'TapDragging', value)
247
248
249    def _set_autotest_ext(self, ext):
250        """Set the autotest extension.
251
252        @ext: the autotest extension object.
253
254        """
255        self._autotest_ext = ext
256
257
258    def _open_test_page(self, cr, filename='test_page.html'):
259        """Prepare test page for testing.  Set self._tab with page.
260
261        @param cr: chrome.Chrome() object
262        @param filename: name of file in self.bindir to open
263
264        """
265        self._test_page = TestPage(cr, self.bindir, filename)
266        self._tab = self._test_page._tab
267
268
269    def _open_events_page(self, cr):
270        """Open the test events page.  Set self._events with EventsPage class.
271
272        Also set self._tab as this page and self.bindir as the http server dir.
273
274        @param cr: chrome.Chrome() object
275
276        """
277        self._events = EventsPage(cr, self.bindir)
278        self._tab = self._events._tab
279
280
281    def _center_cursor(self):
282        """Playback mouse movement to center cursor.
283
284        Requres that self._emulate_mouse() has been called.
285
286        """
287        self.player.blocking_playback_of_default_file(
288                'mouse_center_cursor_gesture', input_type='mouse')
289
290
291    def _get_kernel_events_recorder(self, input_type):
292        """Return a kernel event recording object for the given input type.
293
294        @param input_type: device type, e.g. 'touchpad'
295
296        @returns: KernelEventsRecorder instance.
297
298        """
299        node = self.player.devices[input_type].node
300        return KernelEventsRecorder(node)
301
302
303    def cleanup(self):
304        self.player.close()
305
306
307class KernelEventsRecorder(object):
308    """Object to record kernel events for a particular device."""
309
310    def __init__(self, node):
311        """Setup to record future evtest output for this node.
312
313        @param input_type: the device which to inspect, e.g. 'mouse'
314
315        """
316        self.node = node
317        self.fh = tempfile.NamedTemporaryFile()
318        self.evtest_process = None
319
320
321    def start(self):
322        """Start recording events."""
323        self.evtest_process = subprocess.Popen(
324                ['evtest', self.node], stdout=self.fh)
325
326        # Wait until the initial output has finished before returning.
327        def find_exit():
328            """Polling function for end of output."""
329            interrupt_cmd = ('grep "interrupt to exit" %s | wc -l' %
330                             self.fh.name)
331            line_count = utils.run(interrupt_cmd).stdout.strip()
332            return line_count != '0'
333        utils.poll_for_condition(find_exit)
334
335
336    def clear(self):
337        """Clear previous events."""
338        self.stop()
339        self.fh.close()
340        self.fh = tempfile.NamedTemporaryFile()
341
342
343    def stop(self):
344        """Stop recording events."""
345        if self.evtest_process:
346            self.evtest_process.kill()
347            self.evtest_process = None
348
349
350    def get_recorded_events(self):
351        """Get the evtest output since object was created."""
352        self.fh.seek(0)
353        events = self.fh.read()
354        return events
355
356
357    def log_recorded_events(self):
358        """Save recorded events into logs."""
359        events = self.get_recorded_events()
360        logging.info('Kernel events seen:\n%s', events)
361
362
363    def get_last_event_timestamp(self, filter_str=''):
364        """Return the timestamp of the last event since recording started.
365
366        Events are in the form "Event: time <epoch time>, <info>\n"
367
368        @param filter_str: a regex string to match to the <info> section.
369
370        @returns: floats matching
371
372        """
373        events = self.get_recorded_events()
374        findall = re.findall(r' time (.*?), [^\n]*?%s' % filter_str,
375                             events, re.MULTILINE)
376        re.findall(r' time (.*?), [^\n]*?%s' % filter_str, events, re.MULTILINE)
377        if not findall:
378            self.log_recorded_events()
379            raise error.TestError('Could not find any kernel timestamps!'
380                                  '  Filter: %s' % filter_str)
381        return float(findall[-1])
382
383
384    def close(self):
385        """Clean up this class."""
386        self.stop()
387        self.fh.close()
388
389
390class TestPage(object):
391    """Wrapper around a Telemtry tab for utility functions.
392
393    Provides functions such as reload and setting scroll height on page.
394
395    """
396    _DEFAULT_SCROLL = 5000
397
398    def __init__(self, cr, httpdir, filename):
399        """Open a given test page in the given httpdir.
400
401        @param cr: chrome.Chrome() object
402        @param httpdir: the directory to use for SetHTTPServerDirectories
403        @param filename: path to the file to open, relative to httpdir
404
405        """
406        cr.browser.platform.SetHTTPServerDirectories(httpdir)
407        self._tab = cr.browser.tabs[0]
408        self._tab.Navigate(cr.browser.platform.http_server.UrlOf(
409                os.path.join(httpdir, filename)))
410        self.wait_for_page_ready()
411
412
413    def reload_page(self):
414        """Reloads test page."""
415        self._tab.Navigate(self._tab.url)
416        self.wait_for_page_ready()
417
418
419    def wait_for_page_ready(self):
420        """Wait for a variable pageReady on the test page to be true.
421
422        Presuposes that a pageReady variable exists on this page.
423
424        @raises error.TestError if page is not ready after timeout.
425
426        """
427        self._tab.WaitForDocumentReadyStateToBeComplete()
428        utils.poll_for_condition(
429                lambda: self._tab.EvaluateJavaScript('pageReady'),
430                exception=error.TestError('Test page is not ready!'))
431
432
433    def expand_page(self):
434        """Expand the page to be very large, to allow scrolling."""
435        cmd = 'document.body.style.%s = %d+"px"' % (
436                '%s', self._DEFAULT_SCROLL * 5)
437        self._tab.ExecuteJavaScript(cmd % 'width')
438        self._tab.ExecuteJavaScript(cmd % 'height')
439
440
441    def set_scroll_position(self, value, scroll_vertical=True):
442        """Set scroll position to given value.
443
444        @param scroll_vertical: True for vertical scroll,
445                                False for horizontal Scroll.
446        @param value: True for enabled, False for disabled.
447
448         """
449        if scroll_vertical:
450            self._tab.ExecuteJavaScript(
451                'document.body.scrollTop=%s' % value)
452        else:
453            self._tab.ExecuteJavaScript(
454                'document.body.scrollLeft=%s' % value)
455
456
457    def set_default_scroll_position(self, scroll_vertical=True):
458        """Set scroll position of page to default.
459
460        @param scroll_vertical: True for vertical scroll,
461                                False for horizontal Scroll.
462        @raise: TestError if page is not set to default scroll position
463
464        """
465        total_tries = 2
466        for i in xrange(total_tries):
467            try:
468                self.set_scroll_position(self._DEFAULT_SCROLL, scroll_vertical)
469                self.wait_for_default_scroll_position(scroll_vertical)
470            except error.TestError as e:
471                if i == total_tries - 1:
472                   pos = self.get_scroll_position(scroll_vertical)
473                   logging.error('SCROLL POSITION: %s', pos)
474                   raise e
475            else:
476                 break
477
478
479    def get_scroll_position(self, scroll_vertical=True):
480        """Return current scroll position of page.
481
482        @param scroll_vertical: True for vertical scroll,
483                                False for horizontal Scroll.
484
485        """
486        if scroll_vertical:
487            return int(self._tab.EvaluateJavaScript('document.body.scrollTop'))
488        else:
489            return int(self._tab.EvaluateJavaScript('document.body.scrollLeft'))
490
491
492    def wait_for_default_scroll_position(self, scroll_vertical=True):
493        """Wait for page to be at the default scroll position.
494
495        @param scroll_vertical: True for vertical scroll,
496                                False for horizontal scroll.
497
498        @raise: TestError if page either does not move or does not stop moving.
499
500        """
501        utils.poll_for_condition(
502                lambda: self.get_scroll_position(
503                        scroll_vertical) == self._DEFAULT_SCROLL,
504                exception=error.TestError('Page not set to default scroll!'))
505
506
507    def wait_for_scroll_position_to_settle(self, scroll_vertical=True):
508        """Wait for page to move and then stop moving.
509
510        @param scroll_vertical: True for Vertical scroll and
511                                False for horizontal scroll.
512
513        @raise: TestError if page either does not move or does not stop moving.
514
515        """
516        # Wait until page starts moving.
517        utils.poll_for_condition(
518                lambda: self.get_scroll_position(
519                        scroll_vertical) != self._DEFAULT_SCROLL,
520                exception=error.TestError('No scrolling occurred!'), timeout=30)
521
522        # Wait until page has stopped moving.
523        self._previous = self._DEFAULT_SCROLL
524        def _movement_stopped():
525            current = self.get_scroll_position()
526            result = current == self._previous
527            self._previous = current
528            return result
529
530        utils.poll_for_condition(
531                lambda: _movement_stopped(), sleep_interval=1,
532                exception=error.TestError('Page did not stop moving!'),
533                timeout=30)
534
535
536    def get_page_width(self):
537        """Return window.innerWidth for this page."""
538        return int(self._tab.EvaluateJavaScript('window.innerWidth'))
539
540
541class EventsPage(TestPage):
542    """Functions to monitor input events on the DUT, as seen by a webpage.
543
544    A subclass of TestPage which uses and interacts with a specific page.
545
546    """
547    def __init__(self, cr, httpdir):
548        """Open the website and save the tab in self._tab.
549
550        @param cr: chrome.Chrome() object
551        @param httpdir: the directory to use for SetHTTPServerDirectories
552
553        """
554        filename = 'touch_events_test_page.html'
555        current_dir = os.path.dirname(os.path.realpath(__file__))
556        shutil.copyfile(os.path.join(current_dir, filename),
557                        os.path.join(httpdir, filename))
558
559        super(EventsPage, self).__init__(cr, httpdir, filename)
560
561
562    def clear_previous_events(self):
563        """Wipe the test page back to its original state."""
564        self._tab.ExecuteJavaScript('pageReady = false')
565        self._tab.ExecuteJavaScript('clearPreviousEvents()')
566        self.wait_for_page_ready()
567
568
569    def get_events_log(self):
570        """Return the event log from the test page."""
571        return self._tab.EvaluateJavaScript('eventLog')
572
573
574    def log_events(self):
575        """Put the test page's event log into logging.info."""
576        logging.info('EVENTS LOG:')
577        logging.info(self.get_events_log())
578
579
580    def get_time_of_last_event(self):
581        """Return the timestamp of the last seen event (if any)."""
582        return self._tab.EvaluateJavaScript('timeOfLastEvent')
583
584
585    def get_event_count(self):
586        """Return the number of events that the test page has seen."""
587        return self._tab.EvaluateJavaScript('eventCount')
588
589
590    def get_scroll_delta(self, is_vertical):
591        """Return the net scrolling the test page has seen.
592
593        @param is_vertical: True for vertical scrolling; False for horizontal.
594
595        """
596        axis = 'y' if is_vertical else 'x'
597        return self._tab.EvaluateJavaScript('netScrollDelta.%s' % axis)
598
599
600    def get_click_count(self):
601        """Return the number of clicks the test page has seen."""
602        return self._tab.EvaluateJavaScript('clickCount')
603
604
605    def wait_for_events_to_complete(self, delay_secs=1, timeout=60):
606        """Wait until test page stops seeing events for delay_secs seconds.
607
608        @param delay_secs: the polling frequency in seconds.
609        @param timeout: the number of seconds to wait for events to complete.
610        @raises: error.TestError if no events occurred.
611        @raises: error.TestError if events did not stop after timeout seconds.
612
613        """
614        self._tmp_previous_event_count = -1
615        def _events_stopped_coming():
616            most_recent_event_count = self.get_event_count()
617            delta = most_recent_event_count - self._tmp_previous_event_count
618            self._tmp_previous_event_count = most_recent_event_count
619            return most_recent_event_count != 0 and delta == 0
620
621        try:
622            utils.poll_for_condition(
623                    _events_stopped_coming, exception=error.TestError(),
624                    sleep_interval=delay_secs, timeout=timeout)
625        except error.TestError:
626            if self._tmp_previous_event_count == 0:
627                raise error.TestError('No touch event was seen!')
628            else:
629                self.log_events()
630                raise error.TestError('Touch events did not stop!')
631
632
633    def set_prevent_defaults(self, value):
634        """Set whether to allow default event actions to go through.
635
636        E.g. if this is True, a two finger horizontal scroll will not actually
637        produce history navigation on the browser.
638
639        @param value: True for prevent defaults; False to allow them.
640
641        """
642        js_value = str(value).lower()
643        self._tab.ExecuteJavaScript('preventDefaults = %s;' % js_value)
644