1# Copyright 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Provides stubs for os, sys and subprocess for testing 6 7This test allows one to test code that itself uses os, sys, and subprocess. 8""" 9 10import ntpath 11import os 12import posixpath 13import re 14import shlex 15import sys 16 17 18class Override(object): 19 20 _overidden_modules = set() 21 22 def __init__(self, base_module, module_list): 23 self.cloud_storage = None 24 self.open = None 25 self.os = None 26 self.perf_control = None 27 self.raw_input = None 28 self.subprocess = None 29 self.sys = None 30 self.thermal_throttle = None 31 self.logging = None 32 stubs = {'cloud_storage': CloudStorageModuleStub, 33 'open': OpenFunctionStub, 34 'os': OsModuleStub, 35 'perf_control': PerfControlModuleStub, 36 'raw_input': RawInputFunctionStub, 37 'subprocess': SubprocessModuleStub, 38 'sys': SysModuleStub, 39 'thermal_throttle': ThermalThrottleModuleStub, 40 'logging': LoggingStub, 41 } 42 self.adb_commands = None 43 self.os = None 44 self.subprocess = None 45 self.sys = None 46 47 assert base_module not in self._overidden_modules, ( 48 '%s is already overridden' % base_module.__name__) 49 self._overidden_modules.add(base_module) 50 self._base_module = base_module 51 self._overrides = {} 52 53 for module_name in module_list: 54 self._overrides[module_name] = getattr(base_module, module_name, None) 55 setattr(self, module_name, stubs[module_name]()) 56 setattr(base_module, module_name, getattr(self, module_name)) 57 58 if self.os and self.sys: 59 self.os.path.sys = self.sys 60 61 def __del__(self): 62 assert not len(self._overrides) 63 64 def Restore(self): 65 for module_name, original_module in self._overrides.iteritems(): 66 if original_module is None: 67 # This will happen when we override built-in functions, like open. 68 # If we don't delete the attribute, we will shadow the built-in 69 # function with an attribute set to None. 70 delattr(self._base_module, module_name) 71 else: 72 setattr(self._base_module, module_name, original_module) 73 self._overrides = {} 74 self._overidden_modules.remove(self._base_module) 75 self._base_module = None 76 77 78class AdbDevice(object): 79 80 def __init__(self): 81 self.has_root = False 82 self.needs_su = False 83 self.shell_command_handlers = {} 84 self.mock_content = [] 85 self.system_properties = {} 86 if self.system_properties.get('ro.product.cpu.abi') == None: 87 self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a' 88 89 def HasRoot(self): 90 return self.has_root 91 92 def NeedsSU(self): 93 return self.needs_su 94 95 def RunShellCommand(self, args, **kwargs): 96 del kwargs # unused 97 if isinstance(args, basestring): 98 args = shlex.split(args) 99 handler = self.shell_command_handlers[args[0]] 100 return handler(args) 101 102 def FileExists(self, _): 103 return False 104 105 def ReadFile(self, device_path, as_root=False): 106 del device_path, as_root # unused 107 return self.mock_content 108 109 def GetProp(self, property_name): 110 return self.system_properties[property_name] 111 112 def SetProp(self, property_name, property_value): 113 self.system_properties[property_name] = property_value 114 115 116class CloudStorageModuleStub(object): 117 PUBLIC_BUCKET = 'chromium-telemetry' 118 PARTNER_BUCKET = 'chrome-partner-telemetry' 119 INTERNAL_BUCKET = 'chrome-telemetry' 120 BUCKET_ALIASES = { 121 'public': PUBLIC_BUCKET, 122 'partner': PARTNER_BUCKET, 123 'internal': INTERNAL_BUCKET, 124 } 125 126 # These are used to test for CloudStorage errors. 127 INTERNAL_PERMISSION = 2 128 PARTNER_PERMISSION = 1 129 PUBLIC_PERMISSION = 0 130 # Not logged in. 131 CREDENTIALS_ERROR_PERMISSION = -1 132 133 class NotFoundError(Exception): 134 pass 135 136 class CloudStorageError(Exception): 137 pass 138 139 class PermissionError(CloudStorageError): 140 pass 141 142 class CredentialsError(CloudStorageError): 143 pass 144 145 def __init__(self): 146 self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{}, 147 CloudStorageModuleStub.PARTNER_BUCKET:{}, 148 CloudStorageModuleStub.PUBLIC_BUCKET:{}} 149 self.remote_paths = self.default_remote_paths 150 self.local_file_hashes = {} 151 self.local_hash_files = {} 152 self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION 153 self.downloaded_files = [] 154 155 def SetPermissionLevelForTesting(self, permission_level): 156 self.permission_level = permission_level 157 158 def CheckPermissionLevelForBucket(self, bucket): 159 if bucket == CloudStorageModuleStub.PUBLIC_BUCKET: 160 return 161 elif (self.permission_level == 162 CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION): 163 raise CloudStorageModuleStub.CredentialsError() 164 elif bucket == CloudStorageModuleStub.PARTNER_BUCKET: 165 if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION: 166 raise CloudStorageModuleStub.PermissionError() 167 elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET: 168 if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION: 169 raise CloudStorageModuleStub.PermissionError() 170 elif bucket not in self.remote_paths: 171 raise CloudStorageModuleStub.NotFoundError() 172 173 def SetRemotePathsForTesting(self, remote_path_dict=None): 174 if not remote_path_dict: 175 self.remote_paths = self.default_remote_paths 176 return 177 self.remote_paths = remote_path_dict 178 179 def GetRemotePathsForTesting(self): 180 if not self.remote_paths: 181 self.remote_paths = self.default_remote_paths 182 return self.remote_paths 183 184 # Set a dictionary of data files and their "calculated" hashes. 185 def SetCalculatedHashesForTesting(self, calculated_hash_dictionary): 186 self.local_file_hashes = calculated_hash_dictionary 187 188 def GetLocalDataFiles(self): 189 return self.local_file_hashes.keys() 190 191 # Set a dictionary of hash files and the hashes they should contain. 192 def SetHashFileContentsForTesting(self, hash_file_dictionary): 193 self.local_hash_files = hash_file_dictionary 194 195 def GetLocalHashFiles(self): 196 return self.local_hash_files.keys() 197 198 def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash): 199 self.remote_paths[bucket][remote_path] = new_hash 200 201 def List(self, bucket): 202 if not bucket or not bucket in self.remote_paths: 203 bucket_error = ('Incorrect bucket specified, correct buckets:' + 204 str(self.remote_paths)) 205 raise CloudStorageModuleStub.CloudStorageError(bucket_error) 206 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 207 return list(self.remote_paths[bucket].keys()) 208 209 def Exists(self, bucket, remote_path): 210 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 211 return remote_path in self.remote_paths[bucket] 212 213 def Insert(self, bucket, remote_path, local_path): 214 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 215 if not local_path in self.GetLocalDataFiles(): 216 file_path_error = 'Local file path does not exist' 217 raise CloudStorageModuleStub.CloudStorageError(file_path_error) 218 self.remote_paths[bucket][remote_path] = ( 219 CloudStorageModuleStub.CalculateHash(self, local_path)) 220 return remote_path 221 222 def GetHelper(self, bucket, remote_path, local_path, only_if_changed): 223 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 224 if not remote_path in self.remote_paths[bucket]: 225 if only_if_changed: 226 return False 227 raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.') 228 remote_hash = self.remote_paths[bucket][remote_path] 229 local_hash = self.local_file_hashes[local_path] 230 if only_if_changed and remote_hash == local_hash: 231 return False 232 self.downloaded_files.append(remote_path) 233 self.local_file_hashes[local_path] = remote_hash 234 self.local_hash_files[local_path + '.sha1'] = remote_hash 235 return remote_hash 236 237 def Get(self, bucket, remote_path, local_path): 238 return CloudStorageModuleStub.GetHelper(self, bucket, remote_path, 239 local_path, False) 240 241 def GetIfChanged(self, local_path, bucket=None): 242 remote_path = os.path.basename(local_path) 243 if bucket: 244 return CloudStorageModuleStub.GetHelper(self, bucket, remote_path, 245 local_path, True) 246 result = CloudStorageModuleStub.GetHelper( 247 self, self.PUBLIC_BUCKET, remote_path, local_path, True) 248 if not result: 249 result = CloudStorageModuleStub.GetHelper( 250 self, self.PARTNER_BUCKET, remote_path, local_path, True) 251 if not result: 252 result = CloudStorageModuleStub.GetHelper( 253 self, self.INTERNAL_BUCKET, remote_path, local_path, True) 254 return result 255 256 def GetFilesInDirectoryIfChanged(self, directory, bucket): 257 if os.path.dirname(directory) == directory: # If in the root dir. 258 raise ValueError('Trying to serve root directory from HTTP server.') 259 for dirpath, _, filenames in os.walk(directory): 260 for filename in filenames: 261 path, extension = os.path.splitext( 262 os.path.join(dirpath, filename)) 263 if extension != '.sha1': 264 continue 265 self.GetIfChanged(path, bucket) 266 267 def CalculateHash(self, file_path): 268 return self.local_file_hashes[file_path] 269 270 def ReadHash(self, hash_path): 271 return self.local_hash_files[hash_path] 272 273 274class LoggingStub(object): 275 def __init__(self): 276 self.warnings = [] 277 self.errors = [] 278 279 def info(self, msg, *args): 280 pass 281 282 def error(self, msg, *args): 283 self.errors.append(msg % args) 284 285 def warning(self, msg, *args): 286 self.warnings.append(msg % args) 287 288 def warn(self, msg, *args): 289 self.warning(msg, *args) 290 291 292class OpenFunctionStub(object): 293 class FileStub(object): 294 def __init__(self, data): 295 self._data = data 296 297 def __enter__(self): 298 return self 299 300 def __exit__(self, *args): 301 pass 302 303 def read(self, size=None): 304 if size: 305 return self._data[:size] 306 else: 307 return self._data 308 309 def write(self, data): 310 self._data.write(data) 311 312 def close(self): 313 pass 314 315 def __init__(self): 316 self.files = {} 317 318 def __call__(self, name, *args, **kwargs): 319 return OpenFunctionStub.FileStub(self.files[name]) 320 321 322class OsModuleStub(object): 323 class OsEnvironModuleStub(object): 324 def get(self, _): 325 return None 326 327 class OsPathModuleStub(object): 328 def __init__(self, sys_module): 329 self.sys = sys_module 330 self.files = [] 331 self.dirs = [] 332 333 def exists(self, path): 334 return path in self.files 335 336 def isfile(self, path): 337 return path in self.files 338 339 def isdir(self, path): 340 return path in self.dirs 341 342 def join(self, *paths): 343 def IsAbsolutePath(path): 344 if self.sys.platform.startswith('win'): 345 return re.match('[a-zA-Z]:\\\\', path) 346 else: 347 return path.startswith('/') 348 349 # Per Python specification, if any component is an absolute path, 350 # discard previous components. 351 for index, path in reversed(list(enumerate(paths))): 352 if IsAbsolutePath(path): 353 paths = paths[index:] 354 break 355 356 if self.sys.platform.startswith('win'): 357 tmp = os.path.join(*paths) 358 return tmp.replace('/', '\\') 359 else: 360 tmp = os.path.join(*paths) 361 return tmp.replace('\\', '/') 362 363 def basename(self, path): 364 if self.sys.platform.startswith('win'): 365 return ntpath.basename(path) 366 else: 367 return posixpath.basename(path) 368 369 @staticmethod 370 def abspath(path): 371 return os.path.abspath(path) 372 373 @staticmethod 374 def expanduser(path): 375 return os.path.expanduser(path) 376 377 @staticmethod 378 def dirname(path): 379 return os.path.dirname(path) 380 381 @staticmethod 382 def realpath(path): 383 return os.path.realpath(path) 384 385 @staticmethod 386 def split(path): 387 return os.path.split(path) 388 389 @staticmethod 390 def splitext(path): 391 return os.path.splitext(path) 392 393 @staticmethod 394 def splitdrive(path): 395 return os.path.splitdrive(path) 396 397 X_OK = os.X_OK 398 399 sep = os.sep 400 pathsep = os.pathsep 401 402 def __init__(self, sys_module=sys): 403 self.path = OsModuleStub.OsPathModuleStub(sys_module) 404 self.environ = OsModuleStub.OsEnvironModuleStub() 405 self.display = ':0' 406 self.local_app_data = None 407 self.sys_path = None 408 self.program_files = None 409 self.program_files_x86 = None 410 self.devnull = os.devnull 411 self._directory = {} 412 413 def access(self, path, _): 414 return path in self.path.files 415 416 def getenv(self, name, value=None): 417 if name == 'DISPLAY': 418 env = self.display 419 elif name == 'LOCALAPPDATA': 420 env = self.local_app_data 421 elif name == 'PATH': 422 env = self.sys_path 423 elif name == 'PROGRAMFILES': 424 env = self.program_files 425 elif name == 'PROGRAMFILES(X86)': 426 env = self.program_files_x86 427 else: 428 raise NotImplementedError('Unsupported getenv') 429 return env if env else value 430 431 def chdir(self, path): 432 pass 433 434 def walk(self, top): 435 for dir_name in self._directory: 436 yield top, dir_name, self._directory[dir_name] 437 438 439class PerfControlModuleStub(object): 440 class PerfControlStub(object): 441 def __init__(self, adb): 442 pass 443 444 def __init__(self): 445 self.PerfControl = PerfControlModuleStub.PerfControlStub 446 447 448class RawInputFunctionStub(object): 449 def __init__(self): 450 self.input = '' 451 452 def __call__(self, name, *args, **kwargs): 453 return self.input 454 455 456class SubprocessModuleStub(object): 457 class PopenStub(object): 458 def __init__(self): 459 self.communicate_result = ('', '') 460 self.returncode_result = 0 461 462 def __call__(self, args, **kwargs): 463 return self 464 465 def communicate(self): 466 return self.communicate_result 467 468 @property 469 def returncode(self): 470 return self.returncode_result 471 472 def __init__(self): 473 self.Popen = SubprocessModuleStub.PopenStub() 474 self.PIPE = None 475 476 def call(self, *args, **kwargs): 477 pass 478 479 480class SysModuleStub(object): 481 def __init__(self): 482 self.platform = '' 483 484 485class ThermalThrottleModuleStub(object): 486 class ThermalThrottleStub(object): 487 def __init__(self, adb): 488 pass 489 490 def __init__(self): 491 self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub 492