1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8# pylint: disable=missing-docstring 9 10import copy 11from datetime import datetime 12import getpass 13import glob 14import logging 15import os 16import re 17import shutil 18import sys 19import time 20import traceback 21import types 22import weakref 23 24import common 25from autotest_lib.client.bin import client_logging_config 26from autotest_lib.client.bin import harness 27from autotest_lib.client.bin import local_host 28from autotest_lib.client.bin import parallel 29from autotest_lib.client.bin import partition as partition_lib 30from autotest_lib.client.bin import profilers 31from autotest_lib.client.bin import sysinfo 32from autotest_lib.client.bin import test 33from autotest_lib.client.bin import utils 34from autotest_lib.client.common_lib import barrier 35from autotest_lib.client.common_lib import base_job 36from autotest_lib.client.common_lib import packages 37from autotest_lib.client.common_lib import error 38from autotest_lib.client.common_lib import global_config 39from autotest_lib.client.common_lib import logging_manager 40from autotest_lib.client.common_lib import packages 41from autotest_lib.client.cros import cros_logging 42from autotest_lib.client.tools import html_report 43 44GLOBAL_CONFIG = global_config.global_config 45 46LAST_BOOT_TAG = object() 47JOB_PREAMBLE = """ 48from autotest_lib.client.common_lib.error import * 49from autotest_lib.client.bin.utils import * 50""" 51 52 53class StepError(error.AutotestError): 54 pass 55 56class NotAvailableError(error.AutotestError): 57 pass 58 59 60 61def _run_test_complete_on_exit(f): 62 """Decorator for job methods that automatically calls 63 self.harness.run_test_complete when the method exits, if appropriate.""" 64 def wrapped(self, *args, **dargs): 65 try: 66 return f(self, *args, **dargs) 67 finally: 68 if self._logger.global_filename == 'status': 69 self.harness.run_test_complete() 70 if self.drop_caches: 71 utils.drop_caches() 72 wrapped.__name__ = f.__name__ 73 wrapped.__doc__ = f.__doc__ 74 wrapped.__dict__.update(f.__dict__) 75 return wrapped 76 77 78class status_indenter(base_job.status_indenter): 79 """Provide a status indenter that is backed by job._record_prefix.""" 80 def __init__(self, job_): 81 self._job = weakref.proxy(job_) # avoid a circular reference 82 83 84 @property 85 def indent(self): 86 return self._job._record_indent 87 88 89 def increment(self): 90 self._job._record_indent += 1 91 92 93 def decrement(self): 94 self._job._record_indent -= 1 95 96 97class base_client_job(base_job.base_job): 98 """The client-side concrete implementation of base_job. 99 100 Optional properties provided by this implementation: 101 control 102 harness 103 """ 104 105 _WARNING_DISABLE_DELAY = 5 106 107 # _record_indent is a persistent property, but only on the client 108 _job_state = base_job.base_job._job_state 109 _record_indent = _job_state.property_factory( 110 '_state', '_record_indent', 0, namespace='client') 111 _max_disk_usage_rate = _job_state.property_factory( 112 '_state', '_max_disk_usage_rate', 0.0, namespace='client') 113 114 115 def __init__(self, control, options, drop_caches=True): 116 """ 117 Prepare a client side job object. 118 119 @param control: The control file (pathname of). 120 @param options: an object which includes: 121 jobtag: The job tag string (eg "default"). 122 cont: If this is the continuation of this job. 123 harness_type: An alternative server harness. [None] 124 use_external_logging: If true, the enable_external_logging 125 method will be called during construction. [False] 126 @param drop_caches: If true, utils.drop_caches() is called before and 127 between all tests. [True] 128 """ 129 super(base_client_job, self).__init__(options=options) 130 self._pre_record_init(control, options) 131 try: 132 self._post_record_init(control, options, drop_caches) 133 except Exception, err: 134 self.record( 135 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 136 str(err)) 137 raise 138 139 140 @classmethod 141 def _get_environ_autodir(cls): 142 return os.environ['AUTODIR'] 143 144 145 @classmethod 146 def _find_base_directories(cls): 147 """ 148 Determine locations of autodir and clientdir (which are the same) 149 using os.environ. Serverdir does not exist in this context. 150 """ 151 autodir = clientdir = cls._get_environ_autodir() 152 return autodir, clientdir, None 153 154 155 @classmethod 156 def _parse_args(cls, args): 157 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args) 158 159 160 def _find_resultdir(self, options): 161 """ 162 Determine the directory for storing results. On a client this is 163 always <autodir>/results/<tag>, where tag is passed in on the command 164 line as an option. 165 """ 166 output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT', 167 'output_dir', 168 default="") 169 if options.output_dir: 170 basedir = options.output_dir 171 elif output_dir_config: 172 basedir = output_dir_config 173 else: 174 basedir = self.autodir 175 176 return os.path.join(basedir, 'results', options.tag) 177 178 179 def _get_status_logger(self): 180 """Return a reference to the status logger.""" 181 return self._logger 182 183 184 def _pre_record_init(self, control, options): 185 """ 186 Initialization function that should peform ONLY the required 187 setup so that the self.record() method works. 188 189 As of now self.record() needs self.resultdir, self._group_level, 190 self.harness and of course self._logger. 191 """ 192 if not options.cont: 193 self._cleanup_debugdir_files() 194 self._cleanup_results_dir() 195 196 logging_manager.configure_logging( 197 client_logging_config.ClientLoggingConfig(), 198 results_dir=self.resultdir, 199 verbose=options.verbose) 200 logging.info('Writing results to %s', self.resultdir) 201 202 # init_group_level needs the state 203 self.control = os.path.realpath(control) 204 self._is_continuation = options.cont 205 self._current_step_ancestry = [] 206 self._next_step_index = 0 207 self._load_state() 208 209 _harness = self.handle_persistent_option(options, 'harness') 210 _harness_args = self.handle_persistent_option(options, 'harness_args') 211 212 self.harness = harness.select(_harness, self, _harness_args) 213 214 # set up the status logger 215 def client_job_record_hook(entry): 216 msg_tag = '' 217 if '.' in self._logger.global_filename: 218 msg_tag = self._logger.global_filename.split('.', 1)[1] 219 # send the entry to the job harness 220 message = '\n'.join([entry.message] + entry.extra_message_lines) 221 rendered_entry = self._logger.render_entry(entry) 222 self.harness.test_status_detail(entry.status_code, entry.subdir, 223 entry.operation, message, msg_tag, 224 entry.fields) 225 self.harness.test_status(rendered_entry, msg_tag) 226 # send the entry to stdout, if it's enabled 227 logging.info(rendered_entry) 228 self._logger = base_job.status_logger( 229 self, status_indenter(self), record_hook=client_job_record_hook, 230 tap_writer=self._tap) 231 232 233 def _post_record_init(self, control, options, drop_caches): 234 """ 235 Perform job initialization not required by self.record(). 236 """ 237 self._init_drop_caches(drop_caches) 238 239 self._init_packages() 240 241 self.sysinfo = sysinfo.sysinfo(self.resultdir) 242 self._load_sysinfo_state() 243 244 if not options.cont: 245 download = os.path.join(self.testdir, 'download') 246 if not os.path.exists(download): 247 os.mkdir(download) 248 249 shutil.copyfile(self.control, 250 os.path.join(self.resultdir, 'control')) 251 252 self.control = control 253 254 self.logging = logging_manager.get_logging_manager( 255 manage_stdout_and_stderr=True, redirect_fds=True) 256 self.logging.start_logging() 257 258 self.profilers = profilers.profilers(self) 259 260 self.machines = [options.hostname] 261 self.machine_dict_list = [{'hostname' : options.hostname}] 262 # Client side tests should always run the same whether or not they are 263 # running in the lab. 264 self.in_lab = False 265 self.hosts = set([local_host.LocalHost(hostname=options.hostname)]) 266 267 self.args = [] 268 if options.args: 269 self.args = self._parse_args(options.args) 270 271 if options.user: 272 self.user = options.user 273 else: 274 self.user = getpass.getuser() 275 276 self.sysinfo.log_per_reboot_data() 277 278 if not options.cont: 279 self.record('START', None, None) 280 281 self.harness.run_start() 282 283 if options.log: 284 self.enable_external_logging() 285 286 self.num_tests_run = None 287 self.num_tests_failed = None 288 289 self.warning_loggers = None 290 self.warning_manager = None 291 292 293 def _init_drop_caches(self, drop_caches): 294 """ 295 Perform the drop caches initialization. 296 """ 297 self.drop_caches_between_iterations = ( 298 GLOBAL_CONFIG.get_config_value('CLIENT', 299 'drop_caches_between_iterations', 300 type=bool, default=True)) 301 self.drop_caches = drop_caches 302 if self.drop_caches: 303 utils.drop_caches() 304 305 306 def _init_packages(self): 307 """ 308 Perform the packages support initialization. 309 """ 310 self.pkgmgr = packages.PackageManager( 311 self.autodir, run_function_dargs={'timeout':3600}) 312 313 314 def _cleanup_results_dir(self): 315 """Delete everything in resultsdir""" 316 assert os.path.exists(self.resultdir) 317 list_files = glob.glob('%s/*' % self.resultdir) 318 for f in list_files: 319 if os.path.isdir(f): 320 shutil.rmtree(f) 321 elif os.path.isfile(f): 322 os.remove(f) 323 324 325 def _cleanup_debugdir_files(self): 326 """ 327 Delete any leftover debugdir files 328 """ 329 list_files = glob.glob("/tmp/autotest_results_dir.*") 330 for f in list_files: 331 os.remove(f) 332 333 334 def disable_warnings(self, warning_type): 335 self.record("INFO", None, None, 336 "disabling %s warnings" % warning_type, 337 {"warnings.disable": warning_type}) 338 time.sleep(self._WARNING_DISABLE_DELAY) 339 340 341 def enable_warnings(self, warning_type): 342 time.sleep(self._WARNING_DISABLE_DELAY) 343 self.record("INFO", None, None, 344 "enabling %s warnings" % warning_type, 345 {"warnings.enable": warning_type}) 346 347 348 def monitor_disk_usage(self, max_rate): 349 """\ 350 Signal that the job should monitor disk space usage on / 351 and generate a warning if a test uses up disk space at a 352 rate exceeding 'max_rate'. 353 354 Parameters: 355 max_rate - the maximium allowed rate of disk consumption 356 during a test, in MB/hour, or 0 to indicate 357 no limit. 358 """ 359 self._max_disk_usage_rate = max_rate 360 361 362 def control_get(self): 363 return self.control 364 365 366 def control_set(self, control): 367 self.control = os.path.abspath(control) 368 369 370 def harness_select(self, which, harness_args): 371 self.harness = harness.select(which, self, harness_args) 372 373 374 def setup_dirs(self, results_dir, tmp_dir): 375 if not tmp_dir: 376 tmp_dir = os.path.join(self.tmpdir, 'build') 377 if not os.path.exists(tmp_dir): 378 os.mkdir(tmp_dir) 379 if not os.path.isdir(tmp_dir): 380 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 381 raise ValueError(e_msg) 382 383 # We label the first build "build" and then subsequent ones 384 # as "build.2", "build.3", etc. Whilst this is a little bit 385 # inconsistent, 99.9% of jobs will only have one build 386 # (that's not done as kernbench, sparse, or buildtest), 387 # so it works out much cleaner. One of life's compromises. 388 if not results_dir: 389 results_dir = os.path.join(self.resultdir, 'build') 390 i = 2 391 while os.path.exists(results_dir): 392 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 393 i += 1 394 if not os.path.exists(results_dir): 395 os.mkdir(results_dir) 396 397 return (results_dir, tmp_dir) 398 399 400 def barrier(self, *args, **kwds): 401 """Create a barrier object""" 402 return barrier.barrier(*args, **kwds) 403 404 405 def install_pkg(self, name, pkg_type, install_dir): 406 ''' 407 This method is a simple wrapper around the actual package 408 installation method in the Packager class. This is used 409 internally by the profilers, deps and tests code. 410 name : name of the package (ex: sleeptest, dbench etc.) 411 pkg_type : Type of the package (ex: test, dep etc.) 412 install_dir : The directory in which the source is actually 413 untarred into. (ex: client/profilers/<name> for profilers) 414 ''' 415 if self.pkgmgr.repositories: 416 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 417 418 419 def add_repository(self, repo_urls): 420 ''' 421 Adds the repository locations to the job so that packages 422 can be fetched from them when needed. The repository list 423 needs to be a string list 424 Ex: job.add_repository(['http://blah1','http://blah2']) 425 ''' 426 for repo_url in repo_urls: 427 self.pkgmgr.add_repository(repo_url) 428 429 # Fetch the packages' checksum file that contains the checksums 430 # of all the packages if it is not already fetched. The checksum 431 # is always fetched whenever a job is first started. This 432 # is not done in the job's constructor as we don't have the list of 433 # the repositories there (and obviously don't care about this file 434 # if we are not using the repos) 435 try: 436 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 437 packages.CHECKSUM_FILE) 438 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, 439 checksum_file_path, use_checksum=False) 440 except error.PackageFetchError: 441 # packaging system might not be working in this case 442 # Silently fall back to the normal case 443 pass 444 445 446 def require_gcc(self): 447 """ 448 Test whether gcc is installed on the machine. 449 """ 450 # check if gcc is installed on the system. 451 try: 452 utils.system('which gcc') 453 except error.CmdError: 454 raise NotAvailableError('gcc is required by this job and is ' 455 'not available on the system') 456 457 458 def setup_dep(self, deps): 459 """Set up the dependencies for this test. 460 deps is a list of libraries required for this test. 461 """ 462 # Fetch the deps from the repositories and set them up. 463 for dep in deps: 464 dep_dir = os.path.join(self.autodir, 'deps', dep) 465 # Search for the dependency in the repositories if specified, 466 # else check locally. 467 try: 468 self.install_pkg(dep, 'dep', dep_dir) 469 except error.PackageInstallError: 470 # see if the dep is there locally 471 pass 472 473 # dep_dir might not exist if it is not fetched from the repos 474 if not os.path.exists(dep_dir): 475 raise error.TestError("Dependency %s does not exist" % dep) 476 477 os.chdir(dep_dir) 478 if execfile('%s.py' % dep, {}) is None: 479 logging.info('Dependency %s successfuly built', dep) 480 481 482 def _runtest(self, url, tag, timeout, args, dargs): 483 try: 484 l = lambda : test.runtest(self, url, tag, args, dargs) 485 pid = parallel.fork_start(self.resultdir, l) 486 487 if timeout: 488 logging.debug('Waiting for pid %d for %d seconds', pid, timeout) 489 parallel.fork_waitfor_timed(self.resultdir, pid, timeout) 490 else: 491 parallel.fork_waitfor(self.resultdir, pid) 492 493 except error.TestBaseException: 494 # These are already classified with an error type (exit_status) 495 raise 496 except error.JobError: 497 raise # Caught further up and turned into an ABORT. 498 except Exception, e: 499 # Converts all other exceptions thrown by the test regardless 500 # of phase into a TestError(TestBaseException) subclass that 501 # reports them with their full stack trace. 502 raise error.UnhandledTestError(e) 503 504 505 def _run_test_base(self, url, *args, **dargs): 506 """ 507 Prepares arguments and run functions to run_test and run_test_detail. 508 509 @param url A url that identifies the test to run. 510 @param tag An optional keyword argument that will be added to the 511 test and subdir name. 512 @param subdir_tag An optional keyword argument that will be added 513 to the subdir name. 514 515 @returns: 516 subdir: Test subdirectory 517 testname: Test name 518 group_func: Actual test run function 519 timeout: Test timeout 520 """ 521 _group, testname = self.pkgmgr.get_package_name(url, 'test') 522 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 523 self._make_test_outputdir(subdir) 524 525 timeout = dargs.pop('timeout', None) 526 if timeout: 527 logging.debug('Test has timeout: %d sec.', timeout) 528 529 def log_warning(reason): 530 self.record("WARN", subdir, testname, reason) 531 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) 532 def group_func(): 533 try: 534 self._runtest(url, tag, timeout, args, dargs) 535 except error.TestBaseException, detail: 536 # The error is already classified, record it properly. 537 self.record(detail.exit_status, subdir, testname, str(detail)) 538 raise 539 else: 540 self.record('GOOD', subdir, testname, 'completed successfully') 541 542 return (subdir, testname, group_func, timeout) 543 544 545 @_run_test_complete_on_exit 546 def run_test(self, url, *args, **dargs): 547 """ 548 Summon a test object and run it. 549 550 @param url A url that identifies the test to run. 551 @param tag An optional keyword argument that will be added to the 552 test and subdir name. 553 @param subdir_tag An optional keyword argument that will be added 554 to the subdir name. 555 556 @returns True if the test passes, False otherwise. 557 """ 558 (subdir, testname, group_func, timeout) = self._run_test_base(url, 559 *args, 560 **dargs) 561 try: 562 self._rungroup(subdir, testname, group_func, timeout) 563 return True 564 except error.TestBaseException: 565 return False 566 # Any other exception here will be given to the caller 567 # 568 # NOTE: The only exception possible from the control file here 569 # is error.JobError as _runtest() turns all others into an 570 # UnhandledTestError that is caught above. 571 572 573 @_run_test_complete_on_exit 574 def run_test_detail(self, url, *args, **dargs): 575 """ 576 Summon a test object and run it, returning test status. 577 578 @param url A url that identifies the test to run. 579 @param tag An optional keyword argument that will be added to the 580 test and subdir name. 581 @param subdir_tag An optional keyword argument that will be added 582 to the subdir name. 583 584 @returns Test status 585 @see: client/common_lib/error.py, exit_status 586 """ 587 (subdir, testname, group_func, timeout) = self._run_test_base(url, 588 *args, 589 **dargs) 590 try: 591 self._rungroup(subdir, testname, group_func, timeout) 592 return 'GOOD' 593 except error.TestBaseException, detail: 594 return detail.exit_status 595 596 597 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs): 598 """\ 599 subdir: 600 name of the group 601 testname: 602 name of the test to run, or support step 603 function: 604 subroutine to run 605 *args: 606 arguments for the function 607 608 Returns the result of the passed in function 609 """ 610 611 try: 612 optional_fields = None 613 if timeout: 614 optional_fields = {} 615 optional_fields['timeout'] = timeout 616 self.record('START', subdir, testname, 617 optional_fields=optional_fields) 618 619 self._state.set('client', 'unexpected_reboot', (subdir, testname)) 620 try: 621 result = function(*args, **dargs) 622 self.record('END GOOD', subdir, testname) 623 return result 624 except error.TestBaseException, e: 625 self.record('END %s' % e.exit_status, subdir, testname) 626 raise 627 except error.JobError, e: 628 self.record('END ABORT', subdir, testname) 629 raise 630 except Exception, e: 631 # This should only ever happen due to a bug in the given 632 # function's code. The common case of being called by 633 # run_test() will never reach this. If a control file called 634 # run_group() itself, bugs in its function will be caught 635 # here. 636 err_msg = str(e) + '\n' + traceback.format_exc() 637 self.record('END ERROR', subdir, testname, err_msg) 638 raise 639 finally: 640 self._state.discard('client', 'unexpected_reboot') 641 642 643 def run_group(self, function, tag=None, **dargs): 644 """ 645 Run a function nested within a group level. 646 647 function: 648 Callable to run. 649 tag: 650 An optional tag name for the group. If None (default) 651 function.__name__ will be used. 652 **dargs: 653 Named arguments for the function. 654 """ 655 if tag: 656 name = tag 657 else: 658 name = function.__name__ 659 660 try: 661 return self._rungroup(subdir=None, testname=name, 662 function=function, timeout=None, **dargs) 663 except (SystemExit, error.TestBaseException): 664 raise 665 # If there was a different exception, turn it into a TestError. 666 # It will be caught by step_engine or _run_step_fn. 667 except Exception, e: 668 raise error.UnhandledTestError(e) 669 670 671 def cpu_count(self): 672 return utils.count_cpus() # use total system count 673 674 675 def start_reboot(self): 676 self.record('START', None, 'reboot') 677 self.record('GOOD', None, 'reboot.start') 678 679 680 def _record_reboot_failure(self, subdir, operation, status, 681 running_id=None): 682 self.record("ABORT", subdir, operation, status) 683 if not running_id: 684 running_id = utils.running_os_ident() 685 kernel = {"kernel": running_id.split("::")[0]} 686 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 687 688 689 def _check_post_reboot(self, subdir, running_id=None): 690 """ 691 Function to perform post boot checks such as if the system configuration 692 has changed across reboots (specifically, CPUs and partitions). 693 694 @param subdir: The subdir to use in the job.record call. 695 @param running_id: An optional running_id to include in the reboot 696 failure log message 697 698 @raise JobError: Raised if the current configuration does not match the 699 pre-reboot configuration. 700 """ 701 # check to see if any partitions have changed 702 partition_list = partition_lib.get_partition_list(self, 703 exclude_swap=False) 704 mount_info = partition_lib.get_mount_info(partition_list) 705 old_mount_info = self._state.get('client', 'mount_info') 706 if mount_info != old_mount_info: 707 new_entries = mount_info - old_mount_info 708 old_entries = old_mount_info - mount_info 709 description = ("mounted partitions are different after reboot " 710 "(old entries: %s, new entries: %s)" % 711 (old_entries, new_entries)) 712 self._record_reboot_failure(subdir, "reboot.verify_config", 713 description, running_id=running_id) 714 raise error.JobError("Reboot failed: %s" % description) 715 716 # check to see if any CPUs have changed 717 cpu_count = utils.count_cpus() 718 old_count = self._state.get('client', 'cpu_count') 719 if cpu_count != old_count: 720 description = ('Number of CPUs changed after reboot ' 721 '(old count: %d, new count: %d)' % 722 (old_count, cpu_count)) 723 self._record_reboot_failure(subdir, 'reboot.verify_config', 724 description, running_id=running_id) 725 raise error.JobError('Reboot failed: %s' % description) 726 727 728 def partition(self, device, loop_size=0, mountpoint=None): 729 """ 730 Work with a machine partition 731 732 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 733 @param mountpoint: Specify a directory to mount to. If not specified 734 autotest tmp directory will be used. 735 @param loop_size: Size of loopback device (in MB). Defaults to 0. 736 737 @return: A L{client.bin.partition.partition} object 738 """ 739 740 if not mountpoint: 741 mountpoint = self.tmpdir 742 return partition_lib.partition(self, device, loop_size, mountpoint) 743 744 @utils.deprecated 745 def filesystem(self, device, mountpoint=None, loop_size=0): 746 """ Same as partition 747 748 @deprecated: Use partition method instead 749 """ 750 return self.partition(device, loop_size, mountpoint) 751 752 753 def enable_external_logging(self): 754 pass 755 756 757 def disable_external_logging(self): 758 pass 759 760 761 def reboot_setup(self): 762 # save the partition list and mount points, as well as the cpu count 763 partition_list = partition_lib.get_partition_list(self, 764 exclude_swap=False) 765 mount_info = partition_lib.get_mount_info(partition_list) 766 self._state.set('client', 'mount_info', mount_info) 767 self._state.set('client', 'cpu_count', utils.count_cpus()) 768 769 770 def reboot(self): 771 self.reboot_setup() 772 self.harness.run_reboot() 773 774 # HACK: using this as a module sometimes hangs shutdown, so if it's 775 # installed unload it first 776 utils.system("modprobe -r netconsole", ignore_status=True) 777 778 # sync first, so that a sync during shutdown doesn't time out 779 utils.system("sync; sync", ignore_status=True) 780 781 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 782 self.quit() 783 784 785 def noop(self, text): 786 logging.info("job: noop: " + text) 787 788 789 @_run_test_complete_on_exit 790 def parallel(self, *tasklist): 791 """Run tasks in parallel""" 792 793 pids = [] 794 old_log_filename = self._logger.global_filename 795 for i, task in enumerate(tasklist): 796 assert isinstance(task, (tuple, list)) 797 self._logger.global_filename = old_log_filename + (".%d" % i) 798 def task_func(): 799 # stub out _record_indent with a process-local one 800 base_record_indent = self._record_indent 801 proc_local = self._job_state.property_factory( 802 '_state', '_record_indent.%d' % os.getpid(), 803 base_record_indent, namespace='client') 804 self.__class__._record_indent = proc_local 805 task[0](*task[1:]) 806 pids.append(parallel.fork_start(self.resultdir, task_func)) 807 808 old_log_path = os.path.join(self.resultdir, old_log_filename) 809 old_log = open(old_log_path, "a") 810 exceptions = [] 811 for i, pid in enumerate(pids): 812 # wait for the task to finish 813 try: 814 parallel.fork_waitfor(self.resultdir, pid) 815 except Exception, e: 816 exceptions.append(e) 817 # copy the logs from the subtask into the main log 818 new_log_path = old_log_path + (".%d" % i) 819 if os.path.exists(new_log_path): 820 new_log = open(new_log_path) 821 old_log.write(new_log.read()) 822 new_log.close() 823 old_log.flush() 824 os.remove(new_log_path) 825 old_log.close() 826 827 self._logger.global_filename = old_log_filename 828 829 # handle any exceptions raised by the parallel tasks 830 if exceptions: 831 msg = "%d task(s) failed in job.parallel" % len(exceptions) 832 raise error.JobError(msg) 833 834 835 def quit(self): 836 # XXX: should have a better name. 837 self.harness.run_pause() 838 raise error.JobContinue("more to come") 839 840 841 def complete(self, status): 842 """Write pending TAP reports, clean up, and exit""" 843 # write out TAP reports 844 if self._tap.do_tap_report: 845 self._tap.write() 846 self._tap._write_tap_archive() 847 848 # write out a job HTML report 849 try: 850 html_report.create_report(self.resultdir) 851 except Exception, e: 852 logging.error("Error writing job HTML report: %s", e) 853 854 # We are about to exit 'complete' so clean up the control file. 855 dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) 856 shutil.move(self._state_file, dest) 857 858 self.harness.run_complete() 859 self.disable_external_logging() 860 sys.exit(status) 861 862 863 def _load_state(self): 864 # grab any initial state and set up $CONTROL.state as the backing file 865 init_state_file = self.control + '.init.state' 866 self._state_file = self.control + '.state' 867 if os.path.exists(init_state_file): 868 shutil.move(init_state_file, self._state_file) 869 self._state.set_backing_file(self._state_file) 870 871 # initialize the state engine, if necessary 872 has_steps = self._state.has('client', 'steps') 873 if not self._is_continuation and has_steps: 874 raise RuntimeError('Loaded state can only contain client.steps if ' 875 'this is a continuation') 876 877 if not has_steps: 878 logging.debug('Initializing the state engine') 879 self._state.set('client', 'steps', []) 880 881 882 def handle_persistent_option(self, options, option_name): 883 """ 884 Select option from command line or persistent state. 885 Store selected option to allow standalone client to continue 886 after reboot with previously selected options. 887 Priority: 888 1. explicitly specified via command line 889 2. stored in state file (if continuing job '-c') 890 3. default == None 891 """ 892 option = None 893 cmd_line_option = getattr(options, option_name) 894 if cmd_line_option: 895 option = cmd_line_option 896 self._state.set('client', option_name, option) 897 else: 898 stored_option = self._state.get('client', option_name, None) 899 if stored_option: 900 option = stored_option 901 logging.debug('Persistent option %s now set to %s', option_name, option) 902 return option 903 904 905 def __create_step_tuple(self, fn, args, dargs): 906 # Legacy code passes in an array where the first arg is 907 # the function or its name. 908 if isinstance(fn, list): 909 assert(len(args) == 0) 910 assert(len(dargs) == 0) 911 args = fn[1:] 912 fn = fn[0] 913 # Pickling actual functions is hairy, thus we have to call 914 # them by name. Unfortunately, this means only functions 915 # defined globally can be used as a next step. 916 if callable(fn): 917 fn = fn.__name__ 918 if not isinstance(fn, types.StringTypes): 919 raise StepError("Next steps must be functions or " 920 "strings containing the function name") 921 ancestry = copy.copy(self._current_step_ancestry) 922 return (ancestry, fn, args, dargs) 923 924 925 def next_step_append(self, fn, *args, **dargs): 926 """Define the next step and place it at the end""" 927 steps = self._state.get('client', 'steps') 928 steps.append(self.__create_step_tuple(fn, args, dargs)) 929 self._state.set('client', 'steps', steps) 930 931 932 def next_step(self, fn, *args, **dargs): 933 """Create a new step and place it after any steps added 934 while running the current step but before any steps added in 935 previous steps""" 936 steps = self._state.get('client', 'steps') 937 steps.insert(self._next_step_index, 938 self.__create_step_tuple(fn, args, dargs)) 939 self._next_step_index += 1 940 self._state.set('client', 'steps', steps) 941 942 943 def next_step_prepend(self, fn, *args, **dargs): 944 """Insert a new step, executing first""" 945 steps = self._state.get('client', 'steps') 946 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 947 self._next_step_index += 1 948 self._state.set('client', 'steps', steps) 949 950 951 952 def _run_step_fn(self, local_vars, fn, args, dargs): 953 """Run a (step) function within the given context""" 954 955 local_vars['__args'] = args 956 local_vars['__dargs'] = dargs 957 try: 958 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 959 return local_vars['__ret'] 960 except SystemExit: 961 raise # Send error.JobContinue and JobComplete on up to runjob. 962 except error.TestNAError, detail: 963 self.record(detail.exit_status, None, fn, str(detail)) 964 except Exception, detail: 965 raise error.UnhandledJobError(detail) 966 967 968 def _create_frame(self, global_vars, ancestry, fn_name): 969 """Set up the environment like it would have been when this 970 function was first defined. 971 972 Child step engine 'implementations' must have 'return locals()' 973 at end end of their steps. Because of this, we can call the 974 parent function and get back all child functions (i.e. those 975 defined within it). 976 977 Unfortunately, the call stack of the function calling 978 job.next_step might have been deeper than the function it 979 added. In order to make sure that the environment is what it 980 should be, we need to then pop off the frames we built until 981 we find the frame where the function was first defined.""" 982 983 # The copies ensure that the parent frames are not modified 984 # while building child frames. This matters if we then 985 # pop some frames in the next part of this function. 986 current_frame = copy.copy(global_vars) 987 frames = [current_frame] 988 for steps_fn_name in ancestry: 989 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 990 current_frame = copy.copy(ret) 991 frames.append(current_frame) 992 993 # Walk up the stack frames until we find the place fn_name was defined. 994 while len(frames) > 2: 995 if fn_name not in frames[-2]: 996 break 997 if frames[-2][fn_name] != frames[-1][fn_name]: 998 break 999 frames.pop() 1000 ancestry.pop() 1001 1002 return (frames[-1], ancestry) 1003 1004 1005 def _add_step_init(self, local_vars, current_function): 1006 """If the function returned a dictionary that includes a 1007 function named 'step_init', prepend it to our list of steps. 1008 This will only get run the first time a function with a nested 1009 use of the step engine is run.""" 1010 1011 if (isinstance(local_vars, dict) and 1012 'step_init' in local_vars and 1013 callable(local_vars['step_init'])): 1014 # The init step is a child of the function 1015 # we were just running. 1016 self._current_step_ancestry.append(current_function) 1017 self.next_step_prepend('step_init') 1018 1019 1020 def step_engine(self): 1021 """The multi-run engine used when the control file defines step_init. 1022 1023 Does the next step. 1024 """ 1025 1026 # Set up the environment and then interpret the control file. 1027 # Some control files will have code outside of functions, 1028 # which means we need to have our state engine initialized 1029 # before reading in the file. 1030 global_control_vars = {'job': self, 1031 'args': self.args} 1032 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1033 try: 1034 execfile(self.control, global_control_vars, global_control_vars) 1035 except error.TestNAError, detail: 1036 self.record(detail.exit_status, None, self.control, str(detail)) 1037 except SystemExit: 1038 raise # Send error.JobContinue and JobComplete on up to runjob. 1039 except Exception, detail: 1040 # Syntax errors or other general Python exceptions coming out of 1041 # the top level of the control file itself go through here. 1042 raise error.UnhandledJobError(detail) 1043 1044 # If we loaded in a mid-job state file, then we presumably 1045 # know what steps we have yet to run. 1046 if not self._is_continuation: 1047 if 'step_init' in global_control_vars: 1048 self.next_step(global_control_vars['step_init']) 1049 else: 1050 # if last job failed due to unexpected reboot, record it as fail 1051 # so harness gets called 1052 last_job = self._state.get('client', 'unexpected_reboot', None) 1053 if last_job: 1054 subdir, testname = last_job 1055 self.record('FAIL', subdir, testname, 'unexpected reboot') 1056 self.record('END FAIL', subdir, testname) 1057 1058 # Iterate through the steps. If we reboot, we'll simply 1059 # continue iterating on the next step. 1060 while len(self._state.get('client', 'steps')) > 0: 1061 steps = self._state.get('client', 'steps') 1062 (ancestry, fn_name, args, dargs) = steps.pop(0) 1063 self._state.set('client', 'steps', steps) 1064 1065 self._next_step_index = 0 1066 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1067 local_vars, self._current_step_ancestry = ret 1068 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1069 self._add_step_init(local_vars, fn_name) 1070 1071 1072 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1073 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1074 on_every_test) 1075 1076 1077 def add_sysinfo_logfile(self, file, on_every_test=False): 1078 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1079 1080 1081 def _add_sysinfo_loggable(self, loggable, on_every_test): 1082 if on_every_test: 1083 self.sysinfo.test_loggables.add(loggable) 1084 else: 1085 self.sysinfo.boot_loggables.add(loggable) 1086 self._save_sysinfo_state() 1087 1088 1089 def _load_sysinfo_state(self): 1090 state = self._state.get('client', 'sysinfo', None) 1091 if state: 1092 self.sysinfo.deserialize(state) 1093 1094 1095 def _save_sysinfo_state(self): 1096 state = self.sysinfo.serialize() 1097 self._state.set('client', 'sysinfo', state) 1098 1099 1100class disk_usage_monitor: 1101 def __init__(self, logging_func, device, max_mb_per_hour): 1102 self.func = logging_func 1103 self.device = device 1104 self.max_mb_per_hour = max_mb_per_hour 1105 1106 1107 def start(self): 1108 self.initial_space = utils.freespace(self.device) 1109 self.start_time = time.time() 1110 1111 1112 def stop(self): 1113 # if no maximum usage rate was set, we don't need to 1114 # generate any warnings 1115 if not self.max_mb_per_hour: 1116 return 1117 1118 final_space = utils.freespace(self.device) 1119 used_space = self.initial_space - final_space 1120 stop_time = time.time() 1121 total_time = stop_time - self.start_time 1122 # round up the time to one minute, to keep extremely short 1123 # tests from generating false positives due to short, badly 1124 # timed bursts of activity 1125 total_time = max(total_time, 60.0) 1126 1127 # determine the usage rate 1128 bytes_per_sec = used_space / total_time 1129 mb_per_sec = bytes_per_sec / 1024**2 1130 mb_per_hour = mb_per_sec * 60 * 60 1131 1132 if mb_per_hour > self.max_mb_per_hour: 1133 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1134 msg %= (self.device, mb_per_hour) 1135 self.func(msg) 1136 1137 1138 @classmethod 1139 def watch(cls, *monitor_args, **monitor_dargs): 1140 """ Generic decorator to wrap a function call with the 1141 standard create-monitor -> start -> call -> stop idiom.""" 1142 def decorator(func): 1143 def watched_func(*args, **dargs): 1144 monitor = cls(*monitor_args, **monitor_dargs) 1145 monitor.start() 1146 try: 1147 func(*args, **dargs) 1148 finally: 1149 monitor.stop() 1150 return watched_func 1151 return decorator 1152 1153 1154def runjob(control, drop_caches, options): 1155 """ 1156 Run a job using the given control file. 1157 1158 This is the main interface to this module. 1159 1160 @see base_job.__init__ for parameter info. 1161 """ 1162 control = os.path.abspath(control) 1163 state = control + '.state' 1164 # Ensure state file is cleaned up before the job starts to run if autotest 1165 # is not running with the --continue flag 1166 if not options.cont and os.path.isfile(state): 1167 logging.debug('Cleaning up previously found state file') 1168 os.remove(state) 1169 1170 # instantiate the job object ready for the control file. 1171 myjob = None 1172 try: 1173 # Check that the control file is valid 1174 if not os.path.exists(control): 1175 raise error.JobError(control + ": control file not found") 1176 1177 # When continuing, the job is complete when there is no 1178 # state file, ensure we don't try and continue. 1179 if options.cont and not os.path.exists(state): 1180 raise error.JobComplete("all done") 1181 1182 myjob = job(control=control, drop_caches=drop_caches, options=options) 1183 1184 # Load in the users control file, may do any one of: 1185 # 1) execute in toto 1186 # 2) define steps, and select the first via next_step() 1187 myjob.step_engine() 1188 1189 except error.JobContinue: 1190 sys.exit(5) 1191 1192 except error.JobComplete: 1193 sys.exit(1) 1194 1195 except error.JobError, instance: 1196 logging.error("JOB ERROR: " + str(instance)) 1197 if myjob: 1198 command = None 1199 if len(instance.args) > 1: 1200 command = instance.args[1] 1201 myjob.record('ABORT', None, command, str(instance)) 1202 myjob.record('END ABORT', None, None, str(instance)) 1203 assert myjob._record_indent == 0 1204 myjob.complete(1) 1205 else: 1206 sys.exit(1) 1207 1208 except Exception, e: 1209 # NOTE: job._run_step_fn and job.step_engine will turn things into 1210 # a JobError for us. If we get here, its likely an autotest bug. 1211 msg = str(e) + '\n' + traceback.format_exc() 1212 logging.critical("JOB ERROR (autotest bug?): " + msg) 1213 if myjob: 1214 myjob.record('END ABORT', None, None, msg) 1215 assert myjob._record_indent == 0 1216 myjob.complete(1) 1217 else: 1218 sys.exit(1) 1219 1220 # If we get here, then we assume the job is complete and good. 1221 myjob.record('END GOOD', None, None) 1222 assert myjob._record_indent == 0 1223 1224 myjob.complete(0) 1225 1226 1227class job(base_client_job): 1228 1229 def __init__(self, *args, **kwargs): 1230 base_client_job.__init__(self, *args, **kwargs) 1231 1232 1233 def run_test(self, url, *args, **dargs): 1234 log_pauser = cros_logging.LogRotationPauser() 1235 passed = False 1236 try: 1237 log_pauser.begin() 1238 passed = base_client_job.run_test(self, url, *args, **dargs) 1239 if not passed: 1240 # Save the VM state immediately after the test failure. 1241 # This is a NOOP if the the test isn't running in a VM or 1242 # if the VM is not properly configured to save state. 1243 _group, testname = self.pkgmgr.get_package_name(url, 'test') 1244 now = datetime.now().strftime('%I:%M:%S.%f') 1245 checkpoint_name = '%s-%s' % (testname, now) 1246 utils.save_vm_state(checkpoint_name) 1247 finally: 1248 log_pauser.end() 1249 return passed 1250 1251 1252 def reboot(self): 1253 self.reboot_setup() 1254 self.harness.run_reboot() 1255 1256 # sync first, so that a sync during shutdown doesn't time out 1257 utils.system('sync; sync', ignore_status=True) 1258 1259 utils.system('reboot </dev/null >/dev/null 2>&1 &') 1260 self.quit() 1261 1262 1263 def require_gcc(self): 1264 return False 1265