1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for swarming execution."""
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11import json
12import logging
13import operator
14import os
15import urllib
16import uuid
17
18from lucifer import autotest
19from skylab_suite import errors
20
21
22DEFAULT_SERVICE_ACCOUNT = (
23        '/creds/skylab_swarming_bot/skylab_bot_service_account.json')
24SKYLAB_DRONE_POOL = 'ChromeOSSkylab'
25SKYLAB_SUITE_POOL = 'ChromeOSSkylab-suite'
26
27TASK_COMPLETED = 'COMPLETED'
28TASK_COMPLETED_SUCCESS = 'COMPLETED (SUCCESS)'
29TASK_COMPLETED_FAILURE = 'COMPLETED (FAILURE)'
30TASK_EXPIRED = 'EXPIRED'
31TASK_CANCELED = 'CANCELED'
32TASK_TIMEDOUT = 'TIMED_OUT'
33TASK_RUNNING = 'RUNNING'
34TASK_PENDING = 'PENDING'
35TASK_BOT_DIED = 'BOT_DIED'
36TASK_NO_RESOURCE = 'NO_RESOURCE'
37TASK_KILLED = 'KILLED'
38TASK_FINISHED_STATUS = [TASK_COMPLETED,
39                        TASK_EXPIRED,
40                        TASK_CANCELED,
41                        TASK_TIMEDOUT,
42                        TASK_BOT_DIED,
43                        TASK_NO_RESOURCE,
44                        TASK_KILLED]
45# The swarming task failure status to retry. TASK_CANCELED won't get
46# retried since it's intentionally aborted.
47TASK_STATUS_TO_RETRY = [TASK_EXPIRED, TASK_TIMEDOUT, TASK_BOT_DIED,
48                        TASK_NO_RESOURCE]
49
50DEFAULT_EXPIRATION_SECS = 10 * 60
51DEFAULT_TIMEOUT_SECS = 60 * 60
52
53# A mapping of priorities for skylab hwtest tasks. In swarming,
54# lower number means high priorities. Priority lower than 48 will
55# be special tasks. The upper bound for priority is 255.
56# Use the same priorities mapping as chromite/lib/constants.py
57SKYLAB_HWTEST_PRIORITIES_MAP = {
58    'Weekly': 230,
59    'CTS': 215,
60    'Daily': 200,
61    'PostBuild': 170,
62    'Default': 140,
63    'Build': 110,
64    'PFQ': 80,
65    'CQ': 50,
66    'Super': 49,
67}
68SORTED_SKYLAB_HWTEST_PRIORITY = sorted(
69        SKYLAB_HWTEST_PRIORITIES_MAP.items(),
70        key=operator.itemgetter(1))
71
72SWARMING_DUT_READY_STATUS = 'ready'
73
74_STAINLESS_LOGS_BROWSER_URL_TEMPLATE = (
75        "https://stainless.corp.google.com"
76        "/browse/chromeos-autotest-results/swarming-%(request_id)s/"
77)
78
79def _get_client_path():
80    return os.path.join(
81            os.path.expanduser('~'),
82            'chromiumos/chromite/third_party/swarming.client/swarming.py')
83
84
85def task_dependencies_from_labels(labels):
86    """Parse dependencies from autotest labels.
87
88    @param labels: A list of label string.
89
90    @return a dict [key: value] to represent dependencies.
91    """
92    translation_autotest = autotest.deps_load(
93            'skylab_inventory.translation.autotest')
94    translation_swarming = autotest.deps_load(
95            'skylab_inventory.translation.swarming')
96    dimensions = translation_swarming.labels_to_dimensions(
97            translation_autotest.from_autotest_labels(labels))
98    dependencies = {}
99    for k, v in dimensions.iteritems():
100      if isinstance(v, list):
101        if len(v) > 1:
102          raise ValueError(
103              'Invalid dependencies: Multiple value %r for key %s' % (k, v))
104
105        dependencies[k] = v[0]
106
107    return dependencies
108
109
110def make_logdog_annotation_url():
111    """Return a unique LogDog annotation URL.
112
113    If the appropriate LogDog server cannot be determined, return an
114    empty string.
115    """
116    logdog_server = get_logdog_server()
117    if not logdog_server:
118        return ''
119    return ('logdog://%s/chromeos/skylab/%s/+/annotations'
120            % (logdog_server, uuid.uuid4().hex))
121
122
123def get_swarming_server():
124    """Return the swarming server for the current environment."""
125    try:
126        return os.environ['SWARMING_SERVER']
127    except KeyError:
128        raise errors.DroneEnvironmentError(
129                'SWARMING_SERVER environment variable not set'
130        )
131
132
133def get_logdog_server():
134    """Return the LogDog server for the current environment.
135
136    If the appropriate server cannot be determined, return an empty
137    string.
138    """
139    try:
140        return os.environ['LOGDOG_SERVER']
141    except KeyError:
142        raise errors.DroneEnvironmentError(
143                'LOGDOG_SERVER environment variable not set'
144        )
145
146
147def _namedtuple_to_dict(value):
148    """Recursively converts a namedtuple to a dict.
149
150    Args:
151      value: a namedtuple object.
152
153    Returns:
154      A dict object with the same value.
155    """
156    out = dict(value._asdict())
157    for k, v in out.iteritems():
158      if hasattr(v, '_asdict'):
159        out[k] = _namedtuple_to_dict(v)
160      elif isinstance(v, (list, tuple)):
161        l = []
162        for elem in v:
163          if hasattr(elem, '_asdict'):
164            l.append(_namedtuple_to_dict(elem))
165          else:
166            l.append(elem)
167        out[k] = l
168
169    return out
170
171
172def get_task_link(task_id):
173    return '%s/user/task/%s' % (os.environ.get('SWARMING_SERVER'), task_id)
174
175
176def get_stainless_logs_link(request_id):
177    """Gets a link to the stainless logs for a given task ID."""
178    return _STAINLESS_LOGS_BROWSER_URL_TEMPLATE % {
179            'request_id': request_id,
180    }
181
182def get_task_final_state(task):
183    """Get the final state of a swarming task.
184
185    @param task: the json output of a swarming task fetched by API tasks.list.
186    """
187    state = task['state']
188    if state == TASK_COMPLETED:
189        state = (TASK_COMPLETED_FAILURE if task['failure'] else
190                 TASK_COMPLETED_SUCCESS)
191
192    return state
193
194
195def get_task_dut_name(task_dimensions):
196    """Get the DUT name of running this task.
197
198    @param task_dimensions: a list of dict, e.g. [{'key': k, 'value': v}, ...]
199    """
200    for dimension in task_dimensions:
201        if dimension['key'] == 'dut_name':
202            return dimension['value'][0]
203
204    return ''
205
206def bot_available(bot):
207    """Check whether a bot is available.
208
209    @param bot: A dict describes a bot's dimensions, i.e. an element in return
210        list of |query_bots_list|.
211
212    @return True if a bot is available to run task, otherwise False.
213    """
214    return not (bot['is_dead'] or bot['quarantined'])
215
216
217class Client(object):
218    """Wrapper for interacting with swarming client."""
219
220    # TODO(akeshet): Drop auth_json_path argument and use the same
221    # SWARMING_CREDS envvar that is used to select creds for skylab tool.
222    def __init__(self, auth_json_path=DEFAULT_SERVICE_ACCOUNT):
223        self._auth_json_path = auth_json_path
224
225    def query_task_by_tags(self, tags):
226        """Get tasks for given tags.
227
228        @param tags: A dict of tags for swarming tasks.
229
230        @return a list, which contains all tasks queried by the given tags.
231        """
232        basic_swarming_cmd = self.get_basic_swarming_cmd('query')
233        conditions = [('tags', '%s:%s' % (k, v)) for k, v in tags.iteritems()]
234        swarming_cmd = basic_swarming_cmd + ['tasks/list?%s' %
235                                            urllib.urlencode(conditions)]
236        cros_build_lib = autotest.chromite_load('cros_build_lib')
237        result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True)
238        json_output = json.loads(result.output)
239        return json_output.get('items', [])
240
241    def query_task_by_id(self, task_id):
242        """Get task for given id.
243
244        @param task_id: A string to indicate a swarming task id.
245
246        @return a dict, which contains the task with the given task_id.
247        """
248        basic_swarming_cmd = self.get_basic_swarming_cmd('query')
249        swarming_cmd = basic_swarming_cmd + ['task/%s/result' % task_id]
250        cros_build_lib = autotest.chromite_load('cros_build_lib')
251        result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True)
252        return json.loads(result.output)
253
254    def abort_task(self, task_id):
255        """Abort a swarming task by its id.
256
257        @param task_id: A string swarming task id.
258        """
259        basic_swarming_cmd = self.get_basic_swarming_cmd('cancel')
260        swarming_cmd = basic_swarming_cmd + ['--kill-running', task_id]
261        cros_build_lib = autotest.chromite_load('cros_build_lib')
262        try:
263            cros_build_lib.RunCommand(swarming_cmd, log_output=True)
264        except cros_build_lib.RunCommandError:
265            logging.error('Task %s probably already gone, skip canceling it.',
266                          task_id)
267
268    def query_bots_list(self, dimensions):
269        """Get bots list for given requirements.
270
271        @param dimensions: A dict of dimensions for swarming bots.
272
273        @return a list of bot dicts.
274        """
275        basic_swarming_cmd = self.get_basic_swarming_cmd('query')
276        conditions = [('dimensions', '%s:%s' % (k, v))
277                      for k, v in dimensions.iteritems()]
278        swarming_cmd = basic_swarming_cmd + ['bots/list?%s' %
279                                            urllib.urlencode(conditions)]
280        cros_build_lib = autotest.chromite_load('cros_build_lib')
281        result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True)
282        return json.loads(result.output).get('items', [])
283
284    def get_child_tasks(self, parent_task_id):
285        """Get the child tasks based on a parent swarming task id.
286
287        @param parent_task_id: The parent swarming task id.
288
289        @return a list of dicts, each dict refers to the whole stats of a task,
290            keys include 'name', 'bot_dimensions', 'tags', 'bot_id', 'state',
291            etc.
292        """
293        swarming_cmd = self.get_basic_swarming_cmd('query')
294        swarming_cmd += ['tasks/list?tags=parent_task_id:%s' % parent_task_id]
295        timeout_util = autotest.chromite_load('timeout_util')
296        cros_build_lib = autotest.chromite_load('cros_build_lib')
297        with timeout_util.Timeout(60):
298            child_tasks = cros_build_lib.RunCommand(
299                    swarming_cmd, capture_output=True)
300            return json.loads(child_tasks.output)['items']
301
302    def get_basic_swarming_cmd(self, command):
303        cmd = [_get_client_path(), command, '--swarming', get_swarming_server()]
304        if self._auth_json_path:
305            cmd += ['--auth-service-account-json', self._auth_json_path]
306        return cmd
307
308