1# pylint: disable-msg=C0111
2
3# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6"""
7The main job wrapper for the server side.
8
9This is the core infrastructure. Derived from the client side job.py
10
11Copyright Martin J. Bligh, Andy Whitcroft 2007
12"""
13
14import errno
15import fcntl
16import getpass
17import itertools
18import logging
19import os
20import pickle
21import platform
22import re
23import select
24import shutil
25import sys
26import tempfile
27import time
28import traceback
29import warnings
30
31from autotest_lib.client.bin import sysinfo
32from autotest_lib.client.common_lib import base_job
33from autotest_lib.client.common_lib import error
34from autotest_lib.client.common_lib import global_config
35from autotest_lib.client.common_lib import logging_manager
36from autotest_lib.client.common_lib import packages
37from autotest_lib.client.common_lib import utils
38from autotest_lib.server import profilers
39from autotest_lib.server import subcommand
40from autotest_lib.server import test
41from autotest_lib.server import utils as server_utils
42from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
43from autotest_lib.server.hosts import abstract_ssh
44from autotest_lib.server.hosts import afe_store
45from autotest_lib.server.hosts import factory as host_factory
46from autotest_lib.server.hosts import host_info
47from autotest_lib.tko import db as tko_db
48from autotest_lib.tko import models as tko_models
49from autotest_lib.tko import status_lib
50from autotest_lib.tko import parser_lib
51from autotest_lib.tko import utils as tko_utils
52
53
54INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value(
55        'autoserv', 'incremental_tko_parsing', type=bool, default=False)
56
57def _control_segment_path(name):
58    """Get the pathname of the named control segment file."""
59    server_dir = os.path.dirname(os.path.abspath(__file__))
60    return os.path.join(server_dir, "control_segments", name)
61
62
63CLIENT_CONTROL_FILENAME = 'control'
64SERVER_CONTROL_FILENAME = 'control.srv'
65MACHINES_FILENAME = '.machines'
66
67CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
68CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
69CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
70INSTALL_CONTROL_FILE = _control_segment_path('install')
71CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
72VERIFY_CONTROL_FILE = _control_segment_path('verify')
73REPAIR_CONTROL_FILE = _control_segment_path('repair')
74PROVISION_CONTROL_FILE = _control_segment_path('provision')
75VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
76RESET_CONTROL_FILE = _control_segment_path('reset')
77GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
78
79# by default provide a stub that generates no site data
80def _get_site_job_data_dummy(job):
81    return {}
82
83
84def get_machine_dicts(machine_names, in_lab, host_attributes=None):
85    """Converts a list of machine names to list of dicts.
86
87    @param machine_names: A list of machine names.
88    @param in_lab: A boolean indicating whether we're running in lab.
89    @param host_attributes: Optional list of host attributes to add for each
90            host.
91    @returns: A list of dicts. Each dict has the following keys:
92            'hostname': Name of the machine originally in machine_names (str).
93            'afe_host': A frontend.Host object for the machine, or a stub if
94                    in_lab is false.
95            'host_info_store': A host_info.CachingHostInfoStore object to obtain
96                    host information. A stub if in_lab is False.
97    """
98    if host_attributes is None:
99        host_attributes = dict()
100    machine_dict_list = []
101    for machine in machine_names:
102        afe_host = _create_afe_host(machine, in_lab)
103        afe_host.attributes.update(host_attributes)
104        machine_dict_list.append({
105                'hostname' : machine,
106                'afe_host' : afe_host,
107                'host_info_store': _create_host_info_store(machine, in_lab),
108        })
109    return machine_dict_list
110
111
112class status_indenter(base_job.status_indenter):
113    """Provide a simple integer-backed status indenter."""
114    def __init__(self):
115        self._indent = 0
116
117
118    @property
119    def indent(self):
120        return self._indent
121
122
123    def increment(self):
124        self._indent += 1
125
126
127    def decrement(self):
128        self._indent -= 1
129
130
131    def get_context(self):
132        """Returns a context object for use by job.get_record_context."""
133        class context(object):
134            def __init__(self, indenter, indent):
135                self._indenter = indenter
136                self._indent = indent
137            def restore(self):
138                self._indenter._indent = self._indent
139        return context(self, self._indent)
140
141
142class server_job_record_hook(object):
143    """The job.record hook for server job. Used to inject WARN messages from
144    the console or vlm whenever new logs are written, and to echo any logs
145    to INFO level logging. Implemented as a class so that it can use state to
146    block recursive calls, so that the hook can call job.record itself to
147    log WARN messages.
148
149    Depends on job._read_warnings and job._logger.
150    """
151    def __init__(self, job):
152        self._job = job
153        self._being_called = False
154
155
156    def __call__(self, entry):
157        """A wrapper around the 'real' record hook, the _hook method, which
158        prevents recursion. This isn't making any effort to be threadsafe,
159        the intent is to outright block infinite recursion via a
160        job.record->_hook->job.record->_hook->job.record... chain."""
161        if self._being_called:
162            return
163        self._being_called = True
164        try:
165            self._hook(self._job, entry)
166        finally:
167            self._being_called = False
168
169
170    @staticmethod
171    def _hook(job, entry):
172        """The core hook, which can safely call job.record."""
173        entries = []
174        # poll all our warning loggers for new warnings
175        for timestamp, msg in job._read_warnings():
176            warning_entry = base_job.status_log_entry(
177                'WARN', None, None, msg, {}, timestamp=timestamp)
178            entries.append(warning_entry)
179            job.record_entry(warning_entry)
180        # echo rendered versions of all the status logs to info
181        entries.append(entry)
182        for entry in entries:
183            rendered_entry = job._logger.render_entry(entry)
184            logging.info(rendered_entry)
185            job._parse_status(rendered_entry)
186
187
188class base_server_job(base_job.base_job):
189    """The server-side concrete implementation of base_job.
190
191    Optional properties provided by this implementation:
192        serverdir
193
194        num_tests_run
195        num_tests_failed
196
197        warning_manager
198        warning_loggers
199    """
200
201    _STATUS_VERSION = 1
202
203    # TODO crbug.com/285395 eliminate ssh_verbosity_flag
204    def __init__(self, control, args, resultdir, label, user, machines,
205                 client=False, parse_job='',
206                 ssh_user=host_factory.DEFAULT_SSH_USER,
207                 ssh_port=host_factory.DEFAULT_SSH_PORT,
208                 ssh_pass=host_factory.DEFAULT_SSH_PASS,
209                 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
210                 ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
211                 test_retry=0, group_name='',
212                 tag='', disable_sysinfo=False,
213                 control_filename=SERVER_CONTROL_FILENAME,
214                 parent_job_id=None, host_attributes=None, in_lab=False):
215        """
216        Create a server side job object.
217
218        @param control: The pathname of the control file.
219        @param args: Passed to the control file.
220        @param resultdir: Where to throw the results.
221        @param label: Description of the job.
222        @param user: Username for the job (email address).
223        @param client: True if this is a client-side control file.
224        @param parse_job: string, if supplied it is the job execution tag that
225                the results will be passed through to the TKO parser with.
226        @param ssh_user: The SSH username.  [root]
227        @param ssh_port: The SSH port number.  [22]
228        @param ssh_pass: The SSH passphrase, if needed.
229        @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
230                '-vvv', or an empty string if not needed.
231        @param ssh_options: A string giving additional options that will be
232                            included in ssh commands.
233        @param test_retry: The number of times to retry a test if the test did
234                not complete successfully.
235        @param group_name: If supplied, this will be written out as
236                host_group_name in the keyvals file for the parser.
237        @param tag: The job execution tag from the scheduler.  [optional]
238        @param disable_sysinfo: Whether we should disable the sysinfo step of
239                tests for a modest shortening of test time.  [optional]
240        @param control_filename: The filename where the server control file
241                should be written in the results directory.
242        @param parent_job_id: Job ID of the parent job. Default to None if the
243                job does not have a parent job.
244        @param host_attributes: Dict of host attributes passed into autoserv
245                                via the command line. If specified here, these
246                                attributes will apply to all machines.
247        @param in_lab: Boolean that indicates if this is running in the lab
248                       environment.
249        """
250        super(base_server_job, self).__init__(resultdir=resultdir,
251                                              test_retry=test_retry)
252        path = os.path.dirname(__file__)
253        self.test_retry = test_retry
254        self.control = control
255        self._uncollected_log_file = os.path.join(self.resultdir,
256                                                  'uncollected_logs')
257        debugdir = os.path.join(self.resultdir, 'debug')
258        if not os.path.exists(debugdir):
259            os.mkdir(debugdir)
260
261        if user:
262            self.user = user
263        else:
264            self.user = getpass.getuser()
265
266        self.args = args
267        self.label = label
268        self.machines = machines
269        self._client = client
270        self.warning_loggers = set()
271        self.warning_manager = warning_manager()
272        self._ssh_user = ssh_user
273        self._ssh_port = ssh_port
274        self._ssh_pass = ssh_pass
275        self._ssh_verbosity_flag = ssh_verbosity_flag
276        self._ssh_options = ssh_options
277        self.tag = tag
278        self.hosts = set()
279        self.drop_caches = False
280        self.drop_caches_between_iterations = False
281        self._control_filename = control_filename
282        self._disable_sysinfo = disable_sysinfo
283
284        self.logging = logging_manager.get_logging_manager(
285                manage_stdout_and_stderr=True, redirect_fds=True)
286        subcommand.logging_manager_object = self.logging
287
288        self.sysinfo = sysinfo.sysinfo(self.resultdir)
289        self.profilers = profilers.profilers(self)
290
291        job_data = {'label' : label, 'user' : user,
292                    'hostname' : ','.join(machines),
293                    'drone' : platform.node(),
294                    'status_version' : str(self._STATUS_VERSION),
295                    'job_started' : str(int(time.time()))}
296        # Save parent job id to keyvals, so parser can retrieve the info and
297        # write to tko_jobs record.
298        if parent_job_id:
299            job_data['parent_job_id'] = parent_job_id
300        if group_name:
301            job_data['host_group_name'] = group_name
302
303        # only write these keyvals out on the first job in a resultdir
304        if 'job_started' not in utils.read_keyval(self.resultdir):
305            job_data.update(get_site_job_data(self))
306            utils.write_keyval(self.resultdir, job_data)
307
308        self._parse_job = parse_job
309        self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job
310                              and len(machines) <= 1)
311        self.pkgmgr = packages.PackageManager(
312            self.autodir, run_function_dargs={'timeout':600})
313        self.num_tests_run = 0
314        self.num_tests_failed = 0
315
316        self._register_subcommand_hooks()
317
318        # set up the status logger
319        self._indenter = status_indenter()
320        self._logger = base_job.status_logger(
321            self, self._indenter, 'status.log', 'status.log',
322            record_hook=server_job_record_hook(self))
323
324        # Initialize a flag to indicate DUT failure during the test, e.g.,
325        # unexpected reboot.
326        self.failed_with_device_error = False
327
328        self.parent_job_id = parent_job_id
329        self.in_lab = in_lab
330        self.machine_dict_list = get_machine_dicts(
331                self.machines, self.in_lab, host_attributes)
332
333        # TODO(jrbarnette) The harness attribute is only relevant to
334        # client jobs, but it's required to be present, or we will fail
335        # server job unit tests.  Yes, really.
336        #
337        # TODO(jrbarnette) The utility of the 'harness' attribute even
338        # to client jobs is suspect.  Probably, we should remove it.
339        self.harness = None
340
341
342    @classmethod
343    def _find_base_directories(cls):
344        """
345        Determine locations of autodir, clientdir and serverdir. Assumes
346        that this file is located within serverdir and uses __file__ along
347        with relative paths to resolve the location.
348        """
349        serverdir = os.path.abspath(os.path.dirname(__file__))
350        autodir = os.path.normpath(os.path.join(serverdir, '..'))
351        clientdir = os.path.join(autodir, 'client')
352        return autodir, clientdir, serverdir
353
354
355    def _find_resultdir(self, resultdir, *args, **dargs):
356        """
357        Determine the location of resultdir. For server jobs we expect one to
358        always be explicitly passed in to __init__, so just return that.
359        """
360        if resultdir:
361            return os.path.normpath(resultdir)
362        else:
363            return None
364
365
366    def _get_status_logger(self):
367        """Return a reference to the status logger."""
368        return self._logger
369
370
371    @staticmethod
372    def _load_control_file(path):
373        f = open(path)
374        try:
375            control_file = f.read()
376        finally:
377            f.close()
378        return re.sub('\r', '', control_file)
379
380
381    def _register_subcommand_hooks(self):
382        """
383        Register some hooks into the subcommand modules that allow us
384        to properly clean up self.hosts created in forked subprocesses.
385        """
386        def on_fork(cmd):
387            self._existing_hosts_on_fork = set(self.hosts)
388        def on_join(cmd):
389            new_hosts = self.hosts - self._existing_hosts_on_fork
390            for host in new_hosts:
391                host.close()
392        subcommand.subcommand.register_fork_hook(on_fork)
393        subcommand.subcommand.register_join_hook(on_join)
394
395
396    def init_parser(self):
397        """
398        Start the continuous parsing of self.resultdir. This sets up
399        the database connection and inserts the basic job object into
400        the database if necessary.
401        """
402        if not self._using_parser:
403            return
404        # redirect parser debugging to .parse.log
405        parse_log = os.path.join(self.resultdir, '.parse.log')
406        parse_log = open(parse_log, 'w', 0)
407        tko_utils.redirect_parser_debugging(parse_log)
408        # create a job model object and set up the db
409        self.results_db = tko_db.db(autocommit=True)
410        self.parser = parser_lib.parser(self._STATUS_VERSION)
411        self.job_model = self.parser.make_job(self.resultdir)
412        self.parser.start(self.job_model)
413        # check if a job already exists in the db and insert it if
414        # it does not
415        job_idx = self.results_db.find_job(self._parse_job)
416        if job_idx is None:
417            self.results_db.insert_job(self._parse_job, self.job_model,
418                                       self.parent_job_id)
419        else:
420            machine_idx = self.results_db.lookup_machine(self.job_model.machine)
421            self.job_model.index = job_idx
422            self.job_model.machine_idx = machine_idx
423
424
425    def cleanup_parser(self):
426        """
427        This should be called after the server job is finished
428        to carry out any remaining cleanup (e.g. flushing any
429        remaining test results to the results db)
430        """
431        if not self._using_parser:
432            return
433        final_tests = self.parser.end()
434        for test in final_tests:
435            self.__insert_test(test)
436        self._using_parser = False
437
438    # TODO crbug.com/285395 add a kwargs parameter.
439    def _make_namespace(self):
440        """Create a namespace dictionary to be passed along to control file.
441
442        Creates a namespace argument populated with standard values:
443        machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
444        and ssh_options.
445        """
446        namespace = {'machines' : self.machine_dict_list,
447                     'job' : self,
448                     'ssh_user' : self._ssh_user,
449                     'ssh_port' : self._ssh_port,
450                     'ssh_pass' : self._ssh_pass,
451                     'ssh_verbosity_flag' : self._ssh_verbosity_flag,
452                     'ssh_options' : self._ssh_options}
453        return namespace
454
455
456    def cleanup(self, labels):
457        """Cleanup machines.
458
459        @param labels: Comma separated job labels, will be used to
460                       determine special task actions.
461        """
462        if not self.machines:
463            raise error.AutoservError('No machines specified to cleanup')
464        if self.resultdir:
465            os.chdir(self.resultdir)
466
467        namespace = self._make_namespace()
468        namespace.update({'job_labels': labels, 'args': ''})
469        self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
470
471
472    def verify(self, labels):
473        """Verify machines are all ssh-able.
474
475        @param labels: Comma separated job labels, will be used to
476                       determine special task actions.
477        """
478        if not self.machines:
479            raise error.AutoservError('No machines specified to verify')
480        if self.resultdir:
481            os.chdir(self.resultdir)
482
483        namespace = self._make_namespace()
484        namespace.update({'job_labels': labels, 'args': ''})
485        self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
486
487
488    def reset(self, labels):
489        """Reset machines by first cleanup then verify each machine.
490
491        @param labels: Comma separated job labels, will be used to
492                       determine special task actions.
493        """
494        if not self.machines:
495            raise error.AutoservError('No machines specified to reset.')
496        if self.resultdir:
497            os.chdir(self.resultdir)
498
499        namespace = self._make_namespace()
500        namespace.update({'job_labels': labels, 'args': ''})
501        self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
502
503
504    def repair(self, labels):
505        """Repair machines.
506
507        @param labels: Comma separated job labels, will be used to
508                       determine special task actions.
509        """
510        if not self.machines:
511            raise error.AutoservError('No machines specified to repair')
512        if self.resultdir:
513            os.chdir(self.resultdir)
514
515        namespace = self._make_namespace()
516        namespace.update({'job_labels': labels, 'args': ''})
517        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
518
519
520    def provision(self, labels):
521        """
522        Provision all hosts to match |labels|.
523
524        @param labels: A comma seperated string of labels to provision the
525                       host to.
526
527        """
528        control = self._load_control_file(PROVISION_CONTROL_FILE)
529        self.run(control=control, job_labels=labels)
530
531
532    def precheck(self):
533        """
534        perform any additional checks in derived classes.
535        """
536        pass
537
538
539    def enable_external_logging(self):
540        """
541        Start or restart external logging mechanism.
542        """
543        pass
544
545
546    def disable_external_logging(self):
547        """
548        Pause or stop external logging mechanism.
549        """
550        pass
551
552
553    def use_external_logging(self):
554        """
555        Return True if external logging should be used.
556        """
557        return False
558
559
560    def _make_parallel_wrapper(self, function, machines, log):
561        """Wrap function as appropriate for calling by parallel_simple."""
562        # machines could be a list of dictionaries, e.g.,
563        # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
564        # The dictionary is generated in base_server_job.__init__, refer to
565        # variable machine_dict_list, then passed in with namespace, see method
566        # base_server_job._make_namespace.
567        # To compare the machinese to self.machines, which is a list of machine
568        # hostname, we need to convert machines back to a list of hostnames.
569        # Note that the order of hostnames doesn't matter, as is_forking will be
570        # True if there are more than one machine.
571        if (machines and isinstance(machines, list)
572            and isinstance(machines[0], dict)):
573            machines = [m['hostname'] for m in machines]
574        is_forking = not (len(machines) == 1 and self.machines == machines)
575        if self._parse_job and is_forking and log:
576            def wrapper(machine):
577                hostname = server_utils.get_hostname_from_machine(machine)
578                self._parse_job += "/" + hostname
579                self._using_parser = INCREMENTAL_TKO_PARSING
580                self.machines = [machine]
581                self.push_execution_context(hostname)
582                os.chdir(self.resultdir)
583                utils.write_keyval(self.resultdir, {"hostname": hostname})
584                self.init_parser()
585                result = function(machine)
586                self.cleanup_parser()
587                return result
588        elif len(machines) > 1 and log:
589            def wrapper(machine):
590                hostname = server_utils.get_hostname_from_machine(machine)
591                self.push_execution_context(hostname)
592                os.chdir(self.resultdir)
593                machine_data = {'hostname' : hostname,
594                                'status_version' : str(self._STATUS_VERSION)}
595                utils.write_keyval(self.resultdir, machine_data)
596                result = function(machine)
597                return result
598        else:
599            wrapper = function
600        return wrapper
601
602
603    def parallel_simple(self, function, machines, log=True, timeout=None,
604                        return_results=False):
605        """
606        Run 'function' using parallel_simple, with an extra wrapper to handle
607        the necessary setup for continuous parsing, if possible. If continuous
608        parsing is already properly initialized then this should just work.
609
610        @param function: A callable to run in parallel given each machine.
611        @param machines: A list of machine names to be passed one per subcommand
612                invocation of function.
613        @param log: If True, output will be written to output in a subdirectory
614                named after each machine.
615        @param timeout: Seconds after which the function call should timeout.
616        @param return_results: If True instead of an AutoServError being raised
617                on any error a list of the results|exceptions from the function
618                called on each arg is returned.  [default: False]
619
620        @raises error.AutotestError: If any of the functions failed.
621        """
622        wrapper = self._make_parallel_wrapper(function, machines, log)
623        return subcommand.parallel_simple(wrapper, machines,
624                                          log=log, timeout=timeout,
625                                          return_results=return_results)
626
627
628    def parallel_on_machines(self, function, machines, timeout=None):
629        """
630        @param function: Called in parallel with one machine as its argument.
631        @param machines: A list of machines to call function(machine) on.
632        @param timeout: Seconds after which the function call should timeout.
633
634        @returns A list of machines on which function(machine) returned
635                without raising an exception.
636        """
637        results = self.parallel_simple(function, machines, timeout=timeout,
638                                       return_results=True)
639        success_machines = []
640        for result, machine in itertools.izip(results, machines):
641            if not isinstance(result, Exception):
642                success_machines.append(machine)
643        return success_machines
644
645
646    def _has_failed_tests(self):
647        """Parse status log for failed tests.
648
649        This checks the current working directory and is intended only for use
650        by the run() method.
651
652        @return boolean
653        """
654        path = os.getcwd()
655
656        # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
657        # make code reuse plausible.
658        job_keyval = tko_models.job.read_keyval(path)
659        status_version = job_keyval.get("status_version", 0)
660
661        # parse out the job
662        parser = parser_lib.parser(status_version)
663        job = parser.make_job(path)
664        status_log = os.path.join(path, "status.log")
665        if not os.path.exists(status_log):
666            status_log = os.path.join(path, "status")
667        if not os.path.exists(status_log):
668            logging.warning("! Unable to parse job, no status file")
669            return True
670
671        # parse the status logs
672        status_lines = open(status_log).readlines()
673        parser.start(job)
674        tests = parser.end(status_lines)
675
676        # parser.end can return the same object multiple times, so filter out
677        # dups
678        job.tests = []
679        already_added = set()
680        for test in tests:
681            if test not in already_added:
682                already_added.add(test)
683                job.tests.append(test)
684
685        failed = False
686        for test in job.tests:
687            # The current job is still running and shouldn't count as failed.
688            # The parser will fail to parse the exit status of the job since it
689            # hasn't exited yet (this running right now is the job).
690            failed = failed or (test.status != 'GOOD'
691                                and not _is_current_server_job(test))
692        return failed
693
694
695    def _collect_crashes(self, namespace, collect_crashinfo):
696        """Collect crashes.
697
698        @param namespace: namespace dict.
699        @param collect_crashinfo: whether to collect crashinfo in addition to
700                dumps
701        """
702        if collect_crashinfo:
703            # includes crashdumps
704            crash_control_file = CRASHINFO_CONTROL_FILE
705        else:
706            crash_control_file = CRASHDUMPS_CONTROL_FILE
707        self._execute_code(crash_control_file, namespace)
708
709
710    _USE_TEMP_DIR = object()
711    def run(self, install_before=False, install_after=False,
712            collect_crashdumps=True, namespace={}, control=None,
713            control_file_dir=None, verify_job_repo_url=False,
714            only_collect_crashinfo=False, skip_crash_collection=False,
715            job_labels='', use_packaging=True):
716        # for a normal job, make sure the uncollected logs file exists
717        # for a crashinfo-only run it should already exist, bail out otherwise
718        created_uncollected_logs = False
719        logging.info("I am PID %s", os.getpid())
720        if self.resultdir and not os.path.exists(self._uncollected_log_file):
721            if only_collect_crashinfo:
722                # if this is a crashinfo-only run, and there were no existing
723                # uncollected logs, just bail out early
724                logging.info("No existing uncollected logs, "
725                             "skipping crashinfo collection")
726                return
727            else:
728                log_file = open(self._uncollected_log_file, "w")
729                pickle.dump([], log_file)
730                log_file.close()
731                created_uncollected_logs = True
732
733        # use a copy so changes don't affect the original dictionary
734        namespace = namespace.copy()
735        machines = self.machines
736        if control is None:
737            if self.control is None:
738                control = ''
739            else:
740                control = self._load_control_file(self.control)
741        if control_file_dir is None:
742            control_file_dir = self.resultdir
743
744        self.aborted = False
745        namespace.update(self._make_namespace())
746        namespace.update({'args' : self.args,
747                          'job_labels' : job_labels})
748        test_start_time = int(time.time())
749
750        if self.resultdir:
751            os.chdir(self.resultdir)
752            # touch status.log so that the parser knows a job is running here
753            open(self.get_status_log_path(), 'a').close()
754            self.enable_external_logging()
755
756        collect_crashinfo = True
757        temp_control_file_dir = None
758        try:
759            try:
760                namespace['network_stats_label'] = 'at-start'
761                self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace)
762                if install_before and machines:
763                    self._execute_code(INSTALL_CONTROL_FILE, namespace)
764
765                if only_collect_crashinfo:
766                    return
767
768                # If the verify_job_repo_url option is set but we're unable
769                # to actually verify that the job_repo_url contains the autotest
770                # package, this job will fail.
771                if verify_job_repo_url:
772                    self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
773                                       namespace)
774                else:
775                    logging.warning('Not checking if job_repo_url contains '
776                                    'autotest packages on %s', machines)
777
778                # determine the dir to write the control files to
779                cfd_specified = (control_file_dir
780                                 and control_file_dir is not self._USE_TEMP_DIR)
781                if cfd_specified:
782                    temp_control_file_dir = None
783                else:
784                    temp_control_file_dir = tempfile.mkdtemp(
785                        suffix='temp_control_file_dir')
786                    control_file_dir = temp_control_file_dir
787                server_control_file = os.path.join(control_file_dir,
788                                                   self._control_filename)
789                client_control_file = os.path.join(control_file_dir,
790                                                   CLIENT_CONTROL_FILENAME)
791                if self._client:
792                    namespace['control'] = control
793                    utils.open_write_close(client_control_file, control)
794                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
795                                    server_control_file)
796                else:
797                    utils.open_write_close(server_control_file, control)
798
799                logging.info("Processing control file")
800                namespace['use_packaging'] = use_packaging
801                self._execute_code(server_control_file, namespace)
802                logging.info("Finished processing control file")
803
804                # If no device error occured, no need to collect crashinfo.
805                collect_crashinfo = self.failed_with_device_error
806            except Exception, e:
807                try:
808                    logging.exception(
809                            'Exception escaped control file, job aborting:')
810                    reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
811                                    ' ', str(e))
812                    self.record('INFO', None, None, str(e),
813                                {'job_abort_reason': reason})
814                except:
815                    pass # don't let logging exceptions here interfere
816                raise
817        finally:
818            if temp_control_file_dir:
819                # Clean up temp directory used for copies of the control files
820                try:
821                    shutil.rmtree(temp_control_file_dir)
822                except Exception, e:
823                    logging.warning('Could not remove temp directory %s: %s',
824                                 temp_control_file_dir, e)
825
826            if machines and (collect_crashdumps or collect_crashinfo):
827                if skip_crash_collection:
828                    logging.info('Skipping crash dump/info collection '
829                                 'as requested.')
830                else:
831                    namespace['test_start_time'] = test_start_time
832                    # Remove crash files for passing tests.
833                    # TODO(ayatane): Tests that create crash files should be
834                    # reported.
835                    namespace['has_failed_tests'] = self._has_failed_tests()
836                    self._collect_crashes(namespace, collect_crashinfo)
837            self.disable_external_logging()
838            if self._uncollected_log_file and created_uncollected_logs:
839                os.remove(self._uncollected_log_file)
840            if install_after and machines:
841                self._execute_code(INSTALL_CONTROL_FILE, namespace)
842            namespace['network_stats_label'] = 'at-end'
843            self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace)
844
845
846    def run_test(self, url, *args, **dargs):
847        """
848        Summon a test object and run it.
849
850        tag
851                tag to add to testname
852        url
853                url of the test to run
854        """
855        if self._disable_sysinfo:
856            dargs['disable_sysinfo'] = True
857
858        group, testname = self.pkgmgr.get_package_name(url, 'test')
859        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
860        outputdir = self._make_test_outputdir(subdir)
861
862        def group_func():
863            try:
864                test.runtest(self, url, tag, args, dargs)
865            except error.TestBaseException, e:
866                self.record(e.exit_status, subdir, testname, str(e))
867                raise
868            except Exception, e:
869                info = str(e) + "\n" + traceback.format_exc()
870                self.record('FAIL', subdir, testname, info)
871                raise
872            else:
873                self.record('GOOD', subdir, testname, 'completed successfully')
874
875        result, exc_info = self._run_group(testname, subdir, group_func)
876        if exc_info and isinstance(exc_info[1], error.TestBaseException):
877            return False
878        elif exc_info:
879            raise exc_info[0], exc_info[1], exc_info[2]
880        else:
881            return True
882
883
884    def _run_group(self, name, subdir, function, *args, **dargs):
885        """\
886        Underlying method for running something inside of a group.
887        """
888        result, exc_info = None, None
889        try:
890            self.record('START', subdir, name)
891            result = function(*args, **dargs)
892        except error.TestBaseException, e:
893            self.record("END %s" % e.exit_status, subdir, name)
894            exc_info = sys.exc_info()
895        except Exception, e:
896            err_msg = str(e) + '\n'
897            err_msg += traceback.format_exc()
898            self.record('END ABORT', subdir, name, err_msg)
899            raise error.JobError(name + ' failed\n' + traceback.format_exc())
900        else:
901            self.record('END GOOD', subdir, name)
902
903        return result, exc_info
904
905
906    def run_group(self, function, *args, **dargs):
907        """\
908        function:
909                subroutine to run
910        *args:
911                arguments for the function
912        """
913
914        name = function.__name__
915
916        # Allow the tag for the group to be specified.
917        tag = dargs.pop('tag', None)
918        if tag:
919            name = tag
920
921        return self._run_group(name, None, function, *args, **dargs)[0]
922
923
924    def run_op(self, op, op_func, get_kernel_func):
925        """\
926        A specialization of run_group meant specifically for handling
927        management operation. Includes support for capturing the kernel version
928        after the operation.
929
930        Args:
931           op: name of the operation.
932           op_func: a function that carries out the operation (reboot, suspend)
933           get_kernel_func: a function that returns a string
934                            representing the kernel version.
935        """
936        try:
937            self.record('START', None, op)
938            op_func()
939        except Exception, e:
940            err_msg = str(e) + '\n' + traceback.format_exc()
941            self.record('END FAIL', None, op, err_msg)
942            raise
943        else:
944            kernel = get_kernel_func()
945            self.record('END GOOD', None, op,
946                        optional_fields={"kernel": kernel})
947
948
949    def run_control(self, path):
950        """Execute a control file found at path (relative to the autotest
951        path). Intended for executing a control file within a control file,
952        not for running the top-level job control file."""
953        path = os.path.join(self.autodir, path)
954        control_file = self._load_control_file(path)
955        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
956
957
958    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
959        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
960                                   on_every_test)
961
962
963    def add_sysinfo_logfile(self, file, on_every_test=False):
964        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
965
966
967    def _add_sysinfo_loggable(self, loggable, on_every_test):
968        if on_every_test:
969            self.sysinfo.test_loggables.add(loggable)
970        else:
971            self.sysinfo.boot_loggables.add(loggable)
972
973
974    def _read_warnings(self):
975        """Poll all the warning loggers and extract any new warnings that have
976        been logged. If the warnings belong to a category that is currently
977        disabled, this method will discard them and they will no longer be
978        retrievable.
979
980        Returns a list of (timestamp, message) tuples, where timestamp is an
981        integer epoch timestamp."""
982        warnings = []
983        while True:
984            # pull in a line of output from every logger that has
985            # output ready to be read
986            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
987            closed_loggers = set()
988            for logger in loggers:
989                line = logger.readline()
990                # record any broken pipes (aka line == empty)
991                if len(line) == 0:
992                    closed_loggers.add(logger)
993                    continue
994                # parse out the warning
995                timestamp, msgtype, msg = line.split('\t', 2)
996                timestamp = int(timestamp)
997                # if the warning is valid, add it to the results
998                if self.warning_manager.is_valid(timestamp, msgtype):
999                    warnings.append((timestamp, msg.strip()))
1000
1001            # stop listening to loggers that are closed
1002            self.warning_loggers -= closed_loggers
1003
1004            # stop if none of the loggers have any output left
1005            if not loggers:
1006                break
1007
1008        # sort into timestamp order
1009        warnings.sort()
1010        return warnings
1011
1012
1013    def _unique_subdirectory(self, base_subdirectory_name):
1014        """Compute a unique results subdirectory based on the given name.
1015
1016        Appends base_subdirectory_name with a number as necessary to find a
1017        directory name that doesn't already exist.
1018        """
1019        subdirectory = base_subdirectory_name
1020        counter = 1
1021        while os.path.exists(os.path.join(self.resultdir, subdirectory)):
1022            subdirectory = base_subdirectory_name + '.' + str(counter)
1023            counter += 1
1024        return subdirectory
1025
1026
1027    def get_record_context(self):
1028        """Returns an object representing the current job.record context.
1029
1030        The object returned is an opaque object with a 0-arg restore method
1031        which can be called to restore the job.record context (i.e. indentation)
1032        to the current level. The intention is that it should be used when
1033        something external which generate job.record calls (e.g. an autotest
1034        client) can fail catastrophically and the server job record state
1035        needs to be reset to its original "known good" state.
1036
1037        @return: A context object with a 0-arg restore() method."""
1038        return self._indenter.get_context()
1039
1040
1041    def record_summary(self, status_code, test_name, reason='', attributes=None,
1042                       distinguishing_attributes=(), child_test_ids=None):
1043        """Record a summary test result.
1044
1045        @param status_code: status code string, see
1046                common_lib.log.is_valid_status()
1047        @param test_name: name of the test
1048        @param reason: (optional) string providing detailed reason for test
1049                outcome
1050        @param attributes: (optional) dict of string keyvals to associate with
1051                this result
1052        @param distinguishing_attributes: (optional) list of attribute names
1053                that should be used to distinguish identically-named test
1054                results.  These attributes should be present in the attributes
1055                parameter.  This is used to generate user-friendly subdirectory
1056                names.
1057        @param child_test_ids: (optional) list of test indices for test results
1058                used in generating this result.
1059        """
1060        subdirectory_name_parts = [test_name]
1061        for attribute in distinguishing_attributes:
1062            assert attributes
1063            assert attribute in attributes, '%s not in %s' % (attribute,
1064                                                              attributes)
1065            subdirectory_name_parts.append(attributes[attribute])
1066        base_subdirectory_name = '.'.join(subdirectory_name_parts)
1067
1068        subdirectory = self._unique_subdirectory(base_subdirectory_name)
1069        subdirectory_path = os.path.join(self.resultdir, subdirectory)
1070        os.mkdir(subdirectory_path)
1071
1072        self.record(status_code, subdirectory, test_name,
1073                    status=reason, optional_fields={'is_summary': True})
1074
1075        if attributes:
1076            utils.write_keyval(subdirectory_path, attributes)
1077
1078        if child_test_ids:
1079            ids_string = ','.join(str(test_id) for test_id in child_test_ids)
1080            summary_data = {'child_test_ids': ids_string}
1081            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
1082                               summary_data)
1083
1084
1085    def disable_warnings(self, warning_type):
1086        self.warning_manager.disable_warnings(warning_type)
1087        self.record("INFO", None, None,
1088                    "disabling %s warnings" % warning_type,
1089                    {"warnings.disable": warning_type})
1090
1091
1092    def enable_warnings(self, warning_type):
1093        self.warning_manager.enable_warnings(warning_type)
1094        self.record("INFO", None, None,
1095                    "enabling %s warnings" % warning_type,
1096                    {"warnings.enable": warning_type})
1097
1098
1099    def get_status_log_path(self, subdir=None):
1100        """Return the path to the job status log.
1101
1102        @param subdir - Optional paramter indicating that you want the path
1103            to a subdirectory status log.
1104
1105        @returns The path where the status log should be.
1106        """
1107        if self.resultdir:
1108            if subdir:
1109                return os.path.join(self.resultdir, subdir, "status.log")
1110            else:
1111                return os.path.join(self.resultdir, "status.log")
1112        else:
1113            return None
1114
1115
1116    def _update_uncollected_logs_list(self, update_func):
1117        """Updates the uncollected logs list in a multi-process safe manner.
1118
1119        @param update_func - a function that updates the list of uncollected
1120            logs. Should take one parameter, the list to be updated.
1121        """
1122        # Skip log collection if file _uncollected_log_file does not exist.
1123        if not (self._uncollected_log_file and
1124                os.path.exists(self._uncollected_log_file)):
1125            return
1126        if self._uncollected_log_file:
1127            log_file = open(self._uncollected_log_file, "r+")
1128            fcntl.flock(log_file, fcntl.LOCK_EX)
1129        try:
1130            uncollected_logs = pickle.load(log_file)
1131            update_func(uncollected_logs)
1132            log_file.seek(0)
1133            log_file.truncate()
1134            pickle.dump(uncollected_logs, log_file)
1135            log_file.flush()
1136        finally:
1137            fcntl.flock(log_file, fcntl.LOCK_UN)
1138            log_file.close()
1139
1140
1141    def add_client_log(self, hostname, remote_path, local_path):
1142        """Adds a new set of client logs to the list of uncollected logs,
1143        to allow for future log recovery.
1144
1145        @param host - the hostname of the machine holding the logs
1146        @param remote_path - the directory on the remote machine holding logs
1147        @param local_path - the local directory to copy the logs into
1148        """
1149        def update_func(logs_list):
1150            logs_list.append((hostname, remote_path, local_path))
1151        self._update_uncollected_logs_list(update_func)
1152
1153
1154    def remove_client_log(self, hostname, remote_path, local_path):
1155        """Removes a set of client logs from the list of uncollected logs,
1156        to allow for future log recovery.
1157
1158        @param host - the hostname of the machine holding the logs
1159        @param remote_path - the directory on the remote machine holding logs
1160        @param local_path - the local directory to copy the logs into
1161        """
1162        def update_func(logs_list):
1163            logs_list.remove((hostname, remote_path, local_path))
1164        self._update_uncollected_logs_list(update_func)
1165
1166
1167    def get_client_logs(self):
1168        """Retrieves the list of uncollected logs, if it exists.
1169
1170        @returns A list of (host, remote_path, local_path) tuples. Returns
1171                 an empty list if no uncollected logs file exists.
1172        """
1173        log_exists = (self._uncollected_log_file and
1174                      os.path.exists(self._uncollected_log_file))
1175        if log_exists:
1176            return pickle.load(open(self._uncollected_log_file))
1177        else:
1178            return []
1179
1180
1181    def _fill_server_control_namespace(self, namespace, protect=True):
1182        """
1183        Prepare a namespace to be used when executing server control files.
1184
1185        This sets up the control file API by importing modules and making them
1186        available under the appropriate names within namespace.
1187
1188        For use by _execute_code().
1189
1190        Args:
1191          namespace: The namespace dictionary to fill in.
1192          protect: Boolean.  If True (the default) any operation that would
1193              clobber an existing entry in namespace will cause an error.
1194        Raises:
1195          error.AutoservError: When a name would be clobbered by import.
1196        """
1197        def _import_names(module_name, names=()):
1198            """
1199            Import a module and assign named attributes into namespace.
1200
1201            Args:
1202                module_name: The string module name.
1203                names: A limiting list of names to import from module_name.  If
1204                    empty (the default), all names are imported from the module
1205                    similar to a "from foo.bar import *" statement.
1206            Raises:
1207                error.AutoservError: When a name being imported would clobber
1208                    a name already in namespace.
1209            """
1210            module = __import__(module_name, {}, {}, names)
1211
1212            # No names supplied?  Import * from the lowest level module.
1213            # (Ugh, why do I have to implement this part myself?)
1214            if not names:
1215                for submodule_name in module_name.split('.')[1:]:
1216                    module = getattr(module, submodule_name)
1217                if hasattr(module, '__all__'):
1218                    names = getattr(module, '__all__')
1219                else:
1220                    names = dir(module)
1221
1222            # Install each name into namespace, checking to make sure it
1223            # doesn't override anything that already exists.
1224            for name in names:
1225                # Check for conflicts to help prevent future problems.
1226                if name in namespace and protect:
1227                    if namespace[name] is not getattr(module, name):
1228                        raise error.AutoservError('importing name '
1229                                '%s from %s %r would override %r' %
1230                                (name, module_name, getattr(module, name),
1231                                 namespace[name]))
1232                    else:
1233                        # Encourage cleanliness and the use of __all__ for a
1234                        # more concrete API with less surprises on '*' imports.
1235                        warnings.warn('%s (%r) being imported from %s for use '
1236                                      'in server control files is not the '
1237                                      'first occurrence of that import.' %
1238                                      (name, namespace[name], module_name))
1239
1240                namespace[name] = getattr(module, name)
1241
1242
1243        # This is the equivalent of prepending a bunch of import statements to
1244        # the front of the control script.
1245        namespace.update(os=os, sys=sys, logging=logging)
1246        _import_names('autotest_lib.server',
1247                ('hosts', 'autotest', 'standalone_profiler'))
1248        _import_names('autotest_lib.server.subcommand',
1249                      ('parallel', 'parallel_simple', 'subcommand'))
1250        _import_names('autotest_lib.server.utils',
1251                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1252        _import_names('autotest_lib.client.common_lib.error')
1253        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1254
1255        # Inject ourself as the job object into other classes within the API.
1256        # (Yuck, this injection is a gross thing be part of a public API. -gps)
1257        #
1258        # XXX Base & SiteAutotest do not appear to use .job.  Who does?
1259        namespace['autotest'].Autotest.job = self
1260        # server.hosts.base_classes.Host uses .job.
1261        namespace['hosts'].Host.job = self
1262        namespace['hosts'].TestBed.job = self
1263        namespace['hosts'].factory.ssh_user = self._ssh_user
1264        namespace['hosts'].factory.ssh_port = self._ssh_port
1265        namespace['hosts'].factory.ssh_pass = self._ssh_pass
1266        namespace['hosts'].factory.ssh_verbosity_flag = (
1267                self._ssh_verbosity_flag)
1268        namespace['hosts'].factory.ssh_options = self._ssh_options
1269
1270
1271    def _execute_code(self, code_file, namespace, protect=True):
1272        """
1273        Execute code using a copy of namespace as a server control script.
1274
1275        Unless protect_namespace is explicitly set to False, the dict will not
1276        be modified.
1277
1278        Args:
1279          code_file: The filename of the control file to execute.
1280          namespace: A dict containing names to make available during execution.
1281          protect: Boolean.  If True (the default) a copy of the namespace dict
1282              is used during execution to prevent the code from modifying its
1283              contents outside of this function.  If False the raw dict is
1284              passed in and modifications will be allowed.
1285        """
1286        if protect:
1287            namespace = namespace.copy()
1288        self._fill_server_control_namespace(namespace, protect=protect)
1289        # TODO: Simplify and get rid of the special cases for only 1 machine.
1290        if len(self.machines) > 1:
1291            machines_text = '\n'.join(self.machines) + '\n'
1292            # Only rewrite the file if it does not match our machine list.
1293            try:
1294                machines_f = open(MACHINES_FILENAME, 'r')
1295                existing_machines_text = machines_f.read()
1296                machines_f.close()
1297            except EnvironmentError:
1298                existing_machines_text = None
1299            if machines_text != existing_machines_text:
1300                utils.open_write_close(MACHINES_FILENAME, machines_text)
1301        execfile(code_file, namespace, namespace)
1302
1303
1304    def _parse_status(self, new_line):
1305        if not self._using_parser:
1306            return
1307        new_tests = self.parser.process_lines([new_line])
1308        for test in new_tests:
1309            self.__insert_test(test)
1310
1311
1312    def __insert_test(self, test):
1313        """
1314        An internal method to insert a new test result into the
1315        database. This method will not raise an exception, even if an
1316        error occurs during the insert, to avoid failing a test
1317        simply because of unexpected database issues."""
1318        self.num_tests_run += 1
1319        if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1320            self.num_tests_failed += 1
1321        try:
1322            self.results_db.insert_test(self.job_model, test)
1323        except Exception:
1324            msg = ("WARNING: An unexpected error occured while "
1325                   "inserting test results into the database. "
1326                   "Ignoring error.\n" + traceback.format_exc())
1327            print >> sys.stderr, msg
1328
1329
1330    def preprocess_client_state(self):
1331        """
1332        Produce a state file for initializing the state of a client job.
1333
1334        Creates a new client state file with all the current server state, as
1335        well as some pre-set client state.
1336
1337        @returns The path of the file the state was written into.
1338        """
1339        # initialize the sysinfo state
1340        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1341
1342        # dump the state out to a tempfile
1343        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1344        os.close(fd)
1345
1346        # write_to_file doesn't need locking, we exclusively own file_path
1347        self._state.write_to_file(file_path)
1348        return file_path
1349
1350
1351    def postprocess_client_state(self, state_path):
1352        """
1353        Update the state of this job with the state from a client job.
1354
1355        Updates the state of the server side of a job with the final state
1356        of a client job that was run. Updates the non-client-specific state,
1357        pulls in some specific bits from the client-specific state, and then
1358        discards the rest. Removes the state file afterwards
1359
1360        @param state_file A path to the state file from the client.
1361        """
1362        # update the on-disk state
1363        try:
1364            self._state.read_from_file(state_path)
1365            os.remove(state_path)
1366        except OSError, e:
1367            # ignore file-not-found errors
1368            if e.errno != errno.ENOENT:
1369                raise
1370            else:
1371                logging.debug('Client state file %s not found', state_path)
1372
1373        # update the sysinfo state
1374        if self._state.has('client', 'sysinfo'):
1375            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1376
1377        # drop all the client-specific state
1378        self._state.discard_namespace('client')
1379
1380
1381    def clear_all_known_hosts(self):
1382        """Clears known hosts files for all AbstractSSHHosts."""
1383        for host in self.hosts:
1384            if isinstance(host, abstract_ssh.AbstractSSHHost):
1385                host.clear_known_hosts()
1386
1387
1388class warning_manager(object):
1389    """Class for controlling warning logs. Manages the enabling and disabling
1390    of warnings."""
1391    def __init__(self):
1392        # a map of warning types to a list of disabled time intervals
1393        self.disabled_warnings = {}
1394
1395
1396    def is_valid(self, timestamp, warning_type):
1397        """Indicates if a warning (based on the time it occured and its type)
1398        is a valid warning. A warning is considered "invalid" if this type of
1399        warning was marked as "disabled" at the time the warning occured."""
1400        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1401        for start, end in disabled_intervals:
1402            if timestamp >= start and (end is None or timestamp < end):
1403                return False
1404        return True
1405
1406
1407    def disable_warnings(self, warning_type, current_time_func=time.time):
1408        """As of now, disables all further warnings of this type."""
1409        intervals = self.disabled_warnings.setdefault(warning_type, [])
1410        if not intervals or intervals[-1][1] is not None:
1411            intervals.append((int(current_time_func()), None))
1412
1413
1414    def enable_warnings(self, warning_type, current_time_func=time.time):
1415        """As of now, enables all further warnings of this type."""
1416        intervals = self.disabled_warnings.get(warning_type, [])
1417        if intervals and intervals[-1][1] is None:
1418            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1419
1420
1421def _is_current_server_job(test):
1422    """Return True if parsed test is the currently running job.
1423
1424    @param test: test instance from tko parser.
1425    """
1426    return test.testname == 'SERVER_JOB'
1427
1428
1429def _create_afe_host(hostname, in_lab):
1430    """Create a real or stub frontend.Host object.
1431
1432    @param hostname: Name of the host for which we want the Host object.
1433    @param in_lab: (bool) whether we have access to the AFE.
1434    @returns: An object of type frontend.AFE
1435    """
1436    if not in_lab:
1437        return server_utils.EmptyAFEHost()
1438
1439    afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
1440    hosts = afe.get_hosts(hostname=hostname)
1441    if not hosts:
1442        raise error.AutoservError('No hosts named %s found' % hostname)
1443
1444    return hosts[0]
1445
1446
1447def _create_host_info_store(hostname, in_lab):
1448    """Create a real or stub afe_store.AfeStore object.
1449
1450    @param hostname: Name of the host for which we want the store.
1451    @param in_lab: (bool) whether we have access to the AFE.
1452    @returns: An object of type afe_store.AfeStore
1453    """
1454    if not in_lab:
1455        return host_info.InMemoryHostInfoStore()
1456
1457    host_info_store = afe_store.AfeStore(hostname)
1458    try:
1459        host_info_store.get(force_refresh=True)
1460    except host_info.StoreError:
1461        raise error.AutoservError('Could not obtain HostInfo for hostname %s' %
1462                                  hostname)
1463    return host_info_store
1464
1465
1466# load up site-specific code for generating site-specific job data
1467get_site_job_data = utils.import_site_function(__file__,
1468    "autotest_lib.server.site_server_job", "get_site_job_data",
1469    _get_site_job_data_dummy)
1470
1471
1472site_server_job = utils.import_site_class(
1473    __file__, "autotest_lib.server.site_server_job", "site_server_job",
1474    base_server_job)
1475
1476
1477class server_job(site_server_job):
1478    pass
1479