1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import re
8import shutil
9import subprocess
10import tempfile
11import time
12import urllib
13import urllib2
14
15from autotest_lib.client.bin import test
16from autotest_lib.client.bin import utils
17from autotest_lib.client.common_lib import error
18from autotest_lib.client.common_lib import file_utils
19from autotest_lib.client.cros.input_playback import input_playback
20
21
22class touch_playback_test_base(test.test):
23    """Base class for touch tests involving playback."""
24    version = 1
25
26    _INPUTCONTROL = '/opt/google/input/inputcontrol'
27
28
29    @property
30    def _has_touchpad(self):
31        """True if device under test has a touchpad; else False."""
32        return self.player.has('touchpad')
33
34
35    @property
36    def _has_touchscreen(self):
37        """True if device under test has a touchscreen; else False."""
38        return self.player.has('touchscreen')
39
40
41    @property
42    def _has_mouse(self):
43        """True if device under test has or emulates a USB mouse; else False."""
44        return self.player.has('mouse')
45
46
47    def warmup(self, mouse_props=None):
48        """Test setup.
49
50        Instantiate player object to find touch devices, if any.
51        These devices can be used for playback later.
52        Emulate a USB mouse if a property file is provided.
53        Check if the inputcontrol script is avaiable on the disk.
54
55        @param mouse_props: optional property file for a mouse to emulate.
56                            Created using 'evemu-describe /dev/input/X'.
57
58        """
59        self.player = input_playback.InputPlayback()
60        if mouse_props:
61            self.player.emulate(input_type='mouse', property_file=mouse_props)
62        self.player.find_connected_inputs()
63
64        self._autotest_ext = None
65        self._has_inputcontrol = os.path.isfile(self._INPUTCONTROL)
66        self._platform = utils.get_board()
67        if 'cheets' in self._platform:
68            self._platform = self._platform[:-len('-cheets')]
69
70
71    def _find_test_files(self, input_type, gestures):
72        """Determine where the playback gesture files for this test are.
73
74        Expected file format is: <boardname>_<input type>_<hwid>_<gesture name>
75            e.g. samus_touchpad_164.17_scroll_down
76
77        @param input_type: device type, e.g. 'touchpad'
78        @param gestures: list of gesture name strings used in filename
79
80        @returns: None if not all files are found.  Dictionary of filepaths if
81                  they are found, indexed by gesture names as given.
82        @raises: error.TestError if no device is found or if device should have
83                 a hw_id but does not.
84
85        """
86        if type(gestures) is not list:
87            raise error.TestError('find_test_files() takes a LIST, not a '
88                                   '%s!' % type(gestures))
89
90        if not self.player.has(input_type):
91            raise error.TestError('Device does not have a %s!' % input_type)
92
93        if input_type in ['touchpad', 'touchscreen', 'stylus']:
94            hw_id = self.player.devices[input_type].hw_id
95            if not hw_id:
96                raise error.TestError('No valid hw_id for %s!' % input_type)
97            filename_fmt = '%s_%s_%s' % (self._platform, input_type, hw_id)
98
99        else:
100            device_name = self.player.devices[input_type].name
101            filename_fmt = '%s_%s' % (device_name, input_type)
102
103        filepaths = {}
104        for gesture in gestures:
105            filename = '%s_%s' % (filename_fmt, gesture)
106            filepath = self._download_remote_test_file(filename, input_type)
107            if not filepath:
108                logging.info('Did not find files for this device!')
109                return None
110
111            filepaths[gesture] = filepath
112
113        return filepaths
114
115
116    def _find_test_files_from_directions(self, input_type, fmt_str, directions):
117        """Find gesture files given a list of directions and name format.
118
119        @param input_type: device type, e.g. 'touchpad'
120        @param fmt_str: format string for filename, e.g. 'scroll-%s'
121        @param directions: list of directions for fmt_string
122
123        @returns: None if not all files are found.  Dictionary of filepaths if
124                  they are found, indexed by directions as given.
125        @raises: error.TestError if no hw_id is found.
126
127        """
128        gestures = [fmt_str % d for d in directions]
129        temp_filepaths = self._find_test_files(input_type, gestures)
130
131        filepaths = {}
132        if temp_filepaths:
133            filepaths = {d: temp_filepaths[fmt_str % d] for d in directions}
134
135        return filepaths
136
137
138    def _download_remote_test_file(self, filename, input_type):
139        """Download a file from the remote touch playback folder.
140
141        @param filename: string of filename
142        @param input_type: device type, e.g. 'touchpad'
143
144        @returns: Path to local file or None if file is not found.
145
146        """
147        REMOTE_STORAGE_URL = ('https://storage.googleapis.com/'
148                              'chromiumos-test-assets-public/touch_playback')
149        filename = urllib.quote(filename)
150
151        if input_type in ['touchpad', 'touchscreen', 'stylus']:
152            url = '%s/%s/%s' % (REMOTE_STORAGE_URL, self._platform, filename)
153        else:
154            url = '%s/TYPE-%s/%s' % (REMOTE_STORAGE_URL, input_type, filename)
155        local_file = os.path.join(self.bindir, filename)
156
157        logging.info('Looking for %s', url)
158        try:
159            file_utils.download_file(url, local_file)
160        except urllib2.URLError as e:
161            logging.info('File download failed!')
162            logging.debug(e.msg)
163            return None
164
165        return local_file
166
167
168    def _emulate_mouse(self, property_file=None):
169        """Emulate a mouse with the given property file.
170
171        player will use default mouse if no file is provided.
172
173        """
174        self.player.emulate(input_type='mouse', property_file=property_file)
175        self.player.find_connected_inputs()
176        if not self._has_mouse:
177            raise error.TestError('Mouse emulation failed!')
178
179    def _playback(self, filepath, touch_type='touchpad'):
180        """Playback a given input file on the given input."""
181        self.player.playback(filepath, touch_type)
182
183
184    def _blocking_playback(self, filepath, touch_type='touchpad'):
185        """Playback a given input file on the given input; block until done."""
186        self.player.blocking_playback(filepath, touch_type)
187
188
189    def _set_touch_setting_by_inputcontrol(self, setting, value):
190        """Set a given touch setting the given value by inputcontrol.
191
192        @param setting: Name of touch setting, e.g. 'tapclick'.
193        @param value: True for enabled, False for disabled.
194
195        """
196        cmd_value = 1 if value else 0
197        utils.run('%s --%s %d' % (self._INPUTCONTROL, setting, cmd_value))
198        logging.info('%s turned %s.', setting, 'on' if value else 'off')
199
200
201    def _set_touch_setting(self, inputcontrol_setting, autotest_ext_setting,
202                           value):
203        """Set a given touch setting the given value.
204
205        @param inputcontrol_setting: Name of touch setting for the inputcontrol
206                                     script, e.g. 'tapclick'.
207        @param autotest_ext_setting: Name of touch setting for the autotest
208                                     extension, e.g. 'TapToClick'.
209        @param value: True for enabled, False for disabled.
210
211        """
212        if self._has_inputcontrol:
213            self._set_touch_setting_by_inputcontrol(inputcontrol_setting, value)
214        elif self._autotest_ext is not None:
215            self._autotest_ext.EvaluateJavaScript(
216                    'chrome.autotestPrivate.set%s(%s);'
217                    % (autotest_ext_setting, ("%s" % value).lower()))
218            # TODO: remove this sleep once checking for value is available.
219            time.sleep(1)
220        else:
221            raise error.TestFail('Both inputcontrol and the autotest '
222                                 'extension are not availble.')
223
224
225    def _set_australian_scrolling(self, value):
226        """Set australian scrolling to the given value.
227
228        @param value: True for enabled, False for disabled.
229
230        """
231        self._set_touch_setting('australian_scrolling', 'NaturalScroll', value)
232
233
234    def _set_tap_to_click(self, value):
235        """Set tap-to-click to the given value.
236
237        @param value: True for enabled, False for disabled.
238
239        """
240        self._set_touch_setting('tapclick', 'TapToClick', value)
241
242
243    def _set_tap_dragging(self, value):
244        """Set tap dragging to the given value.
245
246        @param value: True for enabled, False for disabled.
247
248        """
249        self._set_touch_setting('tapdrag', 'TapDragging', value)
250
251
252    def _set_autotest_ext(self, ext):
253        """Set the autotest extension.
254
255        @ext: the autotest extension object.
256
257        """
258        self._autotest_ext = ext
259
260
261    def _open_test_page(self, cr, filename='test_page.html'):
262        """Prepare test page for testing.  Set self._tab with page.
263
264        @param cr: chrome.Chrome() object
265        @param filename: name of file in self.bindir to open
266
267        """
268        self._test_page = TestPage(cr, self.bindir, filename)
269        self._tab = self._test_page._tab
270
271
272    def _open_events_page(self, cr):
273        """Open the test events page.  Set self._events with EventsPage class.
274
275        Also set self._tab as this page and self.bindir as the http server dir.
276
277        @param cr: chrome.Chrome() object
278
279        """
280        self._events = EventsPage(cr, self.bindir)
281        self._tab = self._events._tab
282
283
284    def _center_cursor(self):
285        """Playback mouse movement to center cursor.
286
287        Requres that self._emulate_mouse() has been called.
288
289        """
290        self.player.blocking_playback_of_default_file(
291                'mouse_center_cursor_gesture', input_type='mouse')
292
293
294    def _get_kernel_events_recorder(self, input_type):
295        """Return a kernel event recording object for the given input type.
296
297        @param input_type: device type, e.g. 'touchpad'
298
299        @returns: KernelEventsRecorder instance.
300
301        """
302        node = self.player.devices[input_type].node
303        return KernelEventsRecorder(node)
304
305
306    def cleanup(self):
307        """ clean up """
308        self.player.close()
309
310
311class KernelEventsRecorder(object):
312    """Object to record kernel events for a particular device."""
313
314    def __init__(self, node):
315        """Setup to record future evtest output for this node.
316
317        @param input_type: the device which to inspect, e.g. 'mouse'
318
319        """
320        self.node = node
321        self.fh = tempfile.NamedTemporaryFile()
322        self.evtest_process = None
323
324
325    def start(self):
326        """Start recording events."""
327        self.evtest_process = subprocess.Popen(
328                ['evtest', self.node], stdout=self.fh)
329
330        # Wait until the initial output has finished before returning.
331        def find_exit():
332            """Polling function for end of output."""
333            interrupt_cmd = ('grep "interrupt to exit" %s | wc -l' %
334                             self.fh.name)
335            line_count = utils.run(interrupt_cmd).stdout.strip()
336            return line_count != '0'
337        utils.poll_for_condition(find_exit)
338
339
340    def clear(self):
341        """Clear previous events."""
342        self.stop()
343        self.fh.close()
344        self.fh = tempfile.NamedTemporaryFile()
345
346
347    def stop(self):
348        """Stop recording events."""
349        if self.evtest_process:
350            self.evtest_process.kill()
351            self.evtest_process = None
352
353
354    def get_recorded_events(self):
355        """Get the evtest output since object was created."""
356        self.fh.seek(0)
357        events = self.fh.read()
358        return events
359
360
361    def log_recorded_events(self):
362        """Save recorded events into logs."""
363        events = self.get_recorded_events()
364        logging.info('Kernel events seen:\n%s', events)
365
366
367    def get_last_event_timestamp(self, filter_str=''):
368        """Return the timestamp of the last event since recording started.
369
370        Events are in the form "Event: time <epoch time>, <info>\n"
371
372        @param filter_str: a regex string to match to the <info> section.
373
374        @returns: floats matching
375
376        """
377        events = self.get_recorded_events()
378        findall = re.findall(r' time (.*?), [^\n]*?%s' % filter_str,
379                             events, re.MULTILINE)
380        re.findall(r' time (.*?), [^\n]*?%s' % filter_str, events, re.MULTILINE)
381        if not findall:
382            self.log_recorded_events()
383            raise error.TestError('Could not find any kernel timestamps!'
384                                  '  Filter: %s' % filter_str)
385        return float(findall[-1])
386
387
388    def close(self):
389        """Clean up this class."""
390        self.stop()
391        self.fh.close()
392
393
394class TestPage(object):
395    """Wrapper around a Telemtry tab for utility functions.
396
397    Provides functions such as reload and setting scroll height on page.
398
399    """
400    _DEFAULT_SCROLL = 5000
401
402    def __init__(self, cr, httpdir, filename):
403        """Open a given test page in the given httpdir.
404
405        @param cr: chrome.Chrome() object
406        @param httpdir: the directory to use for SetHTTPServerDirectories
407        @param filename: path to the file to open, relative to httpdir
408
409        """
410        cr.browser.platform.SetHTTPServerDirectories(httpdir)
411        self._tab = cr.browser.tabs[0]
412        self._tab.Navigate(cr.browser.platform.http_server.UrlOf(
413                os.path.join(httpdir, filename)))
414        self.wait_for_page_ready()
415
416
417    def reload_page(self):
418        """Reloads test page."""
419        self._tab.Navigate(self._tab.url)
420        self.wait_for_page_ready()
421
422
423    def wait_for_page_ready(self):
424        """Wait for a variable pageReady on the test page to be true.
425
426        Presuposes that a pageReady variable exists on this page.
427
428        @raises error.TestError if page is not ready after timeout.
429
430        """
431        self._tab.WaitForDocumentReadyStateToBeComplete()
432        utils.poll_for_condition(
433                lambda: self._tab.EvaluateJavaScript('pageReady'),
434                exception=error.TestError('Test page is not ready!'))
435
436
437    def expand_page(self):
438        """Expand the page to be very large, to allow scrolling."""
439        page_width = self._DEFAULT_SCROLL * 5
440        cmd = 'document.body.style.%s = "%dpx"' % ('%s', page_width)
441        self._tab.ExecuteJavaScript(cmd % 'width')
442        self._tab.ExecuteJavaScript(cmd % 'height')
443
444
445    def set_scroll_position(self, value, scroll_vertical=True):
446        """Set scroll position to given value.
447
448        @param value: integer value in pixels.
449        @param scroll_vertical: True for vertical scroll,
450                                False for horizontal Scroll.
451
452        """
453        cmd = 'window.scrollTo(%d, %d);'
454        if scroll_vertical:
455            self._tab.ExecuteJavaScript(cmd % (0, value))
456        else:
457            self._tab.ExecuteJavaScript(cmd % (value, 0))
458
459
460    def set_default_scroll_position(self, scroll_vertical=True):
461        """Set scroll position of page to default.
462
463        @param scroll_vertical: True for vertical scroll,
464                                False for horizontal Scroll.
465        @raise: TestError if page is not set to default scroll position
466
467        """
468        total_tries = 2
469        for i in xrange(total_tries):
470            try:
471                self.set_scroll_position(self._DEFAULT_SCROLL, scroll_vertical)
472                self.wait_for_default_scroll_position(scroll_vertical)
473            except error.TestError as e:
474                if i == total_tries - 1:
475                   pos = self.get_scroll_position(scroll_vertical)
476                   logging.error('SCROLL POSITION: %s', pos)
477                   raise e
478                else:
479                   self.expand_page()
480            else:
481                 break
482
483
484    def get_scroll_position(self, scroll_vertical=True):
485        """Return current scroll position of page.
486
487        @param scroll_vertical: True for vertical scroll,
488                                False for horizontal Scroll.
489
490        """
491        if scroll_vertical:
492            return int(self._tab.EvaluateJavaScript('window.scrollY'))
493        else:
494            return int(self._tab.EvaluateJavaScript('window.scrollX'))
495
496
497    def wait_for_default_scroll_position(self, scroll_vertical=True):
498        """Wait for page to be at the default scroll position.
499
500        @param scroll_vertical: True for vertical scroll,
501                                False for horizontal scroll.
502
503        @raise: TestError if page either does not move or does not stop moving.
504
505        """
506        utils.poll_for_condition(
507                lambda: self.get_scroll_position(
508                        scroll_vertical) == self._DEFAULT_SCROLL,
509                exception=error.TestError('Page not set to default scroll!'))
510
511
512    def wait_for_scroll_position_to_settle(self, scroll_vertical=True):
513        """Wait for page to move and then stop moving.
514
515        @param scroll_vertical: True for Vertical scroll and
516                                False for horizontal scroll.
517
518        @raise: TestError if page either does not move or does not stop moving.
519
520        """
521        # Wait until page starts moving.
522        utils.poll_for_condition(
523                lambda: self.get_scroll_position(
524                        scroll_vertical) != self._DEFAULT_SCROLL,
525                exception=error.TestError('No scrolling occurred!'), timeout=30)
526
527        # Wait until page has stopped moving.
528        self._previous = self._DEFAULT_SCROLL
529        def _movement_stopped():
530            current = self.get_scroll_position()
531            result = current == self._previous
532            self._previous = current
533            return result
534
535        utils.poll_for_condition(
536                lambda: _movement_stopped(), sleep_interval=1,
537                exception=error.TestError('Page did not stop moving!'),
538                timeout=30)
539
540
541    def get_page_zoom(self):
542        """Return window.innerWidth for this page."""
543        return float(self._tab.EvaluateJavaScript(
544                            'window.visualViewport.scale'))
545
546
547class EventsPage(TestPage):
548    """Functions to monitor input events on the DUT, as seen by a webpage.
549
550    A subclass of TestPage which uses and interacts with a specific page.
551
552    """
553    def __init__(self, cr, httpdir):
554        """Open the website and save the tab in self._tab.
555
556        @param cr: chrome.Chrome() object
557        @param httpdir: the directory to use for SetHTTPServerDirectories
558
559        """
560        filename = 'touch_events_test_page.html'
561        current_dir = os.path.dirname(os.path.realpath(__file__))
562        shutil.copyfile(os.path.join(current_dir, filename),
563                        os.path.join(httpdir, filename))
564
565        super(EventsPage, self).__init__(cr, httpdir, filename)
566
567
568    def clear_previous_events(self):
569        """Wipe the test page back to its original state."""
570        self._tab.ExecuteJavaScript('pageReady = false')
571        self._tab.ExecuteJavaScript('clearPreviousEvents()')
572        self.wait_for_page_ready()
573
574
575    def get_events_log(self):
576        """Return the event log from the test page."""
577        return self._tab.EvaluateJavaScript('eventLog')
578
579
580    def log_events(self):
581        """Put the test page's event log into logging.info."""
582        logging.info('EVENTS LOG:')
583        logging.info(self.get_events_log())
584
585
586    def get_time_of_last_event(self):
587        """Return the timestamp of the last seen event (if any)."""
588        return self._tab.EvaluateJavaScript('timeOfLastEvent')
589
590
591    def get_event_count(self):
592        """Return the number of events that the test page has seen."""
593        return self._tab.EvaluateJavaScript('eventCount')
594
595
596    def get_scroll_delta(self, is_vertical):
597        """Return the net scrolling the test page has seen.
598
599        @param is_vertical: True for vertical scrolling; False for horizontal.
600
601        """
602        axis = 'y' if is_vertical else 'x'
603        return self._tab.EvaluateJavaScript('netScrollDelta.%s' % axis)
604
605
606    def get_click_count(self):
607        """Return the number of clicks the test page has seen."""
608        return self._tab.EvaluateJavaScript('clickCount')
609
610
611    def wait_for_events_to_complete(self, delay_secs=1, timeout=60):
612        """Wait until test page stops seeing events for delay_secs seconds.
613
614        @param delay_secs: the polling frequency in seconds.
615        @param timeout: the number of seconds to wait for events to complete.
616        @raises: error.TestError if no events occurred.
617        @raises: error.TestError if events did not stop after timeout seconds.
618
619        """
620        self._tmp_previous_event_count = -1
621        def _events_stopped_coming():
622            most_recent_event_count = self.get_event_count()
623            delta = most_recent_event_count - self._tmp_previous_event_count
624            self._tmp_previous_event_count = most_recent_event_count
625            return most_recent_event_count != 0 and delta == 0
626
627        try:
628            utils.poll_for_condition(
629                    _events_stopped_coming, exception=error.TestError(),
630                    sleep_interval=delay_secs, timeout=timeout)
631        except error.TestError:
632            if self._tmp_previous_event_count == 0:
633                raise error.TestError('No touch event was seen!')
634            else:
635                self.log_events()
636                raise error.TestError('Touch events did not stop!')
637
638
639    def set_prevent_defaults(self, value):
640        """Set whether to allow default event actions to go through.
641
642        E.g. if this is True, a two finger horizontal scroll will not actually
643        produce history navigation on the browser.
644
645        @param value: True for prevent defaults; False to allow them.
646
647        """
648        js_value = str(value).lower()
649        self._tab.ExecuteJavaScript('preventDefaults = %s;' % js_value)
650