1# pylint: disable-msg=C0111
2
3# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6"""
7The main job wrapper for the server side.
8
9This is the core infrastructure. Derived from the client side job.py
10
11Copyright Martin J. Bligh, Andy Whitcroft 2007
12"""
13
14import errno
15import fcntl
16import getpass
17import itertools
18import logging
19import os
20import pickle
21import platform
22import re
23import select
24import shutil
25import sys
26import tempfile
27import time
28import traceback
29import uuid
30import warnings
31
32from autotest_lib.client.bin import sysinfo
33from autotest_lib.client.common_lib import base_job
34from autotest_lib.client.common_lib import control_data
35from autotest_lib.client.common_lib import error
36from autotest_lib.client.common_lib import global_config
37from autotest_lib.client.common_lib import logging_manager
38from autotest_lib.client.common_lib import packages
39from autotest_lib.client.common_lib import utils
40from autotest_lib.server import profilers
41from autotest_lib.server import site_gtest_runner
42from autotest_lib.server import subcommand
43from autotest_lib.server import test
44from autotest_lib.server import utils as server_utils
45from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
46from autotest_lib.server.hosts import abstract_ssh
47from autotest_lib.server.hosts import afe_store
48from autotest_lib.server.hosts import file_store
49from autotest_lib.server.hosts import shadowing_store
50from autotest_lib.server.hosts import factory as host_factory
51from autotest_lib.server.hosts import host_info
52from autotest_lib.server.hosts import ssh_multiplex
53from autotest_lib.tko import db as tko_db
54from autotest_lib.tko import models as tko_models
55from autotest_lib.tko import status_lib
56from autotest_lib.tko import parser_lib
57from autotest_lib.tko import utils as tko_utils
58
59try:
60    from chromite.lib import metrics
61except ImportError:
62    metrics = utils.metrics_mock
63
64
65INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value(
66        'autoserv', 'incremental_tko_parsing', type=bool, default=False)
67
68def _control_segment_path(name):
69    """Get the pathname of the named control segment file."""
70    server_dir = os.path.dirname(os.path.abspath(__file__))
71    return os.path.join(server_dir, "control_segments", name)
72
73
74CLIENT_CONTROL_FILENAME = 'control'
75SERVER_CONTROL_FILENAME = 'control.srv'
76MACHINES_FILENAME = '.machines'
77
78CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
79CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
80CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
81INSTALL_CONTROL_FILE = _control_segment_path('install')
82CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
83VERIFY_CONTROL_FILE = _control_segment_path('verify')
84REPAIR_CONTROL_FILE = _control_segment_path('repair')
85PROVISION_CONTROL_FILE = _control_segment_path('provision')
86VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
87RESET_CONTROL_FILE = _control_segment_path('reset')
88GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
89
90
91def get_machine_dicts(machine_names, workdir, in_lab, host_attributes=None,
92                      connection_pool=None):
93    """Converts a list of machine names to list of dicts.
94
95    TODO(crbug.com/678430): This function temporarily has a side effect of
96    creating files under workdir for backing a FileStore. This side-effect will
97    go away once callers of autoserv start passing in the FileStore.
98
99    @param machine_names: A list of machine names.
100    @param workdir: A directory where any on-disk files related to the machines
101            may be created.
102    @param in_lab: A boolean indicating whether we're running in lab.
103    @param host_attributes: Optional list of host attributes to add for each
104            host.
105    @param connection_pool: ssh_multiplex.ConnectionPool instance to share
106            master connections across control scripts.
107    @returns: A list of dicts. Each dict has the following keys:
108            'hostname': Name of the machine originally in machine_names (str).
109            'afe_host': A frontend.Host object for the machine, or a stub if
110                    in_lab is false.
111            'host_info_store': A host_info.CachingHostInfoStore object to obtain
112                    host information. A stub if in_lab is False.
113            'connection_pool': ssh_multiplex.ConnectionPool instance to share
114                    master ssh connection across control scripts.
115    """
116    machine_dict_list = []
117    for machine in machine_names:
118        # See autoserv_parser.parse_args. Only one of in_lab or host_attributes
119        # can be provided.
120        if not in_lab:
121            afe_host = server_utils.EmptyAFEHost()
122            host_info_store = host_info.InMemoryHostInfoStore()
123            if host_attributes is not None:
124                afe_host.attributes.update(host_attributes)
125                info = host_info.HostInfo(attributes=host_attributes)
126                host_info_store.commit(info)
127        elif host_attributes:
128            raise error.AutoservError(
129                    'in_lab and host_attribute are mutually exclusive. '
130                    'Obtained in_lab:%s, host_attributes:%s'
131                    % (in_lab, host_attributes))
132        else:
133            afe_host = _create_afe_host(machine)
134            host_info_store = _create_host_info_store(machine, workdir)
135
136        machine_dict_list.append({
137                'hostname' : machine,
138                'afe_host' : afe_host,
139                'host_info_store': host_info_store,
140                'connection_pool': connection_pool,
141        })
142
143    return machine_dict_list
144
145
146class status_indenter(base_job.status_indenter):
147    """Provide a simple integer-backed status indenter."""
148    def __init__(self):
149        self._indent = 0
150
151
152    @property
153    def indent(self):
154        return self._indent
155
156
157    def increment(self):
158        self._indent += 1
159
160
161    def decrement(self):
162        self._indent -= 1
163
164
165    def get_context(self):
166        """Returns a context object for use by job.get_record_context."""
167        class context(object):
168            def __init__(self, indenter, indent):
169                self._indenter = indenter
170                self._indent = indent
171            def restore(self):
172                self._indenter._indent = self._indent
173        return context(self, self._indent)
174
175
176class server_job_record_hook(object):
177    """The job.record hook for server job. Used to inject WARN messages from
178    the console or vlm whenever new logs are written, and to echo any logs
179    to INFO level logging. Implemented as a class so that it can use state to
180    block recursive calls, so that the hook can call job.record itself to
181    log WARN messages.
182
183    Depends on job._read_warnings and job._logger.
184    """
185    def __init__(self, job):
186        self._job = job
187        self._being_called = False
188
189
190    def __call__(self, entry):
191        """A wrapper around the 'real' record hook, the _hook method, which
192        prevents recursion. This isn't making any effort to be threadsafe,
193        the intent is to outright block infinite recursion via a
194        job.record->_hook->job.record->_hook->job.record... chain."""
195        if self._being_called:
196            return
197        self._being_called = True
198        try:
199            self._hook(self._job, entry)
200        finally:
201            self._being_called = False
202
203
204    @staticmethod
205    def _hook(job, entry):
206        """The core hook, which can safely call job.record."""
207        entries = []
208        # poll all our warning loggers for new warnings
209        for timestamp, msg in job._read_warnings():
210            warning_entry = base_job.status_log_entry(
211                'WARN', None, None, msg, {}, timestamp=timestamp)
212            entries.append(warning_entry)
213            job.record_entry(warning_entry)
214        # echo rendered versions of all the status logs to info
215        entries.append(entry)
216        for entry in entries:
217            rendered_entry = job._logger.render_entry(entry)
218            logging.info(rendered_entry)
219            job._parse_status(rendered_entry)
220
221
222class server_job(base_job.base_job):
223    """The server-side concrete implementation of base_job.
224
225    Optional properties provided by this implementation:
226        serverdir
227
228        num_tests_run
229        num_tests_failed
230
231        warning_manager
232        warning_loggers
233    """
234
235    _STATUS_VERSION = 1
236
237    # TODO crbug.com/285395 eliminate ssh_verbosity_flag
238    def __init__(self, control, args, resultdir, label, user, machines,
239                 client=False, parse_job='',
240                 ssh_user=host_factory.DEFAULT_SSH_USER,
241                 ssh_port=host_factory.DEFAULT_SSH_PORT,
242                 ssh_pass=host_factory.DEFAULT_SSH_PASS,
243                 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
244                 ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
245                 test_retry=0, group_name='',
246                 tag='', disable_sysinfo=False,
247                 control_filename=SERVER_CONTROL_FILENAME,
248                 parent_job_id=None, host_attributes=None, in_lab=False):
249        """
250        Create a server side job object.
251
252        @param control: The pathname of the control file.
253        @param args: Passed to the control file.
254        @param resultdir: Where to throw the results.
255        @param label: Description of the job.
256        @param user: Username for the job (email address).
257        @param client: True if this is a client-side control file.
258        @param parse_job: string, if supplied it is the job execution tag that
259                the results will be passed through to the TKO parser with.
260        @param ssh_user: The SSH username.  [root]
261        @param ssh_port: The SSH port number.  [22]
262        @param ssh_pass: The SSH passphrase, if needed.
263        @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
264                '-vvv', or an empty string if not needed.
265        @param ssh_options: A string giving additional options that will be
266                            included in ssh commands.
267        @param test_retry: The number of times to retry a test if the test did
268                not complete successfully.
269        @param group_name: If supplied, this will be written out as
270                host_group_name in the keyvals file for the parser.
271        @param tag: The job execution tag from the scheduler.  [optional]
272        @param disable_sysinfo: Whether we should disable the sysinfo step of
273                tests for a modest shortening of test time.  [optional]
274        @param control_filename: The filename where the server control file
275                should be written in the results directory.
276        @param parent_job_id: Job ID of the parent job. Default to None if the
277                job does not have a parent job.
278        @param host_attributes: Dict of host attributes passed into autoserv
279                                via the command line. If specified here, these
280                                attributes will apply to all machines.
281        @param in_lab: Boolean that indicates if this is running in the lab
282                       environment.
283        """
284        super(server_job, self).__init__(resultdir=resultdir,
285                                         test_retry=test_retry)
286        self.test_retry = test_retry
287        self.control = control
288        self._uncollected_log_file = os.path.join(self.resultdir,
289                                                  'uncollected_logs')
290        debugdir = os.path.join(self.resultdir, 'debug')
291        if not os.path.exists(debugdir):
292            os.mkdir(debugdir)
293
294        if user:
295            self.user = user
296        else:
297            self.user = getpass.getuser()
298
299        self.args = args
300        self.label = label
301        self.machines = machines
302        self._client = client
303        self.warning_loggers = set()
304        self.warning_manager = warning_manager()
305        self._ssh_user = ssh_user
306        self._ssh_port = ssh_port
307        self._ssh_pass = ssh_pass
308        self._ssh_verbosity_flag = ssh_verbosity_flag
309        self._ssh_options = ssh_options
310        self.tag = tag
311        self.hosts = set()
312        self.drop_caches = False
313        self.drop_caches_between_iterations = False
314        self._control_filename = control_filename
315        self._disable_sysinfo = disable_sysinfo
316
317        self.logging = logging_manager.get_logging_manager(
318                manage_stdout_and_stderr=True, redirect_fds=True)
319        subcommand.logging_manager_object = self.logging
320
321        self.sysinfo = sysinfo.sysinfo(self.resultdir)
322        self.profilers = profilers.profilers(self)
323
324        job_data = {'label' : label, 'user' : user,
325                    'hostname' : ','.join(machines),
326                    'drone' : platform.node(),
327                    'status_version' : str(self._STATUS_VERSION),
328                    'job_started' : str(int(time.time()))}
329        # Save parent job id to keyvals, so parser can retrieve the info and
330        # write to tko_jobs record.
331        if parent_job_id:
332            job_data['parent_job_id'] = parent_job_id
333        if group_name:
334            job_data['host_group_name'] = group_name
335
336        # only write these keyvals out on the first job in a resultdir
337        if 'job_started' not in utils.read_keyval(self.resultdir):
338            job_data.update(self._get_job_data())
339            utils.write_keyval(self.resultdir, job_data)
340
341        self._parse_job = parse_job
342        self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job
343                              and len(machines) <= 1)
344        self.pkgmgr = packages.PackageManager(
345            self.autodir, run_function_dargs={'timeout':600})
346        self.num_tests_run = 0
347        self.num_tests_failed = 0
348
349        self._register_subcommand_hooks()
350
351        # set up the status logger
352        self._indenter = status_indenter()
353        self._logger = base_job.status_logger(
354            self, self._indenter, 'status.log', 'status.log',
355            record_hook=server_job_record_hook(self))
356
357        # Initialize a flag to indicate DUT failure during the test, e.g.,
358        # unexpected reboot.
359        self.failed_with_device_error = False
360
361        self._connection_pool = ssh_multiplex.ConnectionPool()
362
363        self.parent_job_id = parent_job_id
364        self.in_lab = in_lab
365        self.machine_dict_list = get_machine_dicts(
366                self.machines, self.resultdir, self.in_lab, host_attributes,
367                self._connection_pool)
368
369        # TODO(jrbarnette) The harness attribute is only relevant to
370        # client jobs, but it's required to be present, or we will fail
371        # server job unit tests.  Yes, really.
372        #
373        # TODO(jrbarnette) The utility of the 'harness' attribute even
374        # to client jobs is suspect.  Probably, we should remove it.
375        self.harness = None
376
377        if control:
378            parsed_control = control_data.parse_control(
379                    control, raise_warnings=False)
380            self.fast = parsed_control.fast
381            self.max_result_size_KB = parsed_control.max_result_size_KB
382        else:
383            self.fast = False
384            # Set the maximum result size to be the default specified in
385            # global config, if the job has no control file associated.
386            self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
387
388
389    @classmethod
390    def _find_base_directories(cls):
391        """
392        Determine locations of autodir, clientdir and serverdir. Assumes
393        that this file is located within serverdir and uses __file__ along
394        with relative paths to resolve the location.
395        """
396        serverdir = os.path.abspath(os.path.dirname(__file__))
397        autodir = os.path.normpath(os.path.join(serverdir, '..'))
398        clientdir = os.path.join(autodir, 'client')
399        return autodir, clientdir, serverdir
400
401
402    def _find_resultdir(self, resultdir, *args, **dargs):
403        """
404        Determine the location of resultdir. For server jobs we expect one to
405        always be explicitly passed in to __init__, so just return that.
406        """
407        if resultdir:
408            return os.path.normpath(resultdir)
409        else:
410            return None
411
412
413    def _get_status_logger(self):
414        """Return a reference to the status logger."""
415        return self._logger
416
417
418    @staticmethod
419    def _load_control_file(path):
420        f = open(path)
421        try:
422            control_file = f.read()
423        finally:
424            f.close()
425        return re.sub('\r', '', control_file)
426
427
428    def _register_subcommand_hooks(self):
429        """
430        Register some hooks into the subcommand modules that allow us
431        to properly clean up self.hosts created in forked subprocesses.
432        """
433        def on_fork(cmd):
434            self._existing_hosts_on_fork = set(self.hosts)
435        def on_join(cmd):
436            new_hosts = self.hosts - self._existing_hosts_on_fork
437            for host in new_hosts:
438                host.close()
439        subcommand.subcommand.register_fork_hook(on_fork)
440        subcommand.subcommand.register_join_hook(on_join)
441
442
443    def init_parser(self):
444        """
445        Start the continuous parsing of self.resultdir. This sets up
446        the database connection and inserts the basic job object into
447        the database if necessary.
448        """
449        if self.fast and not self._using_parser:
450            self.parser = parser_lib.parser(self._STATUS_VERSION)
451            self.job_model = self.parser.make_job(self.resultdir)
452            self.parser.start(self.job_model)
453            return
454
455        if not self._using_parser:
456            return
457
458        # redirect parser debugging to .parse.log
459        parse_log = os.path.join(self.resultdir, '.parse.log')
460        parse_log = open(parse_log, 'w', 0)
461        tko_utils.redirect_parser_debugging(parse_log)
462        # create a job model object and set up the db
463        self.results_db = tko_db.db(autocommit=True)
464        self.parser = parser_lib.parser(self._STATUS_VERSION)
465        self.job_model = self.parser.make_job(self.resultdir)
466        self.parser.start(self.job_model)
467        # check if a job already exists in the db and insert it if
468        # it does not
469        job_idx = self.results_db.find_job(self._parse_job)
470        if job_idx is None:
471            self.results_db.insert_job(self._parse_job, self.job_model,
472                                       self.parent_job_id)
473        else:
474            machine_idx = self.results_db.lookup_machine(self.job_model.machine)
475            self.job_model.index = job_idx
476            self.job_model.machine_idx = machine_idx
477
478
479    def cleanup_parser(self):
480        """
481        This should be called after the server job is finished
482        to carry out any remaining cleanup (e.g. flushing any
483        remaining test results to the results db)
484        """
485        if self.fast and not self._using_parser:
486            final_tests = self.parser.end()
487            for test in final_tests:
488                if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
489                    self.num_tests_failed += 1
490            return
491
492        if not self._using_parser:
493            return
494
495        final_tests = self.parser.end()
496        for test in final_tests:
497            self.__insert_test(test)
498        self._using_parser = False
499
500    # TODO crbug.com/285395 add a kwargs parameter.
501    def _make_namespace(self):
502        """Create a namespace dictionary to be passed along to control file.
503
504        Creates a namespace argument populated with standard values:
505        machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
506        and ssh_options.
507        """
508        namespace = {'machines' : self.machine_dict_list,
509                     'job' : self,
510                     'ssh_user' : self._ssh_user,
511                     'ssh_port' : self._ssh_port,
512                     'ssh_pass' : self._ssh_pass,
513                     'ssh_verbosity_flag' : self._ssh_verbosity_flag,
514                     'ssh_options' : self._ssh_options}
515        return namespace
516
517
518    def cleanup(self, labels):
519        """Cleanup machines.
520
521        @param labels: Comma separated job labels, will be used to
522                       determine special task actions.
523        """
524        if not self.machines:
525            raise error.AutoservError('No machines specified to cleanup')
526        if self.resultdir:
527            os.chdir(self.resultdir)
528
529        namespace = self._make_namespace()
530        namespace.update({'job_labels': labels, 'args': ''})
531        self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
532
533
534    def verify(self, labels):
535        """Verify machines are all ssh-able.
536
537        @param labels: Comma separated job labels, will be used to
538                       determine special task actions.
539        """
540        if not self.machines:
541            raise error.AutoservError('No machines specified to verify')
542        if self.resultdir:
543            os.chdir(self.resultdir)
544
545        namespace = self._make_namespace()
546        namespace.update({'job_labels': labels, 'args': ''})
547        self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
548
549
550    def reset(self, labels):
551        """Reset machines by first cleanup then verify each machine.
552
553        @param labels: Comma separated job labels, will be used to
554                       determine special task actions.
555        """
556        if not self.machines:
557            raise error.AutoservError('No machines specified to reset.')
558        if self.resultdir:
559            os.chdir(self.resultdir)
560
561        namespace = self._make_namespace()
562        namespace.update({'job_labels': labels, 'args': ''})
563        self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
564
565
566    def repair(self, labels):
567        """Repair machines.
568
569        @param labels: Comma separated job labels, will be used to
570                       determine special task actions.
571        """
572        if not self.machines:
573            raise error.AutoservError('No machines specified to repair')
574        if self.resultdir:
575            os.chdir(self.resultdir)
576
577        namespace = self._make_namespace()
578        namespace.update({'job_labels': labels, 'args': ''})
579        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
580
581
582    def provision(self, labels):
583        """
584        Provision all hosts to match |labels|.
585
586        @param labels: A comma seperated string of labels to provision the
587                       host to.
588
589        """
590        control = self._load_control_file(PROVISION_CONTROL_FILE)
591        self.run(control=control, job_labels=labels)
592
593
594    def precheck(self):
595        """
596        perform any additional checks in derived classes.
597        """
598        pass
599
600
601    def enable_external_logging(self):
602        """
603        Start or restart external logging mechanism.
604        """
605        pass
606
607
608    def disable_external_logging(self):
609        """
610        Pause or stop external logging mechanism.
611        """
612        pass
613
614
615    def use_external_logging(self):
616        """
617        Return True if external logging should be used.
618        """
619        return False
620
621
622    def _make_parallel_wrapper(self, function, machines, log):
623        """Wrap function as appropriate for calling by parallel_simple."""
624        # machines could be a list of dictionaries, e.g.,
625        # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
626        # The dictionary is generated in server_job.__init__, refer to
627        # variable machine_dict_list, then passed in with namespace, see method
628        # server_job._make_namespace.
629        # To compare the machinese to self.machines, which is a list of machine
630        # hostname, we need to convert machines back to a list of hostnames.
631        # Note that the order of hostnames doesn't matter, as is_forking will be
632        # True if there are more than one machine.
633        if (machines and isinstance(machines, list)
634            and isinstance(machines[0], dict)):
635            machines = [m['hostname'] for m in machines]
636        is_forking = not (len(machines) == 1 and self.machines == machines)
637        if self._parse_job and is_forking and log:
638            def wrapper(machine):
639                hostname = server_utils.get_hostname_from_machine(machine)
640                self._parse_job += "/" + hostname
641                self._using_parser = INCREMENTAL_TKO_PARSING
642                self.machines = [machine]
643                self.push_execution_context(hostname)
644                os.chdir(self.resultdir)
645                utils.write_keyval(self.resultdir, {"hostname": hostname})
646                self.init_parser()
647                result = function(machine)
648                self.cleanup_parser()
649                return result
650        elif len(machines) > 1 and log:
651            def wrapper(machine):
652                hostname = server_utils.get_hostname_from_machine(machine)
653                self.push_execution_context(hostname)
654                os.chdir(self.resultdir)
655                machine_data = {'hostname' : hostname,
656                                'status_version' : str(self._STATUS_VERSION)}
657                utils.write_keyval(self.resultdir, machine_data)
658                result = function(machine)
659                return result
660        else:
661            wrapper = function
662        return wrapper
663
664
665    def parallel_simple(self, function, machines, log=True, timeout=None,
666                        return_results=False):
667        """
668        Run 'function' using parallel_simple, with an extra wrapper to handle
669        the necessary setup for continuous parsing, if possible. If continuous
670        parsing is already properly initialized then this should just work.
671
672        @param function: A callable to run in parallel given each machine.
673        @param machines: A list of machine names to be passed one per subcommand
674                invocation of function.
675        @param log: If True, output will be written to output in a subdirectory
676                named after each machine.
677        @param timeout: Seconds after which the function call should timeout.
678        @param return_results: If True instead of an AutoServError being raised
679                on any error a list of the results|exceptions from the function
680                called on each arg is returned.  [default: False]
681
682        @raises error.AutotestError: If any of the functions failed.
683        """
684        wrapper = self._make_parallel_wrapper(function, machines, log)
685        return subcommand.parallel_simple(
686                wrapper, machines,
687                subdir_name_constructor=server_utils.get_hostname_from_machine,
688                log=log, timeout=timeout, return_results=return_results)
689
690
691    def parallel_on_machines(self, function, machines, timeout=None):
692        """
693        @param function: Called in parallel with one machine as its argument.
694        @param machines: A list of machines to call function(machine) on.
695        @param timeout: Seconds after which the function call should timeout.
696
697        @returns A list of machines on which function(machine) returned
698                without raising an exception.
699        """
700        results = self.parallel_simple(function, machines, timeout=timeout,
701                                       return_results=True)
702        success_machines = []
703        for result, machine in itertools.izip(results, machines):
704            if not isinstance(result, Exception):
705                success_machines.append(machine)
706        return success_machines
707
708
709    def record_skipped_test(self, skipped_test, message=None):
710        """Insert a failure record into status.log for this test."""
711        msg = message
712        if msg is None:
713            msg = 'No valid machines found for test %s.' % skipped_test
714        logging.info(msg)
715        self.record('START', None, skipped_test.test_name)
716        self.record('INFO', None, skipped_test.test_name, msg)
717        self.record('END TEST_NA', None, skipped_test.test_name, msg)
718
719
720    def _has_failed_tests(self):
721        """Parse status log for failed tests.
722
723        This checks the current working directory and is intended only for use
724        by the run() method.
725
726        @return boolean
727        """
728        path = os.getcwd()
729
730        # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
731        # make code reuse plausible.
732        job_keyval = tko_models.job.read_keyval(path)
733        status_version = job_keyval.get("status_version", 0)
734
735        # parse out the job
736        parser = parser_lib.parser(status_version)
737        job = parser.make_job(path)
738        status_log = os.path.join(path, "status.log")
739        if not os.path.exists(status_log):
740            status_log = os.path.join(path, "status")
741        if not os.path.exists(status_log):
742            logging.warning("! Unable to parse job, no status file")
743            return True
744
745        # parse the status logs
746        status_lines = open(status_log).readlines()
747        parser.start(job)
748        tests = parser.end(status_lines)
749
750        # parser.end can return the same object multiple times, so filter out
751        # dups
752        job.tests = []
753        already_added = set()
754        for test in tests:
755            if test not in already_added:
756                already_added.add(test)
757                job.tests.append(test)
758
759        failed = False
760        for test in job.tests:
761            # The current job is still running and shouldn't count as failed.
762            # The parser will fail to parse the exit status of the job since it
763            # hasn't exited yet (this running right now is the job).
764            failed = failed or (test.status != 'GOOD'
765                                and not _is_current_server_job(test))
766        return failed
767
768
769    def _collect_crashes(self, namespace, collect_crashinfo):
770        """Collect crashes.
771
772        @param namespace: namespace dict.
773        @param collect_crashinfo: whether to collect crashinfo in addition to
774                dumps
775        """
776        if collect_crashinfo:
777            # includes crashdumps
778            crash_control_file = CRASHINFO_CONTROL_FILE
779        else:
780            crash_control_file = CRASHDUMPS_CONTROL_FILE
781        self._execute_code(crash_control_file, namespace)
782
783
784    _USE_TEMP_DIR = object()
785    def run(self, install_before=False, install_after=False,
786            collect_crashdumps=True, namespace={}, control=None,
787            control_file_dir=None, verify_job_repo_url=False,
788            only_collect_crashinfo=False, skip_crash_collection=False,
789            job_labels='', use_packaging=True):
790        # for a normal job, make sure the uncollected logs file exists
791        # for a crashinfo-only run it should already exist, bail out otherwise
792        created_uncollected_logs = False
793        logging.info("I am PID %s", os.getpid())
794        if self.resultdir and not os.path.exists(self._uncollected_log_file):
795            if only_collect_crashinfo:
796                # if this is a crashinfo-only run, and there were no existing
797                # uncollected logs, just bail out early
798                logging.info("No existing uncollected logs, "
799                             "skipping crashinfo collection")
800                return
801            else:
802                log_file = open(self._uncollected_log_file, "w")
803                pickle.dump([], log_file)
804                log_file.close()
805                created_uncollected_logs = True
806
807        # use a copy so changes don't affect the original dictionary
808        namespace = namespace.copy()
809        machines = self.machines
810        if control is None:
811            if self.control is None:
812                control = ''
813            else:
814                control = self._load_control_file(self.control)
815        if control_file_dir is None:
816            control_file_dir = self.resultdir
817
818        self.aborted = False
819        namespace.update(self._make_namespace())
820        namespace.update({
821                'args': self.args,
822                'job_labels': job_labels,
823                'gtest_runner': site_gtest_runner.gtest_runner(),
824        })
825        test_start_time = int(time.time())
826
827        if self.resultdir:
828            os.chdir(self.resultdir)
829            # touch status.log so that the parser knows a job is running here
830            open(self.get_status_log_path(), 'a').close()
831            self.enable_external_logging()
832
833        collect_crashinfo = True
834        temp_control_file_dir = None
835        try:
836            try:
837                if not self.fast:
838                    with metrics.SecondsTimer(
839                            'chromeos/autotest/job/get_network_stats',
840                            fields = {'stage': 'start'}):
841                        namespace['network_stats_label'] = 'at-start'
842                        self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
843                                           namespace)
844
845                if install_before and machines:
846                    self._execute_code(INSTALL_CONTROL_FILE, namespace)
847
848                if only_collect_crashinfo:
849                    return
850
851                # If the verify_job_repo_url option is set but we're unable
852                # to actually verify that the job_repo_url contains the autotest
853                # package, this job will fail.
854                if verify_job_repo_url:
855                    self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
856                                       namespace)
857                else:
858                    logging.warning('Not checking if job_repo_url contains '
859                                    'autotest packages on %s', machines)
860
861                # determine the dir to write the control files to
862                cfd_specified = (control_file_dir
863                                 and control_file_dir is not self._USE_TEMP_DIR)
864                if cfd_specified:
865                    temp_control_file_dir = None
866                else:
867                    temp_control_file_dir = tempfile.mkdtemp(
868                        suffix='temp_control_file_dir')
869                    control_file_dir = temp_control_file_dir
870                server_control_file = os.path.join(control_file_dir,
871                                                   self._control_filename)
872                client_control_file = os.path.join(control_file_dir,
873                                                   CLIENT_CONTROL_FILENAME)
874                if self._client:
875                    namespace['control'] = control
876                    utils.open_write_close(client_control_file, control)
877                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
878                                    server_control_file)
879                else:
880                    utils.open_write_close(server_control_file, control)
881
882                logging.info("Processing control file")
883                namespace['use_packaging'] = use_packaging
884                self._execute_code(server_control_file, namespace)
885                logging.info("Finished processing control file")
886
887                # If no device error occured, no need to collect crashinfo.
888                collect_crashinfo = self.failed_with_device_error
889            except Exception as e:
890                try:
891                    # Add num_tests_failed if any extra exceptions are raised
892                    # outside _execute_code().
893                    self.num_tests_failed += 1
894                    logging.exception(
895                            'Exception escaped control file, job aborting:')
896                    reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
897                                    ' ', str(e))
898                    self.record('INFO', None, None, str(e),
899                                {'job_abort_reason': reason})
900                except:
901                    pass # don't let logging exceptions here interfere
902                raise
903        finally:
904            if temp_control_file_dir:
905                # Clean up temp directory used for copies of the control files
906                try:
907                    shutil.rmtree(temp_control_file_dir)
908                except Exception as e:
909                    logging.warning('Could not remove temp directory %s: %s',
910                                 temp_control_file_dir, e)
911
912            if machines and (collect_crashdumps or collect_crashinfo):
913                if skip_crash_collection or self.fast:
914                    logging.info('Skipping crash dump/info collection '
915                                 'as requested.')
916                else:
917                    with metrics.SecondsTimer(
918                            'chromeos/autotest/job/collect_crashinfo'):
919                        namespace['test_start_time'] = test_start_time
920                        # Remove crash files for passing tests.
921                        # TODO(ayatane): Tests that create crash files should be
922                        # reported.
923                        namespace['has_failed_tests'] = self._has_failed_tests()
924                        self._collect_crashes(namespace, collect_crashinfo)
925            self.disable_external_logging()
926            if self._uncollected_log_file and created_uncollected_logs:
927                os.remove(self._uncollected_log_file)
928            if install_after and machines:
929                self._execute_code(INSTALL_CONTROL_FILE, namespace)
930
931            if not self.fast:
932                with metrics.SecondsTimer(
933                        'chromeos/autotest/job/get_network_stats',
934                        fields = {'stage': 'end'}):
935                    namespace['network_stats_label'] = 'at-end'
936                    self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
937                                       namespace)
938
939
940    def run_test(self, url, *args, **dargs):
941        """
942        Summon a test object and run it.
943
944        tag
945                tag to add to testname
946        url
947                url of the test to run
948        """
949        if self._disable_sysinfo:
950            dargs['disable_sysinfo'] = True
951
952        group, testname = self.pkgmgr.get_package_name(url, 'test')
953        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
954        outputdir = self._make_test_outputdir(subdir)
955
956        def group_func():
957            try:
958                test.runtest(self, url, tag, args, dargs)
959            except error.TestBaseException as e:
960                self.record(e.exit_status, subdir, testname, str(e))
961                raise
962            except Exception as e:
963                info = str(e) + "\n" + traceback.format_exc()
964                self.record('FAIL', subdir, testname, info)
965                raise
966            else:
967                self.record('GOOD', subdir, testname, 'completed successfully')
968
969        try:
970            result = self._run_group(testname, subdir, group_func)
971        except error.TestBaseException as e:
972            return False
973        else:
974            return True
975
976
977    def _run_group(self, name, subdir, function, *args, **dargs):
978        """Underlying method for running something inside of a group."""
979        result, exc_info = None, None
980        try:
981            self.record('START', subdir, name)
982            result = function(*args, **dargs)
983        except error.TestBaseException as e:
984            self.record("END %s" % e.exit_status, subdir, name)
985            raise
986        except Exception as e:
987            err_msg = str(e) + '\n'
988            err_msg += traceback.format_exc()
989            self.record('END ABORT', subdir, name, err_msg)
990            raise error.JobError(name + ' failed\n' + traceback.format_exc())
991        else:
992            self.record('END GOOD', subdir, name)
993
994        return result
995
996
997    def run_group(self, function, *args, **dargs):
998        """\
999        @param function: subroutine to run
1000        @returns: (result, exc_info). When the call succeeds, result contains
1001                the return value of |function| and exc_info is None. If
1002                |function| raises an exception, exc_info contains the tuple
1003                returned by sys.exc_info(), and result is None.
1004        """
1005
1006        name = function.__name__
1007        # Allow the tag for the group to be specified.
1008        tag = dargs.pop('tag', None)
1009        if tag:
1010            name = tag
1011
1012        try:
1013            result = self._run_group(name, None, function, *args, **dargs)[0]
1014        except error.TestBaseException:
1015            return None, sys.exc_info()
1016        return result, None
1017
1018
1019    def run_op(self, op, op_func, get_kernel_func):
1020        """\
1021        A specialization of run_group meant specifically for handling
1022        management operation. Includes support for capturing the kernel version
1023        after the operation.
1024
1025        Args:
1026           op: name of the operation.
1027           op_func: a function that carries out the operation (reboot, suspend)
1028           get_kernel_func: a function that returns a string
1029                            representing the kernel version.
1030        """
1031        try:
1032            self.record('START', None, op)
1033            op_func()
1034        except Exception as e:
1035            err_msg = str(e) + '\n' + traceback.format_exc()
1036            self.record('END FAIL', None, op, err_msg)
1037            raise
1038        else:
1039            kernel = get_kernel_func()
1040            self.record('END GOOD', None, op,
1041                        optional_fields={"kernel": kernel})
1042
1043
1044    def run_control(self, path):
1045        """Execute a control file found at path (relative to the autotest
1046        path). Intended for executing a control file within a control file,
1047        not for running the top-level job control file."""
1048        path = os.path.join(self.autodir, path)
1049        control_file = self._load_control_file(path)
1050        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
1051
1052
1053    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1054        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1055                                   on_every_test)
1056
1057
1058    def add_sysinfo_logfile(self, file, on_every_test=False):
1059        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1060
1061
1062    def _add_sysinfo_loggable(self, loggable, on_every_test):
1063        if on_every_test:
1064            self.sysinfo.test_loggables.add(loggable)
1065        else:
1066            self.sysinfo.boot_loggables.add(loggable)
1067
1068
1069    def _read_warnings(self):
1070        """Poll all the warning loggers and extract any new warnings that have
1071        been logged. If the warnings belong to a category that is currently
1072        disabled, this method will discard them and they will no longer be
1073        retrievable.
1074
1075        Returns a list of (timestamp, message) tuples, where timestamp is an
1076        integer epoch timestamp."""
1077        warnings = []
1078        while True:
1079            # pull in a line of output from every logger that has
1080            # output ready to be read
1081            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
1082            closed_loggers = set()
1083            for logger in loggers:
1084                line = logger.readline()
1085                # record any broken pipes (aka line == empty)
1086                if len(line) == 0:
1087                    closed_loggers.add(logger)
1088                    continue
1089                # parse out the warning
1090                timestamp, msgtype, msg = line.split('\t', 2)
1091                timestamp = int(timestamp)
1092                # if the warning is valid, add it to the results
1093                if self.warning_manager.is_valid(timestamp, msgtype):
1094                    warnings.append((timestamp, msg.strip()))
1095
1096            # stop listening to loggers that are closed
1097            self.warning_loggers -= closed_loggers
1098
1099            # stop if none of the loggers have any output left
1100            if not loggers:
1101                break
1102
1103        # sort into timestamp order
1104        warnings.sort()
1105        return warnings
1106
1107
1108    def _unique_subdirectory(self, base_subdirectory_name):
1109        """Compute a unique results subdirectory based on the given name.
1110
1111        Appends base_subdirectory_name with a number as necessary to find a
1112        directory name that doesn't already exist.
1113        """
1114        subdirectory = base_subdirectory_name
1115        counter = 1
1116        while os.path.exists(os.path.join(self.resultdir, subdirectory)):
1117            subdirectory = base_subdirectory_name + '.' + str(counter)
1118            counter += 1
1119        return subdirectory
1120
1121
1122    def get_record_context(self):
1123        """Returns an object representing the current job.record context.
1124
1125        The object returned is an opaque object with a 0-arg restore method
1126        which can be called to restore the job.record context (i.e. indentation)
1127        to the current level. The intention is that it should be used when
1128        something external which generate job.record calls (e.g. an autotest
1129        client) can fail catastrophically and the server job record state
1130        needs to be reset to its original "known good" state.
1131
1132        @return: A context object with a 0-arg restore() method."""
1133        return self._indenter.get_context()
1134
1135
1136    def record_summary(self, status_code, test_name, reason='', attributes=None,
1137                       distinguishing_attributes=(), child_test_ids=None):
1138        """Record a summary test result.
1139
1140        @param status_code: status code string, see
1141                common_lib.log.is_valid_status()
1142        @param test_name: name of the test
1143        @param reason: (optional) string providing detailed reason for test
1144                outcome
1145        @param attributes: (optional) dict of string keyvals to associate with
1146                this result
1147        @param distinguishing_attributes: (optional) list of attribute names
1148                that should be used to distinguish identically-named test
1149                results.  These attributes should be present in the attributes
1150                parameter.  This is used to generate user-friendly subdirectory
1151                names.
1152        @param child_test_ids: (optional) list of test indices for test results
1153                used in generating this result.
1154        """
1155        subdirectory_name_parts = [test_name]
1156        for attribute in distinguishing_attributes:
1157            assert attributes
1158            assert attribute in attributes, '%s not in %s' % (attribute,
1159                                                              attributes)
1160            subdirectory_name_parts.append(attributes[attribute])
1161        base_subdirectory_name = '.'.join(subdirectory_name_parts)
1162
1163        subdirectory = self._unique_subdirectory(base_subdirectory_name)
1164        subdirectory_path = os.path.join(self.resultdir, subdirectory)
1165        os.mkdir(subdirectory_path)
1166
1167        self.record(status_code, subdirectory, test_name,
1168                    status=reason, optional_fields={'is_summary': True})
1169
1170        if attributes:
1171            utils.write_keyval(subdirectory_path, attributes)
1172
1173        if child_test_ids:
1174            ids_string = ','.join(str(test_id) for test_id in child_test_ids)
1175            summary_data = {'child_test_ids': ids_string}
1176            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
1177                               summary_data)
1178
1179
1180    def disable_warnings(self, warning_type):
1181        self.warning_manager.disable_warnings(warning_type)
1182        self.record("INFO", None, None,
1183                    "disabling %s warnings" % warning_type,
1184                    {"warnings.disable": warning_type})
1185
1186
1187    def enable_warnings(self, warning_type):
1188        self.warning_manager.enable_warnings(warning_type)
1189        self.record("INFO", None, None,
1190                    "enabling %s warnings" % warning_type,
1191                    {"warnings.enable": warning_type})
1192
1193
1194    def get_status_log_path(self, subdir=None):
1195        """Return the path to the job status log.
1196
1197        @param subdir - Optional paramter indicating that you want the path
1198            to a subdirectory status log.
1199
1200        @returns The path where the status log should be.
1201        """
1202        if self.resultdir:
1203            if subdir:
1204                return os.path.join(self.resultdir, subdir, "status.log")
1205            else:
1206                return os.path.join(self.resultdir, "status.log")
1207        else:
1208            return None
1209
1210
1211    def _update_uncollected_logs_list(self, update_func):
1212        """Updates the uncollected logs list in a multi-process safe manner.
1213
1214        @param update_func - a function that updates the list of uncollected
1215            logs. Should take one parameter, the list to be updated.
1216        """
1217        # Skip log collection if file _uncollected_log_file does not exist.
1218        if not (self._uncollected_log_file and
1219                os.path.exists(self._uncollected_log_file)):
1220            return
1221        if self._uncollected_log_file:
1222            log_file = open(self._uncollected_log_file, "r+")
1223            fcntl.flock(log_file, fcntl.LOCK_EX)
1224        try:
1225            uncollected_logs = pickle.load(log_file)
1226            update_func(uncollected_logs)
1227            log_file.seek(0)
1228            log_file.truncate()
1229            pickle.dump(uncollected_logs, log_file)
1230            log_file.flush()
1231        finally:
1232            fcntl.flock(log_file, fcntl.LOCK_UN)
1233            log_file.close()
1234
1235
1236    def add_client_log(self, hostname, remote_path, local_path):
1237        """Adds a new set of client logs to the list of uncollected logs,
1238        to allow for future log recovery.
1239
1240        @param host - the hostname of the machine holding the logs
1241        @param remote_path - the directory on the remote machine holding logs
1242        @param local_path - the local directory to copy the logs into
1243        """
1244        def update_func(logs_list):
1245            logs_list.append((hostname, remote_path, local_path))
1246        self._update_uncollected_logs_list(update_func)
1247
1248
1249    def remove_client_log(self, hostname, remote_path, local_path):
1250        """Removes a set of client logs from the list of uncollected logs,
1251        to allow for future log recovery.
1252
1253        @param host - the hostname of the machine holding the logs
1254        @param remote_path - the directory on the remote machine holding logs
1255        @param local_path - the local directory to copy the logs into
1256        """
1257        def update_func(logs_list):
1258            logs_list.remove((hostname, remote_path, local_path))
1259        self._update_uncollected_logs_list(update_func)
1260
1261
1262    def get_client_logs(self):
1263        """Retrieves the list of uncollected logs, if it exists.
1264
1265        @returns A list of (host, remote_path, local_path) tuples. Returns
1266                 an empty list if no uncollected logs file exists.
1267        """
1268        log_exists = (self._uncollected_log_file and
1269                      os.path.exists(self._uncollected_log_file))
1270        if log_exists:
1271            return pickle.load(open(self._uncollected_log_file))
1272        else:
1273            return []
1274
1275
1276    def _fill_server_control_namespace(self, namespace, protect=True):
1277        """
1278        Prepare a namespace to be used when executing server control files.
1279
1280        This sets up the control file API by importing modules and making them
1281        available under the appropriate names within namespace.
1282
1283        For use by _execute_code().
1284
1285        Args:
1286          namespace: The namespace dictionary to fill in.
1287          protect: Boolean.  If True (the default) any operation that would
1288              clobber an existing entry in namespace will cause an error.
1289        Raises:
1290          error.AutoservError: When a name would be clobbered by import.
1291        """
1292        def _import_names(module_name, names=()):
1293            """
1294            Import a module and assign named attributes into namespace.
1295
1296            Args:
1297                module_name: The string module name.
1298                names: A limiting list of names to import from module_name.  If
1299                    empty (the default), all names are imported from the module
1300                    similar to a "from foo.bar import *" statement.
1301            Raises:
1302                error.AutoservError: When a name being imported would clobber
1303                    a name already in namespace.
1304            """
1305            module = __import__(module_name, {}, {}, names)
1306
1307            # No names supplied?  Import * from the lowest level module.
1308            # (Ugh, why do I have to implement this part myself?)
1309            if not names:
1310                for submodule_name in module_name.split('.')[1:]:
1311                    module = getattr(module, submodule_name)
1312                if hasattr(module, '__all__'):
1313                    names = getattr(module, '__all__')
1314                else:
1315                    names = dir(module)
1316
1317            # Install each name into namespace, checking to make sure it
1318            # doesn't override anything that already exists.
1319            for name in names:
1320                # Check for conflicts to help prevent future problems.
1321                if name in namespace and protect:
1322                    if namespace[name] is not getattr(module, name):
1323                        raise error.AutoservError('importing name '
1324                                '%s from %s %r would override %r' %
1325                                (name, module_name, getattr(module, name),
1326                                 namespace[name]))
1327                    else:
1328                        # Encourage cleanliness and the use of __all__ for a
1329                        # more concrete API with less surprises on '*' imports.
1330                        warnings.warn('%s (%r) being imported from %s for use '
1331                                      'in server control files is not the '
1332                                      'first occurrence of that import.' %
1333                                      (name, namespace[name], module_name))
1334
1335                namespace[name] = getattr(module, name)
1336
1337
1338        # This is the equivalent of prepending a bunch of import statements to
1339        # the front of the control script.
1340        namespace.update(os=os, sys=sys, logging=logging)
1341        _import_names('autotest_lib.server',
1342                ('hosts', 'autotest', 'standalone_profiler'))
1343        _import_names('autotest_lib.server.subcommand',
1344                      ('parallel', 'parallel_simple', 'subcommand'))
1345        _import_names('autotest_lib.server.utils',
1346                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1347        _import_names('autotest_lib.client.common_lib.error')
1348        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1349
1350        # Inject ourself as the job object into other classes within the API.
1351        # (Yuck, this injection is a gross thing be part of a public API. -gps)
1352        #
1353        # XXX Autotest does not appear to use .job.  Who does?
1354        namespace['autotest'].Autotest.job = self
1355        # server.hosts.base_classes.Host uses .job.
1356        namespace['hosts'].Host.job = self
1357        namespace['hosts'].TestBed.job = self
1358        namespace['hosts'].factory.ssh_user = self._ssh_user
1359        namespace['hosts'].factory.ssh_port = self._ssh_port
1360        namespace['hosts'].factory.ssh_pass = self._ssh_pass
1361        namespace['hosts'].factory.ssh_verbosity_flag = (
1362                self._ssh_verbosity_flag)
1363        namespace['hosts'].factory.ssh_options = self._ssh_options
1364
1365
1366    def _execute_code(self, code_file, namespace, protect=True):
1367        """
1368        Execute code using a copy of namespace as a server control script.
1369
1370        Unless protect_namespace is explicitly set to False, the dict will not
1371        be modified.
1372
1373        Args:
1374          code_file: The filename of the control file to execute.
1375          namespace: A dict containing names to make available during execution.
1376          protect: Boolean.  If True (the default) a copy of the namespace dict
1377              is used during execution to prevent the code from modifying its
1378              contents outside of this function.  If False the raw dict is
1379              passed in and modifications will be allowed.
1380        """
1381        if protect:
1382            namespace = namespace.copy()
1383        self._fill_server_control_namespace(namespace, protect=protect)
1384        # TODO: Simplify and get rid of the special cases for only 1 machine.
1385        if len(self.machines) > 1:
1386            machines_text = '\n'.join(self.machines) + '\n'
1387            # Only rewrite the file if it does not match our machine list.
1388            try:
1389                machines_f = open(MACHINES_FILENAME, 'r')
1390                existing_machines_text = machines_f.read()
1391                machines_f.close()
1392            except EnvironmentError:
1393                existing_machines_text = None
1394            if machines_text != existing_machines_text:
1395                utils.open_write_close(MACHINES_FILENAME, machines_text)
1396        execfile(code_file, namespace, namespace)
1397
1398
1399    def _parse_status(self, new_line):
1400        if self.fast and not self._using_parser:
1401            logging.info('Parsing lines in fast mode')
1402            new_tests = self.parser.process_lines([new_line])
1403            for test in new_tests:
1404                if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1405                    self.num_tests_failed += 1
1406
1407        if not self._using_parser:
1408            return
1409
1410        new_tests = self.parser.process_lines([new_line])
1411        for test in new_tests:
1412            self.__insert_test(test)
1413
1414
1415    def __insert_test(self, test):
1416        """
1417        An internal method to insert a new test result into the
1418        database. This method will not raise an exception, even if an
1419        error occurs during the insert, to avoid failing a test
1420        simply because of unexpected database issues."""
1421        self.num_tests_run += 1
1422        if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1423            self.num_tests_failed += 1
1424        try:
1425            self.results_db.insert_test(self.job_model, test)
1426        except Exception:
1427            msg = ("WARNING: An unexpected error occured while "
1428                   "inserting test results into the database. "
1429                   "Ignoring error.\n" + traceback.format_exc())
1430            print >> sys.stderr, msg
1431
1432
1433    def preprocess_client_state(self):
1434        """
1435        Produce a state file for initializing the state of a client job.
1436
1437        Creates a new client state file with all the current server state, as
1438        well as some pre-set client state.
1439
1440        @returns The path of the file the state was written into.
1441        """
1442        # initialize the sysinfo state
1443        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1444
1445        # dump the state out to a tempfile
1446        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1447        os.close(fd)
1448
1449        # write_to_file doesn't need locking, we exclusively own file_path
1450        self._state.write_to_file(file_path)
1451        return file_path
1452
1453
1454    def postprocess_client_state(self, state_path):
1455        """
1456        Update the state of this job with the state from a client job.
1457
1458        Updates the state of the server side of a job with the final state
1459        of a client job that was run. Updates the non-client-specific state,
1460        pulls in some specific bits from the client-specific state, and then
1461        discards the rest. Removes the state file afterwards
1462
1463        @param state_file A path to the state file from the client.
1464        """
1465        # update the on-disk state
1466        try:
1467            self._state.read_from_file(state_path)
1468            os.remove(state_path)
1469        except OSError, e:
1470            # ignore file-not-found errors
1471            if e.errno != errno.ENOENT:
1472                raise
1473            else:
1474                logging.debug('Client state file %s not found', state_path)
1475
1476        # update the sysinfo state
1477        if self._state.has('client', 'sysinfo'):
1478            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1479
1480        # drop all the client-specific state
1481        self._state.discard_namespace('client')
1482
1483
1484    def clear_all_known_hosts(self):
1485        """Clears known hosts files for all AbstractSSHHosts."""
1486        for host in self.hosts:
1487            if isinstance(host, abstract_ssh.AbstractSSHHost):
1488                host.clear_known_hosts()
1489
1490
1491    def close(self):
1492        """Closes this job's operation."""
1493
1494        # Use shallow copy, because host.close() internally discards itself.
1495        for host in list(self.hosts):
1496            host.close()
1497        assert not self.hosts
1498        self._connection_pool.shutdown()
1499
1500
1501    def _get_job_data(self):
1502        """Add custom data to the job keyval info.
1503
1504        When multiple machines are used in a job, change the hostname to
1505        the platform of the first machine instead of machine1,machine2,...  This
1506        makes the job reports easier to read and keeps the tko_machines table from
1507        growing too large.
1508
1509        Returns:
1510            keyval dictionary with new hostname value, or empty dictionary.
1511        """
1512        job_data = {}
1513        # Only modify hostname on multimachine jobs. Assume all host have the same
1514        # platform.
1515        if len(self.machines) > 1:
1516            # Search through machines for first machine with a platform.
1517            for host in self.machines:
1518                keyval_path = os.path.join(self.resultdir, 'host_keyvals', host)
1519                keyvals = utils.read_keyval(keyval_path)
1520                host_plat = keyvals.get('platform', None)
1521                if not host_plat:
1522                    continue
1523                job_data['hostname'] = host_plat
1524                break
1525        return job_data
1526
1527
1528class warning_manager(object):
1529    """Class for controlling warning logs. Manages the enabling and disabling
1530    of warnings."""
1531    def __init__(self):
1532        # a map of warning types to a list of disabled time intervals
1533        self.disabled_warnings = {}
1534
1535
1536    def is_valid(self, timestamp, warning_type):
1537        """Indicates if a warning (based on the time it occured and its type)
1538        is a valid warning. A warning is considered "invalid" if this type of
1539        warning was marked as "disabled" at the time the warning occured."""
1540        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1541        for start, end in disabled_intervals:
1542            if timestamp >= start and (end is None or timestamp < end):
1543                return False
1544        return True
1545
1546
1547    def disable_warnings(self, warning_type, current_time_func=time.time):
1548        """As of now, disables all further warnings of this type."""
1549        intervals = self.disabled_warnings.setdefault(warning_type, [])
1550        if not intervals or intervals[-1][1] is not None:
1551            intervals.append((int(current_time_func()), None))
1552
1553
1554    def enable_warnings(self, warning_type, current_time_func=time.time):
1555        """As of now, enables all further warnings of this type."""
1556        intervals = self.disabled_warnings.get(warning_type, [])
1557        if intervals and intervals[-1][1] is None:
1558            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1559
1560
1561def _is_current_server_job(test):
1562    """Return True if parsed test is the currently running job.
1563
1564    @param test: test instance from tko parser.
1565    """
1566    return test.testname == 'SERVER_JOB'
1567
1568
1569def _create_afe_host(hostname):
1570    """Create an afe_host object backed by the AFE.
1571
1572    @param hostname: Name of the host for which we want the Host object.
1573    @returns: An object of type frontend.AFE
1574    """
1575    afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
1576    hosts = afe.get_hosts(hostname=hostname)
1577    if not hosts:
1578        raise error.AutoservError('No hosts named %s found' % hostname)
1579
1580    return hosts[0]
1581
1582
1583def _create_host_info_store(hostname, workdir):
1584    """Create a CachingHostInfo store backed by the AFE.
1585
1586    @param hostname: Name of the host for which we want the store.
1587    @param workdir: A directory where any on-disk files related to the machines
1588            may be created.
1589    @returns: An object of type shadowing_store.ShadowingStore
1590    """
1591    primary_store = afe_store.AfeStore(hostname)
1592    try:
1593        primary_store.get(force_refresh=True)
1594    except host_info.StoreError:
1595        raise error.AutoservError('Could not obtain HostInfo for hostname %s' %
1596                                  hostname)
1597    backing_file_path = _file_store_unique_file_path(workdir)
1598    logging.info('Shadowing AFE store with a FileStore at %s',
1599                 backing_file_path)
1600    shadow_store = file_store.FileStore(backing_file_path)
1601    return shadowing_store.ShadowingStore(primary_store, shadow_store)
1602
1603
1604def _file_store_unique_file_path(workdir):
1605    """Returns a unique filepath for the on-disk FileStore.
1606
1607    Also makes sure that the workdir exists.
1608
1609    @param: Top level working directory.
1610    """
1611    store_dir = os.path.join(workdir, 'host_info_store')
1612    _make_dirs_if_needed(store_dir)
1613    file_path = os.path.join(store_dir, 'store_%s' % uuid.uuid4())
1614    return file_path
1615
1616
1617def _make_dirs_if_needed(path):
1618    """os.makedirs, but ignores failure because the leaf directory exists"""
1619    try:
1620        os.makedirs(path)
1621    except OSError as e:
1622        if e.errno != errno.EEXIST:
1623            raise
1624