1# pylint: disable-msg=C0111
2
3# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6"""
7The main job wrapper for the server side.
8
9This is the core infrastructure. Derived from the client side job.py
10
11Copyright Martin J. Bligh, Andy Whitcroft 2007
12"""
13
14import getpass, os, sys, re, tempfile, time, select, platform
15import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
16from autotest_lib.client.bin import sysinfo
17from autotest_lib.client.common_lib import base_job, global_config
18from autotest_lib.client.common_lib import error, utils, packages
19from autotest_lib.client.common_lib import logging_manager
20from autotest_lib.server import test, subcommand, profilers
21from autotest_lib.server import utils as server_utils
22from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
23from autotest_lib.server.hosts import abstract_ssh, factory as host_factory
24from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
25
26
27INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value(
28        'autoserv', 'incremental_tko_parsing', type=bool, default=False)
29
30def _control_segment_path(name):
31    """Get the pathname of the named control segment file."""
32    server_dir = os.path.dirname(os.path.abspath(__file__))
33    return os.path.join(server_dir, "control_segments", name)
34
35
36CLIENT_CONTROL_FILENAME = 'control'
37SERVER_CONTROL_FILENAME = 'control.srv'
38MACHINES_FILENAME = '.machines'
39
40CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
41CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
42CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
43INSTALL_CONTROL_FILE = _control_segment_path('install')
44CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
45VERIFY_CONTROL_FILE = _control_segment_path('verify')
46REPAIR_CONTROL_FILE = _control_segment_path('repair')
47PROVISION_CONTROL_FILE = _control_segment_path('provision')
48VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
49RESET_CONTROL_FILE = _control_segment_path('reset')
50
51# by default provide a stub that generates no site data
52def _get_site_job_data_dummy(job):
53    return {}
54
55
56class status_indenter(base_job.status_indenter):
57    """Provide a simple integer-backed status indenter."""
58    def __init__(self):
59        self._indent = 0
60
61
62    @property
63    def indent(self):
64        return self._indent
65
66
67    def increment(self):
68        self._indent += 1
69
70
71    def decrement(self):
72        self._indent -= 1
73
74
75    def get_context(self):
76        """Returns a context object for use by job.get_record_context."""
77        class context(object):
78            def __init__(self, indenter, indent):
79                self._indenter = indenter
80                self._indent = indent
81            def restore(self):
82                self._indenter._indent = self._indent
83        return context(self, self._indent)
84
85
86class server_job_record_hook(object):
87    """The job.record hook for server job. Used to inject WARN messages from
88    the console or vlm whenever new logs are written, and to echo any logs
89    to INFO level logging. Implemented as a class so that it can use state to
90    block recursive calls, so that the hook can call job.record itself to
91    log WARN messages.
92
93    Depends on job._read_warnings and job._logger.
94    """
95    def __init__(self, job):
96        self._job = job
97        self._being_called = False
98
99
100    def __call__(self, entry):
101        """A wrapper around the 'real' record hook, the _hook method, which
102        prevents recursion. This isn't making any effort to be threadsafe,
103        the intent is to outright block infinite recursion via a
104        job.record->_hook->job.record->_hook->job.record... chain."""
105        if self._being_called:
106            return
107        self._being_called = True
108        try:
109            self._hook(self._job, entry)
110        finally:
111            self._being_called = False
112
113
114    @staticmethod
115    def _hook(job, entry):
116        """The core hook, which can safely call job.record."""
117        entries = []
118        # poll all our warning loggers for new warnings
119        for timestamp, msg in job._read_warnings():
120            warning_entry = base_job.status_log_entry(
121                'WARN', None, None, msg, {}, timestamp=timestamp)
122            entries.append(warning_entry)
123            job.record_entry(warning_entry)
124        # echo rendered versions of all the status logs to info
125        entries.append(entry)
126        for entry in entries:
127            rendered_entry = job._logger.render_entry(entry)
128            logging.info(rendered_entry)
129            job._parse_status(rendered_entry)
130
131
132class base_server_job(base_job.base_job):
133    """The server-side concrete implementation of base_job.
134
135    Optional properties provided by this implementation:
136        serverdir
137
138        num_tests_run
139        num_tests_failed
140
141        warning_manager
142        warning_loggers
143    """
144
145    _STATUS_VERSION = 1
146
147    # TODO crbug.com/285395 eliminate ssh_verbosity_flag
148    def __init__(self, control, args, resultdir, label, user, machines,
149                 client=False, parse_job='',
150                 ssh_user=host_factory.DEFAULT_SSH_USER,
151                 ssh_port=host_factory.DEFAULT_SSH_PORT,
152                 ssh_pass=host_factory.DEFAULT_SSH_PASS,
153                 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
154                 ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
155                 test_retry=0, group_name='',
156                 tag='', disable_sysinfo=False,
157                 control_filename=SERVER_CONTROL_FILENAME,
158                 parent_job_id=None, host_attributes=None, in_lab=False):
159        """
160        Create a server side job object.
161
162        @param control: The pathname of the control file.
163        @param args: Passed to the control file.
164        @param resultdir: Where to throw the results.
165        @param label: Description of the job.
166        @param user: Username for the job (email address).
167        @param client: True if this is a client-side control file.
168        @param parse_job: string, if supplied it is the job execution tag that
169                the results will be passed through to the TKO parser with.
170        @param ssh_user: The SSH username.  [root]
171        @param ssh_port: The SSH port number.  [22]
172        @param ssh_pass: The SSH passphrase, if needed.
173        @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
174                '-vvv', or an empty string if not needed.
175        @param ssh_options: A string giving additional options that will be
176                            included in ssh commands.
177        @param test_retry: The number of times to retry a test if the test did
178                not complete successfully.
179        @param group_name: If supplied, this will be written out as
180                host_group_name in the keyvals file for the parser.
181        @param tag: The job execution tag from the scheduler.  [optional]
182        @param disable_sysinfo: Whether we should disable the sysinfo step of
183                tests for a modest shortening of test time.  [optional]
184        @param control_filename: The filename where the server control file
185                should be written in the results directory.
186        @param parent_job_id: Job ID of the parent job. Default to None if the
187                job does not have a parent job.
188        @param host_attributes: Dict of host attributes passed into autoserv
189                                via the command line. If specified here, these
190                                attributes will apply to all machines.
191        @param in_lab: Boolean that indicates if this is running in the lab
192                       environment.
193        """
194        super(base_server_job, self).__init__(resultdir=resultdir,
195                                              test_retry=test_retry)
196        path = os.path.dirname(__file__)
197        self.test_retry = test_retry
198        self.control = control
199        self._uncollected_log_file = os.path.join(self.resultdir,
200                                                  'uncollected_logs')
201        debugdir = os.path.join(self.resultdir, 'debug')
202        if not os.path.exists(debugdir):
203            os.mkdir(debugdir)
204
205        if user:
206            self.user = user
207        else:
208            self.user = getpass.getuser()
209
210        self.args = args
211        self.label = label
212        self.machines = machines
213        self._client = client
214        self.warning_loggers = set()
215        self.warning_manager = warning_manager()
216        self._ssh_user = ssh_user
217        self._ssh_port = ssh_port
218        self._ssh_pass = ssh_pass
219        self._ssh_verbosity_flag = ssh_verbosity_flag
220        self._ssh_options = ssh_options
221        self.tag = tag
222        self.last_boot_tag = None
223        self.hosts = set()
224        self.drop_caches = False
225        self.drop_caches_between_iterations = False
226        self._control_filename = control_filename
227        self._disable_sysinfo = disable_sysinfo
228
229        self.logging = logging_manager.get_logging_manager(
230                manage_stdout_and_stderr=True, redirect_fds=True)
231        subcommand.logging_manager_object = self.logging
232
233        self.sysinfo = sysinfo.sysinfo(self.resultdir)
234        self.profilers = profilers.profilers(self)
235
236        job_data = {'label' : label, 'user' : user,
237                    'hostname' : ','.join(machines),
238                    'drone' : platform.node(),
239                    'status_version' : str(self._STATUS_VERSION),
240                    'job_started' : str(int(time.time()))}
241        # Save parent job id to keyvals, so parser can retrieve the info and
242        # write to tko_jobs record.
243        if parent_job_id:
244            job_data['parent_job_id'] = parent_job_id
245        if group_name:
246            job_data['host_group_name'] = group_name
247
248        # only write these keyvals out on the first job in a resultdir
249        if 'job_started' not in utils.read_keyval(self.resultdir):
250            job_data.update(get_site_job_data(self))
251            utils.write_keyval(self.resultdir, job_data)
252
253        self._parse_job = parse_job
254        self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job
255                              and len(machines) <= 1)
256        self.pkgmgr = packages.PackageManager(
257            self.autodir, run_function_dargs={'timeout':600})
258        self.num_tests_run = 0
259        self.num_tests_failed = 0
260
261        self._register_subcommand_hooks()
262
263        # these components aren't usable on the server
264        self.bootloader = None
265        self.harness = None
266
267        # set up the status logger
268        self._indenter = status_indenter()
269        self._logger = base_job.status_logger(
270            self, self._indenter, 'status.log', 'status.log',
271            record_hook=server_job_record_hook(self))
272
273        # Initialize a flag to indicate DUT failure during the test, e.g.,
274        # unexpected reboot.
275        self.failed_with_device_error = False
276
277        self.parent_job_id = parent_job_id
278        self.in_lab = in_lab
279        afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
280        self.machine_dict_list = []
281        for machine in self.machines:
282            host_attributes = host_attributes or {}
283            if self.in_lab:
284                host = afe.get_hosts(hostname=machine)[0]
285                host_attributes.update(host.attributes)
286            self.machine_dict_list.append(
287                    {'hostname' : machine,
288                     'host_attributes' : host_attributes})
289
290
291    @classmethod
292    def _find_base_directories(cls):
293        """
294        Determine locations of autodir, clientdir and serverdir. Assumes
295        that this file is located within serverdir and uses __file__ along
296        with relative paths to resolve the location.
297        """
298        serverdir = os.path.abspath(os.path.dirname(__file__))
299        autodir = os.path.normpath(os.path.join(serverdir, '..'))
300        clientdir = os.path.join(autodir, 'client')
301        return autodir, clientdir, serverdir
302
303
304    def _find_resultdir(self, resultdir, *args, **dargs):
305        """
306        Determine the location of resultdir. For server jobs we expect one to
307        always be explicitly passed in to __init__, so just return that.
308        """
309        if resultdir:
310            return os.path.normpath(resultdir)
311        else:
312            return None
313
314
315    def _get_status_logger(self):
316        """Return a reference to the status logger."""
317        return self._logger
318
319
320    @staticmethod
321    def _load_control_file(path):
322        f = open(path)
323        try:
324            control_file = f.read()
325        finally:
326            f.close()
327        return re.sub('\r', '', control_file)
328
329
330    def _register_subcommand_hooks(self):
331        """
332        Register some hooks into the subcommand modules that allow us
333        to properly clean up self.hosts created in forked subprocesses.
334        """
335        def on_fork(cmd):
336            self._existing_hosts_on_fork = set(self.hosts)
337        def on_join(cmd):
338            new_hosts = self.hosts - self._existing_hosts_on_fork
339            for host in new_hosts:
340                host.close()
341        subcommand.subcommand.register_fork_hook(on_fork)
342        subcommand.subcommand.register_join_hook(on_join)
343
344
345    def init_parser(self):
346        """
347        Start the continuous parsing of self.resultdir. This sets up
348        the database connection and inserts the basic job object into
349        the database if necessary.
350        """
351        if not self._using_parser:
352            return
353        # redirect parser debugging to .parse.log
354        parse_log = os.path.join(self.resultdir, '.parse.log')
355        parse_log = open(parse_log, 'w', 0)
356        tko_utils.redirect_parser_debugging(parse_log)
357        # create a job model object and set up the db
358        self.results_db = tko_db.db(autocommit=True)
359        self.parser = status_lib.parser(self._STATUS_VERSION)
360        self.job_model = self.parser.make_job(self.resultdir)
361        self.parser.start(self.job_model)
362        # check if a job already exists in the db and insert it if
363        # it does not
364        job_idx = self.results_db.find_job(self._parse_job)
365        if job_idx is None:
366            self.results_db.insert_job(self._parse_job, self.job_model,
367                                       self.parent_job_id)
368        else:
369            machine_idx = self.results_db.lookup_machine(self.job_model.machine)
370            self.job_model.index = job_idx
371            self.job_model.machine_idx = machine_idx
372
373
374    def cleanup_parser(self):
375        """
376        This should be called after the server job is finished
377        to carry out any remaining cleanup (e.g. flushing any
378        remaining test results to the results db)
379        """
380        if not self._using_parser:
381            return
382        final_tests = self.parser.end()
383        for test in final_tests:
384            self.__insert_test(test)
385        self._using_parser = False
386
387    # TODO crbug.com/285395 add a kwargs parameter.
388    def _make_namespace(self):
389        """Create a namespace dictionary to be passed along to control file.
390
391        Creates a namespace argument populated with standard values:
392        machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
393        and ssh_options.
394        """
395        namespace = {'machines' : self.machine_dict_list,
396                     'job' : self,
397                     'ssh_user' : self._ssh_user,
398                     'ssh_port' : self._ssh_port,
399                     'ssh_pass' : self._ssh_pass,
400                     'ssh_verbosity_flag' : self._ssh_verbosity_flag,
401                     'ssh_options' : self._ssh_options}
402        return namespace
403
404
405    def cleanup(self, labels):
406        """Cleanup machines.
407
408        @param labels: Comma separated job labels, will be used to
409                       determine special task actions.
410        """
411        if not self.machines:
412            raise error.AutoservError('No machines specified to cleanup')
413        if self.resultdir:
414            os.chdir(self.resultdir)
415
416        namespace = self._make_namespace()
417        namespace.update({'job_labels': labels, 'args': ''})
418        self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
419
420
421    def verify(self, labels):
422        """Verify machines are all ssh-able.
423
424        @param labels: Comma separated job labels, will be used to
425                       determine special task actions.
426        """
427        if not self.machines:
428            raise error.AutoservError('No machines specified to verify')
429        if self.resultdir:
430            os.chdir(self.resultdir)
431
432        namespace = self._make_namespace()
433        namespace.update({'job_labels': labels, 'args': ''})
434        self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
435
436
437    def reset(self, labels):
438        """Reset machines by first cleanup then verify each machine.
439
440        @param labels: Comma separated job labels, will be used to
441                       determine special task actions.
442        """
443        if not self.machines:
444            raise error.AutoservError('No machines specified to reset.')
445        if self.resultdir:
446            os.chdir(self.resultdir)
447
448        namespace = self._make_namespace()
449        namespace.update({'job_labels': labels, 'args': ''})
450        self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
451
452
453    def repair(self, labels):
454        """Repair machines.
455
456        @param labels: Comma separated job labels, will be used to
457                       determine special task actions.
458        """
459        if not self.machines:
460            raise error.AutoservError('No machines specified to repair')
461        if self.resultdir:
462            os.chdir(self.resultdir)
463
464        namespace = self._make_namespace()
465        namespace.update({'job_labels': labels, 'args': ''})
466        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
467
468
469    def provision(self, labels):
470        """
471        Provision all hosts to match |labels|.
472
473        @param labels: A comma seperated string of labels to provision the
474                       host to.
475
476        """
477        control = self._load_control_file(PROVISION_CONTROL_FILE)
478        self.run(control=control, job_labels=labels)
479
480
481    def precheck(self):
482        """
483        perform any additional checks in derived classes.
484        """
485        pass
486
487
488    def enable_external_logging(self):
489        """
490        Start or restart external logging mechanism.
491        """
492        pass
493
494
495    def disable_external_logging(self):
496        """
497        Pause or stop external logging mechanism.
498        """
499        pass
500
501
502    def use_external_logging(self):
503        """
504        Return True if external logging should be used.
505        """
506        return False
507
508
509    def _make_parallel_wrapper(self, function, machines, log):
510        """Wrap function as appropriate for calling by parallel_simple."""
511        # machines could be a list of dictionaries, e.g.,
512        # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
513        # The dictionary is generated in base_server_job.__init__, refer to
514        # variable machine_dict_list, then passed in with namespace, see method
515        # base_server_job._make_namespace.
516        # To compare the machinese to self.machines, which is a list of machine
517        # hostname, we need to convert machines back to a list of hostnames.
518        # Note that the order of hostnames doesn't matter, as is_forking will be
519        # True if there are more than one machine.
520        if (machines and isinstance(machines, list)
521            and isinstance(machines[0], dict)):
522            machines = [m['hostname'] for m in machines]
523        is_forking = not (len(machines) == 1 and self.machines == machines)
524        if self._parse_job and is_forking and log:
525            def wrapper(machine):
526                hostname = server_utils.get_hostname_from_machine(machine)
527                self._parse_job += "/" + hostname
528                self._using_parser = INCREMENTAL_TKO_PARSING
529                self.machines = [machine]
530                self.push_execution_context(hostname)
531                os.chdir(self.resultdir)
532                utils.write_keyval(self.resultdir, {"hostname": hostname})
533                self.init_parser()
534                result = function(machine)
535                self.cleanup_parser()
536                return result
537        elif len(machines) > 1 and log:
538            def wrapper(machine):
539                hostname = server_utils.get_hostname_from_machine(machine)
540                self.push_execution_context(hostname)
541                os.chdir(self.resultdir)
542                machine_data = {'hostname' : hostname,
543                                'status_version' : str(self._STATUS_VERSION)}
544                utils.write_keyval(self.resultdir, machine_data)
545                result = function(machine)
546                return result
547        else:
548            wrapper = function
549        return wrapper
550
551
552    def parallel_simple(self, function, machines, log=True, timeout=None,
553                        return_results=False):
554        """
555        Run 'function' using parallel_simple, with an extra wrapper to handle
556        the necessary setup for continuous parsing, if possible. If continuous
557        parsing is already properly initialized then this should just work.
558
559        @param function: A callable to run in parallel given each machine.
560        @param machines: A list of machine names to be passed one per subcommand
561                invocation of function.
562        @param log: If True, output will be written to output in a subdirectory
563                named after each machine.
564        @param timeout: Seconds after which the function call should timeout.
565        @param return_results: If True instead of an AutoServError being raised
566                on any error a list of the results|exceptions from the function
567                called on each arg is returned.  [default: False]
568
569        @raises error.AutotestError: If any of the functions failed.
570        """
571        wrapper = self._make_parallel_wrapper(function, machines, log)
572        return subcommand.parallel_simple(wrapper, machines,
573                                          log=log, timeout=timeout,
574                                          return_results=return_results)
575
576
577    def parallel_on_machines(self, function, machines, timeout=None):
578        """
579        @param function: Called in parallel with one machine as its argument.
580        @param machines: A list of machines to call function(machine) on.
581        @param timeout: Seconds after which the function call should timeout.
582
583        @returns A list of machines on which function(machine) returned
584                without raising an exception.
585        """
586        results = self.parallel_simple(function, machines, timeout=timeout,
587                                       return_results=True)
588        success_machines = []
589        for result, machine in itertools.izip(results, machines):
590            if not isinstance(result, Exception):
591                success_machines.append(machine)
592        return success_machines
593
594
595    _USE_TEMP_DIR = object()
596    def run(self, install_before=False, install_after=False,
597            collect_crashdumps=True, namespace={}, control=None,
598            control_file_dir=None, verify_job_repo_url=False,
599            only_collect_crashinfo=False, skip_crash_collection=False,
600            job_labels='', use_packaging=True):
601        # for a normal job, make sure the uncollected logs file exists
602        # for a crashinfo-only run it should already exist, bail out otherwise
603        created_uncollected_logs = False
604        logging.info("I am PID %s", os.getpid())
605        if self.resultdir and not os.path.exists(self._uncollected_log_file):
606            if only_collect_crashinfo:
607                # if this is a crashinfo-only run, and there were no existing
608                # uncollected logs, just bail out early
609                logging.info("No existing uncollected logs, "
610                             "skipping crashinfo collection")
611                return
612            else:
613                log_file = open(self._uncollected_log_file, "w")
614                pickle.dump([], log_file)
615                log_file.close()
616                created_uncollected_logs = True
617
618        # use a copy so changes don't affect the original dictionary
619        namespace = namespace.copy()
620        machines = self.machines
621        if control is None:
622            if self.control is None:
623                control = ''
624            else:
625                control = self._load_control_file(self.control)
626        if control_file_dir is None:
627            control_file_dir = self.resultdir
628
629        self.aborted = False
630        namespace.update(self._make_namespace())
631        namespace.update({'args' : self.args,
632                          'job_labels' : job_labels})
633        test_start_time = int(time.time())
634
635        if self.resultdir:
636            os.chdir(self.resultdir)
637            # touch status.log so that the parser knows a job is running here
638            open(self.get_status_log_path(), 'a').close()
639            self.enable_external_logging()
640
641        collect_crashinfo = True
642        temp_control_file_dir = None
643        try:
644            try:
645                if install_before and machines:
646                    self._execute_code(INSTALL_CONTROL_FILE, namespace)
647
648                if only_collect_crashinfo:
649                    return
650
651                # If the verify_job_repo_url option is set but we're unable
652                # to actually verify that the job_repo_url contains the autotest
653                # package, this job will fail.
654                if verify_job_repo_url:
655                    self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
656                                       namespace)
657                else:
658                    logging.warning('Not checking if job_repo_url contains '
659                                    'autotest packages on %s', machines)
660
661                # determine the dir to write the control files to
662                cfd_specified = (control_file_dir
663                                 and control_file_dir is not self._USE_TEMP_DIR)
664                if cfd_specified:
665                    temp_control_file_dir = None
666                else:
667                    temp_control_file_dir = tempfile.mkdtemp(
668                        suffix='temp_control_file_dir')
669                    control_file_dir = temp_control_file_dir
670                server_control_file = os.path.join(control_file_dir,
671                                                   self._control_filename)
672                client_control_file = os.path.join(control_file_dir,
673                                                   CLIENT_CONTROL_FILENAME)
674                if self._client:
675                    namespace['control'] = control
676                    utils.open_write_close(client_control_file, control)
677                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
678                                    server_control_file)
679                else:
680                    utils.open_write_close(server_control_file, control)
681
682                logging.info("Processing control file")
683                namespace['use_packaging'] = use_packaging
684                self._execute_code(server_control_file, namespace)
685                logging.info("Finished processing control file")
686
687                # If no device error occured, no need to collect crashinfo.
688                collect_crashinfo = self.failed_with_device_error
689            except Exception, e:
690                try:
691                    logging.exception(
692                            'Exception escaped control file, job aborting:')
693                    reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
694                                    ' ', str(e))
695                    self.record('INFO', None, None, str(e),
696                                {'job_abort_reason': reason})
697                except:
698                    pass # don't let logging exceptions here interfere
699                raise
700        finally:
701            if temp_control_file_dir:
702                # Clean up temp directory used for copies of the control files
703                try:
704                    shutil.rmtree(temp_control_file_dir)
705                except Exception, e:
706                    logging.warning('Could not remove temp directory %s: %s',
707                                 temp_control_file_dir, e)
708
709            if machines and (collect_crashdumps or collect_crashinfo):
710                namespace['test_start_time'] = test_start_time
711                if skip_crash_collection:
712                    logging.info('Skipping crash dump/info collection '
713                                 'as requested.')
714                elif collect_crashinfo:
715                    # includes crashdumps
716                    self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
717                else:
718                    self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
719            self.disable_external_logging()
720            if self._uncollected_log_file and created_uncollected_logs:
721                os.remove(self._uncollected_log_file)
722            if install_after and machines:
723                self._execute_code(INSTALL_CONTROL_FILE, namespace)
724
725
726    def run_test(self, url, *args, **dargs):
727        """
728        Summon a test object and run it.
729
730        tag
731                tag to add to testname
732        url
733                url of the test to run
734        """
735        if self._disable_sysinfo:
736            dargs['disable_sysinfo'] = True
737
738        group, testname = self.pkgmgr.get_package_name(url, 'test')
739        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
740        outputdir = self._make_test_outputdir(subdir)
741
742        def group_func():
743            try:
744                test.runtest(self, url, tag, args, dargs)
745            except error.TestBaseException, e:
746                self.record(e.exit_status, subdir, testname, str(e))
747                raise
748            except Exception, e:
749                info = str(e) + "\n" + traceback.format_exc()
750                self.record('FAIL', subdir, testname, info)
751                raise
752            else:
753                self.record('GOOD', subdir, testname, 'completed successfully')
754
755        result, exc_info = self._run_group(testname, subdir, group_func)
756        if exc_info and isinstance(exc_info[1], error.TestBaseException):
757            return False
758        elif exc_info:
759            raise exc_info[0], exc_info[1], exc_info[2]
760        else:
761            return True
762
763
764    def _run_group(self, name, subdir, function, *args, **dargs):
765        """\
766        Underlying method for running something inside of a group.
767        """
768        result, exc_info = None, None
769        try:
770            self.record('START', subdir, name)
771            result = function(*args, **dargs)
772        except error.TestBaseException, e:
773            self.record("END %s" % e.exit_status, subdir, name)
774            exc_info = sys.exc_info()
775        except Exception, e:
776            err_msg = str(e) + '\n'
777            err_msg += traceback.format_exc()
778            self.record('END ABORT', subdir, name, err_msg)
779            raise error.JobError(name + ' failed\n' + traceback.format_exc())
780        else:
781            self.record('END GOOD', subdir, name)
782
783        return result, exc_info
784
785
786    def run_group(self, function, *args, **dargs):
787        """\
788        function:
789                subroutine to run
790        *args:
791                arguments for the function
792        """
793
794        name = function.__name__
795
796        # Allow the tag for the group to be specified.
797        tag = dargs.pop('tag', None)
798        if tag:
799            name = tag
800
801        return self._run_group(name, None, function, *args, **dargs)[0]
802
803
804    def run_op(self, op, op_func, get_kernel_func):
805        """\
806        A specialization of run_group meant specifically for handling
807        management operation. Includes support for capturing the kernel version
808        after the operation.
809
810        Args:
811           op: name of the operation.
812           op_func: a function that carries out the operation (reboot, suspend)
813           get_kernel_func: a function that returns a string
814                            representing the kernel version.
815        """
816        try:
817            self.record('START', None, op)
818            op_func()
819        except Exception, e:
820            err_msg = str(e) + '\n' + traceback.format_exc()
821            self.record('END FAIL', None, op, err_msg)
822            raise
823        else:
824            kernel = get_kernel_func()
825            self.record('END GOOD', None, op,
826                        optional_fields={"kernel": kernel})
827
828
829    def run_control(self, path):
830        """Execute a control file found at path (relative to the autotest
831        path). Intended for executing a control file within a control file,
832        not for running the top-level job control file."""
833        path = os.path.join(self.autodir, path)
834        control_file = self._load_control_file(path)
835        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
836
837
838    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
839        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
840                                   on_every_test)
841
842
843    def add_sysinfo_logfile(self, file, on_every_test=False):
844        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
845
846
847    def _add_sysinfo_loggable(self, loggable, on_every_test):
848        if on_every_test:
849            self.sysinfo.test_loggables.add(loggable)
850        else:
851            self.sysinfo.boot_loggables.add(loggable)
852
853
854    def _read_warnings(self):
855        """Poll all the warning loggers and extract any new warnings that have
856        been logged. If the warnings belong to a category that is currently
857        disabled, this method will discard them and they will no longer be
858        retrievable.
859
860        Returns a list of (timestamp, message) tuples, where timestamp is an
861        integer epoch timestamp."""
862        warnings = []
863        while True:
864            # pull in a line of output from every logger that has
865            # output ready to be read
866            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
867            closed_loggers = set()
868            for logger in loggers:
869                line = logger.readline()
870                # record any broken pipes (aka line == empty)
871                if len(line) == 0:
872                    closed_loggers.add(logger)
873                    continue
874                # parse out the warning
875                timestamp, msgtype, msg = line.split('\t', 2)
876                timestamp = int(timestamp)
877                # if the warning is valid, add it to the results
878                if self.warning_manager.is_valid(timestamp, msgtype):
879                    warnings.append((timestamp, msg.strip()))
880
881            # stop listening to loggers that are closed
882            self.warning_loggers -= closed_loggers
883
884            # stop if none of the loggers have any output left
885            if not loggers:
886                break
887
888        # sort into timestamp order
889        warnings.sort()
890        return warnings
891
892
893    def _unique_subdirectory(self, base_subdirectory_name):
894        """Compute a unique results subdirectory based on the given name.
895
896        Appends base_subdirectory_name with a number as necessary to find a
897        directory name that doesn't already exist.
898        """
899        subdirectory = base_subdirectory_name
900        counter = 1
901        while os.path.exists(os.path.join(self.resultdir, subdirectory)):
902            subdirectory = base_subdirectory_name + '.' + str(counter)
903            counter += 1
904        return subdirectory
905
906
907    def get_record_context(self):
908        """Returns an object representing the current job.record context.
909
910        The object returned is an opaque object with a 0-arg restore method
911        which can be called to restore the job.record context (i.e. indentation)
912        to the current level. The intention is that it should be used when
913        something external which generate job.record calls (e.g. an autotest
914        client) can fail catastrophically and the server job record state
915        needs to be reset to its original "known good" state.
916
917        @return: A context object with a 0-arg restore() method."""
918        return self._indenter.get_context()
919
920
921    def record_summary(self, status_code, test_name, reason='', attributes=None,
922                       distinguishing_attributes=(), child_test_ids=None):
923        """Record a summary test result.
924
925        @param status_code: status code string, see
926                common_lib.log.is_valid_status()
927        @param test_name: name of the test
928        @param reason: (optional) string providing detailed reason for test
929                outcome
930        @param attributes: (optional) dict of string keyvals to associate with
931                this result
932        @param distinguishing_attributes: (optional) list of attribute names
933                that should be used to distinguish identically-named test
934                results.  These attributes should be present in the attributes
935                parameter.  This is used to generate user-friendly subdirectory
936                names.
937        @param child_test_ids: (optional) list of test indices for test results
938                used in generating this result.
939        """
940        subdirectory_name_parts = [test_name]
941        for attribute in distinguishing_attributes:
942            assert attributes
943            assert attribute in attributes, '%s not in %s' % (attribute,
944                                                              attributes)
945            subdirectory_name_parts.append(attributes[attribute])
946        base_subdirectory_name = '.'.join(subdirectory_name_parts)
947
948        subdirectory = self._unique_subdirectory(base_subdirectory_name)
949        subdirectory_path = os.path.join(self.resultdir, subdirectory)
950        os.mkdir(subdirectory_path)
951
952        self.record(status_code, subdirectory, test_name,
953                    status=reason, optional_fields={'is_summary': True})
954
955        if attributes:
956            utils.write_keyval(subdirectory_path, attributes)
957
958        if child_test_ids:
959            ids_string = ','.join(str(test_id) for test_id in child_test_ids)
960            summary_data = {'child_test_ids': ids_string}
961            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
962                               summary_data)
963
964
965    def disable_warnings(self, warning_type):
966        self.warning_manager.disable_warnings(warning_type)
967        self.record("INFO", None, None,
968                    "disabling %s warnings" % warning_type,
969                    {"warnings.disable": warning_type})
970
971
972    def enable_warnings(self, warning_type):
973        self.warning_manager.enable_warnings(warning_type)
974        self.record("INFO", None, None,
975                    "enabling %s warnings" % warning_type,
976                    {"warnings.enable": warning_type})
977
978
979    def get_status_log_path(self, subdir=None):
980        """Return the path to the job status log.
981
982        @param subdir - Optional paramter indicating that you want the path
983            to a subdirectory status log.
984
985        @returns The path where the status log should be.
986        """
987        if self.resultdir:
988            if subdir:
989                return os.path.join(self.resultdir, subdir, "status.log")
990            else:
991                return os.path.join(self.resultdir, "status.log")
992        else:
993            return None
994
995
996    def _update_uncollected_logs_list(self, update_func):
997        """Updates the uncollected logs list in a multi-process safe manner.
998
999        @param update_func - a function that updates the list of uncollected
1000            logs. Should take one parameter, the list to be updated.
1001        """
1002        # Skip log collection if file _uncollected_log_file does not exist.
1003        if not (self._uncollected_log_file and
1004                os.path.exists(self._uncollected_log_file)):
1005            return
1006        if self._uncollected_log_file:
1007            log_file = open(self._uncollected_log_file, "r+")
1008            fcntl.flock(log_file, fcntl.LOCK_EX)
1009        try:
1010            uncollected_logs = pickle.load(log_file)
1011            update_func(uncollected_logs)
1012            log_file.seek(0)
1013            log_file.truncate()
1014            pickle.dump(uncollected_logs, log_file)
1015            log_file.flush()
1016        finally:
1017            fcntl.flock(log_file, fcntl.LOCK_UN)
1018            log_file.close()
1019
1020
1021    def add_client_log(self, hostname, remote_path, local_path):
1022        """Adds a new set of client logs to the list of uncollected logs,
1023        to allow for future log recovery.
1024
1025        @param host - the hostname of the machine holding the logs
1026        @param remote_path - the directory on the remote machine holding logs
1027        @param local_path - the local directory to copy the logs into
1028        """
1029        def update_func(logs_list):
1030            logs_list.append((hostname, remote_path, local_path))
1031        self._update_uncollected_logs_list(update_func)
1032
1033
1034    def remove_client_log(self, hostname, remote_path, local_path):
1035        """Removes a set of client logs from the list of uncollected logs,
1036        to allow for future log recovery.
1037
1038        @param host - the hostname of the machine holding the logs
1039        @param remote_path - the directory on the remote machine holding logs
1040        @param local_path - the local directory to copy the logs into
1041        """
1042        def update_func(logs_list):
1043            logs_list.remove((hostname, remote_path, local_path))
1044        self._update_uncollected_logs_list(update_func)
1045
1046
1047    def get_client_logs(self):
1048        """Retrieves the list of uncollected logs, if it exists.
1049
1050        @returns A list of (host, remote_path, local_path) tuples. Returns
1051                 an empty list if no uncollected logs file exists.
1052        """
1053        log_exists = (self._uncollected_log_file and
1054                      os.path.exists(self._uncollected_log_file))
1055        if log_exists:
1056            return pickle.load(open(self._uncollected_log_file))
1057        else:
1058            return []
1059
1060
1061    def _fill_server_control_namespace(self, namespace, protect=True):
1062        """
1063        Prepare a namespace to be used when executing server control files.
1064
1065        This sets up the control file API by importing modules and making them
1066        available under the appropriate names within namespace.
1067
1068        For use by _execute_code().
1069
1070        Args:
1071          namespace: The namespace dictionary to fill in.
1072          protect: Boolean.  If True (the default) any operation that would
1073              clobber an existing entry in namespace will cause an error.
1074        Raises:
1075          error.AutoservError: When a name would be clobbered by import.
1076        """
1077        def _import_names(module_name, names=()):
1078            """
1079            Import a module and assign named attributes into namespace.
1080
1081            Args:
1082                module_name: The string module name.
1083                names: A limiting list of names to import from module_name.  If
1084                    empty (the default), all names are imported from the module
1085                    similar to a "from foo.bar import *" statement.
1086            Raises:
1087                error.AutoservError: When a name being imported would clobber
1088                    a name already in namespace.
1089            """
1090            module = __import__(module_name, {}, {}, names)
1091
1092            # No names supplied?  Import * from the lowest level module.
1093            # (Ugh, why do I have to implement this part myself?)
1094            if not names:
1095                for submodule_name in module_name.split('.')[1:]:
1096                    module = getattr(module, submodule_name)
1097                if hasattr(module, '__all__'):
1098                    names = getattr(module, '__all__')
1099                else:
1100                    names = dir(module)
1101
1102            # Install each name into namespace, checking to make sure it
1103            # doesn't override anything that already exists.
1104            for name in names:
1105                # Check for conflicts to help prevent future problems.
1106                if name in namespace and protect:
1107                    if namespace[name] is not getattr(module, name):
1108                        raise error.AutoservError('importing name '
1109                                '%s from %s %r would override %r' %
1110                                (name, module_name, getattr(module, name),
1111                                 namespace[name]))
1112                    else:
1113                        # Encourage cleanliness and the use of __all__ for a
1114                        # more concrete API with less surprises on '*' imports.
1115                        warnings.warn('%s (%r) being imported from %s for use '
1116                                      'in server control files is not the '
1117                                      'first occurrance of that import.' %
1118                                      (name, namespace[name], module_name))
1119
1120                namespace[name] = getattr(module, name)
1121
1122
1123        # This is the equivalent of prepending a bunch of import statements to
1124        # the front of the control script.
1125        namespace.update(os=os, sys=sys, logging=logging)
1126        _import_names('autotest_lib.server',
1127                ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
1128                 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
1129        _import_names('autotest_lib.server.subcommand',
1130                      ('parallel', 'parallel_simple', 'subcommand'))
1131        _import_names('autotest_lib.server.utils',
1132                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1133        _import_names('autotest_lib.client.common_lib.error')
1134        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1135
1136        # Inject ourself as the job object into other classes within the API.
1137        # (Yuck, this injection is a gross thing be part of a public API. -gps)
1138        #
1139        # XXX Base & SiteAutotest do not appear to use .job.  Who does?
1140        namespace['autotest'].Autotest.job = self
1141        # server.hosts.base_classes.Host uses .job.
1142        namespace['hosts'].Host.job = self
1143        namespace['hosts'].TestBed.job = self
1144        namespace['hosts'].factory.ssh_user = self._ssh_user
1145        namespace['hosts'].factory.ssh_port = self._ssh_port
1146        namespace['hosts'].factory.ssh_pass = self._ssh_pass
1147        namespace['hosts'].factory.ssh_verbosity_flag = (
1148                self._ssh_verbosity_flag)
1149        namespace['hosts'].factory.ssh_options = self._ssh_options
1150
1151
1152    def _execute_code(self, code_file, namespace, protect=True):
1153        """
1154        Execute code using a copy of namespace as a server control script.
1155
1156        Unless protect_namespace is explicitly set to False, the dict will not
1157        be modified.
1158
1159        Args:
1160          code_file: The filename of the control file to execute.
1161          namespace: A dict containing names to make available during execution.
1162          protect: Boolean.  If True (the default) a copy of the namespace dict
1163              is used during execution to prevent the code from modifying its
1164              contents outside of this function.  If False the raw dict is
1165              passed in and modifications will be allowed.
1166        """
1167        if protect:
1168            namespace = namespace.copy()
1169        self._fill_server_control_namespace(namespace, protect=protect)
1170        # TODO: Simplify and get rid of the special cases for only 1 machine.
1171        if len(self.machines) > 1:
1172            machines_text = '\n'.join(self.machines) + '\n'
1173            # Only rewrite the file if it does not match our machine list.
1174            try:
1175                machines_f = open(MACHINES_FILENAME, 'r')
1176                existing_machines_text = machines_f.read()
1177                machines_f.close()
1178            except EnvironmentError:
1179                existing_machines_text = None
1180            if machines_text != existing_machines_text:
1181                utils.open_write_close(MACHINES_FILENAME, machines_text)
1182        execfile(code_file, namespace, namespace)
1183
1184
1185    def _parse_status(self, new_line):
1186        if not self._using_parser:
1187            return
1188        new_tests = self.parser.process_lines([new_line])
1189        for test in new_tests:
1190            self.__insert_test(test)
1191
1192
1193    def __insert_test(self, test):
1194        """
1195        An internal method to insert a new test result into the
1196        database. This method will not raise an exception, even if an
1197        error occurs during the insert, to avoid failing a test
1198        simply because of unexpected database issues."""
1199        self.num_tests_run += 1
1200        if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1201            self.num_tests_failed += 1
1202        try:
1203            self.results_db.insert_test(self.job_model, test)
1204        except Exception:
1205            msg = ("WARNING: An unexpected error occured while "
1206                   "inserting test results into the database. "
1207                   "Ignoring error.\n" + traceback.format_exc())
1208            print >> sys.stderr, msg
1209
1210
1211    def preprocess_client_state(self):
1212        """
1213        Produce a state file for initializing the state of a client job.
1214
1215        Creates a new client state file with all the current server state, as
1216        well as some pre-set client state.
1217
1218        @returns The path of the file the state was written into.
1219        """
1220        # initialize the sysinfo state
1221        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1222
1223        # dump the state out to a tempfile
1224        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1225        os.close(fd)
1226
1227        # write_to_file doesn't need locking, we exclusively own file_path
1228        self._state.write_to_file(file_path)
1229        return file_path
1230
1231
1232    def postprocess_client_state(self, state_path):
1233        """
1234        Update the state of this job with the state from a client job.
1235
1236        Updates the state of the server side of a job with the final state
1237        of a client job that was run. Updates the non-client-specific state,
1238        pulls in some specific bits from the client-specific state, and then
1239        discards the rest. Removes the state file afterwards
1240
1241        @param state_file A path to the state file from the client.
1242        """
1243        # update the on-disk state
1244        try:
1245            self._state.read_from_file(state_path)
1246            os.remove(state_path)
1247        except OSError, e:
1248            # ignore file-not-found errors
1249            if e.errno != errno.ENOENT:
1250                raise
1251            else:
1252                logging.debug('Client state file %s not found', state_path)
1253
1254        # update the sysinfo state
1255        if self._state.has('client', 'sysinfo'):
1256            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1257
1258        # drop all the client-specific state
1259        self._state.discard_namespace('client')
1260
1261
1262    def clear_all_known_hosts(self):
1263        """Clears known hosts files for all AbstractSSHHosts."""
1264        for host in self.hosts:
1265            if isinstance(host, abstract_ssh.AbstractSSHHost):
1266                host.clear_known_hosts()
1267
1268
1269class warning_manager(object):
1270    """Class for controlling warning logs. Manages the enabling and disabling
1271    of warnings."""
1272    def __init__(self):
1273        # a map of warning types to a list of disabled time intervals
1274        self.disabled_warnings = {}
1275
1276
1277    def is_valid(self, timestamp, warning_type):
1278        """Indicates if a warning (based on the time it occured and its type)
1279        is a valid warning. A warning is considered "invalid" if this type of
1280        warning was marked as "disabled" at the time the warning occured."""
1281        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1282        for start, end in disabled_intervals:
1283            if timestamp >= start and (end is None or timestamp < end):
1284                return False
1285        return True
1286
1287
1288    def disable_warnings(self, warning_type, current_time_func=time.time):
1289        """As of now, disables all further warnings of this type."""
1290        intervals = self.disabled_warnings.setdefault(warning_type, [])
1291        if not intervals or intervals[-1][1] is not None:
1292            intervals.append((int(current_time_func()), None))
1293
1294
1295    def enable_warnings(self, warning_type, current_time_func=time.time):
1296        """As of now, enables all further warnings of this type."""
1297        intervals = self.disabled_warnings.get(warning_type, [])
1298        if intervals and intervals[-1][1] is None:
1299            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1300
1301
1302# load up site-specific code for generating site-specific job data
1303get_site_job_data = utils.import_site_function(__file__,
1304    "autotest_lib.server.site_server_job", "get_site_job_data",
1305    _get_site_job_data_dummy)
1306
1307
1308site_server_job = utils.import_site_class(
1309    __file__, "autotest_lib.server.site_server_job", "site_server_job",
1310    base_server_job)
1311
1312
1313class server_job(site_server_job):
1314    pass
1315