1# pylint: disable-msg=C0111
2
3# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6"""
7The main job wrapper for the server side.
8
9This is the core infrastructure. Derived from the client side job.py
10
11Copyright Martin J. Bligh, Andy Whitcroft 2007
12"""
13
14import errno
15import fcntl
16import getpass
17import itertools
18import logging
19import os
20import pickle
21import platform
22import re
23import select
24import shutil
25import sys
26import tempfile
27import time
28import traceback
29import uuid
30import warnings
31
32from autotest_lib.client.bin import sysinfo
33from autotest_lib.client.common_lib import base_job
34from autotest_lib.client.common_lib import control_data
35from autotest_lib.client.common_lib import error
36from autotest_lib.client.common_lib import logging_manager
37from autotest_lib.client.common_lib import packages
38from autotest_lib.client.common_lib import utils
39from autotest_lib.server import profilers
40from autotest_lib.server import site_gtest_runner
41from autotest_lib.server import subcommand
42from autotest_lib.server import test
43from autotest_lib.server import utils as server_utils
44from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
45from autotest_lib.server.hosts import abstract_ssh
46from autotest_lib.server.hosts import afe_store
47from autotest_lib.server.hosts import file_store
48from autotest_lib.server.hosts import shadowing_store
49from autotest_lib.server.hosts import factory as host_factory
50from autotest_lib.server.hosts import host_info
51from autotest_lib.server.hosts import ssh_multiplex
52from autotest_lib.tko import models as tko_models
53from autotest_lib.tko import parser_lib
54
55try:
56    from chromite.lib import metrics
57except ImportError:
58    metrics = utils.metrics_mock
59
60
61def _control_segment_path(name):
62    """Get the pathname of the named control segment file."""
63    server_dir = os.path.dirname(os.path.abspath(__file__))
64    return os.path.join(server_dir, "control_segments", name)
65
66
67CLIENT_CONTROL_FILENAME = 'control'
68SERVER_CONTROL_FILENAME = 'control.srv'
69MACHINES_FILENAME = '.machines'
70
71CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
72CLIENT_TRAMPOLINE_CONTROL_FILE = _control_segment_path('client_trampoline')
73CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
74CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
75CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
76VERIFY_CONTROL_FILE = _control_segment_path('verify')
77REPAIR_CONTROL_FILE = _control_segment_path('repair')
78PROVISION_CONTROL_FILE = _control_segment_path('provision')
79VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
80RESET_CONTROL_FILE = _control_segment_path('reset')
81GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
82
83
84def get_machine_dicts(machine_names, store_dir, in_lab, use_shadow_store,
85                      host_attributes=None):
86    """Converts a list of machine names to list of dicts.
87
88    TODO(crbug.com/678430): This function temporarily has a side effect of
89    creating files under workdir for backing a FileStore. This side-effect will
90    go away once callers of autoserv start passing in the FileStore.
91
92    @param machine_names: A list of machine names.
93    @param store_dir: A directory to contain store backing files.
94    @param use_shadow_store: If True, we should create a ShadowingStore where
95            actual store is backed by the AFE but we create a backing file to
96            shadow the store. If False, backing file should already exist at:
97            ${store_dir}/${hostname}.store
98    @param in_lab: A boolean indicating whether we're running in lab.
99    @param host_attributes: Optional list of host attributes to add for each
100            host.
101    @returns: A list of dicts. Each dict has the following keys:
102            'hostname': Name of the machine originally in machine_names (str).
103            'afe_host': A frontend.Host object for the machine, or a stub if
104                    in_lab is false.
105            'host_info_store': A host_info.CachingHostInfoStore object to obtain
106                    host information. A stub if in_lab is False.
107            'connection_pool': ssh_multiplex.ConnectionPool instance to share
108                    master ssh connection across control scripts. This is set to
109                    None, and should be overridden for connection sharing.
110    """
111    # See autoserv_parser.parse_args. Only one of in_lab or host_attributes can
112    # be provided.
113    if in_lab and host_attributes:
114        raise error.AutoservError(
115                'in_lab and host_attribute are mutually exclusive.')
116
117    machine_dict_list = []
118    for machine in machine_names:
119        if not in_lab:
120            afe_host = server_utils.EmptyAFEHost()
121            host_info_store = host_info.InMemoryHostInfoStore()
122            if host_attributes is not None:
123                afe_host.attributes.update(host_attributes)
124                info = host_info.HostInfo(attributes=host_attributes)
125                host_info_store.commit(info)
126        elif use_shadow_store:
127            afe_host = _create_afe_host(machine)
128            host_info_store = _create_afe_backed_host_info_store(store_dir,
129                                                                 machine)
130        else:
131            afe_host = server_utils.EmptyAFEHost()
132            host_info_store = _create_file_backed_host_info_store(store_dir,
133                                                                  machine)
134
135        machine_dict_list.append({
136                'hostname' : machine,
137                'afe_host' : afe_host,
138                'host_info_store': host_info_store,
139                'connection_pool': None,
140        })
141
142    return machine_dict_list
143
144
145class status_indenter(base_job.status_indenter):
146    """Provide a simple integer-backed status indenter."""
147    def __init__(self):
148        self._indent = 0
149
150
151    @property
152    def indent(self):
153        return self._indent
154
155
156    def increment(self):
157        self._indent += 1
158
159
160    def decrement(self):
161        self._indent -= 1
162
163
164    def get_context(self):
165        """Returns a context object for use by job.get_record_context."""
166        class context(object):
167            def __init__(self, indenter, indent):
168                self._indenter = indenter
169                self._indent = indent
170            def restore(self):
171                self._indenter._indent = self._indent
172        return context(self, self._indent)
173
174
175class server_job_record_hook(object):
176    """The job.record hook for server job. Used to inject WARN messages from
177    the console or vlm whenever new logs are written, and to echo any logs
178    to INFO level logging. Implemented as a class so that it can use state to
179    block recursive calls, so that the hook can call job.record itself to
180    log WARN messages.
181
182    Depends on job._read_warnings and job._logger.
183    """
184    def __init__(self, job):
185        self._job = job
186        self._being_called = False
187
188
189    def __call__(self, entry):
190        """A wrapper around the 'real' record hook, the _hook method, which
191        prevents recursion. This isn't making any effort to be threadsafe,
192        the intent is to outright block infinite recursion via a
193        job.record->_hook->job.record->_hook->job.record... chain."""
194        if self._being_called:
195            return
196        self._being_called = True
197        try:
198            self._hook(self._job, entry)
199        finally:
200            self._being_called = False
201
202
203    @staticmethod
204    def _hook(job, entry):
205        """The core hook, which can safely call job.record."""
206        entries = []
207        # poll all our warning loggers for new warnings
208        for timestamp, msg in job._read_warnings():
209            warning_entry = base_job.status_log_entry(
210                'WARN', None, None, msg, {}, timestamp=timestamp)
211            entries.append(warning_entry)
212            job.record_entry(warning_entry)
213        # echo rendered versions of all the status logs to info
214        entries.append(entry)
215        for entry in entries:
216            rendered_entry = job._logger.render_entry(entry)
217            logging.info(rendered_entry)
218
219
220class server_job(base_job.base_job):
221    """The server-side concrete implementation of base_job.
222
223    Optional properties provided by this implementation:
224        serverdir
225
226        warning_manager
227        warning_loggers
228    """
229
230    _STATUS_VERSION = 1
231
232    # TODO crbug.com/285395 eliminate ssh_verbosity_flag
233    def __init__(self, control, args, resultdir, label, user, machines,
234                 machine_dict_list,
235                 client=False,
236                 ssh_user=host_factory.DEFAULT_SSH_USER,
237                 ssh_port=host_factory.DEFAULT_SSH_PORT,
238                 ssh_pass=host_factory.DEFAULT_SSH_PASS,
239                 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
240                 ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
241                 group_name='',
242                 tag='', disable_sysinfo=False,
243                 control_filename=SERVER_CONTROL_FILENAME,
244                 parent_job_id=None, in_lab=False,
245                 use_client_trampoline=False):
246        """
247        Create a server side job object.
248
249        @param control: The pathname of the control file.
250        @param args: Passed to the control file.
251        @param resultdir: Where to throw the results.
252        @param label: Description of the job.
253        @param user: Username for the job (email address).
254        @param machines: A list of hostnames of the machines to use for the job.
255        @param machine_dict_list: A list of dicts for each of the machines above
256                as returned by get_machine_dicts.
257        @param client: True if this is a client-side control file.
258        @param ssh_user: The SSH username.  [root]
259        @param ssh_port: The SSH port number.  [22]
260        @param ssh_pass: The SSH passphrase, if needed.
261        @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
262                '-vvv', or an empty string if not needed.
263        @param ssh_options: A string giving additional options that will be
264                            included in ssh commands.
265        @param group_name: If supplied, this will be written out as
266                host_group_name in the keyvals file for the parser.
267        @param tag: The job execution tag from the scheduler.  [optional]
268        @param disable_sysinfo: Whether we should disable the sysinfo step of
269                tests for a modest shortening of test time.  [optional]
270        @param control_filename: The filename where the server control file
271                should be written in the results directory.
272        @param parent_job_id: Job ID of the parent job. Default to None if the
273                job does not have a parent job.
274        @param in_lab: Boolean that indicates if this is running in the lab
275                       environment.
276        @param use_client_trampoline: Boolean that indicates whether to
277               use the client trampoline flow.  If this is True, control
278               is interpreted as the name of the client test to run.
279               The client control file will be client_trampoline.  The
280               test name will be passed to client_trampoline, which will
281               install the test package and re-exec the actual test
282               control file.
283        """
284        super(server_job, self).__init__(resultdir=resultdir)
285        self.control = control
286        self._uncollected_log_file = os.path.join(self.resultdir,
287                                                  'uncollected_logs')
288        debugdir = os.path.join(self.resultdir, 'debug')
289        if not os.path.exists(debugdir):
290            os.mkdir(debugdir)
291
292        if user:
293            self.user = user
294        else:
295            self.user = getpass.getuser()
296
297        self.args = args
298        self.label = label
299        self.machines = machines
300        self._client = client
301        self.warning_loggers = set()
302        self.warning_manager = warning_manager()
303        self._ssh_user = ssh_user
304        self._ssh_port = ssh_port
305        self._ssh_pass = ssh_pass
306        self._ssh_verbosity_flag = ssh_verbosity_flag
307        self._ssh_options = ssh_options
308        self.tag = tag
309        self.hosts = set()
310        self.drop_caches = False
311        self.drop_caches_between_iterations = False
312        self._control_filename = control_filename
313        self._disable_sysinfo = disable_sysinfo
314        self._use_client_trampoline = use_client_trampoline
315
316        self.logging = logging_manager.get_logging_manager(
317                manage_stdout_and_stderr=True, redirect_fds=True)
318        subcommand.logging_manager_object = self.logging
319
320        self.sysinfo = sysinfo.sysinfo(self.resultdir)
321        self.profilers = profilers.profilers(self)
322
323        job_data = {'label' : label, 'user' : user,
324                    'hostname' : ','.join(machines),
325                    'drone' : platform.node(),
326                    'status_version' : str(self._STATUS_VERSION),
327                    'job_started' : str(int(time.time()))}
328        # Save parent job id to keyvals, so parser can retrieve the info and
329        # write to tko_jobs record.
330        if parent_job_id:
331            job_data['parent_job_id'] = parent_job_id
332        if group_name:
333            job_data['host_group_name'] = group_name
334
335        # only write these keyvals out on the first job in a resultdir
336        if 'job_started' not in utils.read_keyval(self.resultdir):
337            job_data.update(self._get_job_data())
338            utils.write_keyval(self.resultdir, job_data)
339
340        self.pkgmgr = packages.PackageManager(
341            self.autodir, run_function_dargs={'timeout':600})
342
343        self._register_subcommand_hooks()
344
345        # We no longer parse results as part of the server_job. These arguments
346        # can't be dropped yet because clients haven't all be cleaned up yet.
347        self.num_tests_run = -1
348        self.num_tests_failed = -1
349
350        # set up the status logger
351        self._indenter = status_indenter()
352        self._logger = base_job.status_logger(
353            self, self._indenter, 'status.log', 'status.log',
354            record_hook=server_job_record_hook(self))
355
356        # Initialize a flag to indicate DUT failure during the test, e.g.,
357        # unexpected reboot.
358        self.failed_with_device_error = False
359
360        self._connection_pool = ssh_multiplex.ConnectionPool()
361
362        # List of functions to run after the main job function.
363        self._post_run_hooks = []
364
365        self.parent_job_id = parent_job_id
366        self.in_lab = in_lab
367        self.machine_dict_list = machine_dict_list
368        for machine_dict in self.machine_dict_list:
369            machine_dict['connection_pool'] = self._connection_pool
370
371        # TODO(jrbarnette) The harness attribute is only relevant to
372        # client jobs, but it's required to be present, or we will fail
373        # server job unit tests.  Yes, really.
374        #
375        # TODO(jrbarnette) The utility of the 'harness' attribute even
376        # to client jobs is suspect.  Probably, we should remove it.
377        self.harness = None
378
379        # TODO(ayatane): fast and max_result_size_KB are not set for
380        # client_trampoline jobs.
381        if control and not use_client_trampoline:
382            parsed_control = control_data.parse_control(
383                    control, raise_warnings=False)
384            self.fast = parsed_control.fast
385            self.max_result_size_KB = parsed_control.max_result_size_KB
386        else:
387            self.fast = False
388            # Set the maximum result size to be the default specified in
389            # global config, if the job has no control file associated.
390            self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
391
392
393    @classmethod
394    def _find_base_directories(cls):
395        """
396        Determine locations of autodir, clientdir and serverdir. Assumes
397        that this file is located within serverdir and uses __file__ along
398        with relative paths to resolve the location.
399        """
400        serverdir = os.path.abspath(os.path.dirname(__file__))
401        autodir = os.path.normpath(os.path.join(serverdir, '..'))
402        clientdir = os.path.join(autodir, 'client')
403        return autodir, clientdir, serverdir
404
405
406    def _find_resultdir(self, resultdir, *args, **dargs):
407        """
408        Determine the location of resultdir. For server jobs we expect one to
409        always be explicitly passed in to __init__, so just return that.
410        """
411        if resultdir:
412            return os.path.normpath(resultdir)
413        else:
414            return None
415
416
417    def _get_status_logger(self):
418        """Return a reference to the status logger."""
419        return self._logger
420
421
422    @staticmethod
423    def _load_control_file(path):
424        f = open(path)
425        try:
426            control_file = f.read()
427        finally:
428            f.close()
429        return re.sub('\r', '', control_file)
430
431
432    def _register_subcommand_hooks(self):
433        """
434        Register some hooks into the subcommand modules that allow us
435        to properly clean up self.hosts created in forked subprocesses.
436        """
437        def on_fork(cmd):
438            self._existing_hosts_on_fork = set(self.hosts)
439        def on_join(cmd):
440            new_hosts = self.hosts - self._existing_hosts_on_fork
441            for host in new_hosts:
442                host.close()
443        subcommand.subcommand.register_fork_hook(on_fork)
444        subcommand.subcommand.register_join_hook(on_join)
445
446
447    # TODO crbug.com/285395 add a kwargs parameter.
448    def _make_namespace(self):
449        """Create a namespace dictionary to be passed along to control file.
450
451        Creates a namespace argument populated with standard values:
452        machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
453        and ssh_options.
454        """
455        namespace = {'machines' : self.machine_dict_list,
456                     'job' : self,
457                     'ssh_user' : self._ssh_user,
458                     'ssh_port' : self._ssh_port,
459                     'ssh_pass' : self._ssh_pass,
460                     'ssh_verbosity_flag' : self._ssh_verbosity_flag,
461                     'ssh_options' : self._ssh_options}
462        return namespace
463
464
465    def cleanup(self, labels):
466        """Cleanup machines.
467
468        @param labels: Comma separated job labels, will be used to
469                       determine special task actions.
470        """
471        if not self.machines:
472            raise error.AutoservError('No machines specified to cleanup')
473        if self.resultdir:
474            os.chdir(self.resultdir)
475
476        namespace = self._make_namespace()
477        namespace.update({'job_labels': labels, 'args': ''})
478        self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
479
480
481    def verify(self, labels):
482        """Verify machines are all ssh-able.
483
484        @param labels: Comma separated job labels, will be used to
485                       determine special task actions.
486        """
487        if not self.machines:
488            raise error.AutoservError('No machines specified to verify')
489        if self.resultdir:
490            os.chdir(self.resultdir)
491
492        namespace = self._make_namespace()
493        namespace.update({'job_labels': labels, 'args': ''})
494        self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
495
496
497    def reset(self, labels):
498        """Reset machines by first cleanup then verify each machine.
499
500        @param labels: Comma separated job labels, will be used to
501                       determine special task actions.
502        """
503        if not self.machines:
504            raise error.AutoservError('No machines specified to reset.')
505        if self.resultdir:
506            os.chdir(self.resultdir)
507
508        namespace = self._make_namespace()
509        namespace.update({'job_labels': labels, 'args': ''})
510        self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
511
512
513    def repair(self, labels):
514        """Repair machines.
515
516        @param labels: Comma separated job labels, will be used to
517                       determine special task actions.
518        """
519        if not self.machines:
520            raise error.AutoservError('No machines specified to repair')
521        if self.resultdir:
522            os.chdir(self.resultdir)
523
524        namespace = self._make_namespace()
525        namespace.update({'job_labels': labels, 'args': ''})
526        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
527
528
529    def provision(self, labels):
530        """
531        Provision all hosts to match |labels|.
532
533        @param labels: A comma seperated string of labels to provision the
534                       host to.
535
536        """
537        control = self._load_control_file(PROVISION_CONTROL_FILE)
538        self.run(control=control, job_labels=labels)
539
540
541    def precheck(self):
542        """
543        perform any additional checks in derived classes.
544        """
545        pass
546
547
548    def enable_external_logging(self):
549        """
550        Start or restart external logging mechanism.
551        """
552        pass
553
554
555    def disable_external_logging(self):
556        """
557        Pause or stop external logging mechanism.
558        """
559        pass
560
561
562    def use_external_logging(self):
563        """
564        Return True if external logging should be used.
565        """
566        return False
567
568
569    def _make_parallel_wrapper(self, function, machines, log):
570        """Wrap function as appropriate for calling by parallel_simple."""
571        # machines could be a list of dictionaries, e.g.,
572        # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
573        # The dictionary is generated in server_job.__init__, refer to
574        # variable machine_dict_list, then passed in with namespace, see method
575        # server_job._make_namespace.
576        # To compare the machinese to self.machines, which is a list of machine
577        # hostname, we need to convert machines back to a list of hostnames.
578        if (machines and isinstance(machines, list)
579            and isinstance(machines[0], dict)):
580            machines = [m['hostname'] for m in machines]
581        if len(machines) > 1 and log:
582            def wrapper(machine):
583                hostname = server_utils.get_hostname_from_machine(machine)
584                self.push_execution_context(hostname)
585                os.chdir(self.resultdir)
586                machine_data = {'hostname' : hostname,
587                                'status_version' : str(self._STATUS_VERSION)}
588                utils.write_keyval(self.resultdir, machine_data)
589                result = function(machine)
590                return result
591        else:
592            wrapper = function
593        return wrapper
594
595
596    def parallel_simple(self, function, machines, log=True, timeout=None,
597                        return_results=False):
598        """
599        Run 'function' using parallel_simple, with an extra wrapper to handle
600        the necessary setup for continuous parsing, if possible. If continuous
601        parsing is already properly initialized then this should just work.
602
603        @param function: A callable to run in parallel given each machine.
604        @param machines: A list of machine names to be passed one per subcommand
605                invocation of function.
606        @param log: If True, output will be written to output in a subdirectory
607                named after each machine.
608        @param timeout: Seconds after which the function call should timeout.
609        @param return_results: If True instead of an AutoServError being raised
610                on any error a list of the results|exceptions from the function
611                called on each arg is returned.  [default: False]
612
613        @raises error.AutotestError: If any of the functions failed.
614        """
615        wrapper = self._make_parallel_wrapper(function, machines, log)
616        return subcommand.parallel_simple(
617                wrapper, machines,
618                subdir_name_constructor=server_utils.get_hostname_from_machine,
619                log=log, timeout=timeout, return_results=return_results)
620
621
622    def parallel_on_machines(self, function, machines, timeout=None):
623        """
624        @param function: Called in parallel with one machine as its argument.
625        @param machines: A list of machines to call function(machine) on.
626        @param timeout: Seconds after which the function call should timeout.
627
628        @returns A list of machines on which function(machine) returned
629                without raising an exception.
630        """
631        results = self.parallel_simple(function, machines, timeout=timeout,
632                                       return_results=True)
633        success_machines = []
634        for result, machine in itertools.izip(results, machines):
635            if not isinstance(result, Exception):
636                success_machines.append(machine)
637        return success_machines
638
639
640    def record_skipped_test(self, skipped_test, message=None):
641        """Insert a failure record into status.log for this test."""
642        msg = message
643        if msg is None:
644            msg = 'No valid machines found for test %s.' % skipped_test
645        logging.info(msg)
646        self.record('START', None, skipped_test.test_name)
647        self.record('INFO', None, skipped_test.test_name, msg)
648        self.record('END TEST_NA', None, skipped_test.test_name, msg)
649
650
651    def _has_failed_tests(self):
652        """Parse status log for failed tests.
653
654        This checks the current working directory and is intended only for use
655        by the run() method.
656
657        @return boolean
658        """
659        path = os.getcwd()
660
661        # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
662        # make code reuse plausible.
663        job_keyval = tko_models.job.read_keyval(path)
664        status_version = job_keyval.get("status_version", 0)
665
666        # parse out the job
667        parser = parser_lib.parser(status_version)
668        job = parser.make_job(path)
669        status_log = os.path.join(path, "status.log")
670        if not os.path.exists(status_log):
671            status_log = os.path.join(path, "status")
672        if not os.path.exists(status_log):
673            logging.warning("! Unable to parse job, no status file")
674            return True
675
676        # parse the status logs
677        status_lines = open(status_log).readlines()
678        parser.start(job)
679        tests = parser.end(status_lines)
680
681        # parser.end can return the same object multiple times, so filter out
682        # dups
683        job.tests = []
684        already_added = set()
685        for test in tests:
686            if test not in already_added:
687                already_added.add(test)
688                job.tests.append(test)
689
690        failed = False
691        for test in job.tests:
692            # The current job is still running and shouldn't count as failed.
693            # The parser will fail to parse the exit status of the job since it
694            # hasn't exited yet (this running right now is the job).
695            failed = failed or (test.status != 'GOOD'
696                                and not _is_current_server_job(test))
697        return failed
698
699
700    def _collect_crashes(self, namespace, collect_crashinfo):
701        """Collect crashes.
702
703        @param namespace: namespace dict.
704        @param collect_crashinfo: whether to collect crashinfo in addition to
705                dumps
706        """
707        if collect_crashinfo:
708            # includes crashdumps
709            crash_control_file = CRASHINFO_CONTROL_FILE
710        else:
711            crash_control_file = CRASHDUMPS_CONTROL_FILE
712        self._execute_code(crash_control_file, namespace)
713
714
715    _USE_TEMP_DIR = object()
716    def run(self, collect_crashdumps=True, namespace={}, control=None,
717            control_file_dir=None, verify_job_repo_url=False,
718            only_collect_crashinfo=False, skip_crash_collection=False,
719            job_labels='', use_packaging=True):
720        # for a normal job, make sure the uncollected logs file exists
721        # for a crashinfo-only run it should already exist, bail out otherwise
722        created_uncollected_logs = False
723        logging.info("I am PID %s", os.getpid())
724        if self.resultdir and not os.path.exists(self._uncollected_log_file):
725            if only_collect_crashinfo:
726                # if this is a crashinfo-only run, and there were no existing
727                # uncollected logs, just bail out early
728                logging.info("No existing uncollected logs, "
729                             "skipping crashinfo collection")
730                return
731            else:
732                log_file = open(self._uncollected_log_file, "w")
733                pickle.dump([], log_file)
734                log_file.close()
735                created_uncollected_logs = True
736
737        # use a copy so changes don't affect the original dictionary
738        namespace = namespace.copy()
739        machines = self.machines
740        if control is None:
741            if self.control is None:
742                control = ''
743            elif self._use_client_trampoline:
744                control = self._load_control_file(
745                        CLIENT_TRAMPOLINE_CONTROL_FILE)
746                # repr of a string is safe for eval.
747                control = (('trampoline_testname = %r\n' % str(self.control))
748                           + control)
749            else:
750                control = self._load_control_file(self.control)
751        if control_file_dir is None:
752            control_file_dir = self.resultdir
753
754        self.aborted = False
755        namespace.update(self._make_namespace())
756        namespace.update({
757                'args': self.args,
758                'job_labels': job_labels,
759                'gtest_runner': site_gtest_runner.gtest_runner(),
760        })
761        test_start_time = int(time.time())
762
763        if self.resultdir:
764            os.chdir(self.resultdir)
765            # touch status.log so that the parser knows a job is running here
766            open(self.get_status_log_path(), 'a').close()
767            self.enable_external_logging()
768
769        collect_crashinfo = True
770        temp_control_file_dir = None
771        try:
772            try:
773                if not self.fast:
774                    with metrics.SecondsTimer(
775                            'chromeos/autotest/job/get_network_stats',
776                            fields = {'stage': 'start'}):
777                        namespace['network_stats_label'] = 'at-start'
778                        self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
779                                           namespace)
780
781                if only_collect_crashinfo:
782                    return
783
784                # If the verify_job_repo_url option is set but we're unable
785                # to actually verify that the job_repo_url contains the autotest
786                # package, this job will fail.
787                if verify_job_repo_url:
788                    self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
789                                       namespace)
790                else:
791                    logging.warning('Not checking if job_repo_url contains '
792                                    'autotest packages on %s', machines)
793
794                # determine the dir to write the control files to
795                cfd_specified = (control_file_dir
796                                 and control_file_dir is not self._USE_TEMP_DIR)
797                if cfd_specified:
798                    temp_control_file_dir = None
799                else:
800                    temp_control_file_dir = tempfile.mkdtemp(
801                        suffix='temp_control_file_dir')
802                    control_file_dir = temp_control_file_dir
803                server_control_file = os.path.join(control_file_dir,
804                                                   self._control_filename)
805                client_control_file = os.path.join(control_file_dir,
806                                                   CLIENT_CONTROL_FILENAME)
807                if self._client:
808                    namespace['control'] = control
809                    utils.open_write_close(client_control_file, control)
810                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
811                                    server_control_file)
812                else:
813                    utils.open_write_close(server_control_file, control)
814
815                logging.info("Processing control file")
816                namespace['use_packaging'] = use_packaging
817                self._execute_code(server_control_file, namespace)
818                logging.info("Finished processing control file")
819
820                # If no device error occured, no need to collect crashinfo.
821                collect_crashinfo = self.failed_with_device_error
822            except Exception as e:
823                try:
824                    logging.exception(
825                            'Exception escaped control file, job aborting:')
826                    reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
827                                    ' ', str(e))
828                    self.record('INFO', None, None, str(e),
829                                {'job_abort_reason': reason})
830                except:
831                    pass # don't let logging exceptions here interfere
832                raise
833        finally:
834            if temp_control_file_dir:
835                # Clean up temp directory used for copies of the control files
836                try:
837                    shutil.rmtree(temp_control_file_dir)
838                except Exception as e:
839                    logging.warning('Could not remove temp directory %s: %s',
840                                 temp_control_file_dir, e)
841
842            if machines and (collect_crashdumps or collect_crashinfo):
843                if skip_crash_collection or self.fast:
844                    logging.info('Skipping crash dump/info collection '
845                                 'as requested.')
846                else:
847                    with metrics.SecondsTimer(
848                            'chromeos/autotest/job/collect_crashinfo'):
849                        namespace['test_start_time'] = test_start_time
850                        # Remove crash files for passing tests.
851                        # TODO(ayatane): Tests that create crash files should be
852                        # reported.
853                        namespace['has_failed_tests'] = self._has_failed_tests()
854                        self._collect_crashes(namespace, collect_crashinfo)
855            self.disable_external_logging()
856            if self._uncollected_log_file and created_uncollected_logs:
857                os.remove(self._uncollected_log_file)
858
859            if not self.fast:
860                with metrics.SecondsTimer(
861                        'chromeos/autotest/job/get_network_stats',
862                        fields = {'stage': 'end'}):
863                    namespace['network_stats_label'] = 'at-end'
864                    self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
865                                       namespace)
866
867
868    def run_test(self, url, *args, **dargs):
869        """
870        Summon a test object and run it.
871
872        tag
873                tag to add to testname
874        url
875                url of the test to run
876        """
877        if self._disable_sysinfo:
878            dargs['disable_sysinfo'] = True
879
880        group, testname = self.pkgmgr.get_package_name(url, 'test')
881        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
882        outputdir = self._make_test_outputdir(subdir)
883
884        def group_func():
885            try:
886                test.runtest(self, url, tag, args, dargs)
887            except error.TestBaseException as e:
888                self.record(e.exit_status, subdir, testname, str(e))
889                raise
890            except Exception as e:
891                info = str(e) + "\n" + traceback.format_exc()
892                self.record('FAIL', subdir, testname, info)
893                raise
894            else:
895                self.record('GOOD', subdir, testname, 'completed successfully')
896
897        try:
898            result = self._run_group(testname, subdir, group_func)
899        except error.TestBaseException as e:
900            return False
901        else:
902            return True
903
904
905    def _run_group(self, name, subdir, function, *args, **dargs):
906        """Underlying method for running something inside of a group."""
907        result, exc_info = None, None
908        try:
909            self.record('START', subdir, name)
910            result = function(*args, **dargs)
911        except error.TestBaseException as e:
912            self.record("END %s" % e.exit_status, subdir, name)
913            raise
914        except Exception as e:
915            err_msg = str(e) + '\n'
916            err_msg += traceback.format_exc()
917            self.record('END ABORT', subdir, name, err_msg)
918            raise error.JobError(name + ' failed\n' + traceback.format_exc())
919        else:
920            self.record('END GOOD', subdir, name)
921        finally:
922            for hook in self._post_run_hooks:
923                hook()
924
925        return result
926
927
928    def run_group(self, function, *args, **dargs):
929        """\
930        @param function: subroutine to run
931        @returns: (result, exc_info). When the call succeeds, result contains
932                the return value of |function| and exc_info is None. If
933                |function| raises an exception, exc_info contains the tuple
934                returned by sys.exc_info(), and result is None.
935        """
936
937        name = function.__name__
938        # Allow the tag for the group to be specified.
939        tag = dargs.pop('tag', None)
940        if tag:
941            name = tag
942
943        try:
944            result = self._run_group(name, None, function, *args, **dargs)[0]
945        except error.TestBaseException:
946            return None, sys.exc_info()
947        return result, None
948
949
950    def run_op(self, op, op_func, get_kernel_func):
951        """\
952        A specialization of run_group meant specifically for handling
953        management operation. Includes support for capturing the kernel version
954        after the operation.
955
956        Args:
957           op: name of the operation.
958           op_func: a function that carries out the operation (reboot, suspend)
959           get_kernel_func: a function that returns a string
960                            representing the kernel version.
961        """
962        try:
963            self.record('START', None, op)
964            op_func()
965        except Exception as e:
966            err_msg = str(e) + '\n' + traceback.format_exc()
967            self.record('END FAIL', None, op, err_msg)
968            raise
969        else:
970            kernel = get_kernel_func()
971            self.record('END GOOD', None, op,
972                        optional_fields={"kernel": kernel})
973
974
975    def run_control(self, path):
976        """Execute a control file found at path (relative to the autotest
977        path). Intended for executing a control file within a control file,
978        not for running the top-level job control file."""
979        path = os.path.join(self.autodir, path)
980        control_file = self._load_control_file(path)
981        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
982
983
984    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
985        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
986                                   on_every_test)
987
988
989    def add_sysinfo_logfile(self, file, on_every_test=False):
990        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
991
992
993    def _add_sysinfo_loggable(self, loggable, on_every_test):
994        if on_every_test:
995            self.sysinfo.test_loggables.add(loggable)
996        else:
997            self.sysinfo.boot_loggables.add(loggable)
998
999
1000    def _read_warnings(self):
1001        """Poll all the warning loggers and extract any new warnings that have
1002        been logged. If the warnings belong to a category that is currently
1003        disabled, this method will discard them and they will no longer be
1004        retrievable.
1005
1006        Returns a list of (timestamp, message) tuples, where timestamp is an
1007        integer epoch timestamp."""
1008        warnings = []
1009        while True:
1010            # pull in a line of output from every logger that has
1011            # output ready to be read
1012            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
1013            closed_loggers = set()
1014            for logger in loggers:
1015                line = logger.readline()
1016                # record any broken pipes (aka line == empty)
1017                if len(line) == 0:
1018                    closed_loggers.add(logger)
1019                    continue
1020                # parse out the warning
1021                timestamp, msgtype, msg = line.split('\t', 2)
1022                timestamp = int(timestamp)
1023                # if the warning is valid, add it to the results
1024                if self.warning_manager.is_valid(timestamp, msgtype):
1025                    warnings.append((timestamp, msg.strip()))
1026
1027            # stop listening to loggers that are closed
1028            self.warning_loggers -= closed_loggers
1029
1030            # stop if none of the loggers have any output left
1031            if not loggers:
1032                break
1033
1034        # sort into timestamp order
1035        warnings.sort()
1036        return warnings
1037
1038
1039    def _unique_subdirectory(self, base_subdirectory_name):
1040        """Compute a unique results subdirectory based on the given name.
1041
1042        Appends base_subdirectory_name with a number as necessary to find a
1043        directory name that doesn't already exist.
1044        """
1045        subdirectory = base_subdirectory_name
1046        counter = 1
1047        while os.path.exists(os.path.join(self.resultdir, subdirectory)):
1048            subdirectory = base_subdirectory_name + '.' + str(counter)
1049            counter += 1
1050        return subdirectory
1051
1052
1053    def get_record_context(self):
1054        """Returns an object representing the current job.record context.
1055
1056        The object returned is an opaque object with a 0-arg restore method
1057        which can be called to restore the job.record context (i.e. indentation)
1058        to the current level. The intention is that it should be used when
1059        something external which generate job.record calls (e.g. an autotest
1060        client) can fail catastrophically and the server job record state
1061        needs to be reset to its original "known good" state.
1062
1063        @return: A context object with a 0-arg restore() method."""
1064        return self._indenter.get_context()
1065
1066
1067    def record_summary(self, status_code, test_name, reason='', attributes=None,
1068                       distinguishing_attributes=(), child_test_ids=None):
1069        """Record a summary test result.
1070
1071        @param status_code: status code string, see
1072                common_lib.log.is_valid_status()
1073        @param test_name: name of the test
1074        @param reason: (optional) string providing detailed reason for test
1075                outcome
1076        @param attributes: (optional) dict of string keyvals to associate with
1077                this result
1078        @param distinguishing_attributes: (optional) list of attribute names
1079                that should be used to distinguish identically-named test
1080                results.  These attributes should be present in the attributes
1081                parameter.  This is used to generate user-friendly subdirectory
1082                names.
1083        @param child_test_ids: (optional) list of test indices for test results
1084                used in generating this result.
1085        """
1086        subdirectory_name_parts = [test_name]
1087        for attribute in distinguishing_attributes:
1088            assert attributes
1089            assert attribute in attributes, '%s not in %s' % (attribute,
1090                                                              attributes)
1091            subdirectory_name_parts.append(attributes[attribute])
1092        base_subdirectory_name = '.'.join(subdirectory_name_parts)
1093
1094        subdirectory = self._unique_subdirectory(base_subdirectory_name)
1095        subdirectory_path = os.path.join(self.resultdir, subdirectory)
1096        os.mkdir(subdirectory_path)
1097
1098        self.record(status_code, subdirectory, test_name,
1099                    status=reason, optional_fields={'is_summary': True})
1100
1101        if attributes:
1102            utils.write_keyval(subdirectory_path, attributes)
1103
1104        if child_test_ids:
1105            ids_string = ','.join(str(test_id) for test_id in child_test_ids)
1106            summary_data = {'child_test_ids': ids_string}
1107            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
1108                               summary_data)
1109
1110
1111    def add_post_run_hook(self, hook):
1112        """
1113        Registers a hook to run after the main job function.
1114
1115        This provides a mechanism by which tests that perform multiple tests of
1116        their own can write additional top-level results to the TKO status.log
1117        file.
1118
1119        @param hook: Function to invoke (without any args) after the main job
1120            function completes and the job status is logged.
1121        """
1122        self._post_run_hooks.append(hook)
1123
1124
1125    def disable_warnings(self, warning_type):
1126        self.warning_manager.disable_warnings(warning_type)
1127        self.record("INFO", None, None,
1128                    "disabling %s warnings" % warning_type,
1129                    {"warnings.disable": warning_type})
1130
1131
1132    def enable_warnings(self, warning_type):
1133        self.warning_manager.enable_warnings(warning_type)
1134        self.record("INFO", None, None,
1135                    "enabling %s warnings" % warning_type,
1136                    {"warnings.enable": warning_type})
1137
1138
1139    def get_status_log_path(self, subdir=None):
1140        """Return the path to the job status log.
1141
1142        @param subdir - Optional paramter indicating that you want the path
1143            to a subdirectory status log.
1144
1145        @returns The path where the status log should be.
1146        """
1147        if self.resultdir:
1148            if subdir:
1149                return os.path.join(self.resultdir, subdir, "status.log")
1150            else:
1151                return os.path.join(self.resultdir, "status.log")
1152        else:
1153            return None
1154
1155
1156    def _update_uncollected_logs_list(self, update_func):
1157        """Updates the uncollected logs list in a multi-process safe manner.
1158
1159        @param update_func - a function that updates the list of uncollected
1160            logs. Should take one parameter, the list to be updated.
1161        """
1162        # Skip log collection if file _uncollected_log_file does not exist.
1163        if not (self._uncollected_log_file and
1164                os.path.exists(self._uncollected_log_file)):
1165            return
1166        if self._uncollected_log_file:
1167            log_file = open(self._uncollected_log_file, "r+")
1168            fcntl.flock(log_file, fcntl.LOCK_EX)
1169        try:
1170            uncollected_logs = pickle.load(log_file)
1171            update_func(uncollected_logs)
1172            log_file.seek(0)
1173            log_file.truncate()
1174            pickle.dump(uncollected_logs, log_file)
1175            log_file.flush()
1176        finally:
1177            fcntl.flock(log_file, fcntl.LOCK_UN)
1178            log_file.close()
1179
1180
1181    def add_client_log(self, hostname, remote_path, local_path):
1182        """Adds a new set of client logs to the list of uncollected logs,
1183        to allow for future log recovery.
1184
1185        @param host - the hostname of the machine holding the logs
1186        @param remote_path - the directory on the remote machine holding logs
1187        @param local_path - the local directory to copy the logs into
1188        """
1189        def update_func(logs_list):
1190            logs_list.append((hostname, remote_path, local_path))
1191        self._update_uncollected_logs_list(update_func)
1192
1193
1194    def remove_client_log(self, hostname, remote_path, local_path):
1195        """Removes a set of client logs from the list of uncollected logs,
1196        to allow for future log recovery.
1197
1198        @param host - the hostname of the machine holding the logs
1199        @param remote_path - the directory on the remote machine holding logs
1200        @param local_path - the local directory to copy the logs into
1201        """
1202        def update_func(logs_list):
1203            logs_list.remove((hostname, remote_path, local_path))
1204        self._update_uncollected_logs_list(update_func)
1205
1206
1207    def get_client_logs(self):
1208        """Retrieves the list of uncollected logs, if it exists.
1209
1210        @returns A list of (host, remote_path, local_path) tuples. Returns
1211                 an empty list if no uncollected logs file exists.
1212        """
1213        log_exists = (self._uncollected_log_file and
1214                      os.path.exists(self._uncollected_log_file))
1215        if log_exists:
1216            return pickle.load(open(self._uncollected_log_file))
1217        else:
1218            return []
1219
1220
1221    def _fill_server_control_namespace(self, namespace, protect=True):
1222        """
1223        Prepare a namespace to be used when executing server control files.
1224
1225        This sets up the control file API by importing modules and making them
1226        available under the appropriate names within namespace.
1227
1228        For use by _execute_code().
1229
1230        Args:
1231          namespace: The namespace dictionary to fill in.
1232          protect: Boolean.  If True (the default) any operation that would
1233              clobber an existing entry in namespace will cause an error.
1234        Raises:
1235          error.AutoservError: When a name would be clobbered by import.
1236        """
1237        def _import_names(module_name, names=()):
1238            """
1239            Import a module and assign named attributes into namespace.
1240
1241            Args:
1242                module_name: The string module name.
1243                names: A limiting list of names to import from module_name.  If
1244                    empty (the default), all names are imported from the module
1245                    similar to a "from foo.bar import *" statement.
1246            Raises:
1247                error.AutoservError: When a name being imported would clobber
1248                    a name already in namespace.
1249            """
1250            module = __import__(module_name, {}, {}, names)
1251
1252            # No names supplied?  Import * from the lowest level module.
1253            # (Ugh, why do I have to implement this part myself?)
1254            if not names:
1255                for submodule_name in module_name.split('.')[1:]:
1256                    module = getattr(module, submodule_name)
1257                if hasattr(module, '__all__'):
1258                    names = getattr(module, '__all__')
1259                else:
1260                    names = dir(module)
1261
1262            # Install each name into namespace, checking to make sure it
1263            # doesn't override anything that already exists.
1264            for name in names:
1265                # Check for conflicts to help prevent future problems.
1266                if name in namespace and protect:
1267                    if namespace[name] is not getattr(module, name):
1268                        raise error.AutoservError('importing name '
1269                                '%s from %s %r would override %r' %
1270                                (name, module_name, getattr(module, name),
1271                                 namespace[name]))
1272                    else:
1273                        # Encourage cleanliness and the use of __all__ for a
1274                        # more concrete API with less surprises on '*' imports.
1275                        warnings.warn('%s (%r) being imported from %s for use '
1276                                      'in server control files is not the '
1277                                      'first occurrence of that import.' %
1278                                      (name, namespace[name], module_name))
1279
1280                namespace[name] = getattr(module, name)
1281
1282
1283        # This is the equivalent of prepending a bunch of import statements to
1284        # the front of the control script.
1285        namespace.update(os=os, sys=sys, logging=logging)
1286        _import_names('autotest_lib.server',
1287                ('hosts', 'autotest', 'standalone_profiler'))
1288        _import_names('autotest_lib.server.subcommand',
1289                      ('parallel', 'parallel_simple', 'subcommand'))
1290        _import_names('autotest_lib.server.utils',
1291                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1292        _import_names('autotest_lib.client.common_lib.error')
1293        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1294
1295        # Inject ourself as the job object into other classes within the API.
1296        # (Yuck, this injection is a gross thing be part of a public API. -gps)
1297        #
1298        # XXX Autotest does not appear to use .job.  Who does?
1299        namespace['autotest'].Autotest.job = self
1300        # server.hosts.base_classes.Host uses .job.
1301        namespace['hosts'].Host.job = self
1302        namespace['hosts'].factory.ssh_user = self._ssh_user
1303        namespace['hosts'].factory.ssh_port = self._ssh_port
1304        namespace['hosts'].factory.ssh_pass = self._ssh_pass
1305        namespace['hosts'].factory.ssh_verbosity_flag = (
1306                self._ssh_verbosity_flag)
1307        namespace['hosts'].factory.ssh_options = self._ssh_options
1308
1309
1310    def _execute_code(self, code_file, namespace, protect=True):
1311        """
1312        Execute code using a copy of namespace as a server control script.
1313
1314        Unless protect_namespace is explicitly set to False, the dict will not
1315        be modified.
1316
1317        Args:
1318          code_file: The filename of the control file to execute.
1319          namespace: A dict containing names to make available during execution.
1320          protect: Boolean.  If True (the default) a copy of the namespace dict
1321              is used during execution to prevent the code from modifying its
1322              contents outside of this function.  If False the raw dict is
1323              passed in and modifications will be allowed.
1324        """
1325        if protect:
1326            namespace = namespace.copy()
1327        self._fill_server_control_namespace(namespace, protect=protect)
1328        # TODO: Simplify and get rid of the special cases for only 1 machine.
1329        if len(self.machines) > 1:
1330            machines_text = '\n'.join(self.machines) + '\n'
1331            # Only rewrite the file if it does not match our machine list.
1332            try:
1333                machines_f = open(MACHINES_FILENAME, 'r')
1334                existing_machines_text = machines_f.read()
1335                machines_f.close()
1336            except EnvironmentError:
1337                existing_machines_text = None
1338            if machines_text != existing_machines_text:
1339                utils.open_write_close(MACHINES_FILENAME, machines_text)
1340        execfile(code_file, namespace, namespace)
1341
1342
1343    def preprocess_client_state(self):
1344        """
1345        Produce a state file for initializing the state of a client job.
1346
1347        Creates a new client state file with all the current server state, as
1348        well as some pre-set client state.
1349
1350        @returns The path of the file the state was written into.
1351        """
1352        # initialize the sysinfo state
1353        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1354
1355        # dump the state out to a tempfile
1356        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1357        os.close(fd)
1358
1359        # write_to_file doesn't need locking, we exclusively own file_path
1360        self._state.write_to_file(file_path)
1361        return file_path
1362
1363
1364    def postprocess_client_state(self, state_path):
1365        """
1366        Update the state of this job with the state from a client job.
1367
1368        Updates the state of the server side of a job with the final state
1369        of a client job that was run. Updates the non-client-specific state,
1370        pulls in some specific bits from the client-specific state, and then
1371        discards the rest. Removes the state file afterwards
1372
1373        @param state_file A path to the state file from the client.
1374        """
1375        # update the on-disk state
1376        try:
1377            self._state.read_from_file(state_path)
1378            os.remove(state_path)
1379        except OSError, e:
1380            # ignore file-not-found errors
1381            if e.errno != errno.ENOENT:
1382                raise
1383            else:
1384                logging.debug('Client state file %s not found', state_path)
1385
1386        # update the sysinfo state
1387        if self._state.has('client', 'sysinfo'):
1388            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1389
1390        # drop all the client-specific state
1391        self._state.discard_namespace('client')
1392
1393
1394    def clear_all_known_hosts(self):
1395        """Clears known hosts files for all AbstractSSHHosts."""
1396        for host in self.hosts:
1397            if isinstance(host, abstract_ssh.AbstractSSHHost):
1398                host.clear_known_hosts()
1399
1400
1401    def close(self):
1402        """Closes this job's operation."""
1403
1404        # Use shallow copy, because host.close() internally discards itself.
1405        for host in list(self.hosts):
1406            host.close()
1407        assert not self.hosts
1408        self._connection_pool.shutdown()
1409
1410
1411    def _get_job_data(self):
1412        """Add custom data to the job keyval info.
1413
1414        When multiple machines are used in a job, change the hostname to
1415        the platform of the first machine instead of machine1,machine2,...  This
1416        makes the job reports easier to read and keeps the tko_machines table from
1417        growing too large.
1418
1419        Returns:
1420            keyval dictionary with new hostname value, or empty dictionary.
1421        """
1422        job_data = {}
1423        # Only modify hostname on multimachine jobs. Assume all host have the same
1424        # platform.
1425        if len(self.machines) > 1:
1426            # Search through machines for first machine with a platform.
1427            for host in self.machines:
1428                keyval_path = os.path.join(self.resultdir, 'host_keyvals', host)
1429                keyvals = utils.read_keyval(keyval_path)
1430                host_plat = keyvals.get('platform', None)
1431                if not host_plat:
1432                    continue
1433                job_data['hostname'] = host_plat
1434                break
1435        return job_data
1436
1437
1438class warning_manager(object):
1439    """Class for controlling warning logs. Manages the enabling and disabling
1440    of warnings."""
1441    def __init__(self):
1442        # a map of warning types to a list of disabled time intervals
1443        self.disabled_warnings = {}
1444
1445
1446    def is_valid(self, timestamp, warning_type):
1447        """Indicates if a warning (based on the time it occured and its type)
1448        is a valid warning. A warning is considered "invalid" if this type of
1449        warning was marked as "disabled" at the time the warning occured."""
1450        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1451        for start, end in disabled_intervals:
1452            if timestamp >= start and (end is None or timestamp < end):
1453                return False
1454        return True
1455
1456
1457    def disable_warnings(self, warning_type, current_time_func=time.time):
1458        """As of now, disables all further warnings of this type."""
1459        intervals = self.disabled_warnings.setdefault(warning_type, [])
1460        if not intervals or intervals[-1][1] is not None:
1461            intervals.append((int(current_time_func()), None))
1462
1463
1464    def enable_warnings(self, warning_type, current_time_func=time.time):
1465        """As of now, enables all further warnings of this type."""
1466        intervals = self.disabled_warnings.get(warning_type, [])
1467        if intervals and intervals[-1][1] is None:
1468            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1469
1470
1471def _is_current_server_job(test):
1472    """Return True if parsed test is the currently running job.
1473
1474    @param test: test instance from tko parser.
1475    """
1476    return test.testname == 'SERVER_JOB'
1477
1478
1479def _create_afe_host(hostname):
1480    """Create an afe_host object backed by the AFE.
1481
1482    @param hostname: Name of the host for which we want the Host object.
1483    @returns: An object of type frontend.AFE
1484    """
1485    afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
1486    hosts = afe.get_hosts(hostname=hostname)
1487    if not hosts:
1488        raise error.AutoservError('No hosts named %s found' % hostname)
1489
1490    return hosts[0]
1491
1492
1493def _create_file_backed_host_info_store(store_dir, hostname):
1494    """Create a CachingHostInfoStore backed by an existing file.
1495
1496    @param store_dir: A directory to contain store backing files.
1497    @param hostname: Name of the host for which we want the store.
1498
1499    @returns: An object of type CachingHostInfoStore.
1500    """
1501    backing_file_path = os.path.join(store_dir, '%s.store' % hostname)
1502    if not os.path.isfile(backing_file_path):
1503        raise error.AutoservError(
1504                'Requested FileStore but no backing file at %s'
1505                % backing_file_path
1506        )
1507    return file_store.FileStore(backing_file_path)
1508
1509
1510def _create_afe_backed_host_info_store(store_dir, hostname):
1511    """Create a CachingHostInfoStore backed by the AFE.
1512
1513    @param store_dir: A directory to contain store backing files.
1514    @param hostname: Name of the host for which we want the store.
1515
1516    @returns: An object of type CachingHostInfoStore.
1517    """
1518    primary_store = afe_store.AfeStore(hostname)
1519    try:
1520        primary_store.get(force_refresh=True)
1521    except host_info.StoreError:
1522        raise error.AutoservError(
1523                'Could not obtain HostInfo for hostname %s' % hostname)
1524    # Since the store wasn't initialized external to autoserv, we must
1525    # ensure that the store we create is unique within store_dir.
1526    backing_file_path = os.path.join(
1527            _make_unique_subdir(store_dir),
1528            '%s.store' % hostname,
1529    )
1530    logging.info('Shadowing AFE store with a FileStore at %s',
1531                 backing_file_path)
1532    shadow_store = file_store.FileStore(backing_file_path)
1533    return shadowing_store.ShadowingStore(primary_store, shadow_store)
1534
1535
1536def _make_unique_subdir(workdir):
1537    """Creates a new subdir within workdir and returns the path to it."""
1538    store_dir = os.path.join(workdir, 'dir_%s' % uuid.uuid4())
1539    _make_dirs_if_needed(store_dir)
1540    return store_dir
1541
1542
1543def _make_dirs_if_needed(path):
1544    """os.makedirs, but ignores failure because the leaf directory exists"""
1545    try:
1546        os.makedirs(path)
1547    except OSError as e:
1548        if e.errno != errno.EEXIST:
1549            raise
1550