1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Utility classes used by site_server_job.distribute_across_machines().
6
7test_item: extends the basic test tuple to add include/exclude attributes and
8    pre/post actions.
9
10machine_worker: is a thread that manages running tests on a host.  It
11    verifies test are valid for a host using the test attributes from test_item
12    and the host attributes from host_attributes.
13"""
14
15
16import logging, os, Queue
17from autotest_lib.client.common_lib import error, utils
18from autotest_lib.server import autotest, hosts, host_attributes
19
20
21class test_item(object):
22    """Adds machine verification logic to the basic test tuple.
23
24    Tests can either be tuples of the existing form ('testName', {args}) or the
25    extended form ('testname', {args}, {'include': [], 'exclude': [],
26    'attributes': []}) where include and exclude are lists of host attribute
27    labels and attributes is a list of strings. A machine must have all the
28    labels in include and must not have any of the labels in exclude to be valid
29    for the test. Attributes strings can include reboot_before, reboot_after,
30    and server_job.
31    """
32
33    def __init__(self, test_name, test_args, test_attribs=None):
34        """Creates an instance of test_item.
35
36        Args:
37            test_name: string, name of test to execute.
38            test_args: dictionary, arguments to pass into test.
39            test_attribs: Dictionary of test attributes. Valid keys are:
40              include - labels a machine must have to run a test.
41              exclude - labels preventing a machine from running a test.
42              attributes - reboot before/after test, run test as server job.
43        """
44        self.test_name = test_name
45        self.test_args = test_args
46        self.tagged_test_name = test_name
47        if test_args.get('tag'):
48            self.tagged_test_name = test_name + '.' + test_args.get('tag')
49
50        if test_attribs is None:
51            test_attribs = {}
52        self.inc_set = set(test_attribs.get('include', []))
53        self.exc_set = set(test_attribs.get('exclude', []))
54        self.attributes = test_attribs.get('attributes', [])
55
56    def __str__(self):
57        """Return an info string of this test."""
58        params = ['%s=%s' % (k, v) for k, v in self.test_args.items()]
59        msg = '%s(%s)' % (self.test_name, params)
60        if self.inc_set:
61            msg += ' include=%s' % [s for s in self.inc_set]
62        if self.exc_set:
63            msg += ' exclude=%s' % [s for s in self.exc_set]
64        if self.attributes:
65            msg += ' attributes=%s' % self.attributes
66        return msg
67
68    def validate(self, machine_attributes):
69        """Check if this test can run on machine with machine_attributes.
70
71        If the test has include attributes, a candidate machine must have all
72        the attributes to be valid.
73
74        If the test has exclude attributes, a candidate machine cannot have any
75        of the attributes to be valid.
76
77        Args:
78            machine_attributes: set, True attributes of candidate machine.
79
80        Returns:
81            True/False if the machine is valid for this test.
82        """
83        if self.inc_set is not None:
84            if not self.inc_set <= machine_attributes:
85                return False
86        if self.exc_set is not None:
87            if self.exc_set & machine_attributes:
88                return False
89        return True
90
91    def run_test(self, client_at, work_dir='.', server_job=None):
92        """Runs the test on the client using autotest.
93
94        Args:
95            client_at: Autotest instance for this host.
96            work_dir: Directory to use for results and log files.
97            server_job: Server_Job instance to use to runs server tests.
98        """
99        if 'reboot_before' in self.attributes:
100            client_at.host.reboot()
101
102        try:
103            if 'server_job' in self.attributes:
104                if 'host' in self.test_args:
105                    self.test_args['host'] = client_at.host
106                if server_job is not None:
107                    logging.info('Running Server_Job=%s', self.test_name)
108                    server_job.run_test(self.test_name, **self.test_args)
109                else:
110                    logging.error('No Server_Job instance provided for test '
111                                  '%s.', self.test_name)
112            else:
113                client_at.run_test(self.test_name, results_dir=work_dir,
114                                   **self.test_args)
115        finally:
116            if 'reboot_after' in self.attributes:
117                client_at.host.reboot()
118
119
120class machine_worker(object):
121    """Worker that runs tests on a remote host machine."""
122
123    def __init__(self, server_job, machine, work_dir, test_queue, queue_lock,
124                 continuous_parsing=False):
125        """Creates an instance of machine_worker to run tests on a remote host.
126
127        Retrieves that host attributes for this machine and creates the set of
128        True attributes to validate against test include/exclude attributes.
129
130        Creates a directory to hold the log files for tests run and writes the
131        hostname and tko parser version into keyvals file.
132
133        Args:
134            server_job: run tests for this server_job.
135            machine: name of remote host.
136            work_dir: directory server job is using.
137            test_queue: queue of tests.
138            queue_lock: lock protecting test_queue.
139            continuous_parsing: bool, enable continuous parsing.
140        """
141        self._server_job = server_job
142        self._test_queue = test_queue
143        self._test_queue_lock = queue_lock
144        self._continuous_parsing = continuous_parsing
145        self._tests_run = 0
146        self._machine = machine
147        self._host = hosts.create_host(self._machine)
148        self._client_at = autotest.Autotest(self._host)
149        client_attributes = host_attributes.host_attributes(machine)
150        self.attribute_set = set(client_attributes.get_attributes())
151        self._results_dir = work_dir
152        if not os.path.exists(self._results_dir):
153            os.makedirs(self._results_dir)
154        machine_data = {'hostname': self._machine,
155                        'status_version': str(1)}
156        utils.write_keyval(self._results_dir, machine_data)
157
158    def __str__(self):
159        attributes = [a for a in self.attribute_set]
160        return '%s attributes=%s' % (self._machine, attributes)
161
162    def get_test(self):
163        """Return a test from the queue to run on this host.
164
165        The test queue can be non-empty, but still not contain a test that is
166        valid for this machine. This function will take exclusive access to
167        the queue via _test_queue_lock and repeatedly pop tests off the queue
168        until finding a valid test or depleting the queue.  In either case if
169        invalid tests have been popped from the queue, they are pushed back
170        onto the queue before returning.
171
172        Returns:
173            test_item, or None if no more tests exist for this machine.
174        """
175        good_test = None
176        skipped_tests = []
177
178        with self._test_queue_lock:
179            while True:
180                try:
181                    canidate_test = self._test_queue.get_nowait()
182                    # Check if test is valid for this machine.
183                    if canidate_test.validate(self.attribute_set):
184                        good_test = canidate_test
185                        break
186                    skipped_tests.append(canidate_test)
187
188                except Queue.Empty:
189                    break
190
191            # Return any skipped tests to the queue.
192            for st in skipped_tests:
193                self._test_queue.put(st)
194
195        return good_test
196
197    def run(self):
198        """Executes tests on the host machine.
199
200        If continuous parsing was requested, start the parser before running
201        tests.
202        """
203        # Modify job.resultdir so that it points to the results directory for
204        # the machine we're working on. Required so that server jobs will write
205        # to the proper location.
206        self._server_job.machines = [self._machine]
207        self._server_job.push_execution_context(self._machine['hostname'])
208        os.chdir(self._server_job.resultdir)
209        if self._continuous_parsing:
210            self._server_job._parse_job += "/" + self._machine['hostname']
211            self._server_job._using_parser = True
212            self._server_job.init_parser()
213
214        while True:
215            active_test = self.get_test()
216            if active_test is None:
217                break
218
219            logging.info('%s running %s', self._machine, active_test)
220            try:
221                active_test.run_test(self._client_at, self._results_dir,
222                                     self._server_job)
223            except error.AutoservError:
224                logging.exception('Autoserv error running "%s".', active_test)
225            except error.AutotestError:
226                logging.exception('Autotest error running  "%s".', active_test)
227            except Exception:
228                logging.exception('Exception running test "%s".', active_test)
229                raise
230            finally:
231                self._test_queue.task_done()
232                self._tests_run += 1
233
234        if self._continuous_parsing:
235            self._server_job.cleanup_parser()
236        logging.info('%s completed %d tests.', self._machine, self._tests_run)
237