1# pylint: disable=missing-docstring
2
3"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out.  Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
14        one of these statuses, an email will be sent to the job's email_list.
15        comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime
22import itertools
23import logging
24import re
25import time
26import weakref
27
28from autotest_lib.client.common_lib import global_config, host_protections
29from autotest_lib.client.common_lib import time_utils
30from autotest_lib.client.common_lib import utils
31from autotest_lib.client.common_lib.cros.graphite import autotest_es
32from autotest_lib.frontend.afe import models, model_attributes
33from autotest_lib.scheduler import drone_manager, email_manager
34from autotest_lib.scheduler import rdb_lib
35from autotest_lib.scheduler import scheduler_config
36from autotest_lib.scheduler import scheduler_lib
37from autotest_lib.server import afe_urls
38from autotest_lib.server.cros import provision
39
40try:
41    from chromite.lib import metrics
42except ImportError:
43    metrics = utils.metrics_mock
44
45
46_notify_email_statuses = []
47_base_url = None
48
49_db = None
50_drone_manager = None
51
52def initialize():
53    global _db
54    _db = scheduler_lib.ConnectionManager().get_connection()
55
56    notify_statuses_list = global_config.global_config.get_config_value(
57            scheduler_config.CONFIG_SECTION, "notify_email_statuses",
58            default='')
59    global _notify_email_statuses
60    _notify_email_statuses = [status for status in
61                              re.split(r'[\s,;:]', notify_statuses_list.lower())
62                              if status]
63
64    # AUTOTEST_WEB.base_url is still a supported config option as some people
65    # may wish to override the entire url.
66    global _base_url
67    config_base_url = global_config.global_config.get_config_value(
68            scheduler_config.CONFIG_SECTION, 'base_url', default='')
69    if config_base_url:
70        _base_url = config_base_url
71    else:
72        _base_url = afe_urls.ROOT_URL
73
74    initialize_globals()
75
76
77def initialize_globals():
78    global _drone_manager
79    _drone_manager = drone_manager.instance()
80
81
82def get_job_metadata(job):
83    """Get a dictionary of the job information.
84
85    The return value is a dictionary that includes job information like id,
86    name and parent job information. The value will be stored in metadata
87    database.
88
89    @param job: A Job object.
90    @return: A dictionary containing the job id, owner and name.
91    """
92    if not job:
93        logging.error('Job is None, no metadata returned.')
94        return {}
95    try:
96        return {'job_id': job.id,
97                'owner': job.owner,
98                'job_name': job.name,
99                'parent_job_id': job.parent_job_id}
100    except AttributeError as e:
101        logging.error('Job has missing attribute: %s', e)
102        return {}
103
104
105class DelayedCallTask(object):
106    """
107    A task object like AgentTask for an Agent to run that waits for the
108    specified amount of time to have elapsed before calling the supplied
109    callback once and finishing.  If the callback returns anything, it is
110    assumed to be a new Agent instance and will be added to the dispatcher.
111
112    @attribute end_time: The absolute posix time after which this task will
113            call its callback when it is polled and be finished.
114
115    Also has all attributes required by the Agent class.
116    """
117    def __init__(self, delay_seconds, callback, now_func=None):
118        """
119        @param delay_seconds: The delay in seconds from now that this task
120                will call the supplied callback and be done.
121        @param callback: A callable to be called by this task once after at
122                least delay_seconds time has elapsed.  It must return None
123                or a new Agent instance.
124        @param now_func: A time.time like function.  Default: time.time.
125                Used for testing.
126        """
127        assert delay_seconds > 0
128        assert callable(callback)
129        if not now_func:
130            now_func = time.time
131        self._now_func = now_func
132        self._callback = callback
133
134        self.end_time = self._now_func() + delay_seconds
135
136        # These attributes are required by Agent.
137        self.aborted = False
138        self.host_ids = ()
139        self.success = False
140        self.queue_entry_ids = ()
141        self.num_processes = 0
142
143
144    def poll(self):
145        if not self.is_done() and self._now_func() >= self.end_time:
146            self._callback()
147            self.success = True
148
149
150    def is_done(self):
151        return self.success or self.aborted
152
153
154    def abort(self):
155        self.aborted = True
156
157
158class DBError(Exception):
159    """Raised by the DBObject constructor when its select fails."""
160
161
162class DBObject(object):
163    """A miniature object relational model for the database."""
164
165    # Subclasses MUST override these:
166    _table_name = ''
167    _fields = ()
168
169    # A mapping from (type, id) to the instance of the object for that
170    # particular id.  This prevents us from creating new Job() and Host()
171    # instances for every HostQueueEntry object that we instantiate as
172    # multiple HQEs often share the same Job.
173    _instances_by_type_and_id = weakref.WeakValueDictionary()
174    _initialized = False
175
176
177    def __new__(cls, id=None, **kwargs):
178        """
179        Look to see if we already have an instance for this particular type
180        and id.  If so, use it instead of creating a duplicate instance.
181        """
182        if id is not None:
183            instance = cls._instances_by_type_and_id.get((cls, id))
184            if instance:
185                return instance
186        return super(DBObject, cls).__new__(cls, id=id, **kwargs)
187
188
189    def __init__(self, id=None, row=None, new_record=False, always_query=True):
190        assert bool(id) or bool(row)
191        if id is not None and row is not None:
192            assert id == row[0]
193        assert self._table_name, '_table_name must be defined in your class'
194        assert self._fields, '_fields must be defined in your class'
195        if not new_record:
196            if self._initialized and not always_query:
197                return  # We've already been initialized.
198            if id is None:
199                id = row[0]
200            # Tell future constructors to use us instead of re-querying while
201            # this instance is still around.
202            self._instances_by_type_and_id[(type(self), id)] = self
203
204        self.__table = self._table_name
205
206        self.__new_record = new_record
207
208        if row is None:
209            row = self._fetch_row_from_db(id)
210
211        if self._initialized:
212            differences = self._compare_fields_in_row(row)
213            if differences:
214                logging.warning(
215                    'initialized %s %s instance requery is updating: %s',
216                    type(self), self.id, differences)
217        self._update_fields_from_row(row)
218        self._initialized = True
219
220
221    @classmethod
222    def _clear_instance_cache(cls):
223        """Used for testing, clear the internal instance cache."""
224        cls._instances_by_type_and_id.clear()
225
226
227    def _fetch_row_from_db(self, row_id):
228        sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
229        rows = _db.execute(sql, (row_id,))
230        if not rows:
231            raise DBError("row not found (table=%s, row id=%s)"
232                          % (self.__table, row_id))
233        return rows[0]
234
235
236    def _assert_row_length(self, row):
237        assert len(row) == len(self._fields), (
238            "table = %s, row = %s/%d, fields = %s/%d" % (
239            self.__table, row, len(row), self._fields, len(self._fields)))
240
241
242    def _compare_fields_in_row(self, row):
243        """
244        Given a row as returned by a SELECT query, compare it to our existing in
245        memory fields.  Fractional seconds are stripped from datetime values
246        before comparison.
247
248        @param row - A sequence of values corresponding to fields named in
249                The class attribute _fields.
250
251        @returns A dictionary listing the differences keyed by field name
252                containing tuples of (current_value, row_value).
253        """
254        self._assert_row_length(row)
255        differences = {}
256        for field, row_value in itertools.izip(self._fields, row):
257            current_value = getattr(self, field)
258            if (isinstance(current_value, datetime.datetime)
259                and isinstance(row_value, datetime.datetime)):
260                current_value = current_value.strftime(time_utils.TIME_FMT)
261                row_value = row_value.strftime(time_utils.TIME_FMT)
262            if current_value != row_value:
263                differences[field] = (current_value, row_value)
264        return differences
265
266
267    def _update_fields_from_row(self, row):
268        """
269        Update our field attributes using a single row returned by SELECT.
270
271        @param row - A sequence of values corresponding to fields named in
272                the class fields list.
273        """
274        self._assert_row_length(row)
275
276        self._valid_fields = set()
277        for field, value in itertools.izip(self._fields, row):
278            setattr(self, field, value)
279            self._valid_fields.add(field)
280
281        self._valid_fields.remove('id')
282
283
284    def update_from_database(self):
285        assert self.id is not None
286        row = self._fetch_row_from_db(self.id)
287        self._update_fields_from_row(row)
288
289
290    def count(self, where, table = None):
291        if not table:
292            table = self.__table
293
294        rows = _db.execute("""
295                SELECT count(*) FROM %s
296                WHERE %s
297        """ % (table, where))
298
299        assert len(rows) == 1
300
301        return int(rows[0][0])
302
303
304    def update_field(self, field, value):
305        assert field in self._valid_fields
306
307        if getattr(self, field) == value:
308            return
309
310        query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
311        _db.execute(query, (value, self.id))
312
313        setattr(self, field, value)
314
315
316    def save(self):
317        if self.__new_record:
318            keys = self._fields[1:] # avoid id
319            columns = ','.join([str(key) for key in keys])
320            values = []
321            for key in keys:
322                value = getattr(self, key)
323                if value is None:
324                    values.append('NULL')
325                else:
326                    values.append('"%s"' % value)
327            values_str = ','.join(values)
328            query = ('INSERT INTO %s (%s) VALUES (%s)' %
329                     (self.__table, columns, values_str))
330            _db.execute(query)
331            # Update our id to the one the database just assigned to us.
332            self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
333
334
335    def delete(self):
336        self._instances_by_type_and_id.pop((type(self), id), None)
337        self._initialized = False
338        self._valid_fields.clear()
339        query = 'DELETE FROM %s WHERE id=%%s' % self.__table
340        _db.execute(query, (self.id,))
341
342
343    @staticmethod
344    def _prefix_with(string, prefix):
345        if string:
346            string = prefix + string
347        return string
348
349
350    @classmethod
351    def fetch(cls, where='', params=(), joins='', order_by=''):
352        """
353        Construct instances of our class based on the given database query.
354
355        @yields One class instance for each row fetched.
356        """
357        order_by = cls._prefix_with(order_by, 'ORDER BY ')
358        where = cls._prefix_with(where, 'WHERE ')
359        query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
360                 '%(where)s %(order_by)s' % {'table' : cls._table_name,
361                                             'joins' : joins,
362                                             'where' : where,
363                                             'order_by' : order_by})
364        rows = _db.execute(query, params)
365        return [cls(id=row[0], row=row) for row in rows]
366
367
368class IneligibleHostQueue(DBObject):
369    _table_name = 'afe_ineligible_host_queues'
370    _fields = ('id', 'job_id', 'host_id')
371
372
373class AtomicGroup(DBObject):
374    _table_name = 'afe_atomic_groups'
375    _fields = ('id', 'name', 'description', 'max_number_of_machines',
376               'invalid')
377
378
379class Label(DBObject):
380    _table_name = 'afe_labels'
381    _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
382               'only_if_needed', 'atomic_group_id')
383
384
385    def __repr__(self):
386        return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
387                self.name, self.id, self.atomic_group_id)
388
389
390class Host(DBObject):
391    _table_name = 'afe_hosts'
392    # TODO(ayatane): synch_id is not used, remove after fixing DB.
393    _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
394               'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
395               'leased', 'shard_id', 'lock_reason')
396
397
398    def set_status(self,status):
399        logging.info('%s -> %s', self.hostname, status)
400        self.update_field('status',status)
401
402
403    def platform_and_labels(self):
404        """
405        Returns a tuple (platform_name, list_of_all_label_names).
406        """
407        rows = _db.execute("""
408                SELECT afe_labels.name, afe_labels.platform
409                FROM afe_labels
410                INNER JOIN afe_hosts_labels ON
411                        afe_labels.id = afe_hosts_labels.label_id
412                WHERE afe_hosts_labels.host_id = %s
413                ORDER BY afe_labels.name
414                """, (self.id,))
415        platform = None
416        all_labels = []
417        for label_name, is_platform in rows:
418            if is_platform:
419                platform = label_name
420            all_labels.append(label_name)
421        return platform, all_labels
422
423
424    _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
425
426
427    @classmethod
428    def cmp_for_sort(cls, a, b):
429        """
430        A comparison function for sorting Host objects by hostname.
431
432        This strips any trailing numeric digits, ignores leading 0s and
433        compares hostnames by the leading name and the trailing digits as a
434        number.  If both hostnames do not match this pattern, they are simply
435        compared as lower case strings.
436
437        Example of how hostnames will be sorted:
438
439          alice, host1, host2, host09, host010, host10, host11, yolkfolk
440
441        This hopefully satisfy most people's hostname sorting needs regardless
442        of their exact naming schemes.  Nobody sane should have both a host10
443        and host010 (but the algorithm works regardless).
444        """
445        lower_a = a.hostname.lower()
446        lower_b = b.hostname.lower()
447        match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
448        match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
449        if match_a and match_b:
450            name_a, number_a_str = match_a.groups()
451            name_b, number_b_str = match_b.groups()
452            number_a = int(number_a_str.lstrip('0'))
453            number_b = int(number_b_str.lstrip('0'))
454            result = cmp((name_a, number_a), (name_b, number_b))
455            if result == 0 and lower_a != lower_b:
456                # If they compared equal above but the lower case names are
457                # indeed different, don't report equality.  abc012 != abc12.
458                return cmp(lower_a, lower_b)
459            return result
460        else:
461            return cmp(lower_a, lower_b)
462
463
464class HostQueueEntry(DBObject):
465    _table_name = 'afe_host_queue_entries'
466    _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
467               'active', 'complete', 'deleted', 'execution_subdir',
468               'atomic_group_id', 'aborted', 'started_on', 'finished_on')
469
470    _COMPLETION_COUNT_METRIC = metrics.Counter(
471        'chromeos/autotest/scheduler/hqe_completion_count')
472
473    def __init__(self, id=None, row=None, **kwargs):
474        assert id or row
475        super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
476        self.job = Job(self.job_id)
477
478        if self.host_id:
479            self.host = rdb_lib.get_hosts([self.host_id])[0]
480            self.host.dbg_str = self.get_dbg_str()
481            self.host.metadata = get_job_metadata(self.job)
482        else:
483            self.host = None
484
485
486    @classmethod
487    def clone(cls, template):
488        """
489        Creates a new row using the values from a template instance.
490
491        The new instance will not exist in the database or have a valid
492        id attribute until its save() method is called.
493        """
494        assert isinstance(template, cls)
495        new_row = [getattr(template, field) for field in cls._fields]
496        clone = cls(row=new_row, new_record=True)
497        clone.id = None
498        return clone
499
500
501    def _view_job_url(self):
502        return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
503
504
505    def get_labels(self):
506        """
507        Get all labels associated with this host queue entry (either via the
508        meta_host or as a job dependency label).  The labels yielded are not
509        guaranteed to be unique.
510
511        @yields Label instances associated with this host_queue_entry.
512        """
513        if self.meta_host:
514            yield Label(id=self.meta_host, always_query=False)
515        labels = Label.fetch(
516                joins="JOIN afe_jobs_dependency_labels AS deps "
517                      "ON (afe_labels.id = deps.label_id)",
518                where="deps.job_id = %d" % self.job.id)
519        for label in labels:
520            yield label
521
522
523    def set_host(self, host):
524        if host:
525            logging.info('Assigning host %s to entry %s', host.hostname, self)
526            self.update_field('host_id', host.id)
527            self.block_host(host.id)
528        else:
529            logging.info('Releasing host from %s', self)
530            self.unblock_host(self.host.id)
531            self.update_field('host_id', None)
532
533        self.host = host
534
535
536    def block_host(self, host_id):
537        logging.info("creating block %s/%s", self.job.id, host_id)
538        row = [0, self.job.id, host_id]
539        block = IneligibleHostQueue(row=row, new_record=True)
540        block.save()
541
542
543    def unblock_host(self, host_id):
544        logging.info("removing block %s/%s", self.job.id, host_id)
545        blocks = IneligibleHostQueue.fetch(
546            'job_id=%d and host_id=%d' % (self.job.id, host_id))
547        for block in blocks:
548            block.delete()
549
550
551    def set_execution_subdir(self, subdir=None):
552        if subdir is None:
553            assert self.host
554            subdir = self.host.hostname
555        self.update_field('execution_subdir', subdir)
556
557
558    def _get_hostname(self):
559        if self.host:
560            return self.host.hostname
561        return 'no host'
562
563
564    def get_dbg_str(self):
565        """Get a debug string to identify this host.
566
567        @return: A string containing the hqe and job id.
568        """
569        try:
570            return 'HQE: %s, for job: %s' % (self.id, self.job_id)
571        except AttributeError as e:
572            return 'HQE has not been initialized yet: %s' % e
573
574
575    def __str__(self):
576        flags = []
577        if self.active:
578            flags.append('active')
579        if self.complete:
580            flags.append('complete')
581        if self.deleted:
582            flags.append('deleted')
583        if self.aborted:
584            flags.append('aborted')
585        flags_str = ','.join(flags)
586        if flags_str:
587            flags_str = ' [%s]' % flags_str
588        return ("%s and host: %s has status:%s%s" %
589                (self.get_dbg_str(), self._get_hostname(), self.status,
590                 flags_str))
591
592
593    def record_state(self, type_str, state, value):
594        """Record metadata in elasticsearch.
595
596        If ES configured to use http, then we will time that http request.
597        Otherwise, it uses UDP, so we will not need to time it.
598
599        @param type_str: sets the _type field in elasticsearch db.
600        @param state: string representing what state we are recording,
601                      e.g. 'status'
602        @param value: value of the state, e.g. 'verifying'
603        """
604        metadata = {
605            'time_changed': time.time(),
606             state: value,
607            'job_id': self.job_id,
608        }
609        if self.host:
610            metadata['hostname'] = self.host.hostname
611        autotest_es.post(type_str=type_str, metadata=metadata)
612
613
614    def set_status(self, status):
615        logging.info("%s -> %s", self, status)
616
617        self.update_field('status', status)
618
619        active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
620        complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
621        assert not (active and complete)
622
623        self.update_field('active', active)
624
625        # The ordering of these operations is important. Once we set the
626        # complete bit this job will become indistinguishable from all
627        # the other complete jobs, unless we first set shard_id to NULL
628        # to signal to the shard_client that we need to upload it. However,
629        # we can only set both these after we've updated finished_on etc
630        # within _on_complete or the job will get synced in an intermediate
631        # state. This means that if someone sigkills the scheduler between
632        # setting finished_on and complete, we will have inconsistent jobs.
633        # This should be fine, because nothing critical checks finished_on,
634        # and the scheduler should never be killed mid-tick.
635        if complete:
636            self._on_complete(status)
637            self._email_on_job_complete()
638
639        self.update_field('complete', complete)
640
641        should_email_status = (status.lower() in _notify_email_statuses or
642                               'all' in _notify_email_statuses)
643        if should_email_status:
644            self._email_on_status(status)
645        logging.debug('HQE Set Status Complete')
646        self.record_state('hqe_status', 'status', status)
647
648
649    def _on_complete(self, status):
650        metric_fields = {'status': status.lower()}
651        if self.host:
652            metric_fields['board'] = self.host.board or ''
653            if len(self.host.pools) == 1:
654                metric_fields['pool'] = self.host.pools[0]
655            else:
656                metric_fields['pool'] = 'MULTIPLE'
657        else:
658            metric_fields['board'] = 'NO_HOST'
659            metric_fields['pool'] = 'NO_HOST'
660        self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
661        if status is not models.HostQueueEntry.Status.ABORTED:
662            self.job.stop_if_necessary()
663        if self.started_on:
664            self.set_finished_on_now()
665        if self.job.shard_id is not None:
666            # If shard_id is None, the job will be synced back to the master
667            self.job.update_field('shard_id', None)
668        if not self.execution_subdir:
669            return
670        # unregister any possible pidfiles associated with this queue entry
671        for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
672            pidfile_id = _drone_manager.get_pidfile_id_from(
673                    self.execution_path(), pidfile_name=pidfile_name)
674            _drone_manager.unregister_pidfile(pidfile_id)
675
676
677    def _get_status_email_contents(self, status, summary=None, hostname=None):
678        """
679        Gather info for the status notification e-mails.
680
681        If needed, we could start using the Django templating engine to create
682        the subject and the e-mail body, but that doesn't seem necessary right
683        now.
684
685        @param status: Job status text. Mandatory.
686        @param summary: Job summary text. Optional.
687        @param hostname: A hostname for the job. Optional.
688
689        @return: Tuple (subject, body) for the notification e-mail.
690        """
691        job_stats = Job(id=self.job.id).get_execution_details()
692
693        subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
694                   (self.job.id, self.job.name, status))
695
696        if hostname is not None:
697            subject += '| Hostname: %s ' % hostname
698
699        if status not in ["1 Failed", "Failed"]:
700            subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
701
702        body =  "Job ID: %s\n" % self.job.id
703        body += "Job name: %s\n" % self.job.name
704        if hostname is not None:
705            body += "Host: %s\n" % hostname
706        if summary is not None:
707            body += "Summary: %s\n" % summary
708        body += "Status: %s\n" % status
709        body += "Results interface URL: %s\n" % self._view_job_url()
710        body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
711        if int(job_stats['total_executed']) > 0:
712            body += "User tests executed: %s\n" % job_stats['total_executed']
713            body += "User tests passed: %s\n" % job_stats['total_passed']
714            body += "User tests failed: %s\n" % job_stats['total_failed']
715            body += ("User tests success rate: %.2f %%\n" %
716                     job_stats['success_rate'])
717
718        if job_stats['failed_rows']:
719            body += "Failures:\n"
720            body += job_stats['failed_rows']
721
722        return subject, body
723
724
725    def _email_on_status(self, status):
726        hostname = self._get_hostname()
727        subject, body = self._get_status_email_contents(status, None, hostname)
728        email_manager.manager.send_email(self.job.email_list, subject, body)
729
730
731    def _email_on_job_complete(self):
732        if not self.job.is_finished():
733            return
734
735        summary = []
736        hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
737        for queue_entry in hosts_queue:
738            summary.append("Host: %s Status: %s" %
739                                (queue_entry._get_hostname(),
740                                 queue_entry.status))
741
742        summary = "\n".join(summary)
743        status_counts = models.Job.objects.get_status_counts(
744                [self.job.id])[self.job.id]
745        status = ', '.join('%d %s' % (count, status) for status, count
746                    in status_counts.iteritems())
747
748        subject, body = self._get_status_email_contents(status, summary, None)
749        email_manager.manager.send_email(self.job.email_list, subject, body)
750
751
752    def schedule_pre_job_tasks(self):
753        logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
754                     self.job.name, self.meta_host, self.atomic_group_id,
755                     self.job.id, self.id, self.host.hostname, self.status)
756
757        self._do_schedule_pre_job_tasks()
758
759
760    def _do_schedule_pre_job_tasks(self):
761        self.job.schedule_pre_job_tasks(queue_entry=self)
762
763
764    def requeue(self):
765        assert self.host
766        self.set_status(models.HostQueueEntry.Status.QUEUED)
767        self.update_field('started_on', None)
768        self.update_field('finished_on', None)
769        # verify/cleanup failure sets the execution subdir, so reset it here
770        self.set_execution_subdir('')
771        if self.meta_host:
772            self.set_host(None)
773
774
775    @property
776    def aborted_by(self):
777        self._load_abort_info()
778        return self._aborted_by
779
780
781    @property
782    def aborted_on(self):
783        self._load_abort_info()
784        return self._aborted_on
785
786
787    def _load_abort_info(self):
788        """ Fetch info about who aborted the job. """
789        if hasattr(self, "_aborted_by"):
790            return
791        rows = _db.execute("""
792                SELECT afe_users.login,
793                        afe_aborted_host_queue_entries.aborted_on
794                FROM afe_aborted_host_queue_entries
795                INNER JOIN afe_users
796                ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
797                WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
798                """, (self.id,))
799        if rows:
800            self._aborted_by, self._aborted_on = rows[0]
801        else:
802            self._aborted_by = self._aborted_on = None
803
804
805    def on_pending(self):
806        """
807        Called when an entry in a synchronous job has passed verify.  If the
808        job is ready to run, sets the entries to STARTING. Otherwise, it leaves
809        them in PENDING.
810        """
811        self.set_status(models.HostQueueEntry.Status.PENDING)
812        self.host.set_status(models.Host.Status.PENDING)
813
814        # Some debug code here: sends an email if an asynchronous job does not
815        # immediately enter Starting.
816        # TODO: Remove this once we figure out why asynchronous jobs are getting
817        # stuck in Pending.
818        self.job.run_if_ready(queue_entry=self)
819        if (self.job.synch_count == 1 and
820                self.status == models.HostQueueEntry.Status.PENDING):
821            subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
822            message = 'Asynchronous job stuck in Pending'
823            email_manager.manager.enqueue_notify_email(subject, message)
824
825
826    def abort(self, dispatcher):
827        assert self.aborted and not self.complete
828
829        Status = models.HostQueueEntry.Status
830        if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
831            # do nothing; post-job tasks will finish and then mark this entry
832            # with status "Aborted" and take care of the host
833            return
834
835        if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
836                           Status.WAITING):
837            # If hqe is in any of these status, it should not have any
838            # unfinished agent before it can be aborted.
839            agents = dispatcher.get_agents_for_entry(self)
840            # Agent with finished task can be left behind. This is added to
841            # handle the special case of aborting hostless job in STARTING
842            # status, in which the agent has only a HostlessQueueTask
843            # associated. The finished HostlessQueueTask will be cleaned up in
844            # the next tick, so it's safe to leave the agent there. Without
845            # filtering out finished agent, HQE abort won't be able to proceed.
846            assert all([agent.is_done() for agent in agents])
847            # If hqe is still in STARTING status, it may not have assigned a
848            # host yet.
849            if self.host:
850                self.host.set_status(models.Host.Status.READY)
851        elif (self.status == Status.VERIFYING or
852              self.status == Status.RESETTING):
853            models.SpecialTask.objects.create(
854                    task=models.SpecialTask.Task.CLEANUP,
855                    host=models.Host.objects.get(id=self.host.id),
856                    requested_by=self.job.owner_model())
857        elif self.status == Status.PROVISIONING:
858            models.SpecialTask.objects.create(
859                    task=models.SpecialTask.Task.REPAIR,
860                    host=models.Host.objects.get(id=self.host.id),
861                    requested_by=self.job.owner_model())
862
863        self.set_status(Status.ABORTED)
864
865
866    def execution_tag(self):
867        SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
868                               'complete!=1 AND execution_subdir="" AND '
869                               'status!="Queued";')
870        SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
871                                 'status="Aborted" WHERE id=%s;')
872        try:
873            assert self.execution_subdir
874        except AssertionError:
875            # TODO(scottz): Remove temporary fix/info gathering pathway for
876            # crosbug.com/31595 once issue is root caused.
877            logging.error('No execution_subdir for host queue id:%s.', self.id)
878            logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
879            for row in _db.execute(SQL_SUSPECT_ENTRIES):
880                logging.error(row)
881            logging.error('====DB DEBUG====\n')
882            fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
883            logging.error('EXECUTING: %s', fix_query)
884            _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
885            raise AssertionError(('self.execution_subdir not found. '
886                                  'See log for details.'))
887
888        return "%s/%s" % (self.job.tag(), self.execution_subdir)
889
890
891    def execution_path(self):
892        return self.execution_tag()
893
894
895    def set_started_on_now(self):
896        self.update_field('started_on', datetime.datetime.now())
897
898
899    def set_finished_on_now(self):
900        self.update_field('finished_on', datetime.datetime.now())
901
902
903    def is_hostless(self):
904        return (self.host_id is None
905                and self.meta_host is None)
906
907
908class Job(DBObject):
909    _table_name = 'afe_jobs'
910    _fields = ('id', 'owner', 'name', 'priority', 'control_file',
911               'control_type', 'created_on', 'synch_count', 'timeout',
912               'run_verify', 'email_list', 'reboot_before', 'reboot_after',
913               'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
914               'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
915               'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
916               'require_ssp')
917
918    # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
919    # all status='Pending' atomic group HQEs incase a delay was running when the
920    # scheduler was restarted and no more hosts ever successfully exit Verify.
921
922    def __init__(self, id=None, row=None, **kwargs):
923        assert id or row
924        super(Job, self).__init__(id=id, row=row, **kwargs)
925        self._owner_model = None # caches model instance of owner
926        self.update_image_path = None # path of OS image to install
927
928
929    def model(self):
930        return models.Job.objects.get(id=self.id)
931
932
933    def owner_model(self):
934        # work around the fact that the Job owner field is a string, not a
935        # foreign key
936        if not self._owner_model:
937            self._owner_model = models.User.objects.get(login=self.owner)
938        return self._owner_model
939
940
941    def tag(self):
942        return "%s-%s" % (self.id, self.owner)
943
944
945    def is_image_update_job(self):
946        """
947        Discover if the current job requires an OS update.
948
949        @return: True/False if OS should be updated before job is run.
950        """
951        # All image update jobs have the parameterized_job_id set.
952        if not self.parameterized_job_id:
953            return False
954
955        # Retrieve the ID of the ParameterizedJob this job is an instance of.
956        rows = _db.execute("""
957                SELECT test_id
958                FROM afe_parameterized_jobs
959                WHERE id = %s
960                """, (self.parameterized_job_id,))
961        if not rows:
962            return False
963        test_id = rows[0][0]
964
965        # Retrieve the ID of the known autoupdate_ParameterizedJob.
966        rows = _db.execute("""
967                SELECT id
968                FROM afe_autotests
969                WHERE name = 'autoupdate_ParameterizedJob'
970                """)
971        if not rows:
972            return False
973        update_id = rows[0][0]
974
975        # If the IDs are the same we've found an image update job.
976        if test_id == update_id:
977            # Finally, get the path to the OS image to install.
978            rows = _db.execute("""
979                    SELECT parameter_value
980                    FROM afe_parameterized_job_parameters
981                    WHERE parameterized_job_id = %s
982                    """, (self.parameterized_job_id,))
983            if rows:
984                # Save the path in update_image_path to use later as a command
985                # line parameter to autoserv.
986                self.update_image_path = rows[0][0]
987                return True
988
989        return False
990
991
992    def get_execution_details(self):
993        """
994        Get test execution details for this job.
995
996        @return: Dictionary with test execution details
997        """
998        def _find_test_jobs(rows):
999            """
1000            Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1001            Those are autotest 'internal job' tests, so they should not be
1002            counted when evaluating the test stats.
1003
1004            @param rows: List of rows (matrix) with database results.
1005            """
1006            job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1007            n_test_jobs = 0
1008            for r in rows:
1009                test_name = r[0]
1010                if job_test_pattern.match(test_name):
1011                    n_test_jobs += 1
1012
1013            return n_test_jobs
1014
1015        stats = {}
1016
1017        rows = _db.execute("""
1018                SELECT t.test, s.word, t.reason
1019                FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1020                WHERE t.job_idx = j.job_idx
1021                AND s.status_idx = t.status
1022                AND j.afe_job_id = %s
1023                ORDER BY t.reason
1024                """ % self.id)
1025
1026        failed_rows = [r for r in rows if not r[1] == 'GOOD']
1027
1028        n_test_jobs = _find_test_jobs(rows)
1029        n_test_jobs_failed = _find_test_jobs(failed_rows)
1030
1031        total_executed = len(rows) - n_test_jobs
1032        total_failed = len(failed_rows) - n_test_jobs_failed
1033
1034        if total_executed > 0:
1035            success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1036        else:
1037            success_rate = 0
1038
1039        stats['total_executed'] = total_executed
1040        stats['total_failed'] = total_failed
1041        stats['total_passed'] = total_executed - total_failed
1042        stats['success_rate'] = success_rate
1043
1044        status_header = ("Test Name", "Status", "Reason")
1045        if failed_rows:
1046            stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1047                                                          status_header)
1048        else:
1049            stats['failed_rows'] = ''
1050
1051        time_row = _db.execute("""
1052                   SELECT started_time, finished_time
1053                   FROM tko_jobs
1054                   WHERE afe_job_id = %s
1055                   """ % self.id)
1056
1057        if time_row:
1058            t_begin, t_end = time_row[0]
1059            try:
1060                delta = t_end - t_begin
1061                minutes, seconds = divmod(delta.seconds, 60)
1062                hours, minutes = divmod(minutes, 60)
1063                stats['execution_time'] = ("%02d:%02d:%02d" %
1064                                           (hours, minutes, seconds))
1065            # One of t_end or t_begin are None
1066            except TypeError:
1067                stats['execution_time'] = '(could not determine)'
1068        else:
1069            stats['execution_time'] = '(none)'
1070
1071        return stats
1072
1073
1074    def keyval_dict(self):
1075        return self.model().keyval_dict()
1076
1077
1078    def _pending_count(self):
1079        """The number of HostQueueEntries for this job in the Pending state."""
1080        pending_entries = models.HostQueueEntry.objects.filter(
1081                job=self.id, status=models.HostQueueEntry.Status.PENDING)
1082        return pending_entries.count()
1083
1084
1085    def is_ready(self):
1086        pending_count = self._pending_count()
1087        ready = (pending_count >= self.synch_count)
1088
1089        if not ready:
1090            logging.info(
1091                    'Job %s not ready: %s pending, %s required ',
1092                    self, pending_count, self.synch_count)
1093
1094        return ready
1095
1096
1097    def num_machines(self, clause = None):
1098        sql = "job_id=%s" % self.id
1099        if clause:
1100            sql += " AND (%s)" % clause
1101        return self.count(sql, table='afe_host_queue_entries')
1102
1103
1104    def num_queued(self):
1105        return self.num_machines('not complete')
1106
1107
1108    def num_active(self):
1109        return self.num_machines('active')
1110
1111
1112    def num_complete(self):
1113        return self.num_machines('complete')
1114
1115
1116    def is_finished(self):
1117        return self.num_complete() == self.num_machines()
1118
1119
1120    def _not_yet_run_entries(self, include_active=True):
1121        if include_active:
1122          statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
1123        else:
1124          statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
1125        return models.HostQueueEntry.objects.filter(job=self.id,
1126                                                    status__in=statuses)
1127
1128
1129    def _stop_all_entries(self):
1130        """Stops the job's inactive pre-job HQEs."""
1131        entries_to_stop = self._not_yet_run_entries(
1132            include_active=False)
1133        for child_entry in entries_to_stop:
1134            assert not child_entry.complete, (
1135                '%s status=%s, active=%s, complete=%s' %
1136                (child_entry.id, child_entry.status, child_entry.active,
1137                 child_entry.complete))
1138            if child_entry.status == models.HostQueueEntry.Status.PENDING:
1139                child_entry.host.status = models.Host.Status.READY
1140                child_entry.host.save()
1141            child_entry.status = models.HostQueueEntry.Status.STOPPED
1142            child_entry.save()
1143
1144
1145    def stop_if_necessary(self):
1146        not_yet_run = self._not_yet_run_entries()
1147        if not_yet_run.count() < self.synch_count:
1148            self._stop_all_entries()
1149
1150
1151    def _next_group_name(self):
1152        """@returns a directory name to use for the next host group results."""
1153        group_name = ''
1154        group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1155        query = models.HostQueueEntry.objects.filter(
1156            job=self.id).values('execution_subdir').distinct()
1157        subdirs = (entry['execution_subdir'] for entry in query)
1158        group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1159        ids = [int(match.group(1)) for match in group_matches if match]
1160        if ids:
1161            next_id = max(ids) + 1
1162        else:
1163            next_id = 0
1164        return '%sgroup%d' % (group_name, next_id)
1165
1166
1167    def get_group_entries(self, queue_entry_from_group):
1168        """
1169        @param queue_entry_from_group: A HostQueueEntry instance to find other
1170                group entries on this job for.
1171
1172        @returns A list of HostQueueEntry objects all executing this job as
1173                part of the same group as the one supplied (having the same
1174                execution_subdir).
1175        """
1176        execution_subdir = queue_entry_from_group.execution_subdir
1177        return list(HostQueueEntry.fetch(
1178            where='job_id=%s AND execution_subdir=%s',
1179            params=(self.id, execution_subdir)))
1180
1181
1182    def _should_run_cleanup(self, queue_entry):
1183        if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1184            return True
1185        elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1186            return queue_entry.host.dirty
1187        return False
1188
1189
1190    def _should_run_verify(self, queue_entry):
1191        do_not_verify = (queue_entry.host.protection ==
1192                         host_protections.Protection.DO_NOT_VERIFY)
1193        if do_not_verify:
1194            return False
1195        # If RebootBefore is set to NEVER, then we won't run reset because
1196        # we can't cleanup, so we need to weaken a Reset into a Verify.
1197        weaker_reset = (self.run_reset and
1198                self.reboot_before == model_attributes.RebootBefore.NEVER)
1199        return self.run_verify or weaker_reset
1200
1201
1202    def _should_run_reset(self, queue_entry):
1203        can_verify = (queue_entry.host.protection !=
1204                         host_protections.Protection.DO_NOT_VERIFY)
1205        can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1206        return (can_reboot and can_verify and (self.run_reset or
1207                (self._should_run_cleanup(queue_entry) and
1208                 self._should_run_verify(queue_entry))))
1209
1210
1211    def _should_run_provision(self, queue_entry):
1212        """
1213        Determine if the queue_entry needs to have a provision task run before
1214        it to provision queue_entry.host.
1215
1216        @param queue_entry: The host queue entry in question.
1217        @returns: True if we should schedule a provision task, False otherwise.
1218
1219        """
1220        # If we get to this point, it means that the scheduler has already
1221        # vetted that all the unprovisionable labels match, so we can just
1222        # find all labels on the job that aren't on the host to get the list
1223        # of what we need to provision.  (See the scheduling logic in
1224        # host_scheduler.py:is_host_eligable_for_job() where we discard all
1225        # actionable labels when assigning jobs to hosts.)
1226        job_labels = {x.name for x in queue_entry.get_labels()}
1227        # Skip provision if `skip_provision` is listed in the job labels.
1228        if provision.SKIP_PROVISION in job_labels:
1229            return False
1230        _, host_labels = queue_entry.host.platform_and_labels()
1231        # If there are any labels on the job that are not on the host and they
1232        # are labels that provisioning knows how to change, then that means
1233        # there is provisioning work to do.  If there's no provisioning work to
1234        # do, then obviously we have no reason to schedule a provision task!
1235        diff = job_labels - set(host_labels)
1236        if any([provision.Provision.acts_on(x) for x in diff]):
1237            return True
1238        return False
1239
1240
1241    def _queue_special_task(self, queue_entry, task):
1242        """
1243        Create a special task and associate it with a host queue entry.
1244
1245        @param queue_entry: The queue entry this special task should be
1246                            associated with.
1247        @param task: One of the members of the enum models.SpecialTask.Task.
1248        @returns: None
1249
1250        """
1251        models.SpecialTask.objects.create(
1252                host=models.Host.objects.get(id=queue_entry.host_id),
1253                queue_entry=queue_entry, task=task)
1254
1255
1256    def schedule_pre_job_tasks(self, queue_entry):
1257        """
1258        Queue all of the special tasks that need to be run before a host
1259        queue entry may run.
1260
1261        If no special taskes need to be scheduled, then |on_pending| will be
1262        called directly.
1263
1264        @returns None
1265
1266        """
1267        task_queued = False
1268        hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1269
1270        if self._should_run_provision(queue_entry):
1271            self._queue_special_task(hqe_model,
1272                                     models.SpecialTask.Task.PROVISION)
1273            task_queued = True
1274        elif self._should_run_reset(queue_entry):
1275            self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
1276            task_queued = True
1277        else:
1278            if self._should_run_cleanup(queue_entry):
1279                self._queue_special_task(hqe_model,
1280                                         models.SpecialTask.Task.CLEANUP)
1281                task_queued = True
1282            if self._should_run_verify(queue_entry):
1283                self._queue_special_task(hqe_model,
1284                                         models.SpecialTask.Task.VERIFY)
1285                task_queued = True
1286
1287        if not task_queued:
1288            queue_entry.on_pending()
1289
1290
1291    def _assign_new_group(self, queue_entries):
1292        if len(queue_entries) == 1:
1293            group_subdir_name = queue_entries[0].host.hostname
1294        else:
1295            group_subdir_name = self._next_group_name()
1296            logging.info('Running synchronous job %d hosts %s as %s',
1297                self.id, [entry.host.hostname for entry in queue_entries],
1298                group_subdir_name)
1299
1300        for queue_entry in queue_entries:
1301            queue_entry.set_execution_subdir(group_subdir_name)
1302
1303
1304    def _choose_group_to_run(self, include_queue_entry):
1305        """
1306        @returns A tuple containing a list of HostQueueEntry instances to be
1307                used to run this Job, a string group name to suggest giving
1308                to this job in the results database.
1309        """
1310        chosen_entries = [include_queue_entry]
1311        num_entries_wanted = self.synch_count
1312        num_entries_wanted -= len(chosen_entries)
1313
1314        if num_entries_wanted > 0:
1315            where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1316            pending_entries = list(HostQueueEntry.fetch(
1317                     where=where_clause,
1318                     params=(self.id, include_queue_entry.id)))
1319
1320            # Sort the chosen hosts by hostname before slicing.
1321            def cmp_queue_entries_by_hostname(entry_a, entry_b):
1322                return Host.cmp_for_sort(entry_a.host, entry_b.host)
1323            pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1324            chosen_entries += pending_entries[:num_entries_wanted]
1325
1326        # Sanity check.  We'll only ever be called if this can be met.
1327        if len(chosen_entries) < self.synch_count:
1328            message = ('job %s got less than %s chosen entries: %s' % (
1329                    self.id, self.synch_count, chosen_entries))
1330            logging.error(message)
1331            email_manager.manager.enqueue_notify_email(
1332                    'Job not started, too few chosen entries', message)
1333            return []
1334
1335        self._assign_new_group(chosen_entries)
1336        return chosen_entries
1337
1338
1339    def run_if_ready(self, queue_entry):
1340        """
1341        Run this job by kicking its HQEs into status='Starting' if enough
1342        hosts are ready for it to run.
1343
1344        Cleans up by kicking HQEs into status='Stopped' if this Job is not
1345        ready to run.
1346        """
1347        if not self.is_ready():
1348            self.stop_if_necessary()
1349        else:
1350            self.run(queue_entry)
1351
1352
1353    def request_abort(self):
1354        """Request that this Job be aborted on the next scheduler cycle."""
1355        self.model().abort()
1356
1357
1358    def run(self, queue_entry):
1359        """
1360        @param queue_entry: The HostQueueEntry instance calling this method.
1361        """
1362        queue_entries = self._choose_group_to_run(queue_entry)
1363        if queue_entries:
1364            self._finish_run(queue_entries)
1365
1366
1367    def _finish_run(self, queue_entries):
1368        for queue_entry in queue_entries:
1369            queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1370
1371
1372    def __str__(self):
1373        return '%s-%s' % (self.id, self.owner)
1374