1# pylint: disable=missing-docstring
2
3"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out.  Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
14        one of these statuses, an email will be sent to the job's email_list.
15        comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import base64
22import datetime
23import errno
24import itertools
25import logging
26import re
27import weakref
28
29import google.protobuf.internal.well_known_types as types
30
31from autotest_lib.client.common_lib import global_config, host_protections
32from autotest_lib.client.common_lib import time_utils
33from autotest_lib.client.common_lib import utils
34from autotest_lib.frontend.afe import models, model_attributes
35from autotest_lib.scheduler import drone_manager, email_manager
36from autotest_lib.scheduler import rdb_lib
37from autotest_lib.scheduler import scheduler_config
38from autotest_lib.scheduler import scheduler_lib
39from autotest_lib.server import afe_urls
40from autotest_lib.server.cros import provision
41
42try:
43    from chromite.lib import metrics
44    from chromite.lib import cloud_trace
45except ImportError:
46    metrics = utils.metrics_mock
47    import mock
48    cloud_trace = mock.Mock()
49
50
51_notify_email_statuses = []
52_base_url = None
53
54_db = None
55_drone_manager = None
56
57RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
58        'SKYLAB', 'respect_static_labels', type=bool, default=False)
59
60
61def initialize():
62    global _db
63    _db = scheduler_lib.ConnectionManager().get_connection()
64
65    notify_statuses_list = global_config.global_config.get_config_value(
66            scheduler_config.CONFIG_SECTION, "notify_email_statuses",
67            default='')
68    global _notify_email_statuses
69    _notify_email_statuses = [status for status in
70                              re.split(r'[\s,;:]', notify_statuses_list.lower())
71                              if status]
72
73    # AUTOTEST_WEB.base_url is still a supported config option as some people
74    # may wish to override the entire url.
75    global _base_url
76    config_base_url = global_config.global_config.get_config_value(
77            scheduler_config.CONFIG_SECTION, 'base_url', default='')
78    if config_base_url:
79        _base_url = config_base_url
80    else:
81        _base_url = afe_urls.ROOT_URL
82
83    initialize_globals()
84
85
86def initialize_globals():
87    global _drone_manager
88    _drone_manager = drone_manager.instance()
89
90
91def get_job_metadata(job):
92    """Get a dictionary of the job information.
93
94    The return value is a dictionary that includes job information like id,
95    name and parent job information. The value will be stored in metadata
96    database.
97
98    @param job: A Job object.
99    @return: A dictionary containing the job id, owner and name.
100    """
101    if not job:
102        logging.error('Job is None, no metadata returned.')
103        return {}
104    try:
105        return {'job_id': job.id,
106                'owner': job.owner,
107                'job_name': job.name,
108                'parent_job_id': job.parent_job_id}
109    except AttributeError as e:
110        logging.error('Job has missing attribute: %s', e)
111        return {}
112
113
114class DBError(Exception):
115    """Raised by the DBObject constructor when its select fails."""
116
117
118class DBObject(object):
119    """A miniature object relational model for the database."""
120
121    # Subclasses MUST override these:
122    _table_name = ''
123    _fields = ()
124
125    # A mapping from (type, id) to the instance of the object for that
126    # particular id.  This prevents us from creating new Job() and Host()
127    # instances for every HostQueueEntry object that we instantiate as
128    # multiple HQEs often share the same Job.
129    _instances_by_type_and_id = weakref.WeakValueDictionary()
130    _initialized = False
131
132
133    def __new__(cls, id=None, **kwargs):
134        """
135        Look to see if we already have an instance for this particular type
136        and id.  If so, use it instead of creating a duplicate instance.
137        """
138        if id is not None:
139            instance = cls._instances_by_type_and_id.get((cls, id))
140            if instance:
141                return instance
142        return super(DBObject, cls).__new__(cls, id=id, **kwargs)
143
144
145    def __init__(self, id=None, row=None, new_record=False, always_query=True):
146        assert bool(id) or bool(row)
147        if id is not None and row is not None:
148            assert id == row[0]
149        assert self._table_name, '_table_name must be defined in your class'
150        assert self._fields, '_fields must be defined in your class'
151        if not new_record:
152            if self._initialized and not always_query:
153                return  # We've already been initialized.
154            if id is None:
155                id = row[0]
156            # Tell future constructors to use us instead of re-querying while
157            # this instance is still around.
158            self._instances_by_type_and_id[(type(self), id)] = self
159
160        self.__table = self._table_name
161
162        self.__new_record = new_record
163
164        if row is None:
165            row = self._fetch_row_from_db(id)
166
167        if self._initialized:
168            differences = self._compare_fields_in_row(row)
169            if differences:
170                logging.warning(
171                    'initialized %s %s instance requery is updating: %s',
172                    type(self), self.id, differences)
173        self._update_fields_from_row(row)
174        self._initialized = True
175
176
177    @classmethod
178    def _clear_instance_cache(cls):
179        """Used for testing, clear the internal instance cache."""
180        cls._instances_by_type_and_id.clear()
181
182
183    def _fetch_row_from_db(self, row_id):
184        fields = ', '.join(self._fields)
185        sql = 'SELECT %s FROM %s WHERE ID=%%s' % (fields, self.__table)
186        rows = _db.execute(sql, (row_id,))
187        if not rows:
188            raise DBError("row not found (table=%s, row id=%s)"
189                          % (self.__table, row_id))
190        return rows[0]
191
192
193    def _assert_row_length(self, row):
194        assert len(row) == len(self._fields), (
195            "table = %s, row = %s/%d, fields = %s/%d" % (
196            self.__table, row, len(row), self._fields, len(self._fields)))
197
198
199    def _compare_fields_in_row(self, row):
200        """
201        Given a row as returned by a SELECT query, compare it to our existing in
202        memory fields.  Fractional seconds are stripped from datetime values
203        before comparison.
204
205        @param row - A sequence of values corresponding to fields named in
206                The class attribute _fields.
207
208        @returns A dictionary listing the differences keyed by field name
209                containing tuples of (current_value, row_value).
210        """
211        self._assert_row_length(row)
212        differences = {}
213        for field, row_value in itertools.izip(self._fields, row):
214            current_value = getattr(self, field)
215            if (isinstance(current_value, datetime.datetime)
216                and isinstance(row_value, datetime.datetime)):
217                current_value = current_value.strftime(time_utils.TIME_FMT)
218                row_value = row_value.strftime(time_utils.TIME_FMT)
219            if current_value != row_value:
220                differences[field] = (current_value, row_value)
221        return differences
222
223
224    def _update_fields_from_row(self, row):
225        """
226        Update our field attributes using a single row returned by SELECT.
227
228        @param row - A sequence of values corresponding to fields named in
229                the class fields list.
230        """
231        self._assert_row_length(row)
232
233        self._valid_fields = set()
234        for field, value in itertools.izip(self._fields, row):
235            setattr(self, field, value)
236            self._valid_fields.add(field)
237
238        self._valid_fields.remove('id')
239
240
241    def update_from_database(self):
242        assert self.id is not None
243        row = self._fetch_row_from_db(self.id)
244        self._update_fields_from_row(row)
245
246
247    def count(self, where, table = None):
248        if not table:
249            table = self.__table
250
251        rows = _db.execute("""
252                SELECT count(*) FROM %s
253                WHERE %s
254        """ % (table, where))
255
256        assert len(rows) == 1
257
258        return int(rows[0][0])
259
260
261    def update_field(self, field, value):
262        assert field in self._valid_fields
263
264        if getattr(self, field) == value:
265            return
266
267        query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
268        _db.execute(query, (value, self.id))
269
270        setattr(self, field, value)
271
272
273    def save(self):
274        if self.__new_record:
275            keys = self._fields[1:] # avoid id
276            columns = ','.join([str(key) for key in keys])
277            values = []
278            for key in keys:
279                value = getattr(self, key)
280                if value is None:
281                    values.append('NULL')
282                else:
283                    values.append('"%s"' % value)
284            values_str = ','.join(values)
285            query = ('INSERT INTO %s (%s) VALUES (%s)' %
286                     (self.__table, columns, values_str))
287            _db.execute(query)
288            # Update our id to the one the database just assigned to us.
289            self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
290
291
292    def delete(self):
293        self._instances_by_type_and_id.pop((type(self), id), None)
294        self._initialized = False
295        self._valid_fields.clear()
296        query = 'DELETE FROM %s WHERE id=%%s' % self.__table
297        _db.execute(query, (self.id,))
298
299
300    @staticmethod
301    def _prefix_with(string, prefix):
302        if string:
303            string = prefix + string
304        return string
305
306
307    @classmethod
308    def fetch_rows(cls, where='', params=(), joins='', order_by=''):
309        """
310        Fetch the rows based on the given database query.
311
312        @yields the rows fetched by the given query.
313        """
314        order_by = cls._prefix_with(order_by, 'ORDER BY ')
315        where = cls._prefix_with(where, 'WHERE ')
316        fields = []
317        for field in cls._fields:
318            fields.append('%s.%s' % (cls._table_name, field))
319
320        query = ('SELECT %(fields)s FROM %(table)s %(joins)s '
321                 '%(where)s %(order_by)s' % {'fields' : ', '.join(fields),
322                                             'table' : cls._table_name,
323                                             'joins' : joins,
324                                             'where' : where,
325                                             'order_by' : order_by})
326        rows = _db.execute(query, params)
327        return rows
328
329    @classmethod
330    def fetch(cls, where='', params=(), joins='', order_by=''):
331        """
332        Construct instances of our class based on the given database query.
333
334        @yields One class instance for each row fetched.
335        """
336        rows = cls.fetch_rows(where=where, params=params, joins=joins,
337                              order_by=order_by)
338        return [cls(id=row[0], row=row) for row in rows]
339
340
341class IneligibleHostQueue(DBObject):
342    _table_name = 'afe_ineligible_host_queues'
343    _fields = ('id', 'job_id', 'host_id')
344
345
346class AtomicGroup(DBObject):
347    _table_name = 'afe_atomic_groups'
348    _fields = ('id', 'name', 'description', 'max_number_of_machines',
349               'invalid')
350
351
352class Label(DBObject):
353    _table_name = 'afe_labels'
354    _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
355               'only_if_needed', 'atomic_group_id')
356
357
358    def __repr__(self):
359        return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
360                self.name, self.id, self.atomic_group_id)
361
362
363class Host(DBObject):
364    _table_name = 'afe_hosts'
365    # TODO(ayatane): synch_id is not used, remove after fixing DB.
366    _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
367               'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
368               'leased', 'shard_id', 'lock_reason')
369
370
371    def set_status(self,status):
372        logging.info('%s -> %s', self.hostname, status)
373        self.update_field('status',status)
374
375
376    def _get_labels_with_platform(self, non_static_rows, static_rows):
377        """Helper function to fetch labels & platform for a host."""
378        if not RESPECT_STATIC_LABELS:
379            return non_static_rows
380
381        combined_rows = []
382        replaced_labels = _db.execute(
383                'SELECT label_id FROM afe_replaced_labels')
384        replaced_label_ids = {l[0] for l in replaced_labels}
385
386        # We respect afe_labels more, which means:
387        #   * if non-static labels are replaced, we find its replaced static
388        #   labels from afe_static_labels by label name.
389        #   * if non-static labels are not replaced, we keep it.
390        #   * Drop static labels which don't have reference non-static labels.
391        static_label_names = []
392        for label_id, label_name, is_platform in non_static_rows:
393            if label_id not in replaced_label_ids:
394                combined_rows.append((label_id, label_name, is_platform))
395            else:
396                static_label_names.append(label_name)
397
398        # Only keep static labels who have replaced non-static labels.
399        for label_id, label_name, is_platform in static_rows:
400            if label_name in static_label_names:
401                combined_rows.append((label_id, label_name, is_platform))
402
403        return combined_rows
404
405
406    def platform_and_labels(self):
407        """
408        Returns a tuple (platform_name, list_of_all_label_names).
409        """
410        template = ('SELECT %(label_table)s.id, %(label_table)s.name, '
411                    '%(label_table)s.platform FROM %(label_table)s INNER '
412                    'JOIN %(host_label_table)s '
413                    'ON %(label_table)s.id = %(host_label_table)s.%(column)s '
414                    'WHERE %(host_label_table)s.host_id = %(host_id)s '
415                    'ORDER BY %(label_table)s.name')
416        static_query = template % {
417                'host_label_table': 'afe_static_hosts_labels',
418                'label_table': 'afe_static_labels',
419                'column': 'staticlabel_id',
420                'host_id': self.id
421        }
422        non_static_query = template % {
423                'host_label_table': 'afe_hosts_labels',
424                'label_table': 'afe_labels',
425                'column': 'label_id',
426                'host_id': self.id
427        }
428        non_static_rows = _db.execute(non_static_query)
429        static_rows = _db.execute(static_query)
430
431        rows = self._get_labels_with_platform(non_static_rows, static_rows)
432        platform = None
433        all_labels = []
434        for _, label_name, is_platform in rows:
435            if is_platform:
436                platform = label_name
437            all_labels.append(label_name)
438        return platform, all_labels
439
440
441    _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
442
443
444    @classmethod
445    def cmp_for_sort(cls, a, b):
446        """
447        A comparison function for sorting Host objects by hostname.
448
449        This strips any trailing numeric digits, ignores leading 0s and
450        compares hostnames by the leading name and the trailing digits as a
451        number.  If both hostnames do not match this pattern, they are simply
452        compared as lower case strings.
453
454        Example of how hostnames will be sorted:
455
456          alice, host1, host2, host09, host010, host10, host11, yolkfolk
457
458        This hopefully satisfy most people's hostname sorting needs regardless
459        of their exact naming schemes.  Nobody sane should have both a host10
460        and host010 (but the algorithm works regardless).
461        """
462        lower_a = a.hostname.lower()
463        lower_b = b.hostname.lower()
464        match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
465        match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
466        if match_a and match_b:
467            name_a, number_a_str = match_a.groups()
468            name_b, number_b_str = match_b.groups()
469            number_a = int(number_a_str.lstrip('0'))
470            number_b = int(number_b_str.lstrip('0'))
471            result = cmp((name_a, number_a), (name_b, number_b))
472            if result == 0 and lower_a != lower_b:
473                # If they compared equal above but the lower case names are
474                # indeed different, don't report equality.  abc012 != abc12.
475                return cmp(lower_a, lower_b)
476            return result
477        else:
478            return cmp(lower_a, lower_b)
479
480
481class HostQueueEntry(DBObject):
482    _table_name = 'afe_host_queue_entries'
483    _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
484               'active', 'complete', 'deleted', 'execution_subdir',
485               'atomic_group_id', 'aborted', 'started_on', 'finished_on')
486
487    _COMPLETION_COUNT_METRIC = metrics.Counter(
488        'chromeos/autotest/scheduler/hqe_completion_count')
489
490    def __init__(self, id=None, row=None, job_row=None, **kwargs):
491        """
492        @param id: ID field from afe_host_queue_entries table.
493                   Either id or row should be specified for initialization.
494        @param row: The DB row for a particular HostQueueEntry.
495                    Either id or row should be specified for initialization.
496        @param job_row: The DB row for the job of this HostQueueEntry.
497        """
498        assert id or row
499        super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
500        self.job = Job(self.job_id, row=job_row)
501
502        if self.host_id:
503            self.host = rdb_lib.get_hosts([self.host_id])[0]
504            self.host.dbg_str = self.get_dbg_str()
505            self.host.metadata = get_job_metadata(self.job)
506        else:
507            self.host = None
508
509
510    @classmethod
511    def clone(cls, template):
512        """
513        Creates a new row using the values from a template instance.
514
515        The new instance will not exist in the database or have a valid
516        id attribute until its save() method is called.
517        """
518        assert isinstance(template, cls)
519        new_row = [getattr(template, field) for field in cls._fields]
520        clone = cls(row=new_row, new_record=True)
521        clone.id = None
522        return clone
523
524
525    @classmethod
526    def fetch(cls, where='', params=(), joins='', order_by=''):
527        """
528        Construct instances of our class based on the given database query.
529
530        @yields One class instance for each row fetched.
531        """
532        # Override the original fetch method to pre-fetch the jobs from the DB
533        # in order to prevent each HQE making separate DB queries.
534        rows = cls.fetch_rows(where=where, params=params, joins=joins,
535                              order_by=order_by)
536        if len(rows) <= 1:
537            return [cls(id=row[0], row=row) for row in rows]
538
539        job_params = ', '.join([str(row[1]) for row in rows])
540        job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params))
541        # Create a Job_id to Job_row match dictionary to match the HQE
542        # to its corresponding job.
543        job_dict = {job_row[0]: job_row for job_row in job_rows}
544        return [cls(id=row[0], row=row, job_row=job_dict.get(row[1]))
545                for row in rows]
546
547
548    def _view_job_url(self):
549        return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
550
551
552    def get_labels(self):
553        """
554        Get all labels associated with this host queue entry (either via the
555        meta_host or as a job dependency label).  The labels yielded are not
556        guaranteed to be unique.
557
558        @yields Label instances associated with this host_queue_entry.
559        """
560        if self.meta_host:
561            yield Label(id=self.meta_host, always_query=False)
562        labels = Label.fetch(
563                joins="JOIN afe_jobs_dependency_labels AS deps "
564                      "ON (afe_labels.id = deps.label_id)",
565                where="deps.job_id = %d" % self.job.id)
566        for label in labels:
567            yield label
568
569
570    def set_host(self, host):
571        if host:
572            logging.info('Assigning host %s to entry %s', host.hostname, self)
573            self.update_field('host_id', host.id)
574            self.block_host(host.id)
575        else:
576            logging.info('Releasing host from %s', self)
577            self.unblock_host(self.host.id)
578            self.update_field('host_id', None)
579
580        self.host = host
581
582
583    def block_host(self, host_id):
584        logging.info("creating block %s/%s", self.job.id, host_id)
585        row = [0, self.job.id, host_id]
586        block = IneligibleHostQueue(row=row, new_record=True)
587        block.save()
588
589
590    def unblock_host(self, host_id):
591        logging.info("removing block %s/%s", self.job.id, host_id)
592        blocks = IneligibleHostQueue.fetch(
593            'job_id=%d and host_id=%d' % (self.job.id, host_id))
594        for block in blocks:
595            block.delete()
596
597
598    def set_execution_subdir(self, subdir=None):
599        if subdir is None:
600            assert self.host
601            subdir = self.host.hostname
602        self.update_field('execution_subdir', subdir)
603
604
605    def _get_hostname(self):
606        if self.host:
607            return self.host.hostname
608        return 'no host'
609
610
611    def get_dbg_str(self):
612        """Get a debug string to identify this host.
613
614        @return: A string containing the hqe and job id.
615        """
616        try:
617            return 'HQE: %s, for job: %s' % (self.id, self.job_id)
618        except AttributeError as e:
619            return 'HQE has not been initialized yet: %s' % e
620
621
622    def __str__(self):
623        flags = []
624        if self.active:
625            flags.append('active')
626        if self.complete:
627            flags.append('complete')
628        if self.deleted:
629            flags.append('deleted')
630        if self.aborted:
631            flags.append('aborted')
632        flags_str = ','.join(flags)
633        if flags_str:
634            flags_str = ' [%s]' % flags_str
635        return ("%s and host: %s has status:%s%s" %
636                (self.get_dbg_str(), self._get_hostname(), self.status,
637                 flags_str))
638
639
640    def set_status(self, status):
641        logging.info("%s -> %s", self, status)
642
643        self.update_field('status', status)
644
645        active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
646        complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
647
648        self.update_field('active', active)
649
650        # The ordering of these operations is important. Once we set the
651        # complete bit this job will become indistinguishable from all
652        # the other complete jobs, unless we first set shard_id to NULL
653        # to signal to the shard_client that we need to upload it. However,
654        # we can only set both these after we've updated finished_on etc
655        # within _on_complete or the job will get synced in an intermediate
656        # state. This means that if someone sigkills the scheduler between
657        # setting finished_on and complete, we will have inconsistent jobs.
658        # This should be fine, because nothing critical checks finished_on,
659        # and the scheduler should never be killed mid-tick.
660        if complete:
661            self._on_complete(status)
662            self._email_on_job_complete()
663
664        self.update_field('complete', complete)
665
666        should_email_status = (status.lower() in _notify_email_statuses or
667                               'all' in _notify_email_statuses)
668        if should_email_status:
669            self._email_on_status(status)
670        logging.debug('HQE Set Status Complete')
671
672
673    def _on_complete(self, status):
674        metric_fields = {'status': status.lower()}
675        if self.host:
676            metric_fields['board'] = self.host.board or ''
677            if len(self.host.pools) == 1:
678                metric_fields['pool'] = self.host.pools[0]
679            else:
680                metric_fields['pool'] = 'MULTIPLE'
681        else:
682            metric_fields['board'] = 'NO_HOST'
683            metric_fields['pool'] = 'NO_HOST'
684        self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
685        if status is not models.HostQueueEntry.Status.ABORTED:
686            self.job.stop_if_necessary()
687        if self.started_on:
688            self.set_finished_on_now()
689            self._log_trace()
690        if self.job.shard_id is not None:
691            # If shard_id is None, the job will be synced back to the master
692            self.job.update_field('shard_id', None)
693        if not self.execution_subdir:
694            return
695        # unregister any possible pidfiles associated with this queue entry
696        for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
697            pidfile_id = _drone_manager.get_pidfile_id_from(
698                    self.execution_path(), pidfile_name=pidfile_name)
699            _drone_manager.unregister_pidfile(pidfile_id)
700
701    def _log_trace(self):
702        """Emits a Cloud Trace span for the HQE's duration."""
703        if self.started_on and self.finished_on:
704            span = cloud_trace.Span('HQE', spanId='0',
705                                    traceId=hqe_trace_id(self.id))
706            # TODO(phobbs) make a .SetStart() and .SetEnd() helper method
707            span.startTime = types.Timestamp()
708            span.startTime.FromDatetime(self.started_on)
709            span.endTime = types.Timestamp()
710            span.endTime.FromDatetime(self.finished_on)
711            # TODO(phobbs) any LogSpan calls need to be wrapped in this for
712            # safety during tests, so this should be caught within LogSpan.
713            try:
714                cloud_trace.LogSpan(span)
715            except IOError as e:
716                if e.errno == errno.ENOENT:
717                    logging.warning('Error writing to cloud trace results '
718                                    'directory: %s', e)
719
720
721    def _get_status_email_contents(self, status, summary=None, hostname=None):
722        """
723        Gather info for the status notification e-mails.
724
725        If needed, we could start using the Django templating engine to create
726        the subject and the e-mail body, but that doesn't seem necessary right
727        now.
728
729        @param status: Job status text. Mandatory.
730        @param summary: Job summary text. Optional.
731        @param hostname: A hostname for the job. Optional.
732
733        @return: Tuple (subject, body) for the notification e-mail.
734        """
735        job_stats = Job(id=self.job.id).get_execution_details()
736
737        subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
738                   (self.job.id, self.job.name, status))
739
740        if hostname is not None:
741            subject += '| Hostname: %s ' % hostname
742
743        if status not in ["1 Failed", "Failed"]:
744            subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
745
746        body =  "Job ID: %s\n" % self.job.id
747        body += "Job name: %s\n" % self.job.name
748        if hostname is not None:
749            body += "Host: %s\n" % hostname
750        if summary is not None:
751            body += "Summary: %s\n" % summary
752        body += "Status: %s\n" % status
753        body += "Results interface URL: %s\n" % self._view_job_url()
754        body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
755        if int(job_stats['total_executed']) > 0:
756            body += "User tests executed: %s\n" % job_stats['total_executed']
757            body += "User tests passed: %s\n" % job_stats['total_passed']
758            body += "User tests failed: %s\n" % job_stats['total_failed']
759            body += ("User tests success rate: %.2f %%\n" %
760                     job_stats['success_rate'])
761
762        if job_stats['failed_rows']:
763            body += "Failures:\n"
764            body += job_stats['failed_rows']
765
766        return subject, body
767
768
769    def _email_on_status(self, status):
770        hostname = self._get_hostname()
771        subject, body = self._get_status_email_contents(status, None, hostname)
772        email_manager.manager.send_email(self.job.email_list, subject, body)
773
774
775    def _email_on_job_complete(self):
776        if not self.job.is_finished():
777            return
778
779        summary = []
780        hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
781        for queue_entry in hosts_queue:
782            summary.append("Host: %s Status: %s" %
783                                (queue_entry._get_hostname(),
784                                 queue_entry.status))
785
786        summary = "\n".join(summary)
787        status_counts = models.Job.objects.get_status_counts(
788                [self.job.id])[self.job.id]
789        status = ', '.join('%d %s' % (count, status) for status, count
790                    in status_counts.iteritems())
791
792        subject, body = self._get_status_email_contents(status, summary, None)
793        email_manager.manager.send_email(self.job.email_list, subject, body)
794
795
796    def schedule_pre_job_tasks(self):
797        logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
798                     self.job.name, self.meta_host, self.atomic_group_id,
799                     self.job.id, self.id, self.host.hostname, self.status)
800
801        self._do_schedule_pre_job_tasks()
802
803
804    def _do_schedule_pre_job_tasks(self):
805        self.job.schedule_pre_job_tasks(queue_entry=self)
806
807
808    def requeue(self):
809        assert self.host
810        self.set_status(models.HostQueueEntry.Status.QUEUED)
811        self.update_field('started_on', None)
812        self.update_field('finished_on', None)
813        # verify/cleanup failure sets the execution subdir, so reset it here
814        self.set_execution_subdir('')
815        if self.meta_host:
816            self.set_host(None)
817
818
819    @property
820    def aborted_by(self):
821        self._load_abort_info()
822        return self._aborted_by
823
824
825    @property
826    def aborted_on(self):
827        self._load_abort_info()
828        return self._aborted_on
829
830
831    def _load_abort_info(self):
832        """ Fetch info about who aborted the job. """
833        if hasattr(self, "_aborted_by"):
834            return
835        rows = _db.execute("""
836                SELECT afe_users.login,
837                        afe_aborted_host_queue_entries.aborted_on
838                FROM afe_aborted_host_queue_entries
839                INNER JOIN afe_users
840                ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
841                WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
842                """, (self.id,))
843        if rows:
844            self._aborted_by, self._aborted_on = rows[0]
845        else:
846            self._aborted_by = self._aborted_on = None
847
848
849    def on_pending(self):
850        """
851        Called when an entry in a synchronous job has passed verify.  If the
852        job is ready to run, sets the entries to STARTING. Otherwise, it leaves
853        them in PENDING.
854        """
855        self.set_status(models.HostQueueEntry.Status.PENDING)
856        if not self.host:
857            raise scheduler_lib.NoHostIdError(
858                    'Failed to recover a job whose host_queue_entry_id=%r due'
859                    ' to no host_id.'
860                    % self.id)
861        self.host.set_status(models.Host.Status.PENDING)
862
863        # Some debug code here: sends an email if an asynchronous job does not
864        # immediately enter Starting.
865        # TODO: Remove this once we figure out why asynchronous jobs are getting
866        # stuck in Pending.
867        self.job.run_if_ready(queue_entry=self)
868        if (self.job.synch_count == 1 and
869                self.status == models.HostQueueEntry.Status.PENDING):
870            subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
871            message = 'Asynchronous job stuck in Pending'
872            email_manager.manager.enqueue_notify_email(subject, message)
873
874
875    def abort(self, dispatcher):
876        assert self.aborted and not self.complete
877
878        Status = models.HostQueueEntry.Status
879        if self.status in {Status.GATHERING, Status.PARSING}:
880            # do nothing; post-job tasks will finish and then mark this entry
881            # with status "Aborted" and take care of the host
882            return
883
884        if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
885            # If hqe is in any of these status, it should not have any
886            # unfinished agent before it can be aborted.
887            agents = dispatcher.get_agents_for_entry(self)
888            # Agent with finished task can be left behind. This is added to
889            # handle the special case of aborting hostless job in STARTING
890            # status, in which the agent has only a HostlessQueueTask
891            # associated. The finished HostlessQueueTask will be cleaned up in
892            # the next tick, so it's safe to leave the agent there. Without
893            # filtering out finished agent, HQE abort won't be able to proceed.
894            assert all([agent.is_done() for agent in agents])
895
896        if self.host:
897            if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
898                self.host.set_status(models.Host.Status.READY)
899            elif self.status in {Status.VERIFYING, Status.RESETTING}:
900                models.SpecialTask.objects.create(
901                        task=models.SpecialTask.Task.CLEANUP,
902                        host=models.Host.objects.get(id=self.host.id),
903                        requested_by=self.job.owner_model())
904            elif self.status == Status.PROVISIONING:
905                models.SpecialTask.objects.create(
906                        task=models.SpecialTask.Task.REPAIR,
907                        host=models.Host.objects.get(id=self.host.id),
908                        requested_by=self.job.owner_model())
909
910        self.set_status(Status.ABORTED)
911
912
913    def execution_tag(self):
914        SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
915                               'complete!=1 AND execution_subdir="" AND '
916                               'status!="Queued";')
917        SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
918                                 'status="Aborted" WHERE id=%s;')
919        try:
920            assert self.execution_subdir
921        except AssertionError:
922            # TODO(scottz): Remove temporary fix/info gathering pathway for
923            # crosbug.com/31595 once issue is root caused.
924            logging.error('No execution_subdir for host queue id:%s.', self.id)
925            logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
926            for row in _db.execute(SQL_SUSPECT_ENTRIES):
927                logging.error(row)
928            logging.error('====DB DEBUG====\n')
929            fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
930            logging.error('EXECUTING: %s', fix_query)
931            _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
932            raise AssertionError(('self.execution_subdir not found. '
933                                  'See log for details.'))
934
935        return "%s/%s" % (self.job.tag(), self.execution_subdir)
936
937
938    def execution_path(self):
939        return self.execution_tag()
940
941
942    def set_started_on_now(self):
943        self.update_field('started_on', datetime.datetime.now())
944
945
946    def set_finished_on_now(self):
947        self.update_field('finished_on', datetime.datetime.now())
948
949
950    def is_hostless(self):
951        return (self.host_id is None
952                and self.meta_host is None)
953
954
955def hqe_trace_id(hqe_id):
956    """Constructs the canonical trace id based on the HQE's id.
957
958    Encodes 'HQE' in base16 and concatenates with the hex representation
959    of the HQE's id.
960
961    @param hqe_id: The HostQueueEntry's id.
962
963    Returns:
964        A trace id (in hex format)
965    """
966    return base64.b16encode('HQE') + hex(hqe_id)[2:]
967
968
969class Job(DBObject):
970    _table_name = 'afe_jobs'
971    _fields = ('id', 'owner', 'name', 'priority', 'control_file',
972               'control_type', 'created_on', 'synch_count', 'timeout',
973               'run_verify', 'email_list', 'reboot_before', 'reboot_after',
974               'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
975               'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
976               'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
977               'require_ssp')
978
979    # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
980    # all status='Pending' atomic group HQEs incase a delay was running when the
981    # scheduler was restarted and no more hosts ever successfully exit Verify.
982
983    def __init__(self, id=None, row=None, **kwargs):
984        assert id or row
985        super(Job, self).__init__(id=id, row=row, **kwargs)
986        self._owner_model = None # caches model instance of owner
987        self.update_image_path = None # path of OS image to install
988
989
990    def model(self):
991        return models.Job.objects.get(id=self.id)
992
993
994    def owner_model(self):
995        # work around the fact that the Job owner field is a string, not a
996        # foreign key
997        if not self._owner_model:
998            self._owner_model = models.User.objects.get(login=self.owner)
999        return self._owner_model
1000
1001
1002    def tag(self):
1003        return "%s-%s" % (self.id, self.owner)
1004
1005
1006    def get_execution_details(self):
1007        """
1008        Get test execution details for this job.
1009
1010        @return: Dictionary with test execution details
1011        """
1012        def _find_test_jobs(rows):
1013            """
1014            Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1015            Those are autotest 'internal job' tests, so they should not be
1016            counted when evaluating the test stats.
1017
1018            @param rows: List of rows (matrix) with database results.
1019            """
1020            job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1021            n_test_jobs = 0
1022            for r in rows:
1023                test_name = r[0]
1024                if job_test_pattern.match(test_name):
1025                    n_test_jobs += 1
1026
1027            return n_test_jobs
1028
1029        stats = {}
1030
1031        rows = _db.execute("""
1032                SELECT t.test, s.word, t.reason
1033                FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1034                WHERE t.job_idx = j.job_idx
1035                AND s.status_idx = t.status
1036                AND j.afe_job_id = %s
1037                ORDER BY t.reason
1038                """ % self.id)
1039
1040        failed_rows = [r for r in rows if not r[1] == 'GOOD']
1041
1042        n_test_jobs = _find_test_jobs(rows)
1043        n_test_jobs_failed = _find_test_jobs(failed_rows)
1044
1045        total_executed = len(rows) - n_test_jobs
1046        total_failed = len(failed_rows) - n_test_jobs_failed
1047
1048        if total_executed > 0:
1049            success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1050        else:
1051            success_rate = 0
1052
1053        stats['total_executed'] = total_executed
1054        stats['total_failed'] = total_failed
1055        stats['total_passed'] = total_executed - total_failed
1056        stats['success_rate'] = success_rate
1057
1058        status_header = ("Test Name", "Status", "Reason")
1059        if failed_rows:
1060            stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1061                                                          status_header)
1062        else:
1063            stats['failed_rows'] = ''
1064
1065        time_row = _db.execute("""
1066                   SELECT started_time, finished_time
1067                   FROM tko_jobs
1068                   WHERE afe_job_id = %s
1069                   """ % self.id)
1070
1071        if time_row:
1072            t_begin, t_end = time_row[0]
1073            try:
1074                delta = t_end - t_begin
1075                minutes, seconds = divmod(delta.seconds, 60)
1076                hours, minutes = divmod(minutes, 60)
1077                stats['execution_time'] = ("%02d:%02d:%02d" %
1078                                           (hours, minutes, seconds))
1079            # One of t_end or t_begin are None
1080            except TypeError:
1081                stats['execution_time'] = '(could not determine)'
1082        else:
1083            stats['execution_time'] = '(none)'
1084
1085        return stats
1086
1087
1088    def keyval_dict(self):
1089        return self.model().keyval_dict()
1090
1091
1092    def _pending_count(self):
1093        """The number of HostQueueEntries for this job in the Pending state."""
1094        pending_entries = models.HostQueueEntry.objects.filter(
1095                job=self.id, status=models.HostQueueEntry.Status.PENDING)
1096        return pending_entries.count()
1097
1098
1099    def is_ready(self):
1100        pending_count = self._pending_count()
1101        ready = (pending_count >= self.synch_count)
1102
1103        if not ready:
1104            logging.info(
1105                    'Job %s not ready: %s pending, %s required ',
1106                    self, pending_count, self.synch_count)
1107
1108        return ready
1109
1110
1111    def num_machines(self, clause = None):
1112        sql = "job_id=%s" % self.id
1113        if clause:
1114            sql += " AND (%s)" % clause
1115        return self.count(sql, table='afe_host_queue_entries')
1116
1117
1118    def num_queued(self):
1119        return self.num_machines('not complete')
1120
1121
1122    def num_active(self):
1123        return self.num_machines('active')
1124
1125
1126    def num_complete(self):
1127        return self.num_machines('complete')
1128
1129
1130    def is_finished(self):
1131        return self.num_complete() == self.num_machines()
1132
1133
1134    def _not_yet_run_entries(self, include_active=True):
1135        if include_active:
1136          statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
1137        else:
1138          statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
1139        return models.HostQueueEntry.objects.filter(job=self.id,
1140                                                    status__in=statuses)
1141
1142
1143    def _stop_all_entries(self):
1144        """Stops the job's inactive pre-job HQEs."""
1145        entries_to_stop = self._not_yet_run_entries(
1146            include_active=False)
1147        for child_entry in entries_to_stop:
1148            assert not child_entry.complete, (
1149                '%s status=%s, active=%s, complete=%s' %
1150                (child_entry.id, child_entry.status, child_entry.active,
1151                 child_entry.complete))
1152            if child_entry.status == models.HostQueueEntry.Status.PENDING:
1153                child_entry.host.status = models.Host.Status.READY
1154                child_entry.host.save()
1155            child_entry.status = models.HostQueueEntry.Status.STOPPED
1156            child_entry.save()
1157
1158
1159    def stop_if_necessary(self):
1160        not_yet_run = self._not_yet_run_entries()
1161        if not_yet_run.count() < self.synch_count:
1162            self._stop_all_entries()
1163
1164
1165    def _next_group_name(self):
1166        """@returns a directory name to use for the next host group results."""
1167        group_count_re = re.compile(r'group(\d+)')
1168        query = models.HostQueueEntry.objects.filter(
1169            job=self.id).values('execution_subdir').distinct()
1170        subdirs = (entry['execution_subdir'] for entry in query)
1171        group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1172        ids = [int(match.group(1)) for match in group_matches if match]
1173        if ids:
1174            next_id = max(ids) + 1
1175        else:
1176            next_id = 0
1177        return 'group%d' % (next_id,)
1178
1179
1180    def get_group_entries(self, queue_entry_from_group):
1181        """
1182        @param queue_entry_from_group: A HostQueueEntry instance to find other
1183                group entries on this job for.
1184
1185        @returns A list of HostQueueEntry objects all executing this job as
1186                part of the same group as the one supplied (having the same
1187                execution_subdir).
1188        """
1189        execution_subdir = queue_entry_from_group.execution_subdir
1190        return list(HostQueueEntry.fetch(
1191            where='job_id=%s AND execution_subdir=%s',
1192            params=(self.id, execution_subdir)))
1193
1194
1195    def _should_run_cleanup(self, queue_entry):
1196        if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1197            return True
1198        elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1199            return queue_entry.host.dirty
1200        return False
1201
1202
1203    def _should_run_verify(self, queue_entry):
1204        do_not_verify = (queue_entry.host.protection ==
1205                         host_protections.Protection.DO_NOT_VERIFY)
1206        if do_not_verify:
1207            return False
1208        # If RebootBefore is set to NEVER, then we won't run reset because
1209        # we can't cleanup, so we need to weaken a Reset into a Verify.
1210        weaker_reset = (self.run_reset and
1211                self.reboot_before == model_attributes.RebootBefore.NEVER)
1212        return self.run_verify or weaker_reset
1213
1214
1215    def _should_run_reset(self, queue_entry):
1216        can_verify = (queue_entry.host.protection !=
1217                         host_protections.Protection.DO_NOT_VERIFY)
1218        can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1219        return (can_reboot and can_verify
1220                and (self.run_reset
1221                     or (self._should_run_cleanup(queue_entry)
1222                         and self._should_run_verify(queue_entry))))
1223
1224
1225    def _should_run_provision(self, queue_entry):
1226        """
1227        Determine if the queue_entry needs to have a provision task run before
1228        it to provision queue_entry.host.
1229
1230        @param queue_entry: The host queue entry in question.
1231        @returns: True if we should schedule a provision task, False otherwise.
1232
1233        """
1234        # If we get to this point, it means that the scheduler has already
1235        # vetted that all the unprovisionable labels match, so we can just
1236        # find all labels on the job that aren't on the host to get the list
1237        # of what we need to provision.  (See the scheduling logic in
1238        # host_scheduler.py:is_host_eligable_for_job() where we discard all
1239        # actionable labels when assigning jobs to hosts.)
1240        job_labels = {x.name for x in queue_entry.get_labels()}
1241        # Skip provision if `skip_provision` is listed in the job labels.
1242        if provision.SKIP_PROVISION in job_labels:
1243            return False
1244        _, host_labels = queue_entry.host.platform_and_labels()
1245        # If there are any labels on the job that are not on the host and they
1246        # are labels that provisioning knows how to change, then that means
1247        # there is provisioning work to do.  If there's no provisioning work to
1248        # do, then obviously we have no reason to schedule a provision task!
1249        diff = job_labels - set(host_labels)
1250        if any([provision.Provision.acts_on(x) for x in diff]):
1251            return True
1252        return False
1253
1254
1255    def _queue_special_task(self, queue_entry, task):
1256        """
1257        Create a special task and associate it with a host queue entry.
1258
1259        @param queue_entry: The queue entry this special task should be
1260                            associated with.
1261        @param task: One of the members of the enum models.SpecialTask.Task.
1262        @returns: None
1263
1264        """
1265        models.SpecialTask.objects.create(
1266                host=models.Host.objects.get(id=queue_entry.host_id),
1267                queue_entry=queue_entry, task=task)
1268
1269
1270    def schedule_pre_job_tasks(self, queue_entry):
1271        """
1272        Queue all of the special tasks that need to be run before a host
1273        queue entry may run.
1274
1275        If no special taskes need to be scheduled, then |on_pending| will be
1276        called directly.
1277
1278        @returns None
1279
1280        """
1281        task_queued = False
1282        hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1283
1284        if self._should_run_provision(queue_entry):
1285            self._queue_special_task(hqe_model,
1286                                     models.SpecialTask.Task.PROVISION)
1287            task_queued = True
1288        elif self._should_run_reset(queue_entry):
1289            self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
1290            task_queued = True
1291        else:
1292            if self._should_run_cleanup(queue_entry):
1293                self._queue_special_task(hqe_model,
1294                                         models.SpecialTask.Task.CLEANUP)
1295                task_queued = True
1296            if self._should_run_verify(queue_entry):
1297                self._queue_special_task(hqe_model,
1298                                         models.SpecialTask.Task.VERIFY)
1299                task_queued = True
1300
1301        if not task_queued:
1302            queue_entry.on_pending()
1303
1304
1305    def _assign_new_group(self, queue_entries):
1306        if len(queue_entries) == 1:
1307            group_subdir_name = queue_entries[0].host.hostname
1308        else:
1309            group_subdir_name = self._next_group_name()
1310            logging.info('Running synchronous job %d hosts %s as %s',
1311                self.id, [entry.host.hostname for entry in queue_entries],
1312                group_subdir_name)
1313
1314        for queue_entry in queue_entries:
1315            queue_entry.set_execution_subdir(group_subdir_name)
1316
1317
1318    def _choose_group_to_run(self, include_queue_entry):
1319        """
1320        @returns A tuple containing a list of HostQueueEntry instances to be
1321                used to run this Job, a string group name to suggest giving
1322                to this job in the results database.
1323        """
1324        chosen_entries = [include_queue_entry]
1325        num_entries_wanted = self.synch_count
1326        num_entries_wanted -= len(chosen_entries)
1327
1328        if num_entries_wanted > 0:
1329            where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1330            pending_entries = list(HostQueueEntry.fetch(
1331                     where=where_clause,
1332                     params=(self.id, include_queue_entry.id)))
1333
1334            # Sort the chosen hosts by hostname before slicing.
1335            def cmp_queue_entries_by_hostname(entry_a, entry_b):
1336                return Host.cmp_for_sort(entry_a.host, entry_b.host)
1337            pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1338            chosen_entries += pending_entries[:num_entries_wanted]
1339
1340        # Sanity check.  We'll only ever be called if this can be met.
1341        if len(chosen_entries) < self.synch_count:
1342            message = ('job %s got less than %s chosen entries: %s' % (
1343                    self.id, self.synch_count, chosen_entries))
1344            logging.error(message)
1345            email_manager.manager.enqueue_notify_email(
1346                    'Job not started, too few chosen entries', message)
1347            return []
1348
1349        self._assign_new_group(chosen_entries)
1350        return chosen_entries
1351
1352
1353    def run_if_ready(self, queue_entry):
1354        """
1355        Run this job by kicking its HQEs into status='Starting' if enough
1356        hosts are ready for it to run.
1357
1358        Cleans up by kicking HQEs into status='Stopped' if this Job is not
1359        ready to run.
1360        """
1361        if not self.is_ready():
1362            self.stop_if_necessary()
1363        else:
1364            self.run(queue_entry)
1365
1366
1367    def request_abort(self):
1368        """Request that this Job be aborted on the next scheduler cycle."""
1369        self.model().abort()
1370
1371
1372    def run(self, queue_entry):
1373        """
1374        @param queue_entry: The HostQueueEntry instance calling this method.
1375        """
1376        queue_entries = self._choose_group_to_run(queue_entry)
1377        if queue_entries:
1378            self._finish_run(queue_entries)
1379
1380
1381    def _finish_run(self, queue_entries):
1382        for queue_entry in queue_entries:
1383            queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1384
1385
1386    def __str__(self):
1387        return '%s-%s' % (self.id, self.owner)
1388