1# pylint: disable=missing-docstring
2
3"""\
4Functions to expose over the RPC interface.
5
6For all modify* and delete* functions that ask for an 'id' parameter to
7identify the object to operate on, the id may be either
8 * the database row ID
9 * the name of the object (label name, hostname, user login, etc.)
10 * a dictionary containing uniquely identifying field (this option should seldom
11   be used)
12
13When specifying foreign key fields (i.e. adding hosts to a label, or adding
14users to an ACL group), the given value may be either the database row ID or the
15name of the object.
16
17All get* functions return lists of dictionaries.  Each dictionary represents one
18object and maps field names to values.
19
20Some examples:
21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
23modify_test('sleeptest', test_type='Client', params=', seconds=60')
24delete_acl_group(1) # delete by ID
25delete_acl_group('Everyone') # delete by name
26acl_group_add_users('Everyone', ['mbligh', 'showard'])
27get_jobs(owner='showard', status='Queued')
28
29See doctests/001_rpc_test.txt for (lots) more examples.
30"""
31
32__author__ = 'showard@google.com (Steve Howard)'
33
34import ast
35import collections
36import contextlib
37import datetime
38import logging
39import os
40import sys
41import warnings
42
43from django.db import connection as db_connection
44from django.db import transaction
45from django.db.models import Count
46from django.db.utils import DatabaseError
47
48import common
49from autotest_lib.client.common_lib import control_data
50from autotest_lib.client.common_lib import error
51from autotest_lib.client.common_lib import global_config
52from autotest_lib.client.common_lib import priorities
53from autotest_lib.client.common_lib.cros import dev_server
54from autotest_lib.frontend.afe import control_file as control_file_lib
55from autotest_lib.frontend.afe import model_attributes
56from autotest_lib.frontend.afe import model_logic
57from autotest_lib.frontend.afe import models
58from autotest_lib.frontend.afe import rpc_utils
59from autotest_lib.frontend.tko import models as tko_models
60from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
61from autotest_lib.server import frontend
62from autotest_lib.server import utils
63from autotest_lib.server.cros import provision
64from autotest_lib.server.cros.dynamic_suite import constants
65from autotest_lib.server.cros.dynamic_suite import control_file_getter
66from autotest_lib.server.cros.dynamic_suite import suite_common
67from autotest_lib.server.cros.dynamic_suite import tools
68from autotest_lib.server.cros.dynamic_suite.suite import Suite
69from autotest_lib.server.lib import status_history
70from autotest_lib.site_utils import job_history
71from autotest_lib.site_utils import server_manager_utils
72from autotest_lib.site_utils import stable_version_utils
73
74
75_CONFIG = global_config.global_config
76
77# Definition of LabHealthIndicator
78LabHealthIndicator = collections.namedtuple(
79        'LabHealthIndicator',
80        [
81                'if_lab_close',
82                'available_duts',
83                'devserver_health',
84                'upcoming_builds',
85        ]
86)
87
88RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
89        'SKYLAB', 'respect_static_labels', type=bool, default=False)
90
91RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value(
92        'SKYLAB', 'respect_static_attributes', type=bool, default=False)
93
94# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
95
96# labels
97
98def modify_label(id, **data):
99    """Modify a label.
100
101    @param id: id or name of a label. More often a label name.
102    @param data: New data for a label.
103    """
104    label_model = models.Label.smart_get(id)
105    if label_model.is_replaced_by_static():
106        raise error.UnmodifiableLabelException(
107                'Failed to delete label "%s" because it is a static label. '
108                'Use go/chromeos-skylab-inventory-tools to modify this '
109                'label.' % label_model.name)
110
111    label_model.update_object(data)
112
113    # Master forwards the RPC to shards
114    if not utils.is_shard():
115        rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
116                             id=id, **data)
117
118
119def delete_label(id):
120    """Delete a label.
121
122    @param id: id or name of a label. More often a label name.
123    """
124    label_model = models.Label.smart_get(id)
125    if label_model.is_replaced_by_static():
126        raise error.UnmodifiableLabelException(
127                'Failed to delete label "%s" because it is a static label. '
128                'Use go/chromeos-skylab-inventory-tools to modify this '
129                'label.' % label_model.name)
130
131    # Hosts that have the label to be deleted. Save this info before
132    # the label is deleted to use it later.
133    hosts = []
134    for h in label_model.host_set.all():
135        hosts.append(models.Host.smart_get(h.id))
136    label_model.delete()
137
138    # Master forwards the RPC to shards
139    if not utils.is_shard():
140        rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
141
142
143def add_label(name, ignore_exception_if_exists=False, **kwargs):
144    """Adds a new label of a given name.
145
146    @param name: label name.
147    @param ignore_exception_if_exists: If True and the exception was
148        thrown due to the duplicated label name when adding a label,
149        then suppress the exception. Default is False.
150    @param kwargs: keyword args that store more info about a label
151        other than the name.
152    @return: int/long id of a new label.
153    """
154    # models.Label.add_object() throws model_logic.ValidationError
155    # when it is given a label name that already exists.
156    # However, ValidationError can be thrown with different errors,
157    # and those errors should be thrown up to the call chain.
158    try:
159        label = models.Label.add_object(name=name, **kwargs)
160    except:
161        exc_info = sys.exc_info()
162        if ignore_exception_if_exists:
163            label = rpc_utils.get_label(name)
164            # If the exception is raised not because of duplicated
165            # "name", then raise the original exception.
166            if label is None:
167                raise exc_info[0], exc_info[1], exc_info[2]
168        else:
169            raise exc_info[0], exc_info[1], exc_info[2]
170    return label.id
171
172
173def add_label_to_hosts(id, hosts):
174    """Adds a label of the given id to the given hosts only in local DB.
175
176    @param id: id or name of a label. More often a label name.
177    @param hosts: The hostnames of hosts that need the label.
178
179    @raises models.Label.DoesNotExist: If the label with id doesn't exist.
180    """
181    label = models.Label.smart_get(id)
182    if label.is_replaced_by_static():
183        label = models.StaticLabel.smart_get(label.name)
184
185    host_objs = models.Host.smart_get_bulk(hosts)
186    if label.platform:
187        models.Host.check_no_platform(host_objs)
188    # Ensure a host has no more than one board label with it.
189    if label.name.startswith('board:'):
190        models.Host.check_board_labels_allowed(host_objs, [label.name])
191    label.host_set.add(*host_objs)
192
193
194def _create_label_everywhere(id, hosts):
195    """
196    Yet another method to create labels.
197
198    ALERT! This method should be run only on master not shards!
199    DO NOT RUN THIS ON A SHARD!!!  Deputies will hate you if you do!!!
200
201    This method exists primarily to serve label_add_hosts() and
202    host_add_labels().  Basically it pulls out the label check/add logic
203    from label_add_hosts() into this nice method that not only creates
204    the label but also tells the shards that service the hosts to also
205    create the label.
206
207    @param id: id or name of a label. More often a label name.
208    @param hosts: A list of hostnames or ids. More often hostnames.
209    """
210    try:
211        label = models.Label.smart_get(id)
212    except models.Label.DoesNotExist:
213        # This matches the type checks in smart_get, which is a hack
214        # in and off itself. The aim here is to create any non-existent
215        # label, which we cannot do if the 'id' specified isn't a label name.
216        if isinstance(id, basestring):
217            label = models.Label.smart_get(add_label(id))
218        else:
219            raise ValueError('Label id (%s) does not exist. Please specify '
220                             'the argument, id, as a string (label name).'
221                             % id)
222
223    # Make sure the label exists on the shard with the same id
224    # as it is on the master.
225    # It is possible that the label is already in a shard because
226    # we are adding a new label only to shards of hosts that the label
227    # is going to be attached.
228    # For example, we add a label L1 to a host in shard S1.
229    # Master and S1 will have L1 but other shards won't.
230    # Later, when we add the same label L1 to hosts in shards S1 and S2,
231    # S1 already has the label but S2 doesn't.
232    # S2 should have the new label without any problem.
233    # We ignore exception in such a case.
234    host_objs = models.Host.smart_get_bulk(hosts)
235    rpc_utils.fanout_rpc(
236            host_objs, 'add_label', include_hostnames=False,
237            name=label.name, ignore_exception_if_exists=True,
238            id=label.id, platform=label.platform)
239
240
241@rpc_utils.route_rpc_to_master
242def label_add_hosts(id, hosts):
243    """Adds a label with the given id to the given hosts.
244
245    This method should be run only on master not shards.
246    The given label will be created if it doesn't exist, provided the `id`
247    supplied is a label name not an int/long id.
248
249    @param id: id or name of a label. More often a label name.
250    @param hosts: A list of hostnames or ids. More often hostnames.
251
252    @raises ValueError: If the id specified is an int/long (label id)
253                        while the label does not exist.
254    """
255    # Create the label.
256    _create_label_everywhere(id, hosts)
257
258    # Add it to the master.
259    add_label_to_hosts(id, hosts)
260
261    # Add it to the shards.
262    host_objs = models.Host.smart_get_bulk(hosts)
263    rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
264
265
266def remove_label_from_hosts(id, hosts):
267    """Removes a label of the given id from the given hosts only in local DB.
268
269    @param id: id or name of a label.
270    @param hosts: The hostnames of hosts that need to remove the label from.
271    """
272    host_objs = models.Host.smart_get_bulk(hosts)
273    label = models.Label.smart_get(id)
274    if label.is_replaced_by_static():
275        raise error.UnmodifiableLabelException(
276                'Failed to remove label "%s" for hosts "%r" because it is a '
277                'static label. Use go/chromeos-skylab-inventory-tools to '
278                'modify this label.' % (label.name, hosts))
279
280    label.host_set.remove(*host_objs)
281
282
283@rpc_utils.route_rpc_to_master
284def label_remove_hosts(id, hosts):
285    """Removes a label of the given id from the given hosts.
286
287    This method should be run only on master not shards.
288
289    @param id: id or name of a label.
290    @param hosts: A list of hostnames or ids. More often hostnames.
291    """
292    host_objs = models.Host.smart_get_bulk(hosts)
293    remove_label_from_hosts(id, hosts)
294
295    rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
296
297
298def get_labels(exclude_filters=(), **filter_data):
299    """\
300    @param exclude_filters: A sequence of dictionaries of filters.
301
302    @returns A sequence of nested dictionaries of label information.
303    """
304    labels = models.Label.query_objects(filter_data)
305    for exclude_filter in exclude_filters:
306        labels = labels.exclude(**exclude_filter)
307
308    if not RESPECT_STATIC_LABELS:
309        return rpc_utils.prepare_rows_as_nested_dicts(labels, ())
310
311    static_labels = models.StaticLabel.query_objects(filter_data)
312    for exclude_filter in exclude_filters:
313        static_labels = static_labels.exclude(**exclude_filter)
314
315    non_static_lists = rpc_utils.prepare_rows_as_nested_dicts(labels, ())
316    static_lists = rpc_utils.prepare_rows_as_nested_dicts(static_labels, ())
317
318    label_ids = [label.id for label in labels]
319    replaced = models.ReplacedLabel.objects.filter(label__id__in=label_ids)
320    replaced_ids = {r.label_id for r in replaced}
321    replaced_label_names = {l.name for l in labels if l.id in replaced_ids}
322
323    return_lists  = []
324    for non_static_label in non_static_lists:
325        if non_static_label.get('id') not in replaced_ids:
326            return_lists.append(non_static_label)
327
328    for static_label in static_lists:
329        if static_label.get('name') in replaced_label_names:
330            return_lists.append(static_label)
331
332    return return_lists
333
334
335# hosts
336
337def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
338    if locked and not lock_reason:
339        raise model_logic.ValidationError(
340            {'locked': 'Please provide a reason for locking when adding host.'})
341
342    return models.Host.add_object(hostname=hostname, status=status,
343                                  locked=locked, lock_reason=lock_reason,
344                                  protection=protection).id
345
346
347@rpc_utils.route_rpc_to_master
348def modify_host(id, **kwargs):
349    """Modify local attributes of a host.
350
351    If this is called on the master, but the host is assigned to a shard, this
352    will call `modify_host_local` RPC to the responsible shard. This means if
353    a host is being locked using this function, this change will also propagate
354    to shards.
355    When this is called on a shard, the shard just routes the RPC to the master
356    and does nothing.
357
358    @param id: id of the host to modify.
359    @param kwargs: key=value pairs of values to set on the host.
360    """
361    rpc_utils.check_modify_host(kwargs)
362    host = models.Host.smart_get(id)
363    try:
364        rpc_utils.check_modify_host_locking(host, kwargs)
365    except model_logic.ValidationError as e:
366        if not kwargs.get('force_modify_locking', False):
367            raise
368        logging.exception('The following exception will be ignored and lock '
369                          'modification will be enforced. %s', e)
370
371    # This is required to make `lock_time` for a host be exactly same
372    # between the master and a shard.
373    if kwargs.get('locked', None) and 'lock_time' not in kwargs:
374        kwargs['lock_time'] = datetime.datetime.now()
375
376    # force_modifying_locking is not an internal field in database, remove.
377    shard_kwargs = dict(kwargs)
378    shard_kwargs.pop('force_modify_locking', None)
379    rpc_utils.fanout_rpc([host], 'modify_host_local',
380                         include_hostnames=False, id=id, **shard_kwargs)
381
382    # Update the local DB **after** RPC fanout is complete.
383    # This guarantees that the master state is only updated if the shards were
384    # correctly updated.
385    # In case the shard update fails mid-flight and the master-shard desync, we
386    # always consider the master state to be the source-of-truth, and any
387    # (automated) corrective actions will revert the (partial) shard updates.
388    host.update_object(kwargs)
389
390
391def modify_host_local(id, **kwargs):
392    """Modify host attributes in local DB.
393
394    @param id: Host id.
395    @param kwargs: key=value pairs of values to set on the host.
396    """
397    models.Host.smart_get(id).update_object(kwargs)
398
399
400@rpc_utils.route_rpc_to_master
401def modify_hosts(host_filter_data, update_data):
402    """Modify local attributes of multiple hosts.
403
404    If this is called on the master, but one of the hosts in that match the
405    filters is assigned to a shard, this will call `modify_hosts_local` RPC
406    to the responsible shard.
407    When this is called on a shard, the shard just routes the RPC to the master
408    and does nothing.
409
410    The filters are always applied on the master, not on the shards. This means
411    if the states of a host differ on the master and a shard, the state on the
412    master will be used. I.e. this means:
413    A host was synced to Shard 1. On Shard 1 the status of the host was set to
414    'Repair Failed'.
415    - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
416    update the host (both on the shard and on the master), because the state
417    of the host as the master knows it is still 'Ready'.
418    - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
419    will not update the host, because the filter doesn't apply on the master.
420
421    @param host_filter_data: Filters out which hosts to modify.
422    @param update_data: A dictionary with the changes to make to the hosts.
423    """
424    update_data = update_data.copy()
425    rpc_utils.check_modify_host(update_data)
426    hosts = models.Host.query_objects(host_filter_data)
427
428    affected_shard_hostnames = set()
429    affected_host_ids = []
430
431    # Check all hosts before changing data for exception safety.
432    for host in hosts:
433        try:
434            rpc_utils.check_modify_host_locking(host, update_data)
435        except model_logic.ValidationError as e:
436            if not update_data.get('force_modify_locking', False):
437                raise
438            logging.exception('The following exception will be ignored and '
439                              'lock modification will be enforced. %s', e)
440
441        if host.shard:
442            affected_shard_hostnames.add(host.shard.hostname)
443            affected_host_ids.append(host.id)
444
445    # This is required to make `lock_time` for a host be exactly same
446    # between the master and a shard.
447    if update_data.get('locked', None) and 'lock_time' not in update_data:
448        update_data['lock_time'] = datetime.datetime.now()
449    for host in hosts:
450        host.update_object(update_data)
451
452    update_data.pop('force_modify_locking', None)
453    # Caution: Changing the filter from the original here. See docstring.
454    rpc_utils.run_rpc_on_multiple_hostnames(
455            'modify_hosts_local', affected_shard_hostnames,
456            host_filter_data={'id__in': affected_host_ids},
457            update_data=update_data)
458
459
460def modify_hosts_local(host_filter_data, update_data):
461    """Modify attributes of hosts in local DB.
462
463    @param host_filter_data: Filters out which hosts to modify.
464    @param update_data: A dictionary with the changes to make to the hosts.
465    """
466    for host in models.Host.query_objects(host_filter_data):
467        host.update_object(update_data)
468
469
470def add_labels_to_host(id, labels):
471    """Adds labels to a given host only in local DB.
472
473    @param id: id or hostname for a host.
474    @param labels: ids or names for labels.
475    """
476    label_objs = models.Label.smart_get_bulk(labels)
477    if not RESPECT_STATIC_LABELS:
478        models.Host.smart_get(id).labels.add(*label_objs)
479    else:
480        static_labels, non_static_labels = models.Host.classify_label_objects(
481            label_objs)
482        host = models.Host.smart_get(id)
483        host.static_labels.add(*static_labels)
484        host.labels.add(*non_static_labels)
485
486
487@rpc_utils.route_rpc_to_master
488def host_add_labels(id, labels):
489    """Adds labels to a given host.
490
491    @param id: id or hostname for a host.
492    @param labels: ids or names for labels.
493
494    @raises ValidationError: If adding more than one platform/board label.
495    """
496    # Create the labels on the master/shards.
497    for label in labels:
498        _create_label_everywhere(label, [id])
499
500    label_objs = models.Label.smart_get_bulk(labels)
501
502    platforms = [label.name for label in label_objs if label.platform]
503    if len(platforms) > 1:
504        raise model_logic.ValidationError(
505            {'labels': ('Adding more than one platform: %s' %
506                        ', '.join(platforms))})
507
508    host_obj = models.Host.smart_get(id)
509    if platforms:
510        models.Host.check_no_platform([host_obj])
511    if any(label_name.startswith('board:') for label_name in labels):
512        models.Host.check_board_labels_allowed([host_obj], labels)
513    add_labels_to_host(id, labels)
514
515    rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
516                         id=id, labels=labels)
517
518
519def remove_labels_from_host(id, labels):
520    """Removes labels from a given host only in local DB.
521
522    @param id: id or hostname for a host.
523    @param labels: ids or names for labels.
524    """
525    label_objs = models.Label.smart_get_bulk(labels)
526    if not RESPECT_STATIC_LABELS:
527        models.Host.smart_get(id).labels.remove(*label_objs)
528    else:
529        static_labels, non_static_labels = models.Host.classify_label_objects(
530                label_objs)
531        host = models.Host.smart_get(id)
532        host.labels.remove(*non_static_labels)
533        if static_labels:
534            logging.info('Cannot remove labels "%r" for host "%r" due to they '
535                         'are static labels. Use '
536                         'go/chromeos-skylab-inventory-tools to modify these '
537                         'labels.', static_labels, id)
538
539
540@rpc_utils.route_rpc_to_master
541def host_remove_labels(id, labels):
542    """Removes labels from a given host.
543
544    @param id: id or hostname for a host.
545    @param labels: ids or names for labels.
546    """
547    remove_labels_from_host(id, labels)
548
549    host_obj = models.Host.smart_get(id)
550    rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
551                         id=id, labels=labels)
552
553
554def get_host_attribute(attribute, **host_filter_data):
555    """
556    @param attribute: string name of attribute
557    @param host_filter_data: filter data to apply to Hosts to choose hosts to
558                             act upon
559    """
560    hosts = rpc_utils.get_host_query((), False, True, host_filter_data)
561    hosts = list(hosts)
562    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
563                                               'attribute_list')
564    host_attr_dicts = []
565    host_objs = []
566    for host_obj in hosts:
567        for attr_obj in host_obj.attribute_list:
568            if attr_obj.attribute == attribute:
569                host_attr_dicts.append(attr_obj.get_object_dict())
570                host_objs.append(host_obj)
571
572    if RESPECT_STATIC_ATTRIBUTES:
573        for host_attr, host_obj in zip(host_attr_dicts, host_objs):
574            static_attrs = models.StaticHostAttribute.query_objects(
575                    {'host_id': host_obj.id, 'attribute': attribute})
576            if len(static_attrs) > 0:
577                host_attr['value'] = static_attrs[0].value
578
579    return rpc_utils.prepare_for_serialization(host_attr_dicts)
580
581
582@rpc_utils.route_rpc_to_master
583def set_host_attribute(attribute, value, **host_filter_data):
584    """Set an attribute on hosts.
585
586    This RPC is a shim that forwards calls to master to be handled there.
587
588    @param attribute: string name of attribute
589    @param value: string, or None to delete an attribute
590    @param host_filter_data: filter data to apply to Hosts to choose hosts to
591                             act upon
592    """
593    assert not utils.is_shard()
594    set_host_attribute_impl(attribute, value, **host_filter_data)
595
596
597def set_host_attribute_impl(attribute, value, **host_filter_data):
598    """Set an attribute on hosts.
599
600    *** DO NOT CALL THIS RPC from client code ***
601    This RPC exists for master-shard communication only.
602    Call set_host_attribute instead.
603
604    @param attribute: string name of attribute
605    @param value: string, or None to delete an attribute
606    @param host_filter_data: filter data to apply to Hosts to choose hosts to
607                             act upon
608    """
609    assert host_filter_data # disallow accidental actions on all hosts
610    hosts = models.Host.query_objects(host_filter_data)
611    models.AclGroup.check_for_acl_violation_hosts(hosts)
612    for host in hosts:
613        host.set_or_delete_attribute(attribute, value)
614
615    # Master forwards this RPC to shards.
616    if not utils.is_shard():
617        rpc_utils.fanout_rpc(hosts, 'set_host_attribute_impl', False,
618                attribute=attribute, value=value, **host_filter_data)
619
620
621@rpc_utils.forward_single_host_rpc_to_shard
622def delete_host(id):
623    models.Host.smart_get(id).delete()
624
625
626def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
627              valid_only=True, include_current_job=False, **filter_data):
628    """Get a list of dictionaries which contains the information of hosts.
629
630    @param multiple_labels: match hosts in all of the labels given.  Should
631            be a list of label names.
632    @param exclude_only_if_needed_labels: Deprecated. Raise error if it's True.
633    @param include_current_job: Set to True to include ids of currently running
634            job and special task.
635    """
636    if exclude_only_if_needed_labels:
637        raise error.RPCException('exclude_only_if_needed_labels is deprecated')
638
639    hosts = rpc_utils.get_host_query(multiple_labels,
640                                     exclude_only_if_needed_labels,
641                                     valid_only, filter_data)
642    hosts = list(hosts)
643    models.Host.objects.populate_relationships(hosts, models.Label,
644                                               'label_list')
645    models.Host.objects.populate_relationships(hosts, models.AclGroup,
646                                               'acl_list')
647    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
648                                               'attribute_list')
649    models.Host.objects.populate_relationships(hosts,
650                                               models.StaticHostAttribute,
651                                               'staticattribute_list')
652    host_dicts = []
653    for host_obj in hosts:
654        host_dict = host_obj.get_object_dict()
655        host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
656        host_dict['attributes'] = dict((attribute.attribute, attribute.value)
657                                       for attribute in host_obj.attribute_list)
658        if RESPECT_STATIC_LABELS:
659            label_list = []
660            # Only keep static labels which has a corresponding entries in
661            # afe_labels.
662            for label in host_obj.label_list:
663                if label.is_replaced_by_static():
664                    static_label = models.StaticLabel.smart_get(label.name)
665                    label_list.append(static_label)
666                else:
667                    label_list.append(label)
668
669            host_dict['labels'] = [label.name for label in label_list]
670            host_dict['platform'] = rpc_utils.find_platform(
671                    host_obj.hostname, label_list)
672        else:
673            host_dict['labels'] = [label.name for label in host_obj.label_list]
674            host_dict['platform'] = rpc_utils.find_platform(
675                    host_obj.hostname, host_obj.label_list)
676
677        if RESPECT_STATIC_ATTRIBUTES:
678            # Overwrite attribute with values in afe_static_host_attributes.
679            for attr in host_obj.staticattribute_list:
680                if attr.attribute in host_dict['attributes']:
681                    host_dict['attributes'][attr.attribute] = attr.value
682
683        if include_current_job:
684            host_dict['current_job'] = None
685            host_dict['current_special_task'] = None
686            entries = models.HostQueueEntry.objects.filter(
687                    host_id=host_dict['id'], active=True, complete=False)
688            if entries:
689                host_dict['current_job'] = (
690                        entries[0].get_object_dict()['job'])
691            tasks = models.SpecialTask.objects.filter(
692                    host_id=host_dict['id'], is_active=True, is_complete=False)
693            if tasks:
694                host_dict['current_special_task'] = (
695                        '%d-%s' % (tasks[0].get_object_dict()['id'],
696                                   tasks[0].get_object_dict()['task'].lower()))
697        host_dicts.append(host_dict)
698
699    return rpc_utils.prepare_for_serialization(host_dicts)
700
701
702def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
703                  valid_only=True, **filter_data):
704    """
705    Same parameters as get_hosts().
706
707    @returns The number of matching hosts.
708    """
709    if exclude_only_if_needed_labels:
710        raise error.RPCException('exclude_only_if_needed_labels is deprecated')
711
712    hosts = rpc_utils.get_host_query(multiple_labels,
713                                     exclude_only_if_needed_labels,
714                                     valid_only, filter_data)
715    return len(hosts)
716
717
718# tests
719
720def get_tests(**filter_data):
721    return rpc_utils.prepare_for_serialization(
722        models.Test.list_objects(filter_data))
723
724
725def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
726    """Gets the counts of all passed and failed tests from the matching jobs.
727
728    @param job_name_prefix: Name prefix of the jobs to get the summary
729           from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix
730           matching is case insensitive.
731    @param label_name: Label that must be set in the jobs, e.g.,
732            'cros-version:butterfly-release/R40-6457.21.0'.
733
734    @returns A summary of the counts of all the passed and failed tests.
735    """
736    job_ids = list(models.Job.objects.filter(
737            name__istartswith=job_name_prefix,
738            dependency_labels__name=label_name).values_list(
739                'pk', flat=True))
740    summary = {'passed': 0, 'failed': 0}
741    if not job_ids:
742        return summary
743
744    counts = (tko_models.TestView.objects.filter(
745            afe_job_id__in=job_ids).exclude(
746                test_name='SERVER_JOB').exclude(
747                    test_name__startswith='CLIENT_JOB').values(
748                        'status').annotate(
749                            count=Count('status')))
750    for status in counts:
751        if status['status'] == 'GOOD':
752            summary['passed'] += status['count']
753        else:
754            summary['failed'] += status['count']
755    return summary
756
757
758# profilers
759
760def add_profiler(name, description=None):
761    return models.Profiler.add_object(name=name, description=description).id
762
763
764def modify_profiler(id, **data):
765    models.Profiler.smart_get(id).update_object(data)
766
767
768def delete_profiler(id):
769    models.Profiler.smart_get(id).delete()
770
771
772def get_profilers(**filter_data):
773    return rpc_utils.prepare_for_serialization(
774        models.Profiler.list_objects(filter_data))
775
776
777# users
778
779def get_users(**filter_data):
780    return rpc_utils.prepare_for_serialization(
781        models.User.list_objects(filter_data))
782
783
784# acl groups
785
786def add_acl_group(name, description=None):
787    group = models.AclGroup.add_object(name=name, description=description)
788    group.users.add(models.User.current_user())
789    return group.id
790
791
792def modify_acl_group(id, **data):
793    group = models.AclGroup.smart_get(id)
794    group.check_for_acl_violation_acl_group()
795    group.update_object(data)
796    group.add_current_user_if_empty()
797
798
799def acl_group_add_users(id, users):
800    group = models.AclGroup.smart_get(id)
801    group.check_for_acl_violation_acl_group()
802    users = models.User.smart_get_bulk(users)
803    group.users.add(*users)
804
805
806def acl_group_remove_users(id, users):
807    group = models.AclGroup.smart_get(id)
808    group.check_for_acl_violation_acl_group()
809    users = models.User.smart_get_bulk(users)
810    group.users.remove(*users)
811    group.add_current_user_if_empty()
812
813
814def acl_group_add_hosts(id, hosts):
815    group = models.AclGroup.smart_get(id)
816    group.check_for_acl_violation_acl_group()
817    hosts = models.Host.smart_get_bulk(hosts)
818    group.hosts.add(*hosts)
819    group.on_host_membership_change()
820
821
822def acl_group_remove_hosts(id, hosts):
823    group = models.AclGroup.smart_get(id)
824    group.check_for_acl_violation_acl_group()
825    hosts = models.Host.smart_get_bulk(hosts)
826    group.hosts.remove(*hosts)
827    group.on_host_membership_change()
828
829
830def delete_acl_group(id):
831    models.AclGroup.smart_get(id).delete()
832
833
834def get_acl_groups(**filter_data):
835    acl_groups = models.AclGroup.list_objects(filter_data)
836    for acl_group in acl_groups:
837        acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
838        acl_group['users'] = [user.login
839                              for user in acl_group_obj.users.all()]
840        acl_group['hosts'] = [host.hostname
841                              for host in acl_group_obj.hosts.all()]
842    return rpc_utils.prepare_for_serialization(acl_groups)
843
844
845# jobs
846
847def generate_control_file(tests=(), profilers=(),
848                          client_control_file='', use_container=False,
849                          profile_only=None, db_tests=True,
850                          test_source_build=None):
851    """
852    Generates a client-side control file to run tests.
853
854    @param tests List of tests to run. See db_tests for more information.
855    @param profilers List of profilers to activate during the job.
856    @param client_control_file The contents of a client-side control file to
857        run at the end of all tests.  If this is supplied, all tests must be
858        client side.
859        TODO: in the future we should support server control files directly
860        to wrap with a kernel.  That'll require changing the parameter
861        name and adding a boolean to indicate if it is a client or server
862        control file.
863    @param use_container unused argument today.  TODO: Enable containers
864        on the host during a client side test.
865    @param profile_only A boolean that indicates what default profile_only
866        mode to use in the control file. Passing None will generate a
867        control file that does not explcitly set the default mode at all.
868    @param db_tests: if True, the test object can be found in the database
869                     backing the test model. In this case, tests is a tuple
870                     of test IDs which are used to retrieve the test objects
871                     from the database. If False, tests is a tuple of test
872                     dictionaries stored client-side in the AFE.
873    @param test_source_build: Build to be used to retrieve test code. Default
874                              to None.
875
876    @returns a dict with the following keys:
877        control_file: str, The control file text.
878        is_server: bool, is the control file a server-side control file?
879        synch_count: How many machines the job uses per autoserv execution.
880            synch_count == 1 means the job is asynchronous.
881        dependencies: A list of the names of labels on which the job depends.
882    """
883    if not tests and not client_control_file:
884        return dict(control_file='', is_server=False, synch_count=1,
885                    dependencies=[])
886
887    cf_info, test_objects, profiler_objects = (
888        rpc_utils.prepare_generate_control_file(tests, profilers,
889                                                db_tests))
890    cf_info['control_file'] = control_file_lib.generate_control(
891        tests=test_objects, profilers=profiler_objects,
892        is_server=cf_info['is_server'],
893        client_control_file=client_control_file, profile_only=profile_only,
894        test_source_build=test_source_build)
895    return cf_info
896
897
898def create_job_page_handler(name, priority, control_file, control_type,
899                            image=None, hostless=False, firmware_rw_build=None,
900                            firmware_ro_build=None, test_source_build=None,
901                            is_cloning=False, cheets_build=None, **kwargs):
902    """\
903    Create and enqueue a job.
904
905    @param name name of this job
906    @param priority Integer priority of this job.  Higher is more important.
907    @param control_file String contents of the control file.
908    @param control_type Type of control file, Client or Server.
909    @param image: ChromeOS build to be installed in the dut. Default to None.
910    @param firmware_rw_build: Firmware build to update RW firmware. Default to
911                              None, i.e., RW firmware will not be updated.
912    @param firmware_ro_build: Firmware build to update RO firmware. Default to
913                              None, i.e., RO firmware will not be updated.
914    @param test_source_build: Build to be used to retrieve test code. Default
915                              to None.
916    @param is_cloning: True if creating a cloning job.
917    @param cheets_build: ChromeOS Android build  to be installed in the dut.
918                         Default to None. Cheets build will not be updated.
919    @param kwargs extra args that will be required by create_suite_job or
920                  create_job.
921
922    @returns The created Job id number.
923    """
924    test_args = {}
925    if kwargs.get('args'):
926        # args' format is: ['disable_sysinfo=False', 'fast=True', ...]
927        args = kwargs.get('args')
928        for arg in args:
929            k, v = arg.split('=')[0], arg.split('=')[1]
930            test_args[k] = v
931
932    if is_cloning:
933        logging.info('Start to clone a new job')
934        # When cloning a job, hosts and meta_hosts should not exist together,
935        # which would cause host-scheduler to schedule two hqe jobs to one host
936        # at the same time, and crash itself. Clear meta_hosts for this case.
937        if kwargs.get('hosts') and kwargs.get('meta_hosts'):
938            kwargs['meta_hosts'] = []
939    else:
940        logging.info('Start to create a new job')
941    control_file = rpc_utils.encode_ascii(control_file)
942    if not control_file:
943        raise model_logic.ValidationError({
944                'control_file' : "Control file cannot be empty"})
945
946    if image and hostless:
947        builds = {}
948        builds[provision.CROS_VERSION_PREFIX] = image
949        if cheets_build:
950            builds[provision.CROS_ANDROID_VERSION_PREFIX] = cheets_build
951        if firmware_rw_build:
952            builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
953        if firmware_ro_build:
954            builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
955        return create_suite_job(
956                name=name, control_file=control_file, priority=priority,
957                builds=builds, test_source_build=test_source_build,
958                is_cloning=is_cloning, test_args=test_args, **kwargs)
959
960    return create_job(name, priority, control_file, control_type, image=image,
961                      hostless=hostless, test_args=test_args, **kwargs)
962
963
964@rpc_utils.route_rpc_to_master
965def create_job(
966        name,
967        priority,
968        control_file,
969        control_type,
970        hosts=(),
971        meta_hosts=(),
972        one_time_hosts=(),
973        synch_count=None,
974        is_template=False,
975        timeout=None,
976        timeout_mins=None,
977        max_runtime_mins=None,
978        run_verify=False,
979        email_list='',
980        dependencies=(),
981        reboot_before=None,
982        reboot_after=None,
983        parse_failed_repair=None,
984        hostless=False,
985        keyvals=None,
986        drone_set=None,
987        image=None,
988        parent_job_id=None,
989        test_retry=0,
990        run_reset=True,
991        require_ssp=None,
992        test_args=None,
993        **kwargs):
994    """\
995    Create and enqueue a job.
996
997    @param name name of this job
998    @param priority Integer priority of this job.  Higher is more important.
999    @param control_file String contents of the control file.
1000    @param control_type Type of control file, Client or Server.
1001    @param synch_count How many machines the job uses per autoserv execution.
1002        synch_count == 1 means the job is asynchronous.  If an atomic group is
1003        given this value is treated as a minimum.
1004    @param is_template If true then create a template job.
1005    @param timeout Hours after this call returns until the job times out.
1006    @param timeout_mins Minutes after this call returns until the job times
1007        out.
1008    @param max_runtime_mins Minutes from job starting time until job times out
1009    @param run_verify Should the host be verified before running the test?
1010    @param email_list String containing emails to mail when the job is done
1011    @param dependencies List of label names on which this job depends
1012    @param reboot_before Never, If dirty, or Always
1013    @param reboot_after Never, If all tests passed, or Always
1014    @param parse_failed_repair if true, results of failed repairs launched by
1015        this job will be parsed as part of the job.
1016    @param hostless if true, create a hostless job
1017    @param keyvals dict of keyvals to associate with the job
1018    @param hosts List of hosts to run job on.
1019    @param meta_hosts List where each entry is a label name, and for each entry
1020        one host will be chosen from that label to run the job on.
1021    @param one_time_hosts List of hosts not in the database to run the job on.
1022    @param drone_set The name of the drone set to run this test on.
1023    @param image OS image to install before running job.
1024    @param parent_job_id id of a job considered to be parent of created job.
1025    @param test_retry DEPRECATED
1026    @param run_reset Should the host be reset before running the test?
1027    @param require_ssp Set to True to require server-side packaging to run the
1028                       test. If it's set to None, drone will still try to run
1029                       the server side with server-side packaging. If the
1030                       autotest-server package doesn't exist for the build or
1031                       image is not set, drone will run the test without server-
1032                       side packaging. Default is None.
1033    @param test_args A dict of args passed to be injected into control file.
1034    @param kwargs extra keyword args. NOT USED.
1035
1036    @returns The created Job id number.
1037    """
1038    if test_args:
1039        control_file = tools.inject_vars(test_args, control_file)
1040    if image:
1041        dependencies += (provision.image_version_to_label(image),)
1042    return rpc_utils.create_job_common(
1043            name=name,
1044            priority=priority,
1045            control_type=control_type,
1046            control_file=control_file,
1047            hosts=hosts,
1048            meta_hosts=meta_hosts,
1049            one_time_hosts=one_time_hosts,
1050            synch_count=synch_count,
1051            is_template=is_template,
1052            timeout=timeout,
1053            timeout_mins=timeout_mins,
1054            max_runtime_mins=max_runtime_mins,
1055            run_verify=run_verify,
1056            email_list=email_list,
1057            dependencies=dependencies,
1058            reboot_before=reboot_before,
1059            reboot_after=reboot_after,
1060            parse_failed_repair=parse_failed_repair,
1061            hostless=hostless,
1062            keyvals=keyvals,
1063            drone_set=drone_set,
1064            parent_job_id=parent_job_id,
1065            run_reset=run_reset,
1066            require_ssp=require_ssp)
1067
1068
1069def abort_host_queue_entries(**filter_data):
1070    """\
1071    Abort a set of host queue entries.
1072
1073    @return: A list of dictionaries, each contains information
1074             about an aborted HQE.
1075    """
1076    query = models.HostQueueEntry.query_objects(filter_data)
1077
1078    # Dont allow aborts on:
1079    #   1. Jobs that have already completed (whether or not they were aborted)
1080    #   2. Jobs that we have already been aborted (but may not have completed)
1081    query = query.filter(complete=False).filter(aborted=False)
1082    models.AclGroup.check_abort_permissions(query)
1083    host_queue_entries = list(query.select_related())
1084    rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
1085
1086    models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
1087    hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
1088                 'Job name': hqe.job.name} for hqe in host_queue_entries]
1089    return hqe_info
1090
1091
1092def abort_special_tasks(**filter_data):
1093    """\
1094    Abort the special task, or tasks, specified in the filter.
1095    """
1096    query = models.SpecialTask.query_objects(filter_data)
1097    special_tasks = query.filter(is_active=True)
1098    for task in special_tasks:
1099        task.abort()
1100
1101
1102def _call_special_tasks_on_hosts(task, hosts):
1103    """\
1104    Schedules a set of hosts for a special task.
1105
1106    @returns A list of hostnames that a special task was created for.
1107    """
1108    models.AclGroup.check_for_acl_violation_hosts(hosts)
1109    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
1110    if shard_host_map and not utils.is_shard():
1111        raise ValueError('The following hosts are on shards, please '
1112                         'follow the link to the shards and create jobs '
1113                         'there instead. %s.' % shard_host_map)
1114    for host in hosts:
1115        models.SpecialTask.schedule_special_task(host, task)
1116    return list(sorted(host.hostname for host in hosts))
1117
1118
1119def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
1120    """Forward special tasks to corresponding shards.
1121
1122    For master, when special tasks are fired on hosts that are sharded,
1123    forward the RPC to corresponding shards.
1124
1125    For shard, create special task records in local DB.
1126
1127    @param task: Enum value of frontend.afe.models.SpecialTask.Task
1128    @param rpc: RPC name to forward.
1129    @param filter_data: Filter keywords to be used for DB query.
1130
1131    @return: A list of hostnames that a special task was created for.
1132    """
1133    hosts = models.Host.query_objects(filter_data)
1134    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
1135
1136    # Filter out hosts on a shard from those on the master, forward
1137    # rpcs to the shard with an additional hostname__in filter, and
1138    # create a local SpecialTask for each remaining host.
1139    if shard_host_map and not utils.is_shard():
1140        hosts = [h for h in hosts if h.shard is None]
1141        for shard, hostnames in shard_host_map.iteritems():
1142
1143            # The main client of this module is the frontend website, and
1144            # it invokes it with an 'id' or an 'id__in' filter. Regardless,
1145            # the 'hostname' filter should narrow down the list of hosts on
1146            # each shard even though we supply all the ids in filter_data.
1147            # This method uses hostname instead of id because it fits better
1148            # with the overall architecture of redirection functions in
1149            # rpc_utils.
1150            shard_filter = filter_data.copy()
1151            shard_filter['hostname__in'] = hostnames
1152            rpc_utils.run_rpc_on_multiple_hostnames(
1153                    rpc, [shard], **shard_filter)
1154
1155    # There is a race condition here if someone assigns a shard to one of these
1156    # hosts before we create the task. The host will stay on the master if:
1157    # 1. The host is not Ready
1158    # 2. The host is Ready but has a task
1159    # But if the host is Ready and doesn't have a task yet, it will get sent
1160    # to the shard as we're creating a task here.
1161
1162    # Given that we only rarely verify Ready hosts it isn't worth putting this
1163    # entire method in a transaction. The worst case scenario is that we have
1164    # a verify running on a Ready host while the shard is using it, if the
1165    # verify fails no subsequent tasks will be created against the host on the
1166    # master, and verifies are safe enough that this is OK.
1167    return _call_special_tasks_on_hosts(task, hosts)
1168
1169
1170def reverify_hosts(**filter_data):
1171    """\
1172    Schedules a set of hosts for verify.
1173
1174    @returns A list of hostnames that a verify task was created for.
1175    """
1176    return _forward_special_tasks_on_hosts(
1177            models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
1178
1179
1180def repair_hosts(**filter_data):
1181    """\
1182    Schedules a set of hosts for repair.
1183
1184    @returns A list of hostnames that a repair task was created for.
1185    """
1186    return _forward_special_tasks_on_hosts(
1187            models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
1188
1189
1190def get_jobs(not_yet_run=False, running=False, finished=False,
1191             suite=False, sub=False, standalone=False, **filter_data):
1192    """\
1193    Extra status filter args for get_jobs:
1194    -not_yet_run: Include only jobs that have not yet started running.
1195    -running: Include only jobs that have start running but for which not
1196    all hosts have completed.
1197    -finished: Include only jobs for which all hosts have completed (or
1198    aborted).
1199
1200    Extra type filter args for get_jobs:
1201    -suite: Include only jobs with child jobs.
1202    -sub: Include only jobs with a parent job.
1203    -standalone: Inlcude only jobs with no child or parent jobs.
1204    At most one of these three fields should be specified.
1205    """
1206    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1207                                                    running,
1208                                                    finished)
1209    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1210                                                                 suite,
1211                                                                 sub,
1212                                                                 standalone)
1213    job_dicts = []
1214    jobs = list(models.Job.query_objects(filter_data))
1215    models.Job.objects.populate_relationships(jobs, models.Label,
1216                                              'dependencies')
1217    models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
1218    for job in jobs:
1219        job_dict = job.get_object_dict()
1220        job_dict['dependencies'] = ','.join(label.name
1221                                            for label in job.dependencies)
1222        job_dict['keyvals'] = dict((keyval.key, keyval.value)
1223                                   for keyval in job.keyvals)
1224        job_dicts.append(job_dict)
1225    return rpc_utils.prepare_for_serialization(job_dicts)
1226
1227
1228def get_num_jobs(not_yet_run=False, running=False, finished=False,
1229                 suite=False, sub=False, standalone=False,
1230                 **filter_data):
1231    """\
1232    See get_jobs() for documentation of extra filter parameters.
1233    """
1234    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1235                                                    running,
1236                                                    finished)
1237    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1238                                                                 suite,
1239                                                                 sub,
1240                                                                 standalone)
1241    return models.Job.query_count(filter_data)
1242
1243
1244def get_jobs_summary(**filter_data):
1245    """\
1246    Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
1247
1248    'status_counts' filed is a dictionary mapping status strings to the number
1249    of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
1250
1251    'result_counts' field is piped to tko's rpc_interface and has the return
1252    format specified under get_group_counts.
1253    """
1254    jobs = get_jobs(**filter_data)
1255    ids = [job['id'] for job in jobs]
1256    all_status_counts = models.Job.objects.get_status_counts(ids)
1257    for job in jobs:
1258        job['status_counts'] = all_status_counts[job['id']]
1259        job['result_counts'] = tko_rpc_interface.get_status_counts(
1260                ['afe_job_id', 'afe_job_id'],
1261                header_groups=[['afe_job_id'], ['afe_job_id']],
1262                **{'afe_job_id': job['id']})
1263    return rpc_utils.prepare_for_serialization(jobs)
1264
1265
1266def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
1267    """\
1268    Retrieves all the information needed to clone a job.
1269    """
1270    job = models.Job.objects.get(id=id)
1271    job_info = rpc_utils.get_job_info(job,
1272                                      preserve_metahosts,
1273                                      queue_entry_filter_data)
1274
1275    host_dicts = []
1276    for host in job_info['hosts']:
1277        host_dict = get_hosts(id=host.id)[0]
1278        other_labels = host_dict['labels']
1279        if host_dict['platform']:
1280            other_labels.remove(host_dict['platform'])
1281        host_dict['other_labels'] = ', '.join(other_labels)
1282        host_dicts.append(host_dict)
1283
1284    for host in job_info['one_time_hosts']:
1285        host_dict = dict(hostname=host.hostname,
1286                         id=host.id,
1287                         platform='(one-time host)',
1288                         locked_text='')
1289        host_dicts.append(host_dict)
1290
1291    # convert keys from Label objects to strings (names of labels)
1292    meta_host_counts = dict((meta_host.name, count) for meta_host, count
1293                            in job_info['meta_host_counts'].iteritems())
1294
1295    info = dict(job=job.get_object_dict(),
1296                meta_host_counts=meta_host_counts,
1297                hosts=host_dicts)
1298    info['job']['dependencies'] = job_info['dependencies']
1299    info['hostless'] = job_info['hostless']
1300    info['drone_set'] = job.drone_set and job.drone_set.name
1301
1302    image = _get_image_for_job(job, job_info['hostless'])
1303    if image:
1304        info['job']['image'] = image
1305
1306    return rpc_utils.prepare_for_serialization(info)
1307
1308
1309def _get_image_for_job(job, hostless):
1310    """Gets the image used for a job.
1311
1312    Gets the image used for an AFE job from the job's keyvals 'build' or
1313    'builds'. If that fails, and the job is a hostless job, tries to
1314    get the image from its control file attributes 'build' or 'builds'.
1315
1316    TODO(ntang): Needs to handle FAFT with two builds for ro/rw.
1317
1318    @param job      An AFE job object.
1319    @param hostless Boolean indicating whether the job is hostless.
1320
1321    @returns The image build used for the job.
1322    """
1323    keyvals = job.keyval_dict()
1324    image = keyvals.get('build')
1325    if not image:
1326        value = keyvals.get('builds')
1327        builds = None
1328        if isinstance(value, dict):
1329            builds = value
1330        elif isinstance(value, basestring):
1331            builds = ast.literal_eval(value)
1332        if builds:
1333            image = builds.get('cros-version')
1334    if not image and hostless and job.control_file:
1335        try:
1336            control_obj = control_data.parse_control_string(
1337                    job.control_file)
1338            if hasattr(control_obj, 'build'):
1339                image = getattr(control_obj, 'build')
1340            if not image and hasattr(control_obj, 'builds'):
1341                builds = getattr(control_obj, 'builds')
1342                image = builds.get('cros-version')
1343        except:
1344            logging.warning('Failed to parse control file for job: %s',
1345                            job.name)
1346    return image
1347
1348
1349def get_host_queue_entries_by_insert_time(
1350    insert_time_after=None, insert_time_before=None, **filter_data):
1351    """Like get_host_queue_entries, but using the insert index table.
1352
1353    @param insert_time_after: A lower bound on insert_time
1354    @param insert_time_before: An upper bound on insert_time
1355    @returns A sequence of nested dictionaries of host and job information.
1356    """
1357    assert insert_time_after is not None or insert_time_before is not None, \
1358      ('Caller to get_host_queue_entries_by_insert_time must provide either'
1359       ' insert_time_after or insert_time_before.')
1360    # Get insert bounds on the index of the host queue entries.
1361    if insert_time_after:
1362        query = models.HostQueueEntryStartTimes.objects.filter(
1363            # Note: '-insert_time' means descending. We want the largest
1364            # insert time smaller than the insert time.
1365            insert_time__lte=insert_time_after).order_by('-insert_time')
1366        try:
1367            constraint = query[0].highest_hqe_id
1368            if 'id__gte' in filter_data:
1369                constraint = max(constraint, filter_data['id__gte'])
1370            filter_data['id__gte'] = constraint
1371        except IndexError:
1372            pass
1373
1374    # Get end bounds.
1375    if insert_time_before:
1376        query = models.HostQueueEntryStartTimes.objects.filter(
1377            insert_time__gte=insert_time_before).order_by('insert_time')
1378        try:
1379            constraint = query[0].highest_hqe_id
1380            if 'id__lte' in filter_data:
1381                constraint = min(constraint, filter_data['id__lte'])
1382            filter_data['id__lte'] = constraint
1383        except IndexError:
1384            pass
1385
1386    return rpc_utils.prepare_rows_as_nested_dicts(
1387            models.HostQueueEntry.query_objects(filter_data),
1388            ('host', 'job'))
1389
1390
1391def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
1392    """\
1393    @returns A sequence of nested dictionaries of host and job information.
1394    """
1395    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1396                                                   'started_on__lte',
1397                                                   start_time,
1398                                                   end_time,
1399                                                   **filter_data)
1400    return rpc_utils.prepare_rows_as_nested_dicts(
1401            models.HostQueueEntry.query_objects(filter_data),
1402            ('host', 'job'))
1403
1404
1405def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
1406    """\
1407    Get the number of host queue entries associated with this job.
1408    """
1409    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1410                                                   'started_on__lte',
1411                                                   start_time,
1412                                                   end_time,
1413                                                   **filter_data)
1414    return models.HostQueueEntry.query_count(filter_data)
1415
1416
1417def get_hqe_percentage_complete(**filter_data):
1418    """
1419    Computes the fraction of host queue entries matching the given filter data
1420    that are complete.
1421    """
1422    query = models.HostQueueEntry.query_objects(filter_data)
1423    complete_count = query.filter(complete=True).count()
1424    total_count = query.count()
1425    if total_count == 0:
1426        return 1
1427    return float(complete_count) / total_count
1428
1429
1430# special tasks
1431
1432def get_special_tasks(**filter_data):
1433    """Get special task entries from the local database.
1434
1435    Query the special tasks table for tasks matching the given
1436    `filter_data`, and return a list of the results.  No attempt is
1437    made to forward the call to shards; the buck will stop here.
1438    The caller is expected to know the target shard for such reasons
1439    as:
1440      * The caller is a service (such as gs_offloader) configured
1441        to operate on behalf of one specific shard, and no other.
1442      * The caller has a host as a parameter, and knows that this is
1443        the shard assigned to that host.
1444
1445    @param filter_data  Filter keywords to pass to the underlying
1446                        database query.
1447
1448    """
1449    return rpc_utils.prepare_rows_as_nested_dicts(
1450            models.SpecialTask.query_objects(filter_data),
1451            ('host', 'queue_entry'))
1452
1453
1454def get_host_special_tasks(host_id, **filter_data):
1455    """Get special task entries for a given host.
1456
1457    Query the special tasks table for tasks that ran on the host
1458    given by `host_id` and matching the given `filter_data`.
1459    Return a list of the results.  If the host is assigned to a
1460    shard, forward this call to that shard.
1461
1462    @param host_id      Id in the database of the target host.
1463    @param filter_data  Filter keywords to pass to the underlying
1464                        database query.
1465
1466    """
1467    # Retrieve host data even if the host is in an invalid state.
1468    host = models.Host.smart_get(host_id, False)
1469    if not host.shard:
1470        return get_special_tasks(host_id=host_id, **filter_data)
1471    else:
1472        # The return values from AFE methods are post-processed
1473        # objects that aren't JSON-serializable.  So, we have to
1474        # call AFE.run() to get the raw, serializable output from
1475        # the shard.
1476        shard_afe = frontend.AFE(server=host.shard.hostname)
1477        return shard_afe.run('get_special_tasks',
1478                             host_id=host_id, **filter_data)
1479
1480
1481def get_num_special_tasks(**kwargs):
1482    """Get the number of special task entries from the local database.
1483
1484    Query the special tasks table for tasks matching the given 'kwargs',
1485    and return the number of the results. No attempt is made to forward
1486    the call to shards; the buck will stop here.
1487
1488    @param kwargs    Filter keywords to pass to the underlying database query.
1489
1490    """
1491    return models.SpecialTask.query_count(kwargs)
1492
1493
1494def get_host_num_special_tasks(host, **kwargs):
1495    """Get special task entries for a given host.
1496
1497    Query the special tasks table for tasks that ran on the host
1498    given by 'host' and matching the given 'kwargs'.
1499    Return a list of the results.  If the host is assigned to a
1500    shard, forward this call to that shard.
1501
1502    @param host      id or name of a host. More often a hostname.
1503    @param kwargs    Filter keywords to pass to the underlying database query.
1504
1505    """
1506    # Retrieve host data even if the host is in an invalid state.
1507    host_model = models.Host.smart_get(host, False)
1508    if not host_model.shard:
1509        return get_num_special_tasks(host=host, **kwargs)
1510    else:
1511        shard_afe = frontend.AFE(server=host_model.shard.hostname)
1512        return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
1513
1514
1515def get_status_task(host_id, end_time):
1516    """Get the "status task" for a host from the local shard.
1517
1518    Returns a single special task representing the given host's
1519    "status task".  The status task is a completed special task that
1520    identifies whether the corresponding host was working or broken
1521    when it completed.  A successful task indicates a working host;
1522    a failed task indicates broken.
1523
1524    This call will not be forward to a shard; the receiving server
1525    must be the shard that owns the host.
1526
1527    @param host_id      Id in the database of the target host.
1528    @param end_time     Time reference for the host's status.
1529
1530    @return A single task; its status (successful or not)
1531            corresponds to the status of the host (working or
1532            broken) at the given time.  If no task is found, return
1533            `None`.
1534
1535    """
1536    tasklist = rpc_utils.prepare_rows_as_nested_dicts(
1537            status_history.get_status_task(host_id, end_time),
1538            ('host', 'queue_entry'))
1539    return tasklist[0] if tasklist else None
1540
1541
1542def get_host_status_task(host_id, end_time):
1543    """Get the "status task" for a host from its owning shard.
1544
1545    Finds the given host's owning shard, and forwards to it a call
1546    to `get_status_task()` (see above).
1547
1548    @param host_id      Id in the database of the target host.
1549    @param end_time     Time reference for the host's status.
1550
1551    @return A single task; its status (successful or not)
1552            corresponds to the status of the host (working or
1553            broken) at the given time.  If no task is found, return
1554            `None`.
1555
1556    """
1557    host = models.Host.smart_get(host_id)
1558    if not host.shard:
1559        return get_status_task(host_id, end_time)
1560    else:
1561        # The return values from AFE methods are post-processed
1562        # objects that aren't JSON-serializable.  So, we have to
1563        # call AFE.run() to get the raw, serializable output from
1564        # the shard.
1565        shard_afe = frontend.AFE(server=host.shard.hostname)
1566        return shard_afe.run('get_status_task',
1567                             host_id=host_id, end_time=end_time)
1568
1569
1570def get_host_diagnosis_interval(host_id, end_time, success):
1571    """Find a "diagnosis interval" for a given host.
1572
1573    A "diagnosis interval" identifies a start and end time where
1574    the host went from "working" to "broken", or vice versa.  The
1575    interval's starting time is the starting time of the last status
1576    task with the old status; the end time is the finish time of the
1577    first status task with the new status.
1578
1579    This routine finds the most recent diagnosis interval for the
1580    given host prior to `end_time`, with a starting status matching
1581    `success`.  If `success` is true, the interval will start with a
1582    successful status task; if false the interval will start with a
1583    failed status task.
1584
1585    @param host_id      Id in the database of the target host.
1586    @param end_time     Time reference for the diagnosis interval.
1587    @param success      Whether the diagnosis interval should start
1588                        with a successful or failed status task.
1589
1590    @return A list of two strings.  The first is the timestamp for
1591            the beginning of the interval; the second is the
1592            timestamp for the end.  If the host has never changed
1593            state, the list is empty.
1594
1595    """
1596    host = models.Host.smart_get(host_id)
1597    if not host.shard or utils.is_shard():
1598        return status_history.get_diagnosis_interval(
1599                host_id, end_time, success)
1600    else:
1601        shard_afe = frontend.AFE(server=host.shard.hostname)
1602        return shard_afe.get_host_diagnosis_interval(
1603                host_id, end_time, success)
1604
1605
1606# support for host detail view
1607
1608def get_host_queue_entries_and_special_tasks(host, query_start=None,
1609                                             query_limit=None, start_time=None,
1610                                             end_time=None):
1611    """
1612    @returns an interleaved list of HostQueueEntries and SpecialTasks,
1613            in approximate run order.  each dict contains keys for type, host,
1614            job, status, started_on, execution_path, and ID.
1615    """
1616    total_limit = None
1617    if query_limit is not None:
1618        total_limit = query_start + query_limit
1619    filter_data_common = {'host': host,
1620                          'query_limit': total_limit,
1621                          'sort_by': ['-id']}
1622
1623    filter_data_special_tasks = rpc_utils.inject_times_to_filter(
1624            'time_started__gte', 'time_started__lte', start_time, end_time,
1625            **filter_data_common)
1626
1627    queue_entries = get_host_queue_entries(
1628            start_time, end_time, **filter_data_common)
1629    special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
1630
1631    interleaved_entries = rpc_utils.interleave_entries(queue_entries,
1632                                                       special_tasks)
1633    if query_start is not None:
1634        interleaved_entries = interleaved_entries[query_start:]
1635    if query_limit is not None:
1636        interleaved_entries = interleaved_entries[:query_limit]
1637    return rpc_utils.prepare_host_queue_entries_and_special_tasks(
1638            interleaved_entries, queue_entries)
1639
1640
1641def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
1642                                                 end_time=None):
1643    filter_data_common = {'host': host}
1644
1645    filter_data_queue_entries, filter_data_special_tasks = (
1646            rpc_utils.inject_times_to_hqe_special_tasks_filters(
1647                    filter_data_common, start_time, end_time))
1648
1649    return (models.HostQueueEntry.query_count(filter_data_queue_entries)
1650            + get_host_num_special_tasks(**filter_data_special_tasks))
1651
1652
1653# other
1654
1655def echo(data=""):
1656    """\
1657    Returns a passed in string. For doing a basic test to see if RPC calls
1658    can successfully be made.
1659    """
1660    return data
1661
1662
1663def get_motd():
1664    """\
1665    Returns the message of the day as a string.
1666    """
1667    return rpc_utils.get_motd()
1668
1669
1670def get_static_data():
1671    """\
1672    Returns a dictionary containing a bunch of data that shouldn't change
1673    often and is otherwise inaccessible.  This includes:
1674
1675    priorities: List of job priority choices.
1676    default_priority: Default priority value for new jobs.
1677    users: Sorted list of all users.
1678    labels: Sorted list of labels not start with 'cros-version' and
1679            'fw-version'.
1680    tests: Sorted list of all tests.
1681    profilers: Sorted list of all profilers.
1682    current_user: Logged-in username.
1683    host_statuses: Sorted list of possible Host statuses.
1684    job_statuses: Sorted list of possible HostQueueEntry statuses.
1685    job_timeout_default: The default job timeout length in minutes.
1686    parse_failed_repair_default: Default value for the parse_failed_repair job
1687            option.
1688    reboot_before_options: A list of valid RebootBefore string enums.
1689    reboot_after_options: A list of valid RebootAfter string enums.
1690    motd: Server's message of the day.
1691    status_dictionary: A mapping from one word job status names to a more
1692            informative description.
1693    """
1694
1695    default_drone_set_name = models.DroneSet.default_drone_set_name()
1696    drone_sets = ([default_drone_set_name] +
1697                  sorted(drone_set.name for drone_set in
1698                         models.DroneSet.objects.exclude(
1699                                 name=default_drone_set_name)))
1700
1701    result = {}
1702    result['priorities'] = priorities.Priority.choices()
1703    result['default_priority'] = 'Default'
1704    result['max_schedulable_priority'] = priorities.Priority.DEFAULT
1705    result['users'] = get_users(sort_by=['login'])
1706
1707    label_exclude_filters = [{'name__startswith': 'cros-version'},
1708                             {'name__startswith': 'fw-version'},
1709                             {'name__startswith': 'fwrw-version'},
1710                             {'name__startswith': 'fwro-version'}]
1711    result['labels'] = get_labels(
1712        label_exclude_filters,
1713        sort_by=['-platform', 'name'])
1714
1715    result['tests'] = get_tests(sort_by=['name'])
1716    result['profilers'] = get_profilers(sort_by=['name'])
1717    result['current_user'] = rpc_utils.prepare_for_serialization(
1718        models.User.current_user().get_object_dict())
1719    result['host_statuses'] = sorted(models.Host.Status.names)
1720    result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
1721    result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
1722    result['job_max_runtime_mins_default'] = (
1723        models.Job.DEFAULT_MAX_RUNTIME_MINS)
1724    result['parse_failed_repair_default'] = bool(
1725        models.Job.DEFAULT_PARSE_FAILED_REPAIR)
1726    result['reboot_before_options'] = model_attributes.RebootBefore.names
1727    result['reboot_after_options'] = model_attributes.RebootAfter.names
1728    result['motd'] = rpc_utils.get_motd()
1729    result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
1730    result['drone_sets'] = drone_sets
1731
1732    result['status_dictionary'] = {"Aborted": "Aborted",
1733                                   "Verifying": "Verifying Host",
1734                                   "Provisioning": "Provisioning Host",
1735                                   "Pending": "Waiting on other hosts",
1736                                   "Running": "Running autoserv",
1737                                   "Completed": "Autoserv completed",
1738                                   "Failed": "Failed to complete",
1739                                   "Queued": "Queued",
1740                                   "Starting": "Next in host's queue",
1741                                   "Stopped": "Other host(s) failed verify",
1742                                   "Parsing": "Awaiting parse of final results",
1743                                   "Gathering": "Gathering log files",
1744                                   "Waiting": "Waiting for scheduler action",
1745                                   "Archiving": "Archiving results",
1746                                   "Resetting": "Resetting hosts"}
1747
1748    result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
1749    result['stainless_url'] = rpc_utils.get_stainless_url()
1750    result['is_moblab'] = bool(utils.is_moblab())
1751
1752    return result
1753
1754
1755def get_server_time():
1756    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
1757
1758
1759def ping_db():
1760    """Simple connection test to db"""
1761    try:
1762        db_connection.cursor()
1763    except DatabaseError:
1764        return [False]
1765    return [True]
1766
1767
1768def get_hosts_by_attribute(attribute, value):
1769    """
1770    Get the list of valid hosts that share the same host attribute value.
1771
1772    @param attribute: String of the host attribute to check.
1773    @param value: String of the value that is shared between hosts.
1774
1775    @returns List of hostnames that all have the same host attribute and
1776             value.
1777    """
1778    rows = models.HostAttribute.query_objects({'attribute': attribute,
1779                                               'value': value})
1780    if RESPECT_STATIC_ATTRIBUTES:
1781        returned_hosts = set()
1782        # Add hosts:
1783        #     * Non-valid
1784        #     * Exist in afe_host_attribute with given attribute.
1785        #     * Don't exist in afe_static_host_attribute OR exist in
1786        #       afe_static_host_attribute with the same given value.
1787        for row in rows:
1788            if row.host.invalid != 0:
1789                continue
1790
1791            static_hosts = models.StaticHostAttribute.query_objects(
1792                {'host_id': row.host.id, 'attribute': attribute})
1793            values = [static_host.value for static_host in static_hosts]
1794            if len(values) == 0 or values[0] == value:
1795                returned_hosts.add(row.host.hostname)
1796
1797        # Add hosts:
1798        #     * Non-valid
1799        #     * Exist in afe_static_host_attribute with given attribute
1800        #       and value
1801        #     * No need to check whether each static attribute has its
1802        #       corresponding entry in afe_host_attribute since it is ensured
1803        #       in inventory sync.
1804        static_rows = models.StaticHostAttribute.query_objects(
1805                {'attribute': attribute, 'value': value})
1806        for row in static_rows:
1807            if row.host.invalid != 0:
1808                continue
1809
1810            returned_hosts.add(row.host.hostname)
1811
1812        return list(returned_hosts)
1813    else:
1814        return [row.host.hostname for row in rows if row.host.invalid == 0]
1815
1816
1817def _get_control_file_by_suite(suite_name):
1818    """Get control file contents by suite name.
1819
1820    @param suite_name: Suite name as string.
1821    @returns: Control file contents as string.
1822    """
1823    getter = control_file_getter.FileSystemGetter(
1824            [_CONFIG.get_config_value('SCHEDULER',
1825                                      'drone_installation_directory')])
1826    return getter.get_control_file_contents_by_name(suite_name)
1827
1828
1829@rpc_utils.route_rpc_to_master
1830def create_suite_job(
1831        name='',
1832        board='',
1833        pool='',
1834        child_dependencies=(),
1835        control_file='',
1836        check_hosts=True,
1837        num=None,
1838        file_bugs=False,
1839        timeout=24,
1840        timeout_mins=None,
1841        priority=priorities.Priority.DEFAULT,
1842        suite_args=None,
1843        wait_for_results=True,
1844        job_retry=False,
1845        max_retries=None,
1846        max_runtime_mins=None,
1847        suite_min_duts=0,
1848        offload_failures_only=False,
1849        builds=None,
1850        test_source_build=None,
1851        run_prod_code=False,
1852        delay_minutes=0,
1853        is_cloning=False,
1854        job_keyvals=None,
1855        test_args=None,
1856        **kwargs):
1857    """
1858    Create a job to run a test suite on the given device with the given image.
1859
1860    When the timeout specified in the control file is reached, the
1861    job is guaranteed to have completed and results will be available.
1862
1863    @param name: The test name if control_file is supplied, otherwise the name
1864                 of the test suite to run, e.g. 'bvt'.
1865    @param board: the kind of device to run the tests on.
1866    @param builds: the builds to install e.g.
1867                   {'cros-version:': 'x86-alex-release/R18-1655.0.0',
1868                    'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
1869                    'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
1870                   If builds is given a value, it overrides argument build.
1871    @param test_source_build: Build that contains the server-side test code.
1872    @param pool: Specify the pool of machines to use for scheduling
1873            purposes.
1874    @param child_dependencies: (optional) list of additional dependency labels
1875            (strings) that will be added as dependency labels to child jobs.
1876    @param control_file: the control file of the job.
1877    @param check_hosts: require appropriate live hosts to exist in the lab.
1878    @param num: Specify the number of machines to schedule across (integer).
1879                Leave unspecified or use None to use default sharding factor.
1880    @param file_bugs: File a bug on each test failure in this suite.
1881    @param timeout: The max lifetime of this suite, in hours.
1882    @param timeout_mins: The max lifetime of this suite, in minutes. Takes
1883                         priority over timeout.
1884    @param priority: Integer denoting priority. Higher is more important.
1885    @param suite_args: Optional arguments which will be parsed by the suite
1886                       control file. Used by control.test_that_wrapper to
1887                       determine which tests to run.
1888    @param wait_for_results: Set to False to run the suite job without waiting
1889                             for test jobs to finish. Default is True.
1890    @param job_retry: Set to True to enable job-level retry. Default is False.
1891    @param max_retries: Integer, maximum job retries allowed at suite level.
1892                        None for no max.
1893    @param max_runtime_mins: Maximum amount of time a job can be running in
1894                             minutes.
1895    @param suite_min_duts: Integer. Scheduler will prioritize getting the
1896                           minimum number of machines for the suite when it is
1897                           competing with another suite that has a higher
1898                           priority but already got minimum machines it needs.
1899    @param offload_failures_only: Only enable gs_offloading for failed jobs.
1900    @param run_prod_code: If True, the suite will run the test code that
1901                          lives in prod aka the test code currently on the
1902                          lab servers. If False, the control files and test
1903                          code for this suite run will be retrieved from the
1904                          build artifacts.
1905    @param delay_minutes: Delay the creation of test jobs for a given number of
1906                          minutes.
1907    @param is_cloning: True if creating a cloning job.
1908    @param job_keyvals: A dict of job keyvals to be inject to control file.
1909    @param test_args: A dict of args passed all the way to each individual test
1910                      that will be actually run.
1911    @param kwargs: extra keyword args. NOT USED.
1912
1913    @raises ControlFileNotFound: if a unique suite control file doesn't exist.
1914    @raises NoControlFileList: if we can't list the control files at all.
1915    @raises StageControlFileFailure: If the dev server throws 500 while
1916                                     staging test_suites.
1917    @raises ControlFileEmpty: if the control file exists on the server, but
1918                              can't be read.
1919
1920    @return: the job ID of the suite; -1 on error.
1921    """
1922    if num is not None:
1923        warnings.warn('num is deprecated for create_suite_job')
1924    del num
1925
1926    if builds is None:
1927        builds = {}
1928
1929    # Default test source build to CrOS build if it's not specified and
1930    # run_prod_code is set to False.
1931    if not run_prod_code:
1932        test_source_build = Suite.get_test_source_build(
1933                builds, test_source_build=test_source_build)
1934
1935    sample_dut = rpc_utils.get_sample_dut(board, pool)
1936
1937    suite_name = suite_common.canonicalize_suite_name(name)
1938    if run_prod_code:
1939        ds = dev_server.resolve(test_source_build, hostname=sample_dut)
1940        keyvals = {}
1941    else:
1942        ds, keyvals = suite_common.stage_build_artifacts(
1943                test_source_build, hostname=sample_dut)
1944    keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
1945
1946    # Do not change this naming convention without updating
1947    # site_utils.parse_job_name.
1948    if run_prod_code:
1949        # If run_prod_code is True, test_source_build is not set, use the
1950        # first build in the builds list for the sutie job name.
1951        name = '%s-%s' % (builds.values()[0], suite_name)
1952    else:
1953        name = '%s-%s' % (test_source_build, suite_name)
1954
1955    timeout_mins = timeout_mins or timeout * 60
1956    max_runtime_mins = max_runtime_mins or timeout * 60
1957
1958    if not board:
1959        board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
1960
1961    if run_prod_code:
1962        control_file = _get_control_file_by_suite(suite_name)
1963
1964    if not control_file:
1965        # No control file was supplied so look it up from the build artifacts.
1966        control_file = suite_common.get_control_file_by_build(
1967                test_source_build, ds, suite_name)
1968
1969    # Prepend builds and board to the control file.
1970    if is_cloning:
1971        control_file = tools.remove_injection(control_file)
1972
1973    if suite_args is None:
1974        suite_args = dict()
1975
1976    inject_dict = {
1977        'board': board,
1978        # `build` is needed for suites like AU to stage image inside suite
1979        # control file.
1980        'build': test_source_build,
1981        'builds': builds,
1982        'check_hosts': check_hosts,
1983        'pool': pool,
1984        'child_dependencies': child_dependencies,
1985        'file_bugs': file_bugs,
1986        'timeout': timeout,
1987        'timeout_mins': timeout_mins,
1988        'devserver_url': ds.url(),
1989        'priority': priority,
1990        'wait_for_results': wait_for_results,
1991        'job_retry': job_retry,
1992        'max_retries': max_retries,
1993        'max_runtime_mins': max_runtime_mins,
1994        'offload_failures_only': offload_failures_only,
1995        'test_source_build': test_source_build,
1996        'run_prod_code': run_prod_code,
1997        'delay_minutes': delay_minutes,
1998        'job_keyvals': job_keyvals,
1999        'test_args': test_args,
2000    }
2001    inject_dict.update(suite_args)
2002    control_file = tools.inject_vars(inject_dict, control_file)
2003
2004    return rpc_utils.create_job_common(name,
2005                                       priority=priority,
2006                                       timeout_mins=timeout_mins,
2007                                       max_runtime_mins=max_runtime_mins,
2008                                       control_type='Server',
2009                                       control_file=control_file,
2010                                       hostless=True,
2011                                       keyvals=keyvals)
2012
2013
2014def get_job_history(**filter_data):
2015    """Get history of the job, including the special tasks executed for the job
2016
2017    @param filter_data: filter for the call, should at least include
2018                        {'job_id': [job id]}
2019    @returns: JSON string of the job's history, including the information such
2020              as the hosts run the job and the special tasks executed before
2021              and after the job.
2022    """
2023    job_id = filter_data['job_id']
2024    job_info = job_history.get_job_info(job_id)
2025    return rpc_utils.prepare_for_serialization(job_info.get_history())
2026
2027
2028def get_host_history(start_time, end_time, hosts=None, board=None, pool=None):
2029    """Deprecated."""
2030    raise ValueError('get_host_history rpc is deprecated '
2031                     'and no longer implemented.')
2032
2033
2034def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
2035                    known_host_ids=(), known_host_statuses=()):
2036    """Receive updates for job statuses from shards and assign hosts and jobs.
2037
2038    @param shard_hostname: Hostname of the calling shard
2039    @param jobs: Jobs in serialized form that should be updated with newer
2040                 status from a shard.
2041    @param hqes: Hostqueueentries in serialized form that should be updated with
2042                 newer status from a shard. Note that for every hostqueueentry
2043                 the corresponding job must be in jobs.
2044    @param known_job_ids: List of ids of jobs the shard already has.
2045    @param known_host_ids: List of ids of hosts the shard already has.
2046    @param known_host_statuses: List of statuses of hosts the shard already has.
2047
2048    @returns: Serialized representations of hosts, jobs, suite job keyvals
2049              and their dependencies to be inserted into a shard's database.
2050    """
2051    # The following alternatives to sending host and job ids in every heartbeat
2052    # have been considered:
2053    # 1. Sending the highest known job and host ids. This would work for jobs:
2054    #    Newer jobs always have larger ids. Also, if a job is not assigned to a
2055    #    particular shard during a heartbeat, it never will be assigned to this
2056    #    shard later.
2057    #    This is not true for hosts though: A host that is leased won't be sent
2058    #    to the shard now, but might be sent in a future heartbeat. This means
2059    #    sometimes hosts should be transfered that have a lower id than the
2060    #    maximum host id the shard knows.
2061    # 2. Send the number of jobs/hosts the shard knows to the master in each
2062    #    heartbeat. Compare these to the number of records that already have
2063    #    the shard_id set to this shard. In the normal case, they should match.
2064    #    In case they don't, resend all entities of that type.
2065    #    This would work well for hosts, because there aren't that many.
2066    #    Resending all jobs is quite a big overhead though.
2067    #    Also, this approach might run into edge cases when entities are
2068    #    ever deleted.
2069    # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
2070    #    Using two different approaches isn't consistent and might cause
2071    #    confusion. Also the issues with the case of deletions might still
2072    #    occur.
2073    #
2074    # The overhead of sending all job and host ids in every heartbeat is low:
2075    # At peaks one board has about 1200 created but unfinished jobs.
2076    # See the numbers here: http://goo.gl/gQCGWH
2077    # Assuming that job id's have 6 digits and that json serialization takes a
2078    # comma and a space as overhead, the traffic per id sent is about 8 bytes.
2079    # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
2080    # A NOT IN query with 5000 ids took about 30ms in tests made.
2081    # These numbers seem low enough to outweigh the disadvantages of the
2082    # solutions described above.
2083    shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
2084    rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
2085    assert len(known_host_ids) == len(known_host_statuses)
2086    for i in range(len(known_host_ids)):
2087        host_model = models.Host.objects.get(pk=known_host_ids[i])
2088        if host_model.status != known_host_statuses[i]:
2089            host_model.status = known_host_statuses[i]
2090            host_model.save()
2091
2092    hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard(
2093            shard_obj, known_job_ids=known_job_ids,
2094            known_host_ids=known_host_ids)
2095    return {
2096        'hosts': [host.serialize() for host in hosts],
2097        'jobs': [job.serialize() for job in jobs],
2098        'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
2099        'incorrect_host_ids': [int(i) for i in inc_ids],
2100    }
2101
2102
2103def get_shards(**filter_data):
2104    """Return a list of all shards.
2105
2106    @returns A sequence of nested dictionaries of shard information.
2107    """
2108    shards = models.Shard.query_objects(filter_data)
2109    serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
2110    for serialized, shard in zip(serialized_shards, shards):
2111        serialized['labels'] = [label.name for label in shard.labels.all()]
2112
2113    return serialized_shards
2114
2115
2116def _assign_board_to_shard_precheck(labels):
2117    """Verify whether board labels are valid to be added to a given shard.
2118
2119    First check whether board label is in correct format. Second, check whether
2120    the board label exist. Third, check whether the board has already been
2121    assigned to shard.
2122
2123    @param labels: Board labels separated by comma.
2124
2125    @raises error.RPCException: If label provided doesn't start with `board:`
2126            or board has been added to shard already.
2127    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2128
2129    @returns: A list of label models that ready to be added to shard.
2130    """
2131    if not labels:
2132      # allow creation of label-less shards (labels='' would otherwise fail the
2133      # checks below)
2134      return []
2135    labels = labels.split(',')
2136    label_models = []
2137    for label in labels:
2138        # Check whether the board label is in correct format.
2139        if not label.startswith('board:'):
2140            raise error.RPCException('Sharding only supports `board:.*` label.')
2141        # Check whether the board label exist. If not, exception will be thrown
2142        # by smart_get function.
2143        label = models.Label.smart_get(label)
2144        # Check whether the board has been sharded already
2145        try:
2146            shard = models.Shard.objects.get(labels=label)
2147            raise error.RPCException(
2148                    '%s is already on shard %s' % (label, shard.hostname))
2149        except models.Shard.DoesNotExist:
2150            # board is not on any shard, so it's valid.
2151            label_models.append(label)
2152    return label_models
2153
2154
2155def add_shard(hostname, labels):
2156    """Add a shard and start running jobs on it.
2157
2158    @param hostname: The hostname of the shard to be added; needs to be unique.
2159    @param labels: Board labels separated by comma. Jobs of one of the labels
2160                   will be assigned to the shard.
2161
2162    @raises error.RPCException: If label provided doesn't start with `board:` or
2163            board has been added to shard already.
2164    @raises model_logic.ValidationError: If a shard with the given hostname
2165            already exist.
2166    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2167
2168    @returns: The id of the added shard.
2169    """
2170    labels = _assign_board_to_shard_precheck(labels)
2171    shard = models.Shard.add_object(hostname=hostname)
2172    for label in labels:
2173        shard.labels.add(label)
2174    return shard.id
2175
2176
2177def add_board_to_shard(hostname, labels):
2178    """Add boards to a given shard
2179
2180    @param hostname: The hostname of the shard to be changed.
2181    @param labels: Board labels separated by comma.
2182
2183    @raises error.RPCException: If label provided doesn't start with `board:` or
2184            board has been added to shard already.
2185    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2186
2187    @returns: The id of the changed shard.
2188    """
2189    labels = _assign_board_to_shard_precheck(labels)
2190    shard = models.Shard.objects.get(hostname=hostname)
2191    for label in labels:
2192        shard.labels.add(label)
2193    return shard.id
2194
2195
2196# Remove board RPCs are rare, so we can afford to make them a bit more
2197# expensive (by performing in a transaction) in order to guarantee
2198# atomicity.
2199# TODO(akeshet): If we ever update to newer version of django, we need to
2200# migrate to transaction.atomic instead of commit_on_success
2201@transaction.commit_on_success
2202def remove_board_from_shard(hostname, label):
2203    """Remove board from the given shard.
2204    @param hostname: The hostname of the shard to be changed.
2205    @param labels: Board label.
2206
2207    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2208
2209    @returns: The id of the changed shard.
2210    """
2211    shard = models.Shard.objects.get(hostname=hostname)
2212    label = models.Label.smart_get(label)
2213    if label not in shard.labels.all():
2214        raise error.RPCException(
2215          'Cannot remove label from shard that does not belong to it.')
2216
2217    shard.labels.remove(label)
2218    if label.is_replaced_by_static():
2219        static_label = models.StaticLabel.smart_get(label.name)
2220        models.Host.objects.filter(
2221                static_labels__in=[static_label]).update(shard=None)
2222    else:
2223        models.Host.objects.filter(labels__in=[label]).update(shard=None)
2224
2225
2226def delete_shard(hostname):
2227    """Delete a shard and reclaim all resources from it.
2228
2229    This claims back all assigned hosts from the shard.
2230    """
2231    shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
2232
2233    # Remove shard information.
2234    models.Host.objects.filter(shard=shard).update(shard=None)
2235
2236    # Note: The original job-cleanup query was performed with django call
2237    #   models.Job.objects.filter(shard=shard).update(shard=None)
2238    #
2239    # But that started becoming unreliable due to the large size of afe_jobs.
2240    #
2241    # We don't need atomicity here, so the new cleanup method is iterative, in
2242    # chunks of 100k jobs.
2243    QUERY = ('UPDATE afe_jobs SET shard_id = NULL WHERE shard_id = %s '
2244             'LIMIT 100000')
2245    try:
2246        with contextlib.closing(db_connection.cursor()) as cursor:
2247            clear_jobs = True
2248            assert shard.id is not None
2249            while clear_jobs:
2250                cursor.execute(QUERY % shard.id)
2251                clear_jobs = bool(cursor.fetchone())
2252    # Unit tests use sqlite backend instead of MySQL. sqlite does not support
2253    # UPDATE ... LIMIT, so fall back to the old behavior.
2254    except DatabaseError as e:
2255        if 'syntax error' in str(e):
2256            models.Job.objects.filter(shard=shard).update(shard=None)
2257        else:
2258            raise
2259
2260    shard.labels.clear()
2261    shard.delete()
2262
2263
2264def get_servers(hostname=None, role=None, status=None):
2265    """Get a list of servers with matching role and status.
2266
2267    @param hostname: FQDN of the server.
2268    @param role: Name of the server role, e.g., drone, scheduler. Default to
2269                 None to match any role.
2270    @param status: Status of the server, e.g., primary, backup, repair_required.
2271                   Default to None to match any server status.
2272
2273    @raises error.RPCException: If server database is not used.
2274    @return: A list of server names for servers with matching role and status.
2275    """
2276    if not server_manager_utils.use_server_db():
2277        raise error.RPCException('Server database is not enabled. Please try '
2278                                 'retrieve servers from global config.')
2279    servers = server_manager_utils.get_servers(hostname=hostname, role=role,
2280                                               status=status)
2281    return [s.get_details() for s in servers]
2282
2283
2284@rpc_utils.route_rpc_to_master
2285def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
2286    """Get stable version for the given board.
2287
2288    @param board: Name of the board.
2289    @param android: Unused legacy parameter.  This is maintained for the
2290            sake of clients on old branches that still pass the
2291            parameter.  TODO(jrbarnette) Remove this completely once R68
2292            drops off stable.
2293
2294    @return: Stable version of the given board. Return global configure value
2295             of CROS.stable_cros_version if stable_versinos table does not have
2296             entry of board DEFAULT.
2297    """
2298    assert not android, 'get_stable_version no longer supports `android`.'
2299    return stable_version_utils.get(board=board)
2300
2301
2302@rpc_utils.route_rpc_to_master
2303def get_all_stable_versions():
2304    """Get stable versions for all boards.
2305
2306    @return: A dictionary of board:version.
2307    """
2308    return stable_version_utils.get_all()
2309
2310
2311@rpc_utils.route_rpc_to_master
2312def set_stable_version(version, board=stable_version_utils.DEFAULT):
2313    """Modify stable version for the given board.
2314
2315    @param version: The new value of stable version for given board.
2316    @param board: Name of the board, default to value `DEFAULT`.
2317    """
2318    stable_version_utils.set(version=version, board=board)
2319
2320
2321@rpc_utils.route_rpc_to_master
2322def delete_stable_version(board):
2323    """Modify stable version for the given board.
2324
2325    Delete a stable version entry in afe_stable_versions table for a given
2326    board, so default stable version will be used.
2327
2328    @param board: Name of the board.
2329    """
2330    stable_version_utils.delete(board=board)
2331
2332
2333def get_tests_by_build(build, ignore_invalid_tests=True):
2334    """Get the tests that are available for the specified build.
2335
2336    @param build: unique name by which to refer to the image.
2337    @param ignore_invalid_tests: flag on if unparsable tests are ignored.
2338
2339    @return: A sorted list of all tests that are in the build specified.
2340    """
2341    # Collect the control files specified in this build
2342    cfile_getter = control_file_lib._initialize_control_file_getter(build)
2343    if suite_common.ENABLE_CONTROLS_IN_BATCH:
2344        control_file_info_list = cfile_getter.get_suite_info()
2345        control_file_list = control_file_info_list.keys()
2346    else:
2347        control_file_list = cfile_getter.get_control_file_list()
2348
2349    test_objects = []
2350    _id = 0
2351    for control_file_path in control_file_list:
2352        # Read and parse the control file
2353        if suite_common.ENABLE_CONTROLS_IN_BATCH:
2354            control_file = control_file_info_list[control_file_path]
2355        else:
2356            control_file = cfile_getter.get_control_file_contents(
2357                    control_file_path)
2358        try:
2359            control_obj = control_data.parse_control_string(control_file)
2360        except:
2361            logging.info('Failed to parse control file: %s', control_file_path)
2362            if not ignore_invalid_tests:
2363                raise
2364
2365        # Extract the values needed for the AFE from the control_obj.
2366        # The keys list represents attributes in the control_obj that
2367        # are required by the AFE
2368        keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
2369                'test_category', 'test_class', 'dependencies', 'run_verify',
2370                'sync_count', 'job_retries', 'path']
2371
2372        test_object = {}
2373        for key in keys:
2374            test_object[key] = getattr(control_obj, key) if hasattr(
2375                    control_obj, key) else ''
2376
2377        # Unfortunately, the AFE expects different key-names for certain
2378        # values, these must be corrected to avoid the risk of tests
2379        # being omitted by the AFE.
2380        # The 'id' is an additional value used in the AFE.
2381        # The control_data parsing does not reference 'run_reset', but it
2382        # is also used in the AFE and defaults to True.
2383        test_object['id'] = _id
2384        test_object['run_reset'] = True
2385        test_object['description'] = test_object.get('doc', '')
2386        test_object['test_time'] = test_object.get('time', 0)
2387
2388        # TODO(crbug.com/873716) DEPRECATED. Remove entirely.
2389        test_object['test_retry'] = 0
2390
2391        # Fix the test name to be consistent with the current presentation
2392        # of test names in the AFE.
2393        testpath, subname = os.path.split(control_file_path)
2394        testname = os.path.basename(testpath)
2395        subname = subname.split('.')[1:]
2396        if subname:
2397            testname = '%s:%s' % (testname, ':'.join(subname))
2398
2399        test_object['name'] = testname
2400
2401        # Correct the test path as parse_control_string sets an empty string.
2402        test_object['path'] = control_file_path
2403
2404        _id += 1
2405        test_objects.append(test_object)
2406
2407    test_objects = sorted(test_objects, key=lambda x: x.get('name'))
2408    return rpc_utils.prepare_for_serialization(test_objects)
2409
2410
2411@rpc_utils.route_rpc_to_master
2412def get_lab_health_indicators(board=None):
2413    """Get the healthy indicators for whole lab.
2414
2415    The indicators now includes:
2416    1. lab is closed or not.
2417    2. Available DUTs list for a given board.
2418    3. Devserver capacity.
2419    4. When is the next major DUT utilization (e.g. CQ is coming in 3 minutes).
2420
2421    @param board: if board is specified, a list of available DUTs will be
2422        returned for it. Otherwise, skip this indicator.
2423
2424    @returns: A healthy indicator object including health info.
2425    """
2426    return LabHealthIndicator(None, None, None, None)
2427