1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Utilities for PyAuto.""" 6 7from __future__ import print_function 8 9import httplib 10import logging 11import os 12import shutil 13import socket 14import sys 15import tempfile 16import unittest 17import urlparse 18import zipfile 19 20 21class ExistingPathReplacer(object): 22 """Facilitates backing up a given path (file or dir).. 23 24 Often you want to manipulate a directory or file for testing but don't want to 25 meddle with the existing contents. This class lets you make a backup, and 26 reinstate the backup when done. A backup is made in an adjacent directory, 27 so you need to make sure you have write permissions to the parent directory. 28 29 Works seemlessly in cases where the requested path already exists, or not. 30 31 Automatically reinstates the backed up path (if any) when object is deleted. 32 """ 33 _path = '' 34 _backup_dir = None # dir to which existing content is backed up 35 _backup_basename = '' 36 37 def __init__(self, path, path_type='dir'): 38 """Initialize the object, making backups if necessary. 39 40 Args: 41 path: the requested path to file or directory 42 path_type: path type. Options: 'file', 'dir'. Default: 'dir' 43 """ 44 assert path_type in ('file', 'dir'), 'Invalid path_type: %s' % path_type 45 self._path_type = path_type 46 self._path = path 47 if os.path.exists(self._path): 48 if 'dir' == self._path_type: 49 assert os.path.isdir(self._path), '%s is not a directory' % self._path 50 else: 51 assert os.path.isfile(self._path), '%s is not a file' % self._path 52 # take a backup 53 self._backup_basename = os.path.basename(self._path) 54 self._backup_dir = tempfile.mkdtemp(dir=os.path.dirname(self._path), 55 prefix='bkp-' + self._backup_basename) 56 logging.info('Backing up %s in %s' % (self._path, self._backup_dir)) 57 shutil.move(self._path, 58 os.path.join(self._backup_dir, self._backup_basename)) 59 self._CreateRequestedPath() 60 61 def __del__(self): 62 """Cleanup. Reinstate backup.""" 63 self._CleanupRequestedPath() 64 if self._backup_dir: # Reinstate, if backed up. 65 from_path = os.path.join(self._backup_dir, self._backup_basename) 66 logging.info('Reinstating backup from %s to %s' % (from_path, self._path)) 67 shutil.move(from_path, self._path) 68 self._RemoveBackupDir() 69 70 def _CreateRequestedPath(self): 71 # Create intermediate dirs if needed. 72 if not os.path.exists(os.path.dirname(self._path)): 73 os.makedirs(os.path.dirname(self._path)) 74 if 'dir' == self._path_type: 75 os.mkdir(self._path) 76 else: 77 open(self._path, 'w').close() 78 79 def _CleanupRequestedPath(self): 80 if os.path.exists(self._path): 81 if os.path.isdir(self._path): 82 shutil.rmtree(self._path, ignore_errors=True) 83 else: 84 os.remove(self._path) 85 86 def _RemoveBackupDir(self): 87 if self._backup_dir and os.path.isdir(self._backup_dir): 88 shutil.rmtree(self._backup_dir, ignore_errors=True) 89 90 91def RemovePath(path): 92 """Remove the given path (file or dir).""" 93 if os.path.isdir(path): 94 shutil.rmtree(path, ignore_errors=True) 95 return 96 try: 97 os.remove(path) 98 except OSError: 99 pass 100 101 102def UnzipFilenameToDir(filename, dir): 103 """Unzip |filename| to directory |dir|. 104 105 This works with as low as python2.4 (used on win). 106 """ 107 zf = zipfile.ZipFile(filename) 108 pushd = os.getcwd() 109 if not os.path.isdir(dir): 110 os.mkdir(dir) 111 os.chdir(dir) 112 # Extract files. 113 for info in zf.infolist(): 114 name = info.filename 115 if name.endswith('/'): # dir 116 if not os.path.isdir(name): 117 os.makedirs(name) 118 else: # file 119 dir = os.path.dirname(name) 120 if not os.path.isdir(dir): 121 os.makedirs(dir) 122 out = open(name, 'wb') 123 out.write(zf.read(name)) 124 out.close() 125 # Set permissions. Permission info in external_attr is shifted 16 bits. 126 os.chmod(name, info.external_attr >> 16) 127 os.chdir(pushd) 128 129 130def GetCurrentPlatform(): 131 """Get a string representation for the current platform. 132 133 Returns: 134 'mac', 'win' or 'linux' 135 """ 136 if sys.platform == 'darwin': 137 return 'mac' 138 if sys.platform == 'win32': 139 return 'win' 140 if sys.platform.startswith('linux'): 141 return 'linux' 142 raise RuntimeError('Unknown platform') 143 144 145def PrintPerfResult(graph_name, series_name, data_point, units, 146 show_on_waterfall=False): 147 """Prints a line to stdout that is specially formatted for the perf bots. 148 149 Args: 150 graph_name: String name for the graph on which to plot the data. 151 series_name: String name for the series (line on the graph) associated with 152 the data. This is also the string displayed on the waterfall 153 if |show_on_waterfall| is True. 154 data_point: Numeric data value to plot on the graph for the current build. 155 This can be a single value or an array of values. If an array, 156 the graph will plot the average of the values, along with error 157 bars. 158 units: The string unit of measurement for the given |data_point|. 159 show_on_waterfall: Whether or not to display this result directly on the 160 buildbot waterfall itself (in the buildbot step running 161 this test on the waterfall page, not the stdio page). 162 """ 163 waterfall_indicator = ['', '*'][show_on_waterfall] 164 print('%sRESULT %s: %s= %s %s' % ( 165 waterfall_indicator, graph_name, series_name, 166 str(data_point).replace(' ', ''), units)) 167 sys.stdout.flush() 168 169 170def Shard(ilist, shard_index, num_shards): 171 """Shard a given list and return the group at index |shard_index|. 172 173 Args: 174 ilist: input list 175 shard_index: 0-based sharding index 176 num_shards: shard count 177 """ 178 chunk_size = len(ilist) / num_shards 179 chunk_start = shard_index * chunk_size 180 if shard_index == num_shards - 1: # Exhaust the remainder in the last shard. 181 chunk_end = len(ilist) 182 else: 183 chunk_end = chunk_start + chunk_size 184 return ilist[chunk_start:chunk_end] 185 186 187def WaitForDomElement(pyauto, driver, xpath): 188 """Wait for the UI element to appear. 189 190 Args: 191 pyauto: an instance of pyauto.PyUITest. 192 driver: an instance of chrome driver or a web element. 193 xpath: the xpath of the element to wait for. 194 195 Returns: 196 The element if it is found. 197 NoSuchElementException if it is not found. 198 """ 199 pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0) 200 return driver.find_element_by_xpath(xpath) 201 202 203def DoesUrlExist(url): 204 """Determines whether a resource exists at the given URL. 205 206 Args: 207 url: URL to be verified. 208 209 Returns: 210 True if url exists, otherwise False. 211 """ 212 parsed = urlparse.urlparse(url) 213 try: 214 conn = httplib.HTTPConnection(parsed.netloc) 215 conn.request('HEAD', parsed.path) 216 response = conn.getresponse() 217 except (socket.gaierror, socket.error): 218 return False 219 finally: 220 conn.close() 221 # Follow both permanent (301) and temporary (302) redirects. 222 if response.status == 302 or response.status == 301: 223 return DoesUrlExist(response.getheader('location')) 224 return response.status == 200 225 226 227class _GTestTextTestResult(unittest._TextTestResult): 228 """A test result class that can print formatted text results to a stream. 229 230 Results printed in conformance with gtest output format, like: 231 [ RUN ] autofill.AutofillTest.testAutofillInvalid: "test desc." 232 [ OK ] autofill.AutofillTest.testAutofillInvalid 233 [ RUN ] autofill.AutofillTest.testFillProfile: "test desc." 234 [ OK ] autofill.AutofillTest.testFillProfile 235 [ RUN ] autofill.AutofillTest.testFillProfileCrazyCharacters: "Test." 236 [ OK ] autofill.AutofillTest.testFillProfileCrazyCharacters 237 """ 238 239 def __init__(self, stream, descriptions, verbosity): 240 unittest._TextTestResult.__init__(self, stream, descriptions, verbosity) 241 242 def _GetTestURI(self, test): 243 return '%s.%s' % (unittest._strclass(test.__class__), test._testMethodName) 244 245 def getDescription(self, test): 246 return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription()) 247 248 def startTest(self, test): 249 unittest.TestResult.startTest(self, test) 250 self.stream.writeln('[ RUN ] %s' % self.getDescription(test)) 251 252 def addSuccess(self, test): 253 unittest.TestResult.addSuccess(self, test) 254 self.stream.writeln('[ OK ] %s' % self._GetTestURI(test)) 255 256 def addError(self, test, err): 257 unittest.TestResult.addError(self, test, err) 258 self.stream.writeln('[ ERROR ] %s' % self._GetTestURI(test)) 259 260 def addFailure(self, test, err): 261 unittest.TestResult.addFailure(self, test, err) 262 self.stream.writeln('[ FAILED ] %s' % self._GetTestURI(test)) 263 264 265class GTestTextTestRunner(unittest.TextTestRunner): 266 """Test Runner for displaying test results in textual format. 267 268 Results are displayed in conformance with gtest output. 269 """ 270 271 def __init__(self, verbosity=1): 272 unittest.TextTestRunner.__init__(self, stream=sys.stderr, 273 verbosity=verbosity) 274 275 def _makeResult(self): 276 return _GTestTextTestResult(self.stream, self.descriptions, self.verbosity) 277