1# Copyright 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Provides stubs for os, sys and subprocess for testing
6
7This test allows one to test code that itself uses os, sys, and subprocess.
8"""
9
10import ntpath
11import os
12import posixpath
13import re
14import shlex
15import sys
16
17
18class Override(object):
19
20  _overidden_modules = set()
21
22  def __init__(self, base_module, module_list):
23    self.cloud_storage = None
24    self.open = None
25    self.os = None
26    self.perf_control = None
27    self.raw_input = None
28    self.subprocess = None
29    self.sys = None
30    self.thermal_throttle = None
31    self.logging = None
32    stubs = {'cloud_storage': CloudStorageModuleStub,
33             'open': OpenFunctionStub,
34             'os': OsModuleStub,
35             'perf_control': PerfControlModuleStub,
36             'raw_input': RawInputFunctionStub,
37             'subprocess': SubprocessModuleStub,
38             'sys': SysModuleStub,
39             'thermal_throttle': ThermalThrottleModuleStub,
40             'logging': LoggingStub,
41    }
42    self.adb_commands = None
43    self.os = None
44    self.subprocess = None
45    self.sys = None
46
47    assert base_module not in self._overidden_modules, (
48        '%s is already overridden' % base_module.__name__)
49    self._overidden_modules.add(base_module)
50    self._base_module = base_module
51    self._overrides = {}
52
53    for module_name in module_list:
54      self._overrides[module_name] = getattr(base_module, module_name, None)
55      setattr(self, module_name, stubs[module_name]())
56      setattr(base_module, module_name, getattr(self, module_name))
57
58    if self.os and self.sys:
59      self.os.path.sys = self.sys
60
61  def __del__(self):
62    assert not len(self._overrides)
63
64  def Restore(self):
65    for module_name, original_module in self._overrides.iteritems():
66      if original_module is None:
67        # This will happen when we override built-in functions, like open.
68        # If we don't delete the attribute, we will shadow the built-in
69        # function with an attribute set to None.
70        delattr(self._base_module, module_name)
71      else:
72        setattr(self._base_module, module_name, original_module)
73    self._overrides = {}
74    self._overidden_modules.remove(self._base_module)
75    self._base_module = None
76
77
78class AdbDevice(object):
79
80  def __init__(self):
81    self.has_root = False
82    self.needs_su = False
83    self.shell_command_handlers = {}
84    self.mock_content = []
85    self.system_properties = {}
86    if self.system_properties.get('ro.product.cpu.abi') == None:
87      self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a'
88
89  def HasRoot(self):
90    return self.has_root
91
92  def NeedsSU(self):
93    return self.needs_su
94
95  def RunShellCommand(self, args, **kwargs):
96    del kwargs  # unused
97    if isinstance(args, basestring):
98      args = shlex.split(args)
99    handler = self.shell_command_handlers[args[0]]
100    return handler(args)
101
102  def FileExists(self, _):
103    return False
104
105  def ReadFile(self, device_path, as_root=False):
106    del device_path, as_root  # unused
107    return self.mock_content
108
109  def GetProp(self, property_name):
110    return self.system_properties[property_name]
111
112  def SetProp(self, property_name, property_value):
113    self.system_properties[property_name] = property_value
114
115
116class CloudStorageModuleStub(object):
117  PUBLIC_BUCKET = 'chromium-telemetry'
118  PARTNER_BUCKET = 'chrome-partner-telemetry'
119  INTERNAL_BUCKET = 'chrome-telemetry'
120  BUCKET_ALIASES = {
121    'public': PUBLIC_BUCKET,
122    'partner': PARTNER_BUCKET,
123    'internal': INTERNAL_BUCKET,
124  }
125
126  # These are used to test for CloudStorage errors.
127  INTERNAL_PERMISSION = 2
128  PARTNER_PERMISSION = 1
129  PUBLIC_PERMISSION = 0
130  # Not logged in.
131  CREDENTIALS_ERROR_PERMISSION = -1
132
133  class NotFoundError(Exception):
134    pass
135
136  class CloudStorageError(Exception):
137    pass
138
139  class PermissionError(CloudStorageError):
140    pass
141
142  class CredentialsError(CloudStorageError):
143    pass
144
145  def __init__(self):
146    self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
147                                 CloudStorageModuleStub.PARTNER_BUCKET:{},
148                                 CloudStorageModuleStub.PUBLIC_BUCKET:{}}
149    self.remote_paths = self.default_remote_paths
150    self.local_file_hashes = {}
151    self.local_hash_files = {}
152    self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
153    self.downloaded_files = []
154
155  def SetPermissionLevelForTesting(self, permission_level):
156    self.permission_level = permission_level
157
158  def CheckPermissionLevelForBucket(self, bucket):
159    if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
160      return
161    elif (self.permission_level ==
162          CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
163      raise CloudStorageModuleStub.CredentialsError()
164    elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
165      if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
166        raise CloudStorageModuleStub.PermissionError()
167    elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
168      if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
169        raise CloudStorageModuleStub.PermissionError()
170    elif bucket not in self.remote_paths:
171      raise CloudStorageModuleStub.NotFoundError()
172
173  def SetRemotePathsForTesting(self, remote_path_dict=None):
174    if not remote_path_dict:
175      self.remote_paths = self.default_remote_paths
176      return
177    self.remote_paths = remote_path_dict
178
179  def GetRemotePathsForTesting(self):
180    if not self.remote_paths:
181      self.remote_paths = self.default_remote_paths
182    return self.remote_paths
183
184  # Set a dictionary of data files and their "calculated" hashes.
185  def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
186    self.local_file_hashes = calculated_hash_dictionary
187
188  def GetLocalDataFiles(self):
189    return self.local_file_hashes.keys()
190
191  # Set a dictionary of hash files and the hashes they should contain.
192  def SetHashFileContentsForTesting(self, hash_file_dictionary):
193    self.local_hash_files = hash_file_dictionary
194
195  def GetLocalHashFiles(self):
196    return self.local_hash_files.keys()
197
198  def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
199    self.remote_paths[bucket][remote_path] = new_hash
200
201  def List(self, bucket):
202    if not bucket or not bucket in self.remote_paths:
203      bucket_error = ('Incorrect bucket specified, correct buckets:' +
204                      str(self.remote_paths))
205      raise CloudStorageModuleStub.CloudStorageError(bucket_error)
206    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
207    return list(self.remote_paths[bucket].keys())
208
209  def Exists(self, bucket, remote_path):
210    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
211    return remote_path in self.remote_paths[bucket]
212
213  def Insert(self, bucket, remote_path, local_path):
214    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
215    if not local_path in self.GetLocalDataFiles():
216      file_path_error = 'Local file path does not exist'
217      raise CloudStorageModuleStub.CloudStorageError(file_path_error)
218    self.remote_paths[bucket][remote_path] = (
219      CloudStorageModuleStub.CalculateHash(self, local_path))
220    return remote_path
221
222  def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
223    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
224    if not remote_path in self.remote_paths[bucket]:
225      if only_if_changed:
226        return False
227      raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
228    remote_hash = self.remote_paths[bucket][remote_path]
229    local_hash = self.local_file_hashes[local_path]
230    if only_if_changed and remote_hash == local_hash:
231      return False
232    self.downloaded_files.append(remote_path)
233    self.local_file_hashes[local_path] = remote_hash
234    self.local_hash_files[local_path + '.sha1'] = remote_hash
235    return remote_hash
236
237  def Get(self, bucket, remote_path, local_path):
238    return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
239                                            local_path, False)
240
241  def GetIfChanged(self, local_path, bucket=None):
242    remote_path = os.path.basename(local_path)
243    if bucket:
244      return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
245                                              local_path, True)
246    result = CloudStorageModuleStub.GetHelper(
247        self, self.PUBLIC_BUCKET, remote_path, local_path, True)
248    if not result:
249      result = CloudStorageModuleStub.GetHelper(
250          self, self.PARTNER_BUCKET, remote_path, local_path, True)
251    if not result:
252      result = CloudStorageModuleStub.GetHelper(
253          self, self.INTERNAL_BUCKET, remote_path, local_path, True)
254    return result
255
256  def GetFilesInDirectoryIfChanged(self, directory, bucket):
257    if os.path.dirname(directory) == directory: # If in the root dir.
258      raise ValueError('Trying to serve root directory from HTTP server.')
259    for dirpath, _, filenames in os.walk(directory):
260      for filename in filenames:
261        path, extension = os.path.splitext(
262            os.path.join(dirpath, filename))
263        if extension != '.sha1':
264          continue
265        self.GetIfChanged(path, bucket)
266
267  def CalculateHash(self, file_path):
268    return self.local_file_hashes[file_path]
269
270  def ReadHash(self, hash_path):
271    return self.local_hash_files[hash_path]
272
273
274class LoggingStub(object):
275  def __init__(self):
276    self.warnings = []
277    self.errors = []
278
279  def info(self, msg, *args):
280    pass
281
282  def error(self, msg, *args):
283    self.errors.append(msg % args)
284
285  def warning(self, msg, *args):
286    self.warnings.append(msg % args)
287
288  def warn(self, msg, *args):
289    self.warning(msg, *args)
290
291
292class OpenFunctionStub(object):
293  class FileStub(object):
294    def __init__(self, data):
295      self._data = data
296
297    def __enter__(self):
298      return self
299
300    def __exit__(self, *args):
301      pass
302
303    def read(self, size=None):
304      if size:
305        return self._data[:size]
306      else:
307        return self._data
308
309    def write(self, data):
310      self._data.write(data)
311
312    def close(self):
313      pass
314
315  def __init__(self):
316    self.files = {}
317
318  def __call__(self, name, *args, **kwargs):
319    return OpenFunctionStub.FileStub(self.files[name])
320
321
322class OsModuleStub(object):
323  class OsEnvironModuleStub(object):
324    def get(self, _):
325      return None
326
327  class OsPathModuleStub(object):
328    def __init__(self, sys_module):
329      self.sys = sys_module
330      self.files = []
331      self.dirs = []
332
333    def exists(self, path):
334      return path in self.files
335
336    def isfile(self, path):
337      return path in self.files
338
339    def isdir(self, path):
340      return path in self.dirs
341
342    def join(self, *paths):
343      def IsAbsolutePath(path):
344        if self.sys.platform.startswith('win'):
345          return re.match('[a-zA-Z]:\\\\', path)
346        else:
347          return path.startswith('/')
348
349      # Per Python specification, if any component is an absolute path,
350      # discard previous components.
351      for index, path in reversed(list(enumerate(paths))):
352        if IsAbsolutePath(path):
353          paths = paths[index:]
354          break
355
356      if self.sys.platform.startswith('win'):
357        tmp = os.path.join(*paths)
358        return tmp.replace('/', '\\')
359      else:
360        tmp = os.path.join(*paths)
361        return tmp.replace('\\', '/')
362
363    def basename(self, path):
364      if self.sys.platform.startswith('win'):
365        return ntpath.basename(path)
366      else:
367        return posixpath.basename(path)
368
369    @staticmethod
370    def abspath(path):
371      return os.path.abspath(path)
372
373    @staticmethod
374    def expanduser(path):
375      return os.path.expanduser(path)
376
377    @staticmethod
378    def dirname(path):
379      return os.path.dirname(path)
380
381    @staticmethod
382    def realpath(path):
383      return os.path.realpath(path)
384
385    @staticmethod
386    def split(path):
387      return os.path.split(path)
388
389    @staticmethod
390    def splitext(path):
391      return os.path.splitext(path)
392
393    @staticmethod
394    def splitdrive(path):
395      return os.path.splitdrive(path)
396
397  X_OK = os.X_OK
398
399  sep = os.sep
400  pathsep = os.pathsep
401
402  def __init__(self, sys_module=sys):
403    self.path = OsModuleStub.OsPathModuleStub(sys_module)
404    self.environ = OsModuleStub.OsEnvironModuleStub()
405    self.display = ':0'
406    self.local_app_data = None
407    self.sys_path = None
408    self.program_files = None
409    self.program_files_x86 = None
410    self.devnull = os.devnull
411    self._directory = {}
412
413  def access(self, path, _):
414    return path in self.path.files
415
416  def getenv(self, name, value=None):
417    if name == 'DISPLAY':
418      env = self.display
419    elif name == 'LOCALAPPDATA':
420      env = self.local_app_data
421    elif name == 'PATH':
422      env = self.sys_path
423    elif name == 'PROGRAMFILES':
424      env = self.program_files
425    elif name == 'PROGRAMFILES(X86)':
426      env = self.program_files_x86
427    else:
428      raise NotImplementedError('Unsupported getenv')
429    return env if env else value
430
431  def chdir(self, path):
432    pass
433
434  def walk(self, top):
435    for dir_name in self._directory:
436      yield top, dir_name, self._directory[dir_name]
437
438
439class PerfControlModuleStub(object):
440  class PerfControlStub(object):
441    def __init__(self, adb):
442      pass
443
444  def __init__(self):
445    self.PerfControl = PerfControlModuleStub.PerfControlStub
446
447
448class RawInputFunctionStub(object):
449  def __init__(self):
450    self.input = ''
451
452  def __call__(self, name, *args, **kwargs):
453    return self.input
454
455
456class SubprocessModuleStub(object):
457  class PopenStub(object):
458    def __init__(self):
459      self.communicate_result = ('', '')
460      self.returncode_result = 0
461
462    def __call__(self, args, **kwargs):
463      return self
464
465    def communicate(self):
466      return self.communicate_result
467
468    @property
469    def returncode(self):
470      return self.returncode_result
471
472  def __init__(self):
473    self.Popen = SubprocessModuleStub.PopenStub()
474    self.PIPE = None
475
476  def call(self, *args, **kwargs):
477    pass
478
479
480class SysModuleStub(object):
481  def __init__(self):
482    self.platform = ''
483
484
485class ThermalThrottleModuleStub(object):
486  class ThermalThrottleStub(object):
487    def __init__(self, adb):
488      pass
489
490  def __init__(self):
491    self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub
492