1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import re 8import shutil 9import subprocess 10import tempfile 11import time 12import urllib 13import urllib2 14 15from autotest_lib.client.bin import test 16from autotest_lib.client.bin import utils 17from autotest_lib.client.common_lib import error 18from autotest_lib.client.common_lib import file_utils 19from autotest_lib.client.cros.input_playback import input_playback 20 21 22class touch_playback_test_base(test.test): 23 """Base class for touch tests involving playback.""" 24 version = 1 25 26 _INPUTCONTROL = '/opt/google/input/inputcontrol' 27 28 29 @property 30 def _has_touchpad(self): 31 """True if device under test has a touchpad; else False.""" 32 return self.player.has('touchpad') 33 34 35 @property 36 def _has_touchscreen(self): 37 """True if device under test has a touchscreen; else False.""" 38 return self.player.has('touchscreen') 39 40 41 @property 42 def _has_mouse(self): 43 """True if device under test has or emulates a USB mouse; else False.""" 44 return self.player.has('mouse') 45 46 47 def warmup(self, mouse_props=None): 48 """Test setup. 49 50 Instantiate player object to find touch devices, if any. 51 These devices can be used for playback later. 52 Emulate a USB mouse if a property file is provided. 53 Check if the inputcontrol script is avaiable on the disk. 54 55 @param mouse_props: optional property file for a mouse to emulate. 56 Created using 'evemu-describe /dev/input/X'. 57 58 """ 59 self.player = input_playback.InputPlayback() 60 if mouse_props: 61 self.player.emulate(input_type='mouse', property_file=mouse_props) 62 self.player.find_connected_inputs() 63 64 self._autotest_ext = None 65 self._has_inputcontrol = os.path.isfile(self._INPUTCONTROL) 66 self._platform = utils.get_board() 67 if 'cheets' in self._platform: 68 self._platform = self._platform[:-len('-cheets')] 69 70 71 def _find_test_files(self, input_type, gestures): 72 """Determine where the playback gesture files for this test are. 73 74 Expected file format is: <boardname>_<input type>_<hwid>_<gesture name> 75 e.g. samus_touchpad_164.17_scroll_down 76 77 @param input_type: device type, e.g. 'touchpad' 78 @param gestures: list of gesture name strings used in filename 79 80 @returns: None if not all files are found. Dictionary of filepaths if 81 they are found, indexed by gesture names as given. 82 @raises: error.TestError if no device is found or if device should have 83 a hw_id but does not. 84 85 """ 86 if not self.player.has(input_type): 87 raise error.TestError('Device does not have a %s!' % input_type) 88 89 if input_type in ['touchpad', 'touchscreen', 'stylus']: 90 hw_id = self.player.devices[input_type].hw_id 91 if not hw_id: 92 raise error.TestError('No valid hw_id for %s!' % input_type) 93 filename_fmt = '%s_%s_%s' % (self._platform, input_type, hw_id) 94 95 else: 96 device_name = self.player.devices[input_type].name 97 filename_fmt = '%s_%s' % (device_name, input_type) 98 99 filepaths = {} 100 for gesture in gestures: 101 filename = '%s_%s' % (filename_fmt, gesture) 102 filepath = self._download_remote_test_file(filename, input_type) 103 if not filepath: 104 logging.info('Did not find files for this device!') 105 return None 106 107 filepaths[gesture] = filepath 108 109 return filepaths 110 111 112 def _find_test_files_from_directions(self, input_type, fmt_str, directions): 113 """Find gesture files given a list of directions and name format. 114 115 @param input_type: device type, e.g. 'touchpad' 116 @param fmt_str: format string for filename, e.g. 'scroll-%s' 117 @param directions: list of directions for fmt_string 118 119 @returns: None if not all files are found. Dictionary of filepaths if 120 they are found, indexed by directions as given. 121 @raises: error.TestError if no hw_id is found. 122 123 """ 124 gestures = [fmt_str % d for d in directions] 125 temp_filepaths = self._find_test_files(input_type, gestures) 126 127 filepaths = {} 128 if temp_filepaths: 129 filepaths = {d: temp_filepaths[fmt_str % d] for d in directions} 130 131 return filepaths 132 133 134 def _download_remote_test_file(self, filename, input_type): 135 """Download a file from the remote touch playback folder. 136 137 @param filename: string of filename 138 @param input_type: device type, e.g. 'touchpad' 139 140 @returns: Path to local file or None if file is not found. 141 142 """ 143 REMOTE_STORAGE_URL = ('https://storage.googleapis.com/' 144 'chromiumos-test-assets-public/touch_playback') 145 filename = urllib.quote(filename) 146 147 if input_type in ['touchpad', 'touchscreen', 'stylus']: 148 url = '%s/%s/%s' % (REMOTE_STORAGE_URL, self._platform, filename) 149 else: 150 url = '%s/TYPE-%s/%s' % (REMOTE_STORAGE_URL, input_type, filename) 151 local_file = os.path.join(self.bindir, filename) 152 153 logging.info('Looking for %s', url) 154 try: 155 file_utils.download_file(url, local_file) 156 except urllib2.URLError as e: 157 logging.info('File download failed!') 158 logging.debug(e.msg) 159 return None 160 161 return local_file 162 163 164 def _emulate_mouse(self, property_file=None): 165 """Emulate a mouse with the given property file. 166 167 player will use default mouse if no file is provided. 168 169 """ 170 self.player.emulate(input_type='mouse', property_file=property_file) 171 self.player.find_connected_inputs() 172 if not self._has_mouse: 173 raise error.TestError('Mouse emulation failed!') 174 175 176 def _playback(self, filepath, touch_type='touchpad'): 177 """Playback a given input file on the given input.""" 178 self.player.playback(filepath, touch_type) 179 180 181 def _blocking_playback(self, filepath, touch_type='touchpad'): 182 """Playback a given input file on the given input; block until done.""" 183 self.player.blocking_playback(filepath, touch_type) 184 185 186 def _set_touch_setting_by_inputcontrol(self, setting, value): 187 """Set a given touch setting the given value by inputcontrol. 188 189 @param setting: Name of touch setting, e.g. 'tapclick'. 190 @param value: True for enabled, False for disabled. 191 192 """ 193 cmd_value = 1 if value else 0 194 utils.run('%s --%s %d' % (self._INPUTCONTROL, setting, cmd_value)) 195 logging.info('%s turned %s.', setting, 'on' if value else 'off') 196 197 198 def _set_touch_setting(self, inputcontrol_setting, autotest_ext_setting, 199 value): 200 """Set a given touch setting the given value. 201 202 @param inputcontrol_setting: Name of touch setting for the inputcontrol 203 script, e.g. 'tapclick'. 204 @param autotest_ext_setting: Name of touch setting for the autotest 205 extension, e.g. 'TapToClick'. 206 @param value: True for enabled, False for disabled. 207 208 """ 209 if self._has_inputcontrol: 210 self._set_touch_setting_by_inputcontrol(inputcontrol_setting, value) 211 elif self._autotest_ext is not None: 212 self._autotest_ext.EvaluateJavaScript( 213 'chrome.autotestPrivate.set%s(%s);' 214 % (autotest_ext_setting, ("%s" % value).lower())) 215 # TODO: remove this sleep once checking for value is available. 216 time.sleep(1) 217 else: 218 raise error.TestFail('Both inputcontrol and the autotest ' 219 'extension are not availble.') 220 221 222 def _set_australian_scrolling(self, value): 223 """Set australian scrolling to the given value. 224 225 @param value: True for enabled, False for disabled. 226 227 """ 228 self._set_touch_setting('australian_scrolling', 'NaturalScroll', value) 229 230 231 def _set_tap_to_click(self, value): 232 """Set tap-to-click to the given value. 233 234 @param value: True for enabled, False for disabled. 235 236 """ 237 self._set_touch_setting('tapclick', 'TapToClick', value) 238 239 240 def _set_tap_dragging(self, value): 241 """Set tap dragging to the given value. 242 243 @param value: True for enabled, False for disabled. 244 245 """ 246 self._set_touch_setting('tapdrag', 'TapDragging', value) 247 248 249 def _set_autotest_ext(self, ext): 250 """Set the autotest extension. 251 252 @ext: the autotest extension object. 253 254 """ 255 self._autotest_ext = ext 256 257 258 def _open_test_page(self, cr, filename='test_page.html'): 259 """Prepare test page for testing. Set self._tab with page. 260 261 @param cr: chrome.Chrome() object 262 @param filename: name of file in self.bindir to open 263 264 """ 265 self._test_page = TestPage(cr, self.bindir, filename) 266 self._tab = self._test_page._tab 267 268 269 def _open_events_page(self, cr): 270 """Open the test events page. Set self._events with EventsPage class. 271 272 Also set self._tab as this page and self.bindir as the http server dir. 273 274 @param cr: chrome.Chrome() object 275 276 """ 277 self._events = EventsPage(cr, self.bindir) 278 self._tab = self._events._tab 279 280 281 def _center_cursor(self): 282 """Playback mouse movement to center cursor. 283 284 Requres that self._emulate_mouse() has been called. 285 286 """ 287 self.player.blocking_playback_of_default_file( 288 'mouse_center_cursor_gesture', input_type='mouse') 289 290 291 def _get_kernel_events_recorder(self, input_type): 292 """Return a kernel event recording object for the given input type. 293 294 @param input_type: device type, e.g. 'touchpad' 295 296 @returns: KernelEventsRecorder instance. 297 298 """ 299 node = self.player.devices[input_type].node 300 return KernelEventsRecorder(node) 301 302 303 def cleanup(self): 304 self.player.close() 305 306 307class KernelEventsRecorder(object): 308 """Object to record kernel events for a particular device.""" 309 310 def __init__(self, node): 311 """Setup to record future evtest output for this node. 312 313 @param input_type: the device which to inspect, e.g. 'mouse' 314 315 """ 316 self.node = node 317 self.fh = tempfile.NamedTemporaryFile() 318 self.evtest_process = None 319 320 321 def start(self): 322 """Start recording events.""" 323 self.evtest_process = subprocess.Popen( 324 ['evtest', self.node], stdout=self.fh) 325 326 # Wait until the initial output has finished before returning. 327 def find_exit(): 328 """Polling function for end of output.""" 329 interrupt_cmd = ('grep "interrupt to exit" %s | wc -l' % 330 self.fh.name) 331 line_count = utils.run(interrupt_cmd).stdout.strip() 332 return line_count != '0' 333 utils.poll_for_condition(find_exit) 334 335 336 def clear(self): 337 """Clear previous events.""" 338 self.stop() 339 self.fh.close() 340 self.fh = tempfile.NamedTemporaryFile() 341 342 343 def stop(self): 344 """Stop recording events.""" 345 if self.evtest_process: 346 self.evtest_process.kill() 347 self.evtest_process = None 348 349 350 def get_recorded_events(self): 351 """Get the evtest output since object was created.""" 352 self.fh.seek(0) 353 events = self.fh.read() 354 return events 355 356 357 def log_recorded_events(self): 358 """Save recorded events into logs.""" 359 events = self.get_recorded_events() 360 logging.info('Kernel events seen:\n%s', events) 361 362 363 def get_last_event_timestamp(self, filter_str=''): 364 """Return the timestamp of the last event since recording started. 365 366 Events are in the form "Event: time <epoch time>, <info>\n" 367 368 @param filter_str: a regex string to match to the <info> section. 369 370 @returns: floats matching 371 372 """ 373 events = self.get_recorded_events() 374 findall = re.findall(r' time (.*?), [^\n]*?%s' % filter_str, 375 events, re.MULTILINE) 376 re.findall(r' time (.*?), [^\n]*?%s' % filter_str, events, re.MULTILINE) 377 if not findall: 378 self.log_recorded_events() 379 raise error.TestError('Could not find any kernel timestamps!' 380 ' Filter: %s' % filter_str) 381 return float(findall[-1]) 382 383 384 def close(self): 385 """Clean up this class.""" 386 self.stop() 387 self.fh.close() 388 389 390class TestPage(object): 391 """Wrapper around a Telemtry tab for utility functions. 392 393 Provides functions such as reload and setting scroll height on page. 394 395 """ 396 _DEFAULT_SCROLL = 5000 397 398 def __init__(self, cr, httpdir, filename): 399 """Open a given test page in the given httpdir. 400 401 @param cr: chrome.Chrome() object 402 @param httpdir: the directory to use for SetHTTPServerDirectories 403 @param filename: path to the file to open, relative to httpdir 404 405 """ 406 cr.browser.platform.SetHTTPServerDirectories(httpdir) 407 self._tab = cr.browser.tabs[0] 408 self._tab.Navigate(cr.browser.platform.http_server.UrlOf( 409 os.path.join(httpdir, filename))) 410 self.wait_for_page_ready() 411 412 413 def reload_page(self): 414 """Reloads test page.""" 415 self._tab.Navigate(self._tab.url) 416 self.wait_for_page_ready() 417 418 419 def wait_for_page_ready(self): 420 """Wait for a variable pageReady on the test page to be true. 421 422 Presuposes that a pageReady variable exists on this page. 423 424 @raises error.TestError if page is not ready after timeout. 425 426 """ 427 self._tab.WaitForDocumentReadyStateToBeComplete() 428 utils.poll_for_condition( 429 lambda: self._tab.EvaluateJavaScript('pageReady'), 430 exception=error.TestError('Test page is not ready!')) 431 432 433 def expand_page(self): 434 """Expand the page to be very large, to allow scrolling.""" 435 cmd = 'document.body.style.%s = %d+"px"' % ( 436 '%s', self._DEFAULT_SCROLL * 5) 437 self._tab.ExecuteJavaScript(cmd % 'width') 438 self._tab.ExecuteJavaScript(cmd % 'height') 439 440 441 def set_scroll_position(self, value, scroll_vertical=True): 442 """Set scroll position to given value. 443 444 @param scroll_vertical: True for vertical scroll, 445 False for horizontal Scroll. 446 @param value: True for enabled, False for disabled. 447 448 """ 449 if scroll_vertical: 450 self._tab.ExecuteJavaScript( 451 'document.body.scrollTop=%s' % value) 452 else: 453 self._tab.ExecuteJavaScript( 454 'document.body.scrollLeft=%s' % value) 455 456 457 def set_default_scroll_position(self, scroll_vertical=True): 458 """Set scroll position of page to default. 459 460 @param scroll_vertical: True for vertical scroll, 461 False for horizontal Scroll. 462 @raise: TestError if page is not set to default scroll position 463 464 """ 465 total_tries = 2 466 for i in xrange(total_tries): 467 try: 468 self.set_scroll_position(self._DEFAULT_SCROLL, scroll_vertical) 469 self.wait_for_default_scroll_position(scroll_vertical) 470 except error.TestError as e: 471 if i == total_tries - 1: 472 pos = self.get_scroll_position(scroll_vertical) 473 logging.error('SCROLL POSITION: %s', pos) 474 raise e 475 else: 476 break 477 478 479 def get_scroll_position(self, scroll_vertical=True): 480 """Return current scroll position of page. 481 482 @param scroll_vertical: True for vertical scroll, 483 False for horizontal Scroll. 484 485 """ 486 if scroll_vertical: 487 return int(self._tab.EvaluateJavaScript('document.body.scrollTop')) 488 else: 489 return int(self._tab.EvaluateJavaScript('document.body.scrollLeft')) 490 491 492 def wait_for_default_scroll_position(self, scroll_vertical=True): 493 """Wait for page to be at the default scroll position. 494 495 @param scroll_vertical: True for vertical scroll, 496 False for horizontal scroll. 497 498 @raise: TestError if page either does not move or does not stop moving. 499 500 """ 501 utils.poll_for_condition( 502 lambda: self.get_scroll_position( 503 scroll_vertical) == self._DEFAULT_SCROLL, 504 exception=error.TestError('Page not set to default scroll!')) 505 506 507 def wait_for_scroll_position_to_settle(self, scroll_vertical=True): 508 """Wait for page to move and then stop moving. 509 510 @param scroll_vertical: True for Vertical scroll and 511 False for horizontal scroll. 512 513 @raise: TestError if page either does not move or does not stop moving. 514 515 """ 516 # Wait until page starts moving. 517 utils.poll_for_condition( 518 lambda: self.get_scroll_position( 519 scroll_vertical) != self._DEFAULT_SCROLL, 520 exception=error.TestError('No scrolling occurred!'), timeout=30) 521 522 # Wait until page has stopped moving. 523 self._previous = self._DEFAULT_SCROLL 524 def _movement_stopped(): 525 current = self.get_scroll_position() 526 result = current == self._previous 527 self._previous = current 528 return result 529 530 utils.poll_for_condition( 531 lambda: _movement_stopped(), sleep_interval=1, 532 exception=error.TestError('Page did not stop moving!'), 533 timeout=30) 534 535 536 def get_page_width(self): 537 """Return window.innerWidth for this page.""" 538 return int(self._tab.EvaluateJavaScript('window.innerWidth')) 539 540 541class EventsPage(TestPage): 542 """Functions to monitor input events on the DUT, as seen by a webpage. 543 544 A subclass of TestPage which uses and interacts with a specific page. 545 546 """ 547 def __init__(self, cr, httpdir): 548 """Open the website and save the tab in self._tab. 549 550 @param cr: chrome.Chrome() object 551 @param httpdir: the directory to use for SetHTTPServerDirectories 552 553 """ 554 filename = 'touch_events_test_page.html' 555 current_dir = os.path.dirname(os.path.realpath(__file__)) 556 shutil.copyfile(os.path.join(current_dir, filename), 557 os.path.join(httpdir, filename)) 558 559 super(EventsPage, self).__init__(cr, httpdir, filename) 560 561 562 def clear_previous_events(self): 563 """Wipe the test page back to its original state.""" 564 self._tab.ExecuteJavaScript('pageReady = false') 565 self._tab.ExecuteJavaScript('clearPreviousEvents()') 566 self.wait_for_page_ready() 567 568 569 def get_events_log(self): 570 """Return the event log from the test page.""" 571 return self._tab.EvaluateJavaScript('eventLog') 572 573 574 def log_events(self): 575 """Put the test page's event log into logging.info.""" 576 logging.info('EVENTS LOG:') 577 logging.info(self.get_events_log()) 578 579 580 def get_time_of_last_event(self): 581 """Return the timestamp of the last seen event (if any).""" 582 return self._tab.EvaluateJavaScript('timeOfLastEvent') 583 584 585 def get_event_count(self): 586 """Return the number of events that the test page has seen.""" 587 return self._tab.EvaluateJavaScript('eventCount') 588 589 590 def get_scroll_delta(self, is_vertical): 591 """Return the net scrolling the test page has seen. 592 593 @param is_vertical: True for vertical scrolling; False for horizontal. 594 595 """ 596 axis = 'y' if is_vertical else 'x' 597 return self._tab.EvaluateJavaScript('netScrollDelta.%s' % axis) 598 599 600 def get_click_count(self): 601 """Return the number of clicks the test page has seen.""" 602 return self._tab.EvaluateJavaScript('clickCount') 603 604 605 def wait_for_events_to_complete(self, delay_secs=1, timeout=60): 606 """Wait until test page stops seeing events for delay_secs seconds. 607 608 @param delay_secs: the polling frequency in seconds. 609 @param timeout: the number of seconds to wait for events to complete. 610 @raises: error.TestError if no events occurred. 611 @raises: error.TestError if events did not stop after timeout seconds. 612 613 """ 614 self._tmp_previous_event_count = -1 615 def _events_stopped_coming(): 616 most_recent_event_count = self.get_event_count() 617 delta = most_recent_event_count - self._tmp_previous_event_count 618 self._tmp_previous_event_count = most_recent_event_count 619 return most_recent_event_count != 0 and delta == 0 620 621 try: 622 utils.poll_for_condition( 623 _events_stopped_coming, exception=error.TestError(), 624 sleep_interval=delay_secs, timeout=timeout) 625 except error.TestError: 626 if self._tmp_previous_event_count == 0: 627 raise error.TestError('No touch event was seen!') 628 else: 629 self.log_events() 630 raise error.TestError('Touch events did not stop!') 631 632 633 def set_prevent_defaults(self, value): 634 """Set whether to allow default event actions to go through. 635 636 E.g. if this is True, a two finger horizontal scroll will not actually 637 produce history navigation on the browser. 638 639 @param value: True for prevent defaults; False to allow them. 640 641 """ 642 js_value = str(value).lower() 643 self._tab.ExecuteJavaScript('preventDefaults = %s;' % js_value) 644