# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import json import os import time import selenium from autotest_lib.client.bin import utils from extension_pages import e2e_test_utils from extension_pages import options class TestUtils(object): """Contains all the helper functions for Chrome mirroring automation.""" short_wait_secs = 3 step_timeout_secs = 60 cpu_fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq', 'softirq', 'steal', 'guest', 'guest_nice'] cpu_idle_fields = ['idle', 'iowait'] def __init__(self): """Constructor""" def set_mirroring_options(self, driver, extension_id, settings): """Apply all the settings given by the user to the option page. @param driver: The chromedriver instance of the test @param extension_id: The id of the Cast extension @param settings: The settings and information about the test """ options_page = options.OptionsPage(driver, extension_id) options_page.open_hidden_options_menu() time.sleep(self.short_wait_secs) for key in settings.keys(): options_page.set_value(key, settings[key]) def start_v2_mirroring_test_utils( self, driver, extension_id, receiver_ip, url, fullscreen, udp_proxy_server=None, network_profile=None): """Use test util page to start mirroring session on specific device. @param driver: The chromedriver instance @param extension_id: The id of the Cast extension @param receiver_ip: The ip of the Eureka dongle to launch the activity @param url: The URL to navigate to @param fullscreen: click the fullscreen button or not @param udp_proxy_server: the address of udp proxy server, it should be a http address, http://: @param network_profile: the network profile, it should be one of wifi, bad and evil. @return True if the function finishes """ e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( driver, extension_id) time.sleep(self.short_wait_secs) tab_handles = driver.window_handles e2e_test_utils_page.receiver_ip_or_name_v2_text_box().set_value( receiver_ip) e2e_test_utils_page.url_to_open_v2_text_box().set_value(url) if udp_proxy_server: e2e_test_utils_page.udp_proxy_server_text_box().set_value( udp_proxy_server) if network_profile: e2e_test_utils_page.network_profile_text_box().set_value( network_profile) time.sleep(self.short_wait_secs) e2e_test_utils_page.open_then_mirror_v2_button().click() time.sleep(self.short_wait_secs) all_handles = driver.window_handles video_handle = [x for x in all_handles if x not in tab_handles].pop() driver.switch_to_window(video_handle) self.navigate_to_test_url(driver, url, fullscreen) return True def stop_v2_mirroring_test_utils(self, driver, extension_id): """Use test util page to stop a mirroring session on a specific device. @param driver: The chromedriver instance @param extension_id: The id of the Cast extension @param activity_id: The id of the mirroring activity """ e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(driver, extension_id) e2e_test_utils_page.go_to_page() time.sleep(self.short_wait_secs) e2e_test_utils_page.stop_v2_mirroring_button().click() def start_v2_mirroring_sdk(self, driver, device_ip, url, extension_id): """Use SDK to start a mirroring session on a specific device. @param driver: The chromedriver instance @param device_ip: The IP of the Eureka device @param url: The URL to navigate to @param extension_id: The id of the Cast extension @return True if the function finishes @raise RuntimeError for timeouts """ self.set_auto_testing_ip(driver, extension_id, device_ip) self.nagviate(driver, url, False) time.sleep(self.short_wait_secs) driver.execute_script('loadScript()') self._wait_for_result( lambda: driver.execute_script('return isSuccessful'), 'Timeout when initiating mirroring... ...') driver.execute_script('startV2Mirroring()') self._wait_for_result( lambda: driver.execute_script('return isSuccessful'), 'Timeout when triggering mirroring... ...') return True def stop_v2_mirroring_sdk(self, driver, activity_id=None): """Use SDK to stop the mirroring activity in Chrome. @param driver: The chromedriver instance @param activity_id: The id of the mirroring activity @raise RuntimeError for timeouts """ driver.execute_script('stopV2Mirroring()') self._wait_for_result( lambda: driver.execute_script('return isSuccessful'), self.step_timeout_secs) def set_auto_testing_ip(self, driver, extension_id, device_ip): """Set the auto testing IP on the extension page. @param driver: The chromedriver instance @param extension_id: The id of the Cast extension @param device_ip: The IP of the device to test against """ e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( driver, extension_id) e2e_test_utils_page.execute_script( 'localStorage["AutoTestingIp"] = "%s";' % device_ip) def upload_v2_mirroring_logs(self, driver, extension_id): """Upload v2 mirroring logs for the latest mirroring session. @param driver: The chromedriver instance of the browser @param extension_id: The extension ID of the Cast extension @return The report id in crash staging server. @raises RuntimeError if an error occurred during uploading """ e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( driver, extension_id) e2e_test_utils_page.go_to_page() time.sleep(self.short_wait_secs) e2e_test_utils_page.upload_v2_mirroring_logs_button().click() report_id = self._wait_for_result( e2e_test_utils_page.v2_mirroring_logs_scroll_box().get_value, 'Failed to get v2 mirroring logs') if 'Failed to upload logs' in report_id: raise RuntimeError('Failed to get v2 mirroring logs') return report_id def get_chrome_version(self, driver): """Return the Chrome version that is being used for running test. @param driver: The chromedriver instance @return The Chrome version """ get_chrome_version_js = 'return window.navigator.appVersion;' app_version = driver.execute_script(get_chrome_version_js) for item in app_version.split(): if 'Chrome/' in item: return item.split('/')[1] return None def get_chrome_revision(self, driver): """Return Chrome revision number that is being used for running test. @param driver: The chromedriver instance @return The Chrome revision number """ get_chrome_revision_js = ('return document.getElementById("version").' 'getElementsByTagName("span")[2].innerHTML;') driver.get('chrome://version') return driver.execute_script(get_chrome_revision_js) def get_extension_id_from_flag(self, extra_flags): """Gets the extension ID based on the whitelisted extension id flag. @param extra_flags: A string which contains all the extra chrome flags @return The ID of the extension. Return None if nothing is found. """ extra_flags_list = extra_flags.split() for flag in extra_flags_list: if 'whitelisted-extension-id=' in flag: return flag.split('=')[1] return None def navigate_to_test_url(self, driver, url, fullscreen): """Navigate to a given URL. Click fullscreen button if needed. @param driver: The chromedriver instance @param url: The URL of the site to navigate to @param fullscreen: True and the video will play in full screen mode. Otherwise, set to False """ driver.get(url) driver.refresh() if fullscreen: self.request_full_screen(driver) def request_full_screen(self, driver): """Request full screen. @param driver: The chromedriver instance """ try: time.sleep(self.short_wait_secs) driver.find_element_by_id('fsbutton').click() except selenium.common.exceptions.NoSuchElementException as error_message: print 'Full screen button is not found. ' + str(error_message) def set_focus_tab(self, driver, tab_handle): """Set the focus on a tab. @param driver: The chromedriver instance @param tab_handle: The chrome driver handle of the tab """ driver.switch_to_window(tab_handle) driver.get_screenshot_as_base64() def block_setup_dialog(self, driver, extension_id): """Tab cast through the extension. @param driver: A chromedriver instance that has the extension loaded. @param extension_id: Id of the extension to use. """ e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( driver, extension_id) e2e_test_utils_page.go_to_page() time.sleep(self.short_wait_secs) driver.execute_script( 'localStorage["blockChromekeySetupAutoLaunchOnInstall"] = "true"') def close_popup_tabs(self, driver): """Close any popup windows the extension might open by default. Since we're going to handle the extension ourselves all we need is the main browser window with a single tab. The safest way to handle the popup however, is to close the currently active tab, so we don't mess with chromedrivers ui debugger. @param driver: Chromedriver instance. @raises Exception If you close the tab associated with the ui debugger. """ # TODO: There are several, albeit hacky ways, to handle this popup # that might need to change with different versions of the extension # until the core issue is resolved. See crbug.com/338399. current_tab_handle = driver.current_window_handle for handle in driver.window_handles: if current_tab_handle != handle: try: time.sleep(self.short_wait_secs) driver.switch_to_window(handle) driver.close() except: pass driver.switch_to_window(current_tab_handle) def output_dict_to_file(self, dictionary, file_name, path=None, sort_keys=False): """Output a dictionary into a file. @param dictionary: The dictionary to be output as JSON @param file_name: The name of the file that is being output @param path: The path of the file. The default is None @param sort_keys: Sort dictionary by keys when output. False by default """ if path is None: path = os.path.abspath(os.path.dirname(__file__)) # if json file exists, read the existing one and append to it json_file = os.path.join(path, file_name) if os.path.isfile(json_file) and dictionary: with open(json_file, 'r') as existing_json_data: json_data = json.load(existing_json_data) dictionary = dict(json_data.items() + dictionary.items()) output_json = json.dumps(dictionary, sort_keys=sort_keys) with open(json_file, 'w') as file_handler: file_handler.write(output_json) def compute_cpu_utilization(self, cpu_dict): """Generate the upper/lower bound and the average CPU consumption. @param cpu_dict: The dictionary that contains CPU usage every sec. @returns A dict that contains upper/lower bound and average cpu usage. """ cpu_bound = {} cpu_usage = sorted(cpu_dict.values()) cpu_bound['lower_bound'] = ( '%.2f' % cpu_usage[int(len(cpu_usage) * 10.0 / 100.0)]) cpu_bound['upper_bound'] = ( '%.2f' % cpu_usage[int(len(cpu_usage) * 90.0 / 100.0)]) cpu_bound['average'] = '%.2f' % (sum(cpu_usage) / float(len(cpu_usage))) return cpu_bound def cpu_usage_interval(self, duration, interval=1): """Get the CPU usage over a period of time based on interval. @param duration: The duration of getting the CPU usage. @param interval: The interval to check the CPU usage. Default is 1 sec. @return A dict that contains CPU usage over different time intervals. """ current_time = 0 cpu_usage = {} while current_time < duration: pre_times = self._get_system_times() time.sleep(interval) post_times = self._get_system_times() cpu_usage[current_time] = self._get_avg_cpu_usage( pre_times, post_times) current_time += interval return cpu_usage def _get_avg_cpu_usage(self, pre_times, post_times): """Calculate the average CPU usage of two different periods of time. @param pre_times: The CPU usage information of the start point. @param post_times: The CPU usage information of the end point. @return Average CPU usage over a time period. """ diff_times = {} for field in self.cpu_fields: diff_times[field] = post_times[field] - pre_times[field] idle_time = sum(diff_times[field] for field in self.cpu_idle_fields) total_time = sum(diff_times[field] for field in self.cpu_fields) return float(total_time - idle_time) / total_time * 100.0 def _get_system_times(self): """Get the CPU information from the system times. @return An list with CPU usage of different processes. """ proc_stat = utils.read_file('/proc/stat') for line in proc_stat.split('\n'): if line.startswith('cpu '): times = line[4:].strip().split(' ') times = [int(jiffies) for jiffies in times] return dict(zip(self.cpu_fields, times)) def _wait_for_result(self, get_result, error_message): """Wait for the result. @param get_result: the function to get result. @param error_message: the error message in the exception if it is failed to get result. @return The result. @raises RuntimeError if it is failed to get result within self.step_timeout_secs. """ start = time.time() while (((time.time() - start) < self.step_timeout_secs) and not get_result()): time.sleep(self.step_timeout_secs/10.0) if not get_result(): raise RuntimeError(error_message) return get_result()