1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Utilities for PyAuto."""
6
7from __future__ import print_function
8
9import httplib
10import logging
11import os
12import shutil
13import socket
14import sys
15import tempfile
16import unittest
17import urlparse
18import zipfile
19
20
21class ExistingPathReplacer(object):
22  """Facilitates backing up a given path (file or dir)..
23
24  Often you want to manipulate a directory or file for testing but don't want to
25  meddle with the existing contents.  This class lets you make a backup, and
26  reinstate the backup when done.  A backup is made in an adjacent directory,
27  so you need to make sure you have write permissions to the parent directory.
28
29  Works seemlessly in cases where the requested path already exists, or not.
30
31  Automatically reinstates the backed up path (if any) when object is deleted.
32  """
33  _path = ''
34  _backup_dir = None  # dir to which existing content is backed up
35  _backup_basename = ''
36
37  def __init__(self, path, path_type='dir'):
38    """Initialize the object, making backups if necessary.
39
40    Args:
41      path: the requested path to file or directory
42      path_type: path type. Options: 'file', 'dir'. Default: 'dir'
43    """
44    assert path_type in ('file', 'dir'), 'Invalid path_type: %s' % path_type
45    self._path_type = path_type
46    self._path = path
47    if os.path.exists(self._path):
48      if 'dir' == self._path_type:
49        assert os.path.isdir(self._path), '%s is not a directory' % self._path
50      else:
51        assert os.path.isfile(self._path), '%s is not a file' % self._path
52      # take a backup
53      self._backup_basename = os.path.basename(self._path)
54      self._backup_dir = tempfile.mkdtemp(dir=os.path.dirname(self._path),
55                                          prefix='bkp-' + self._backup_basename)
56      logging.info('Backing up %s in %s' % (self._path, self._backup_dir))
57      shutil.move(self._path,
58                  os.path.join(self._backup_dir, self._backup_basename))
59    self._CreateRequestedPath()
60
61  def __del__(self):
62    """Cleanup. Reinstate backup."""
63    self._CleanupRequestedPath()
64    if self._backup_dir:  # Reinstate, if backed up.
65      from_path = os.path.join(self._backup_dir, self._backup_basename)
66      logging.info('Reinstating backup from %s to %s' % (from_path, self._path))
67      shutil.move(from_path, self._path)
68    self._RemoveBackupDir()
69
70  def _CreateRequestedPath(self):
71    # Create intermediate dirs if needed.
72    if not os.path.exists(os.path.dirname(self._path)):
73      os.makedirs(os.path.dirname(self._path))
74    if 'dir' == self._path_type:
75      os.mkdir(self._path)
76    else:
77      open(self._path, 'w').close()
78
79  def _CleanupRequestedPath(self):
80    if os.path.exists(self._path):
81      if os.path.isdir(self._path):
82        shutil.rmtree(self._path, ignore_errors=True)
83      else:
84        os.remove(self._path)
85
86  def _RemoveBackupDir(self):
87    if self._backup_dir and os.path.isdir(self._backup_dir):
88      shutil.rmtree(self._backup_dir, ignore_errors=True)
89
90
91def RemovePath(path):
92  """Remove the given path (file or dir)."""
93  if os.path.isdir(path):
94    shutil.rmtree(path, ignore_errors=True)
95    return
96  try:
97    os.remove(path)
98  except OSError:
99    pass
100
101
102def UnzipFilenameToDir(filename, dir):
103  """Unzip |filename| to directory |dir|.
104
105  This works with as low as python2.4 (used on win).
106  """
107  zf = zipfile.ZipFile(filename)
108  pushd = os.getcwd()
109  if not os.path.isdir(dir):
110    os.mkdir(dir)
111  os.chdir(dir)
112  # Extract files.
113  for info in zf.infolist():
114    name = info.filename
115    if name.endswith('/'):  # dir
116      if not os.path.isdir(name):
117        os.makedirs(name)
118    else:  # file
119      dir = os.path.dirname(name)
120      if not os.path.isdir(dir):
121        os.makedirs(dir)
122      out = open(name, 'wb')
123      out.write(zf.read(name))
124      out.close()
125    # Set permissions. Permission info in external_attr is shifted 16 bits.
126    os.chmod(name, info.external_attr >> 16)
127  os.chdir(pushd)
128
129
130def GetCurrentPlatform():
131  """Get a string representation for the current platform.
132
133  Returns:
134    'mac', 'win' or 'linux'
135  """
136  if sys.platform == 'darwin':
137    return 'mac'
138  if sys.platform == 'win32':
139    return 'win'
140  if sys.platform.startswith('linux'):
141    return 'linux'
142  raise RuntimeError('Unknown platform')
143
144
145def PrintPerfResult(graph_name, series_name, data_point, units,
146                    show_on_waterfall=False):
147  """Prints a line to stdout that is specially formatted for the perf bots.
148
149  Args:
150    graph_name: String name for the graph on which to plot the data.
151    series_name: String name for the series (line on the graph) associated with
152                 the data.  This is also the string displayed on the waterfall
153                 if |show_on_waterfall| is True.
154    data_point: Numeric data value to plot on the graph for the current build.
155                This can be a single value or an array of values.  If an array,
156                the graph will plot the average of the values, along with error
157                bars.
158    units: The string unit of measurement for the given |data_point|.
159    show_on_waterfall: Whether or not to display this result directly on the
160                       buildbot waterfall itself (in the buildbot step running
161                       this test on the waterfall page, not the stdio page).
162  """
163  waterfall_indicator = ['', '*'][show_on_waterfall]
164  print('%sRESULT %s: %s= %s %s' % (
165      waterfall_indicator, graph_name, series_name,
166      str(data_point).replace(' ', ''), units))
167  sys.stdout.flush()
168
169
170def Shard(ilist, shard_index, num_shards):
171  """Shard a given list and return the group at index |shard_index|.
172
173  Args:
174    ilist: input list
175    shard_index: 0-based sharding index
176    num_shards: shard count
177  """
178  chunk_size = len(ilist) / num_shards
179  chunk_start = shard_index * chunk_size
180  if shard_index == num_shards - 1:  # Exhaust the remainder in the last shard.
181    chunk_end = len(ilist)
182  else:
183    chunk_end = chunk_start + chunk_size
184  return ilist[chunk_start:chunk_end]
185
186
187def WaitForDomElement(pyauto, driver, xpath):
188  """Wait for the UI element to appear.
189
190  Args:
191    pyauto: an instance of pyauto.PyUITest.
192    driver: an instance of chrome driver or a web element.
193    xpath: the xpath of the element to wait for.
194
195  Returns:
196    The element if it is found.
197    NoSuchElementException if it is not found.
198  """
199  pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0)
200  return driver.find_element_by_xpath(xpath)
201
202
203def DoesUrlExist(url):
204  """Determines whether a resource exists at the given URL.
205
206  Args:
207    url: URL to be verified.
208
209  Returns:
210    True if url exists, otherwise False.
211  """
212  parsed = urlparse.urlparse(url)
213  try:
214    conn = httplib.HTTPConnection(parsed.netloc)
215    conn.request('HEAD', parsed.path)
216    response = conn.getresponse()
217  except (socket.gaierror, socket.error):
218    return False
219  finally:
220    conn.close()
221  # Follow both permanent (301) and temporary (302) redirects.
222  if response.status == 302 or response.status == 301:
223    return DoesUrlExist(response.getheader('location'))
224  return response.status == 200
225
226
227class _GTestTextTestResult(unittest._TextTestResult):
228  """A test result class that can print formatted text results to a stream.
229
230  Results printed in conformance with gtest output format, like:
231  [ RUN        ] autofill.AutofillTest.testAutofillInvalid: "test desc."
232  [         OK ] autofill.AutofillTest.testAutofillInvalid
233  [ RUN        ] autofill.AutofillTest.testFillProfile: "test desc."
234  [         OK ] autofill.AutofillTest.testFillProfile
235  [ RUN        ] autofill.AutofillTest.testFillProfileCrazyCharacters: "Test."
236  [         OK ] autofill.AutofillTest.testFillProfileCrazyCharacters
237  """
238
239  def __init__(self, stream, descriptions, verbosity):
240    unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
241
242  def _GetTestURI(self, test):
243    return '%s.%s' % (unittest._strclass(test.__class__), test._testMethodName)
244
245  def getDescription(self, test):
246    return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription())
247
248  def startTest(self, test):
249    unittest.TestResult.startTest(self, test)
250    self.stream.writeln('[ RUN        ] %s' % self.getDescription(test))
251
252  def addSuccess(self, test):
253    unittest.TestResult.addSuccess(self, test)
254    self.stream.writeln('[         OK ] %s' % self._GetTestURI(test))
255
256  def addError(self, test, err):
257    unittest.TestResult.addError(self, test, err)
258    self.stream.writeln('[      ERROR ] %s' % self._GetTestURI(test))
259
260  def addFailure(self, test, err):
261    unittest.TestResult.addFailure(self, test, err)
262    self.stream.writeln('[     FAILED ] %s' % self._GetTestURI(test))
263
264
265class GTestTextTestRunner(unittest.TextTestRunner):
266  """Test Runner for displaying test results in textual format.
267
268  Results are displayed in conformance with gtest output.
269  """
270
271  def __init__(self, verbosity=1):
272    unittest.TextTestRunner.__init__(self, stream=sys.stderr,
273                                     verbosity=verbosity)
274
275  def _makeResult(self):
276    return _GTestTextTestResult(self.stream, self.descriptions, self.verbosity)
277