1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import json, logging, os, re, tempfile, time, urllib2, zipfile
6from autotest_lib.client.bin import test
7from autotest_lib.client.common_lib import error, file_utils
8from autotest_lib.client.common_lib.cros import chromedriver
9
10
11UPDATE_CHECK_URL = ('https://clients2.google.com/service/update2/'
12                    'crx?x=id%%3D%s%%26v%%3D0%%26uc&prodversion=32.0.0.0')
13EXTENSION_ID_BETA = 'dliochdbjfkdbacpmhlcpmleaejidimm'
14CHROME_EXTENSION_BASE = 'chrome-extension://'
15EXTENSION_OPTIONS_PAGE = 'options.html'
16RECEIVER_LOG = 'receiver_debug.log'
17EXTENSION_TEST_UTILS_PAGE = 'e2e_test_utils.html'
18TEST_URL = 'http://www.vimeo.com'
19GET_LOG_URL = 'http://%s:8008/setup/get_log_report'
20KEY = ('MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC+hlN5FB+tjCsBszmBIvIcD/djLLQm2z'
21       'ZfFygP4U4/o++ZM91EWtgII10LisoS47qT2TIOg4Un4+G57elZ9PjEIhcJfANqkYrD3t9d'
22       'pEzMNr936TLB2u683B5qmbB68Nq1Eel7KVc+F0BqhBondDqhvDvGPEV0vBsbErJFlNH7SQ'
23       'IDAQAB')
24WAIT_SECONDS = 3
25MIRRORING_DURATION_SECONDS = 20
26
27
28class network_CastTDLS(test.test):
29    """Test loading the sonic extension through chromedriver."""
30    version = 1
31
32    def _navigate_url(self, driver, url):
33        """Go to options page if current page is not options page.
34
35        @param driver: The chromedriver instance.
36        @param url: The URL to navigate to.
37        """
38        if driver.current_url != url:
39            driver.get(url)
40            driver.refresh()
41
42    def _set_focus_tab(self, driver, tab_handle):
43      """Set the focus on a tab.
44
45      @param driver: The chromedriver instance.
46      @param tab_handle: The chrome driver handle of the tab.
47      """
48      driver.switch_to_window(tab_handle)
49      driver.get_screenshot_as_base64()
50
51    def _block_setup_dialog(self, driver, extension_id):
52        """Tab cast through the extension.
53
54        @param driver: A chromedriver instance that has the extension loaded.
55        @param extension_id: Id of the extension to use.
56        """
57        test_utils_page = '%s%s/%s' % (
58                CHROME_EXTENSION_BASE, extension_id, EXTENSION_TEST_UTILS_PAGE)
59        self._navigate_url(driver, test_utils_page)
60        time.sleep(WAIT_SECONDS)
61        driver.execute_script(
62            'localStorage["blockChromekeySetupAutoLaunchOnInstall"] = "true"')
63
64    def _close_popup_tabs(self, driver):
65        """Close any popup windows the extension might open by default.
66
67        Since we're going to handle the extension ourselves all we need is
68        the main browser window with a single tab. The safest way to handle
69        the popup however, is to close the currently active tab, so we don't
70        mess with chromedrivers ui debugger.
71
72        @param driver: Chromedriver instance.
73        @raises Exception If you close the tab associated with the ui debugger.
74        """
75        current_tab_handle = driver.current_window_handle
76        for handle in driver.window_handles:
77            if handle != current_tab_handle:
78                try:
79                    time.sleep(WAIT_SECONDS)
80                    driver.switch_to_window(handle)
81                    driver.close()
82                except:
83                    pass
84        driver.switch_to_window(current_tab_handle)
85
86    def _download_extension_crx(self, output_file):
87        """Download Cast (Beta) extension from Chrome Web Store to a file.
88
89        @param output_file: The output file of the extension.
90        """
91        update_check_url = UPDATE_CHECK_URL % EXTENSION_ID_BETA
92        response = urllib2.urlopen(update_check_url).read()
93        logging.info('Response: %s', response)
94        pattern = r'codebase="(.*crx)"'
95        regex = re.compile(pattern)
96        match = regex.search(response)
97        extension_download_url = match.groups()[0]
98        logging.info('Extension download link: %s', extension_download_url)
99        file_utils.download_file(extension_download_url, output_file)
100
101    def _unzip_file(self, zip_file, output_folder):
102        """Unzip a zip file to a folder.
103
104        @param zip_file: Path of the zip file.
105        @param output_folder: Output file path.
106        """
107        zip_ref = zipfile.ZipFile(zip_file, 'r')
108        zip_ref.extractall(output_folder)
109        zip_ref.close()
110
111    def _modify_manifest(self, extension_folder):
112        """Update the Cast extension manifest file by adding a key to it.
113
114        @param extension_folder: Path to the unzipped extension folder.
115        """
116        manifest_file = os.path.join(extension_folder, 'manifest.json')
117        with open(manifest_file) as f:
118            manifest_dict = json.load(f)
119        manifest_dict['key'] = KEY
120        with open(manifest_file, 'wb') as f:
121            json.dump(manifest_dict, f)
122
123    def _turn_on_tdls(self, driver, extension_id):
124        """Turn on TDLS in the extension option page.
125
126        @param driver: The chromedriver instance of the test.
127        @param extension_id: The id of the Cast extension.
128        """
129        turn_on_tdls_url = ('%s%s/%s''?disableTDLS=false') % (
130                CHROME_EXTENSION_BASE, extension_id, EXTENSION_OPTIONS_PAGE)
131        self._navigate_url(driver, turn_on_tdls_url)
132        time.sleep(WAIT_SECONDS)
133
134    def _start_mirroring(self, driver, extension_id, device_ip, url):
135        """Use test util page to start mirroring session on specific device.
136
137        @param driver: The chromedriver instance.
138        @param extension_id: The id of the Cast extension.
139        @param device_ip: The IP of receiver device to launch mirroring.
140        @param url: The URL to mirror.
141        """
142        test_utils_page = '%s%s/%s' % (
143                CHROME_EXTENSION_BASE, extension_id, EXTENSION_TEST_UTILS_PAGE)
144        self._navigate_url(driver, test_utils_page)
145        time.sleep(WAIT_SECONDS)
146        tab_handles = driver.window_handles
147        driver.find_element_by_id('receiverIpAddressV2').send_keys(device_ip)
148        driver.find_element_by_id('urlToOpenV2').send_keys(url)
149        time.sleep(WAIT_SECONDS)
150        driver.find_element_by_id('mirrorUrlV2').click()
151        time.sleep(WAIT_SECONDS)
152        all_handles = driver.window_handles
153        test_handle = [x for x in all_handles if x not in tab_handles].pop()
154        driver.switch_to_window(test_handle)
155        driver.refresh()
156
157    def _stop_mirroring(self, driver, extension_id):
158      """Use test util page to stop a mirroring session on a specific device.
159
160      @param driver: The chromedriver instance.
161      @param extension_id: The id of the Cast extension.
162      """
163      test_utils_page = '%s%s/%s' % (
164            CHROME_EXTENSION_BASE, extension_id, EXTENSION_TEST_UTILS_PAGE)
165      self._navigate_url(driver, test_utils_page)
166      time.sleep(WAIT_SECONDS)
167      driver.find_element_by_id('stopV2Mirroring').click()
168
169    def _check_tdls_status(self, device_ip):
170        """Check the TDLS status of a mirroring session using receiver's log.
171
172        @param device_ip: IP of receiver device used in the mirroring session.
173        @raises error.TestFail If there is log about TDLS status.
174        @raises error.TestFail If TDLS status is invalid.
175        @raises error.TestFail TDLS is not being used in the mirroring session.
176        """
177        response = urllib2.urlopen(GET_LOG_URL % device_ip).read()
178        logging.info('Receiver log is under: %s', self.debugdir)
179        with open(os.path.join(self.debugdir, RECEIVER_LOG), 'wb') as f:
180            f.write(response)
181        tdls_status_list = re.findall(
182                r'.*TDLS status.*last TDLS status:.*', response)
183        if not len(tdls_status_list):
184            raise error.TestFail('There is no TDLS log on %s.', device_ip)
185        pattern = r'last TDLS status: (.*)'
186        regex = re.compile(pattern)
187        match = regex.search(tdls_status_list[-1])
188        if not len(match.groups()):
189            raise error.TestFail('Invalid TDLS status.')
190        if match.groups()[0].lower() != 'enabled':
191            raise error.TestFail(
192                'TDLS is not initiated in the last mirroring session.')
193
194    def initialize(self):
195        """Initialize the test.
196
197        @param extension_dir: Directory of a custom extension.
198            If one isn't supplied, the latest ToT extension is
199            downloaded and loaded into chromedriver.
200        @param live: Use a live url if True. Start a test server
201            and server a hello world page if False.
202        """
203        super(network_CastTDLS, self).initialize()
204        self._tmp_folder = tempfile.mkdtemp()
205
206
207    def cleanup(self):
208        """Clean up the test environment, e.g., stop local http server."""
209        super(network_CastTDLS, self).cleanup()
210
211
212    def run_once(self, device_ip):
213        """Run the test code.
214
215        @param  device_ip: Device IP to use in the test.
216        """
217        logging.info('Download Cast extension ... ...')
218        crx_file = tempfile.NamedTemporaryFile(
219                dir=self._tmp_folder, suffix='.crx').name
220        unzip_crx_folder = tempfile.mkdtemp(dir=self._tmp_folder)
221        self._download_extension_crx(crx_file)
222        self._unzip_file(crx_file, unzip_crx_folder)
223        self._modify_manifest(unzip_crx_folder)
224        kwargs = { 'extension_paths': [unzip_crx_folder] }
225        with chromedriver.chromedriver(**kwargs) as chromedriver_instance:
226            driver = chromedriver_instance.driver
227            extension_id = chromedriver_instance.get_extension(
228                    unzip_crx_folder).extension_id
229            # Close popup dialog if there is any
230            self._close_popup_tabs(driver)
231            self._block_setup_dialog(driver, extension_id)
232            time.sleep(WAIT_SECONDS)
233            logging.info('Enable TDLS in extension: %s', extension_id)
234            self._turn_on_tdls(driver, extension_id)
235            extension_tab_handle = driver.current_window_handle
236            logging.info('Start mirroring on device: %s', device_ip)
237            self._start_mirroring(
238                    driver, extension_id, device_ip, TEST_URL)
239            time.sleep(MIRRORING_DURATION_SECONDS)
240            self._set_focus_tab(driver, extension_tab_handle)
241            driver.switch_to_window(extension_tab_handle)
242            logging.info('Stop mirroring on device: %s', device_ip)
243            self._stop_mirroring(driver, extension_id)
244            time.sleep(WAIT_SECONDS * 3)
245            logging.info('Verify TDLS status in the mirroring session ... ... ')
246            self._check_tdls_status(device_ip)
247