1# pylint: disable-msg=C0111
2
3"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out.  Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
14        one of these statuses, an email will be sent to the job's email_list.
15        comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
22from autotest_lib.client.common_lib import control_data
23from autotest_lib.client.common_lib import global_config, host_protections
24from autotest_lib.client.common_lib import time_utils
25from autotest_lib.client.common_lib import utils
26from autotest_lib.client.common_lib.cros.graphite import autotest_es
27from autotest_lib.client.common_lib.cros.graphite import autotest_stats
28from autotest_lib.frontend.afe import models, model_attributes
29from autotest_lib.scheduler import drone_manager, email_manager
30from autotest_lib.scheduler import rdb_lib
31from autotest_lib.scheduler import scheduler_config
32from autotest_lib.scheduler import scheduler_lib
33from autotest_lib.server.cros import provision
34
35
36_notify_email_statuses = []
37_base_url = None
38
39_db = None
40_drone_manager = None
41
42def initialize():
43    global _db
44    _db = scheduler_lib.ConnectionManager().get_connection()
45
46    notify_statuses_list = global_config.global_config.get_config_value(
47            scheduler_config.CONFIG_SECTION, "notify_email_statuses",
48            default='')
49    global _notify_email_statuses
50    _notify_email_statuses = [status for status in
51                              re.split(r'[\s,;:]', notify_statuses_list.lower())
52                              if status]
53
54    # AUTOTEST_WEB.base_url is still a supported config option as some people
55    # may wish to override the entire url.
56    global _base_url
57    config_base_url = global_config.global_config.get_config_value(
58            scheduler_config.CONFIG_SECTION, 'base_url', default='')
59    if config_base_url:
60        _base_url = config_base_url
61    else:
62        # For the common case of everything running on a single server you
63        # can just set the hostname in a single place in the config file.
64        server_name = global_config.global_config.get_config_value(
65                'SERVER', 'hostname')
66        if not server_name:
67            logging.critical('[SERVER] hostname missing from the config file.')
68            sys.exit(1)
69        _base_url = 'http://%s/afe/' % server_name
70
71    initialize_globals()
72
73
74def initialize_globals():
75    global _drone_manager
76    _drone_manager = drone_manager.instance()
77
78
79def get_job_metadata(job):
80    """Get a dictionary of the job information.
81
82    The return value is a dictionary that includes job information like id,
83    name and parent job information. The value will be stored in metadata
84    database.
85
86    @param job: A Job object.
87    @return: A dictionary containing the job id, owner and name.
88    """
89    if not job:
90        logging.error('Job is None, no metadata returned.')
91        return {}
92    try:
93        return {'job_id': job.id,
94                'owner': job.owner,
95                'job_name': job.name,
96                'parent_job_id': job.parent_job_id}
97    except AttributeError as e:
98        logging.error('Job has missing attribute: %s', e)
99        return {}
100
101
102class DelayedCallTask(object):
103    """
104    A task object like AgentTask for an Agent to run that waits for the
105    specified amount of time to have elapsed before calling the supplied
106    callback once and finishing.  If the callback returns anything, it is
107    assumed to be a new Agent instance and will be added to the dispatcher.
108
109    @attribute end_time: The absolute posix time after which this task will
110            call its callback when it is polled and be finished.
111
112    Also has all attributes required by the Agent class.
113    """
114    def __init__(self, delay_seconds, callback, now_func=None):
115        """
116        @param delay_seconds: The delay in seconds from now that this task
117                will call the supplied callback and be done.
118        @param callback: A callable to be called by this task once after at
119                least delay_seconds time has elapsed.  It must return None
120                or a new Agent instance.
121        @param now_func: A time.time like function.  Default: time.time.
122                Used for testing.
123        """
124        assert delay_seconds > 0
125        assert callable(callback)
126        if not now_func:
127            now_func = time.time
128        self._now_func = now_func
129        self._callback = callback
130
131        self.end_time = self._now_func() + delay_seconds
132
133        # These attributes are required by Agent.
134        self.aborted = False
135        self.host_ids = ()
136        self.success = False
137        self.queue_entry_ids = ()
138        self.num_processes = 0
139
140
141    def poll(self):
142        if not self.is_done() and self._now_func() >= self.end_time:
143            self._callback()
144            self.success = True
145
146
147    def is_done(self):
148        return self.success or self.aborted
149
150
151    def abort(self):
152        self.aborted = True
153
154
155class DBError(Exception):
156    """Raised by the DBObject constructor when its select fails."""
157
158
159class DBObject(object):
160    """A miniature object relational model for the database."""
161
162    # Subclasses MUST override these:
163    _table_name = ''
164    _fields = ()
165
166    # A mapping from (type, id) to the instance of the object for that
167    # particular id.  This prevents us from creating new Job() and Host()
168    # instances for every HostQueueEntry object that we instantiate as
169    # multiple HQEs often share the same Job.
170    _instances_by_type_and_id = weakref.WeakValueDictionary()
171    _initialized = False
172
173
174    def __new__(cls, id=None, **kwargs):
175        """
176        Look to see if we already have an instance for this particular type
177        and id.  If so, use it instead of creating a duplicate instance.
178        """
179        if id is not None:
180            instance = cls._instances_by_type_and_id.get((cls, id))
181            if instance:
182                return instance
183        return super(DBObject, cls).__new__(cls, id=id, **kwargs)
184
185
186    def __init__(self, id=None, row=None, new_record=False, always_query=True):
187        assert bool(id) or bool(row)
188        if id is not None and row is not None:
189            assert id == row[0]
190        assert self._table_name, '_table_name must be defined in your class'
191        assert self._fields, '_fields must be defined in your class'
192        if not new_record:
193            if self._initialized and not always_query:
194                return  # We've already been initialized.
195            if id is None:
196                id = row[0]
197            # Tell future constructors to use us instead of re-querying while
198            # this instance is still around.
199            self._instances_by_type_and_id[(type(self), id)] = self
200
201        self.__table = self._table_name
202
203        self.__new_record = new_record
204
205        if row is None:
206            row = self._fetch_row_from_db(id)
207
208        if self._initialized:
209            differences = self._compare_fields_in_row(row)
210            if differences:
211                logging.warning(
212                    'initialized %s %s instance requery is updating: %s',
213                    type(self), self.id, differences)
214        self._update_fields_from_row(row)
215        self._initialized = True
216
217
218    @classmethod
219    def _clear_instance_cache(cls):
220        """Used for testing, clear the internal instance cache."""
221        cls._instances_by_type_and_id.clear()
222
223
224    def _fetch_row_from_db(self, row_id):
225        sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
226        rows = _db.execute(sql, (row_id,))
227        if not rows:
228            raise DBError("row not found (table=%s, row id=%s)"
229                          % (self.__table, row_id))
230        return rows[0]
231
232
233    def _assert_row_length(self, row):
234        assert len(row) == len(self._fields), (
235            "table = %s, row = %s/%d, fields = %s/%d" % (
236            self.__table, row, len(row), self._fields, len(self._fields)))
237
238
239    def _compare_fields_in_row(self, row):
240        """
241        Given a row as returned by a SELECT query, compare it to our existing in
242        memory fields.  Fractional seconds are stripped from datetime values
243        before comparison.
244
245        @param row - A sequence of values corresponding to fields named in
246                The class attribute _fields.
247
248        @returns A dictionary listing the differences keyed by field name
249                containing tuples of (current_value, row_value).
250        """
251        self._assert_row_length(row)
252        differences = {}
253        for field, row_value in itertools.izip(self._fields, row):
254            current_value = getattr(self, field)
255            if (isinstance(current_value, datetime.datetime)
256                and isinstance(row_value, datetime.datetime)):
257                current_value = current_value.strftime(time_utils.TIME_FMT)
258                row_value = row_value.strftime(time_utils.TIME_FMT)
259            if current_value != row_value:
260                differences[field] = (current_value, row_value)
261        return differences
262
263
264    def _update_fields_from_row(self, row):
265        """
266        Update our field attributes using a single row returned by SELECT.
267
268        @param row - A sequence of values corresponding to fields named in
269                the class fields list.
270        """
271        self._assert_row_length(row)
272
273        self._valid_fields = set()
274        for field, value in itertools.izip(self._fields, row):
275            setattr(self, field, value)
276            self._valid_fields.add(field)
277
278        self._valid_fields.remove('id')
279
280
281    def update_from_database(self):
282        assert self.id is not None
283        row = self._fetch_row_from_db(self.id)
284        self._update_fields_from_row(row)
285
286
287    def count(self, where, table = None):
288        if not table:
289            table = self.__table
290
291        rows = _db.execute("""
292                SELECT count(*) FROM %s
293                WHERE %s
294        """ % (table, where))
295
296        assert len(rows) == 1
297
298        return int(rows[0][0])
299
300
301    def update_field(self, field, value):
302        assert field in self._valid_fields
303
304        if getattr(self, field) == value:
305            return
306
307        query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
308        _db.execute(query, (value, self.id))
309
310        setattr(self, field, value)
311
312
313    def save(self):
314        if self.__new_record:
315            keys = self._fields[1:] # avoid id
316            columns = ','.join([str(key) for key in keys])
317            values = []
318            for key in keys:
319                value = getattr(self, key)
320                if value is None:
321                    values.append('NULL')
322                else:
323                    values.append('"%s"' % value)
324            values_str = ','.join(values)
325            query = ('INSERT INTO %s (%s) VALUES (%s)' %
326                     (self.__table, columns, values_str))
327            _db.execute(query)
328            # Update our id to the one the database just assigned to us.
329            self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
330
331
332    def delete(self):
333        self._instances_by_type_and_id.pop((type(self), id), None)
334        self._initialized = False
335        self._valid_fields.clear()
336        query = 'DELETE FROM %s WHERE id=%%s' % self.__table
337        _db.execute(query, (self.id,))
338
339
340    @staticmethod
341    def _prefix_with(string, prefix):
342        if string:
343            string = prefix + string
344        return string
345
346
347    @classmethod
348    def fetch(cls, where='', params=(), joins='', order_by=''):
349        """
350        Construct instances of our class based on the given database query.
351
352        @yields One class instance for each row fetched.
353        """
354        order_by = cls._prefix_with(order_by, 'ORDER BY ')
355        where = cls._prefix_with(where, 'WHERE ')
356        query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
357                 '%(where)s %(order_by)s' % {'table' : cls._table_name,
358                                             'joins' : joins,
359                                             'where' : where,
360                                             'order_by' : order_by})
361        rows = _db.execute(query, params)
362        return [cls(id=row[0], row=row) for row in rows]
363
364
365class IneligibleHostQueue(DBObject):
366    _table_name = 'afe_ineligible_host_queues'
367    _fields = ('id', 'job_id', 'host_id')
368
369
370class AtomicGroup(DBObject):
371    _table_name = 'afe_atomic_groups'
372    _fields = ('id', 'name', 'description', 'max_number_of_machines',
373               'invalid')
374
375
376class Label(DBObject):
377    _table_name = 'afe_labels'
378    _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
379               'only_if_needed', 'atomic_group_id')
380
381
382    def __repr__(self):
383        return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
384                self.name, self.id, self.atomic_group_id)
385
386
387class Host(DBObject):
388    _table_name = 'afe_hosts'
389    _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
390               'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
391               'leased', 'shard_id', 'lock_reason')
392    _timer = autotest_stats.Timer("scheduler_models.Host")
393
394
395    @_timer.decorate
396    def set_status(self,status):
397        logging.info('%s -> %s', self.hostname, status)
398        self.update_field('status',status)
399        # Noticed some time jumps after the last log message.
400        logging.debug('Host Set Status Complete')
401
402
403    def platform_and_labels(self):
404        """
405        Returns a tuple (platform_name, list_of_all_label_names).
406        """
407        rows = _db.execute("""
408                SELECT afe_labels.name, afe_labels.platform
409                FROM afe_labels
410                INNER JOIN afe_hosts_labels ON
411                        afe_labels.id = afe_hosts_labels.label_id
412                WHERE afe_hosts_labels.host_id = %s
413                ORDER BY afe_labels.name
414                """, (self.id,))
415        platform = None
416        all_labels = []
417        for label_name, is_platform in rows:
418            if is_platform:
419                platform = label_name
420            all_labels.append(label_name)
421        return platform, all_labels
422
423
424    _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
425
426
427    @classmethod
428    def cmp_for_sort(cls, a, b):
429        """
430        A comparison function for sorting Host objects by hostname.
431
432        This strips any trailing numeric digits, ignores leading 0s and
433        compares hostnames by the leading name and the trailing digits as a
434        number.  If both hostnames do not match this pattern, they are simply
435        compared as lower case strings.
436
437        Example of how hostnames will be sorted:
438
439          alice, host1, host2, host09, host010, host10, host11, yolkfolk
440
441        This hopefully satisfy most people's hostname sorting needs regardless
442        of their exact naming schemes.  Nobody sane should have both a host10
443        and host010 (but the algorithm works regardless).
444        """
445        lower_a = a.hostname.lower()
446        lower_b = b.hostname.lower()
447        match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
448        match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
449        if match_a and match_b:
450            name_a, number_a_str = match_a.groups()
451            name_b, number_b_str = match_b.groups()
452            number_a = int(number_a_str.lstrip('0'))
453            number_b = int(number_b_str.lstrip('0'))
454            result = cmp((name_a, number_a), (name_b, number_b))
455            if result == 0 and lower_a != lower_b:
456                # If they compared equal above but the lower case names are
457                # indeed different, don't report equality.  abc012 != abc12.
458                return cmp(lower_a, lower_b)
459            return result
460        else:
461            return cmp(lower_a, lower_b)
462
463
464class HostQueueEntry(DBObject):
465    _table_name = 'afe_host_queue_entries'
466    _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
467               'active', 'complete', 'deleted', 'execution_subdir',
468               'atomic_group_id', 'aborted', 'started_on', 'finished_on')
469    _timer = autotest_stats.Timer('scheduler_models.HostQueueEntry')
470
471
472    def __init__(self, id=None, row=None, **kwargs):
473        assert id or row
474        super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
475        self.job = Job(self.job_id)
476
477        if self.host_id:
478            self.host = rdb_lib.get_hosts([self.host_id])[0]
479            self.host.dbg_str = self.get_dbg_str()
480            self.host.metadata = get_job_metadata(self.job)
481        else:
482            self.host = None
483
484        if self.atomic_group_id:
485            self.atomic_group = AtomicGroup(self.atomic_group_id,
486                                            always_query=False)
487        else:
488            self.atomic_group = None
489
490
491    @classmethod
492    def clone(cls, template):
493        """
494        Creates a new row using the values from a template instance.
495
496        The new instance will not exist in the database or have a valid
497        id attribute until its save() method is called.
498        """
499        assert isinstance(template, cls)
500        new_row = [getattr(template, field) for field in cls._fields]
501        clone = cls(row=new_row, new_record=True)
502        clone.id = None
503        return clone
504
505
506    def _view_job_url(self):
507        return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
508
509
510    def get_labels(self):
511        """
512        Get all labels associated with this host queue entry (either via the
513        meta_host or as a job dependency label).  The labels yielded are not
514        guaranteed to be unique.
515
516        @yields Label instances associated with this host_queue_entry.
517        """
518        if self.meta_host:
519            yield Label(id=self.meta_host, always_query=False)
520        labels = Label.fetch(
521                joins="JOIN afe_jobs_dependency_labels AS deps "
522                      "ON (afe_labels.id = deps.label_id)",
523                where="deps.job_id = %d" % self.job.id)
524        for label in labels:
525            yield label
526
527
528    def set_host(self, host):
529        if host:
530            logging.info('Assigning host %s to entry %s', host.hostname, self)
531            self.update_field('host_id', host.id)
532            self.block_host(host.id)
533        else:
534            logging.info('Releasing host from %s', self)
535            self.unblock_host(self.host.id)
536            self.update_field('host_id', None)
537
538        self.host = host
539
540
541    def block_host(self, host_id):
542        logging.info("creating block %s/%s", self.job.id, host_id)
543        row = [0, self.job.id, host_id]
544        block = IneligibleHostQueue(row=row, new_record=True)
545        block.save()
546
547
548    def unblock_host(self, host_id):
549        logging.info("removing block %s/%s", self.job.id, host_id)
550        blocks = IneligibleHostQueue.fetch(
551            'job_id=%d and host_id=%d' % (self.job.id, host_id))
552        for block in blocks:
553            block.delete()
554
555
556    def set_execution_subdir(self, subdir=None):
557        if subdir is None:
558            assert self.host
559            subdir = self.host.hostname
560        self.update_field('execution_subdir', subdir)
561
562
563    def _get_hostname(self):
564        if self.host:
565            return self.host.hostname
566        return 'no host'
567
568
569    def get_dbg_str(self):
570        """Get a debug string to identify this host.
571
572        @return: A string containing the hqe and job id.
573        """
574        try:
575            return 'HQE: %s, for job: %s' % (self.id, self.job_id)
576        except AttributeError as e:
577            return 'HQE has not been initialized yet: %s' % e
578
579
580    def __str__(self):
581        flags = []
582        if self.active:
583            flags.append('active')
584        if self.complete:
585            flags.append('complete')
586        if self.deleted:
587            flags.append('deleted')
588        if self.aborted:
589            flags.append('aborted')
590        flags_str = ','.join(flags)
591        if flags_str:
592            flags_str = ' [%s]' % flags_str
593        return ("%s and host: %s has status:%s%s" %
594                (self.get_dbg_str(), self._get_hostname(), self.status,
595                 flags_str))
596
597
598    def record_state(self, type_str, state, value):
599        """Record metadata in elasticsearch.
600
601        If ES configured to use http, then we will time that http request.
602        Otherwise, it uses UDP, so we will not need to time it.
603
604        @param type_str: sets the _type field in elasticsearch db.
605        @param state: string representing what state we are recording,
606                      e.g. 'status'
607        @param value: value of the state, e.g. 'verifying'
608        """
609        metadata = {
610            'time_changed': time.time(),
611             state: value,
612            'job_id': self.job_id,
613        }
614        if self.host:
615            metadata['hostname'] = self.host.hostname
616        autotest_es.post(type_str=type_str, metadata=metadata)
617
618
619    @_timer.decorate
620    def set_status(self, status):
621        logging.info("%s -> %s", self, status)
622
623        self.update_field('status', status)
624        # Noticed some time jumps after last logging message.
625        logging.debug('Update Field Complete')
626
627        active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
628        complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
629        assert not (active and complete)
630
631        self.update_field('active', active)
632
633        # The ordering of these operations is important. Once we set the
634        # complete bit this job will become indistinguishable from all
635        # the other complete jobs, unless we first set shard_id to NULL
636        # to signal to the shard_client that we need to upload it. However,
637        # we can only set both these after we've updated finished_on etc
638        # within _on_complete or the job will get synced in an intermediate
639        # state. This means that if someone sigkills the scheduler between
640        # setting finished_on and complete, we will have inconsistent jobs.
641        # This should be fine, because nothing critical checks finished_on,
642        # and the scheduler should never be killed mid-tick.
643        if complete:
644            self._on_complete(status)
645            if self.job.shard_id is not None:
646                # If shard_id is None, the job will be synced back to the master
647                self.job.update_field('shard_id', None)
648            self._email_on_job_complete()
649
650        self.update_field('complete', complete)
651
652        should_email_status = (status.lower() in _notify_email_statuses or
653                               'all' in _notify_email_statuses)
654        if should_email_status:
655            self._email_on_status(status)
656        logging.debug('HQE Set Status Complete')
657        self.record_state('hqe_status', 'status', status)
658
659
660
661    def _on_complete(self, status):
662        if status is not models.HostQueueEntry.Status.ABORTED:
663            self.job.stop_if_necessary()
664
665        if self.started_on:
666            self.set_finished_on_now()
667        if not self.execution_subdir:
668            return
669        # unregister any possible pidfiles associated with this queue entry
670        for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
671            pidfile_id = _drone_manager.get_pidfile_id_from(
672                    self.execution_path(), pidfile_name=pidfile_name)
673            _drone_manager.unregister_pidfile(pidfile_id)
674
675
676    def _get_status_email_contents(self, status, summary=None, hostname=None):
677        """
678        Gather info for the status notification e-mails.
679
680        If needed, we could start using the Django templating engine to create
681        the subject and the e-mail body, but that doesn't seem necessary right
682        now.
683
684        @param status: Job status text. Mandatory.
685        @param summary: Job summary text. Optional.
686        @param hostname: A hostname for the job. Optional.
687
688        @return: Tuple (subject, body) for the notification e-mail.
689        """
690        job_stats = Job(id=self.job.id).get_execution_details()
691
692        subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
693                   (self.job.id, self.job.name, status))
694
695        if hostname is not None:
696            subject += '| Hostname: %s ' % hostname
697
698        if status not in ["1 Failed", "Failed"]:
699            subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
700
701        body =  "Job ID: %s\n" % self.job.id
702        body += "Job name: %s\n" % self.job.name
703        if hostname is not None:
704            body += "Host: %s\n" % hostname
705        if summary is not None:
706            body += "Summary: %s\n" % summary
707        body += "Status: %s\n" % status
708        body += "Results interface URL: %s\n" % self._view_job_url()
709        body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
710        if int(job_stats['total_executed']) > 0:
711            body += "User tests executed: %s\n" % job_stats['total_executed']
712            body += "User tests passed: %s\n" % job_stats['total_passed']
713            body += "User tests failed: %s\n" % job_stats['total_failed']
714            body += ("User tests success rate: %.2f %%\n" %
715                     job_stats['success_rate'])
716
717        if job_stats['failed_rows']:
718            body += "Failures:\n"
719            body += job_stats['failed_rows']
720
721        return subject, body
722
723
724    def _email_on_status(self, status):
725        hostname = self._get_hostname()
726        subject, body = self._get_status_email_contents(status, None, hostname)
727        email_manager.manager.send_email(self.job.email_list, subject, body)
728
729
730    def _email_on_job_complete(self):
731        if not self.job.is_finished():
732            return
733
734        summary = []
735        hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
736        for queue_entry in hosts_queue:
737            summary.append("Host: %s Status: %s" %
738                                (queue_entry._get_hostname(),
739                                 queue_entry.status))
740
741        summary = "\n".join(summary)
742        status_counts = models.Job.objects.get_status_counts(
743                [self.job.id])[self.job.id]
744        status = ', '.join('%d %s' % (count, status) for status, count
745                    in status_counts.iteritems())
746
747        subject, body = self._get_status_email_contents(status, summary, None)
748        email_manager.manager.send_email(self.job.email_list, subject, body)
749
750
751    def schedule_pre_job_tasks(self):
752        logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
753                     self.job.name, self.meta_host, self.atomic_group_id,
754                     self.job.id, self.id, self.host.hostname, self.status)
755
756        self._do_schedule_pre_job_tasks()
757
758
759    def _do_schedule_pre_job_tasks(self):
760        self.job.schedule_pre_job_tasks(queue_entry=self)
761
762
763    def requeue(self):
764        assert self.host
765        self.set_status(models.HostQueueEntry.Status.QUEUED)
766        self.update_field('started_on', None)
767        self.update_field('finished_on', None)
768        # verify/cleanup failure sets the execution subdir, so reset it here
769        self.set_execution_subdir('')
770        if self.meta_host:
771            self.set_host(None)
772
773
774    @property
775    def aborted_by(self):
776        self._load_abort_info()
777        return self._aborted_by
778
779
780    @property
781    def aborted_on(self):
782        self._load_abort_info()
783        return self._aborted_on
784
785
786    def _load_abort_info(self):
787        """ Fetch info about who aborted the job. """
788        if hasattr(self, "_aborted_by"):
789            return
790        rows = _db.execute("""
791                SELECT afe_users.login,
792                        afe_aborted_host_queue_entries.aborted_on
793                FROM afe_aborted_host_queue_entries
794                INNER JOIN afe_users
795                ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
796                WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
797                """, (self.id,))
798        if rows:
799            self._aborted_by, self._aborted_on = rows[0]
800        else:
801            self._aborted_by = self._aborted_on = None
802
803
804    def on_pending(self):
805        """
806        Called when an entry in a synchronous job has passed verify.  If the
807        job is ready to run, sets the entries to STARTING. Otherwise, it leaves
808        them in PENDING.
809        """
810        self.set_status(models.HostQueueEntry.Status.PENDING)
811        self.host.set_status(models.Host.Status.PENDING)
812
813        # Some debug code here: sends an email if an asynchronous job does not
814        # immediately enter Starting.
815        # TODO: Remove this once we figure out why asynchronous jobs are getting
816        # stuck in Pending.
817        self.job.run_if_ready(queue_entry=self)
818        if (self.job.synch_count == 1 and
819                self.status == models.HostQueueEntry.Status.PENDING):
820            subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
821            message = 'Asynchronous job stuck in Pending'
822            email_manager.manager.enqueue_notify_email(subject, message)
823
824
825    def abort(self, dispatcher):
826        assert self.aborted and not self.complete
827
828        Status = models.HostQueueEntry.Status
829        if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
830            # do nothing; post-job tasks will finish and then mark this entry
831            # with status "Aborted" and take care of the host
832            return
833
834        if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
835                           Status.WAITING):
836            # If hqe is in any of these status, it should not have any
837            # unfinished agent before it can be aborted.
838            agents = dispatcher.get_agents_for_entry(self)
839            # Agent with finished task can be left behind. This is added to
840            # handle the special case of aborting hostless job in STARTING
841            # status, in which the agent has only a HostlessQueueTask
842            # associated. The finished HostlessQueueTask will be cleaned up in
843            # the next tick, so it's safe to leave the agent there. Without
844            # filtering out finished agent, HQE abort won't be able to proceed.
845            assert all([agent.is_done() for agent in agents])
846            # If hqe is still in STARTING status, it may not have assigned a
847            # host yet.
848            if self.host:
849                self.host.set_status(models.Host.Status.READY)
850        elif (self.status == Status.VERIFYING or
851              self.status == Status.RESETTING):
852            models.SpecialTask.objects.create(
853                    task=models.SpecialTask.Task.CLEANUP,
854                    host=models.Host.objects.get(id=self.host.id),
855                    requested_by=self.job.owner_model())
856        elif self.status == Status.PROVISIONING:
857            models.SpecialTask.objects.create(
858                    task=models.SpecialTask.Task.REPAIR,
859                    host=models.Host.objects.get(id=self.host.id),
860                    requested_by=self.job.owner_model())
861
862        self.set_status(Status.ABORTED)
863        self.job.abort_delay_ready_task()
864
865
866    def get_group_name(self):
867        atomic_group = self.atomic_group
868        if not atomic_group:
869            return ''
870
871        # Look at any meta_host and dependency labels and pick the first
872        # one that also specifies this atomic group.  Use that label name
873        # as the group name if possible (it is more specific).
874        for label in self.get_labels():
875            if label.atomic_group_id:
876                assert label.atomic_group_id == atomic_group.id
877                return label.name
878        return atomic_group.name
879
880
881    def execution_tag(self):
882        SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
883                               'complete!=1 AND execution_subdir="" AND '
884                               'status!="Queued";')
885        SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
886                                 'status="Aborted" WHERE id=%s;')
887        try:
888            assert self.execution_subdir
889        except AssertionError:
890            # TODO(scottz): Remove temporary fix/info gathering pathway for
891            # crosbug.com/31595 once issue is root caused.
892            logging.error('No execution_subdir for host queue id:%s.', self.id)
893            logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
894            for row in _db.execute(SQL_SUSPECT_ENTRIES):
895                logging.error(row)
896            logging.error('====DB DEBUG====\n')
897            fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
898            logging.error('EXECUTING: %s', fix_query)
899            _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
900            raise AssertionError(('self.execution_subdir not found. '
901                                  'See log for details.'))
902
903        return "%s/%s" % (self.job.tag(), self.execution_subdir)
904
905
906    def execution_path(self):
907        return self.execution_tag()
908
909
910    def set_started_on_now(self):
911        self.update_field('started_on', datetime.datetime.now())
912
913
914    def set_finished_on_now(self):
915        self.update_field('finished_on', datetime.datetime.now())
916
917
918    def is_hostless(self):
919        return (self.host_id is None
920                and self.meta_host is None
921                and self.atomic_group_id is None)
922
923
924class Job(DBObject):
925    _table_name = 'afe_jobs'
926    _fields = ('id', 'owner', 'name', 'priority', 'control_file',
927               'control_type', 'created_on', 'synch_count', 'timeout',
928               'run_verify', 'email_list', 'reboot_before', 'reboot_after',
929               'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
930               'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
931               'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
932               'require_ssp')
933    _timer = autotest_stats.Timer("scheduler_models.Job")
934
935    # This does not need to be a column in the DB.  The delays are likely to
936    # be configured short.  If the scheduler is stopped and restarted in
937    # the middle of a job's delay cycle, the delay cycle will either be
938    # repeated or skipped depending on the number of Pending machines found
939    # when the restarted scheduler recovers to track it.  Not a problem.
940    #
941    # A reference to the DelayedCallTask that will wake up the job should
942    # no other HQEs change state in time.  Its end_time attribute is used
943    # by our run_with_ready_delay() method to determine if the wait is over.
944    _delay_ready_task = None
945
946    # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
947    # all status='Pending' atomic group HQEs incase a delay was running when the
948    # scheduler was restarted and no more hosts ever successfully exit Verify.
949
950    def __init__(self, id=None, row=None, **kwargs):
951        assert id or row
952        super(Job, self).__init__(id=id, row=row, **kwargs)
953        self._owner_model = None # caches model instance of owner
954        self.update_image_path = None # path of OS image to install
955
956
957    def model(self):
958        return models.Job.objects.get(id=self.id)
959
960
961    def owner_model(self):
962        # work around the fact that the Job owner field is a string, not a
963        # foreign key
964        if not self._owner_model:
965            self._owner_model = models.User.objects.get(login=self.owner)
966        return self._owner_model
967
968
969    def is_server_job(self):
970        return self.control_type == control_data.CONTROL_TYPE.SERVER
971
972
973    def tag(self):
974        return "%s-%s" % (self.id, self.owner)
975
976
977    def get_host_queue_entries(self):
978        rows = _db.execute("""
979                SELECT * FROM afe_host_queue_entries
980                WHERE job_id= %s
981        """, (self.id,))
982        entries = [HostQueueEntry(row=i) for i in rows]
983
984        assert len(entries)>0
985
986        return entries
987
988
989    def is_image_update_job(self):
990        """
991        Discover if the current job requires an OS update.
992
993        @return: True/False if OS should be updated before job is run.
994        """
995        # All image update jobs have the parameterized_job_id set.
996        if not self.parameterized_job_id:
997            return False
998
999        # Retrieve the ID of the ParameterizedJob this job is an instance of.
1000        rows = _db.execute("""
1001                SELECT test_id
1002                FROM afe_parameterized_jobs
1003                WHERE id = %s
1004                """, (self.parameterized_job_id,))
1005        if not rows:
1006            return False
1007        test_id = rows[0][0]
1008
1009        # Retrieve the ID of the known autoupdate_ParameterizedJob.
1010        rows = _db.execute("""
1011                SELECT id
1012                FROM afe_autotests
1013                WHERE name = 'autoupdate_ParameterizedJob'
1014                """)
1015        if not rows:
1016            return False
1017        update_id = rows[0][0]
1018
1019        # If the IDs are the same we've found an image update job.
1020        if test_id == update_id:
1021            # Finally, get the path to the OS image to install.
1022            rows = _db.execute("""
1023                    SELECT parameter_value
1024                    FROM afe_parameterized_job_parameters
1025                    WHERE parameterized_job_id = %s
1026                    """, (self.parameterized_job_id,))
1027            if rows:
1028                # Save the path in update_image_path to use later as a command
1029                # line parameter to autoserv.
1030                self.update_image_path = rows[0][0]
1031                return True
1032
1033        return False
1034
1035
1036    def get_execution_details(self):
1037        """
1038        Get test execution details for this job.
1039
1040        @return: Dictionary with test execution details
1041        """
1042        def _find_test_jobs(rows):
1043            """
1044            Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1045            Those are autotest 'internal job' tests, so they should not be
1046            counted when evaluating the test stats.
1047
1048            @param rows: List of rows (matrix) with database results.
1049            """
1050            job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1051            n_test_jobs = 0
1052            for r in rows:
1053                test_name = r[0]
1054                if job_test_pattern.match(test_name):
1055                    n_test_jobs += 1
1056
1057            return n_test_jobs
1058
1059        stats = {}
1060
1061        rows = _db.execute("""
1062                SELECT t.test, s.word, t.reason
1063                FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1064                WHERE t.job_idx = j.job_idx
1065                AND s.status_idx = t.status
1066                AND j.afe_job_id = %s
1067                ORDER BY t.reason
1068                """ % self.id)
1069
1070        failed_rows = [r for r in rows if not r[1] == 'GOOD']
1071
1072        n_test_jobs = _find_test_jobs(rows)
1073        n_test_jobs_failed = _find_test_jobs(failed_rows)
1074
1075        total_executed = len(rows) - n_test_jobs
1076        total_failed = len(failed_rows) - n_test_jobs_failed
1077
1078        if total_executed > 0:
1079            success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1080        else:
1081            success_rate = 0
1082
1083        stats['total_executed'] = total_executed
1084        stats['total_failed'] = total_failed
1085        stats['total_passed'] = total_executed - total_failed
1086        stats['success_rate'] = success_rate
1087
1088        status_header = ("Test Name", "Status", "Reason")
1089        if failed_rows:
1090            stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1091                                                          status_header)
1092        else:
1093            stats['failed_rows'] = ''
1094
1095        time_row = _db.execute("""
1096                   SELECT started_time, finished_time
1097                   FROM tko_jobs
1098                   WHERE afe_job_id = %s
1099                   """ % self.id)
1100
1101        if time_row:
1102            t_begin, t_end = time_row[0]
1103            try:
1104                delta = t_end - t_begin
1105                minutes, seconds = divmod(delta.seconds, 60)
1106                hours, minutes = divmod(minutes, 60)
1107                stats['execution_time'] = ("%02d:%02d:%02d" %
1108                                           (hours, minutes, seconds))
1109            # One of t_end or t_begin are None
1110            except TypeError:
1111                stats['execution_time'] = '(could not determine)'
1112        else:
1113            stats['execution_time'] = '(none)'
1114
1115        return stats
1116
1117
1118    @_timer.decorate
1119    def set_status(self, status, update_queues=False):
1120        self.update_field('status',status)
1121
1122        if update_queues:
1123            for queue_entry in self.get_host_queue_entries():
1124                queue_entry.set_status(status)
1125
1126
1127    def keyval_dict(self):
1128        return self.model().keyval_dict()
1129
1130
1131    def _atomic_and_has_started(self):
1132        """
1133        @returns True if any of the HostQueueEntries associated with this job
1134        have entered the Status.STARTING state or beyond.
1135        """
1136        atomic_entries = models.HostQueueEntry.objects.filter(
1137                job=self.id, atomic_group__isnull=False)
1138        if atomic_entries.count() <= 0:
1139            return False
1140
1141        # These states may *only* be reached if Job.run() has been called.
1142        started_statuses = (models.HostQueueEntry.Status.STARTING,
1143                            models.HostQueueEntry.Status.RUNNING,
1144                            models.HostQueueEntry.Status.COMPLETED)
1145
1146        started_entries = atomic_entries.filter(status__in=started_statuses)
1147        return started_entries.count() > 0
1148
1149
1150    def _hosts_assigned_count(self):
1151        """The number of HostQueueEntries assigned a Host for this job."""
1152        entries = models.HostQueueEntry.objects.filter(job=self.id,
1153                                                       host__isnull=False)
1154        return entries.count()
1155
1156
1157    def _pending_count(self):
1158        """The number of HostQueueEntries for this job in the Pending state."""
1159        pending_entries = models.HostQueueEntry.objects.filter(
1160                job=self.id, status=models.HostQueueEntry.Status.PENDING)
1161        return pending_entries.count()
1162
1163
1164    def _max_hosts_needed_to_run(self, atomic_group):
1165        """
1166        @param atomic_group: The AtomicGroup associated with this job that we
1167                are using to set an upper bound on the threshold.
1168        @returns The maximum number of HostQueueEntries assigned a Host before
1169                this job can run.
1170        """
1171        return min(self._hosts_assigned_count(),
1172                   atomic_group.max_number_of_machines)
1173
1174
1175    def _min_hosts_needed_to_run(self):
1176        """Return the minumum number of hsots needed to run this job."""
1177        return self.synch_count
1178
1179
1180    def is_ready(self):
1181        # NOTE: Atomic group jobs stop reporting ready after they have been
1182        # started to avoid launching multiple copies of one atomic job.
1183        # Only possible if synch_count is less than than half the number of
1184        # machines in the atomic group.
1185        pending_count = self._pending_count()
1186        atomic_and_has_started = self._atomic_and_has_started()
1187        ready = (pending_count >= self.synch_count
1188                 and not atomic_and_has_started)
1189
1190        if not ready:
1191            logging.info(
1192                    'Job %s not ready: %s pending, %s required '
1193                    '(Atomic and started: %s)',
1194                    self, pending_count, self.synch_count,
1195                    atomic_and_has_started)
1196
1197        return ready
1198
1199
1200    def num_machines(self, clause = None):
1201        sql = "job_id=%s" % self.id
1202        if clause:
1203            sql += " AND (%s)" % clause
1204        return self.count(sql, table='afe_host_queue_entries')
1205
1206
1207    def num_queued(self):
1208        return self.num_machines('not complete')
1209
1210
1211    def num_active(self):
1212        return self.num_machines('active')
1213
1214
1215    def num_complete(self):
1216        return self.num_machines('complete')
1217
1218
1219    def is_finished(self):
1220        return self.num_complete() == self.num_machines()
1221
1222
1223    def _not_yet_run_entries(self, include_verifying=True):
1224        statuses = [models.HostQueueEntry.Status.QUEUED,
1225                    models.HostQueueEntry.Status.PENDING]
1226        if include_verifying:
1227            statuses.append(models.HostQueueEntry.Status.VERIFYING)
1228        return models.HostQueueEntry.objects.filter(job=self.id,
1229                                                    status__in=statuses)
1230
1231
1232    def _stop_all_entries(self):
1233        entries_to_stop = self._not_yet_run_entries(
1234            include_verifying=False)
1235        for child_entry in entries_to_stop:
1236            assert not child_entry.complete, (
1237                '%s status=%s, active=%s, complete=%s' %
1238                (child_entry.id, child_entry.status, child_entry.active,
1239                 child_entry.complete))
1240            if child_entry.status == models.HostQueueEntry.Status.PENDING:
1241                child_entry.host.status = models.Host.Status.READY
1242                child_entry.host.save()
1243            child_entry.status = models.HostQueueEntry.Status.STOPPED
1244            child_entry.save()
1245
1246
1247    def stop_if_necessary(self):
1248        not_yet_run = self._not_yet_run_entries()
1249        if not_yet_run.count() < self.synch_count:
1250            self._stop_all_entries()
1251
1252
1253    def _next_group_name(self, group_name=''):
1254        """@returns a directory name to use for the next host group results."""
1255        if group_name:
1256            # Sanitize for use as a pathname.
1257            group_name = group_name.replace(os.path.sep, '_')
1258            if group_name.startswith('.'):
1259                group_name = '_' + group_name[1:]
1260            # Add a separator between the group name and 'group%d'.
1261            group_name += '.'
1262        group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1263        query = models.HostQueueEntry.objects.filter(
1264            job=self.id).values('execution_subdir').distinct()
1265        subdirs = (entry['execution_subdir'] for entry in query)
1266        group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1267        ids = [int(match.group(1)) for match in group_matches if match]
1268        if ids:
1269            next_id = max(ids) + 1
1270        else:
1271            next_id = 0
1272        return '%sgroup%d' % (group_name, next_id)
1273
1274
1275    def get_group_entries(self, queue_entry_from_group):
1276        """
1277        @param queue_entry_from_group: A HostQueueEntry instance to find other
1278                group entries on this job for.
1279
1280        @returns A list of HostQueueEntry objects all executing this job as
1281                part of the same group as the one supplied (having the same
1282                execution_subdir).
1283        """
1284        execution_subdir = queue_entry_from_group.execution_subdir
1285        return list(HostQueueEntry.fetch(
1286            where='job_id=%s AND execution_subdir=%s',
1287            params=(self.id, execution_subdir)))
1288
1289
1290    def _should_run_cleanup(self, queue_entry):
1291        if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1292            return True
1293        elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1294            return queue_entry.host.dirty
1295        return False
1296
1297
1298    def _should_run_verify(self, queue_entry):
1299        do_not_verify = (queue_entry.host.protection ==
1300                         host_protections.Protection.DO_NOT_VERIFY)
1301        if do_not_verify:
1302            return False
1303        # If RebootBefore is set to NEVER, then we won't run reset because
1304        # we can't cleanup, so we need to weaken a Reset into a Verify.
1305        weaker_reset = (self.run_reset and
1306                self.reboot_before == model_attributes.RebootBefore.NEVER)
1307        return self.run_verify or weaker_reset
1308
1309
1310    def _should_run_reset(self, queue_entry):
1311        can_verify = (queue_entry.host.protection !=
1312                         host_protections.Protection.DO_NOT_VERIFY)
1313        can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1314        return (can_reboot and can_verify and (self.run_reset or
1315                (self._should_run_cleanup(queue_entry) and
1316                 self._should_run_verify(queue_entry))))
1317
1318
1319    def _should_run_provision(self, queue_entry):
1320        """
1321        Determine if the queue_entry needs to have a provision task run before
1322        it to provision queue_entry.host.
1323
1324        @param queue_entry: The host queue entry in question.
1325        @returns: True if we should schedule a provision task, False otherwise.
1326
1327        """
1328        # If we get to this point, it means that the scheduler has already
1329        # vetted that all the unprovisionable labels match, so we can just
1330        # find all labels on the job that aren't on the host to get the list
1331        # of what we need to provision.  (See the scheduling logic in
1332        # host_scheduler.py:is_host_eligable_for_job() where we discard all
1333        # actionable labels when assigning jobs to hosts.)
1334        job_labels = {x.name for x in queue_entry.get_labels()}
1335        _, host_labels = queue_entry.host.platform_and_labels()
1336        # If there are any labels on the job that are not on the host and they
1337        # are labels that provisioning knows how to change, then that means
1338        # there is provisioning work to do.  If there's no provisioning work to
1339        # do, then obviously we have no reason to schedule a provision task!
1340        diff = job_labels - set(host_labels)
1341        if any([provision.Provision.acts_on(x) for x in diff]):
1342            return True
1343        return False
1344
1345
1346    def _queue_special_task(self, queue_entry, task):
1347        """
1348        Create a special task and associate it with a host queue entry.
1349
1350        @param queue_entry: The queue entry this special task should be
1351                            associated with.
1352        @param task: One of the members of the enum models.SpecialTask.Task.
1353        @returns: None
1354
1355        """
1356        models.SpecialTask.objects.create(
1357                host=models.Host.objects.get(id=queue_entry.host_id),
1358                queue_entry=queue_entry, task=task)
1359
1360
1361    def schedule_pre_job_tasks(self, queue_entry):
1362        """
1363        Queue all of the special tasks that need to be run before a host
1364        queue entry may run.
1365
1366        If no special taskes need to be scheduled, then |on_pending| will be
1367        called directly.
1368
1369        @returns None
1370
1371        """
1372        task_queued = False
1373        hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1374
1375        if self._should_run_provision(queue_entry):
1376            self._queue_special_task(hqe_model,
1377                                     models.SpecialTask.Task.PROVISION)
1378            task_queued = True
1379        elif self._should_run_reset(queue_entry):
1380            self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
1381            task_queued = True
1382        else:
1383            if self._should_run_cleanup(queue_entry):
1384                self._queue_special_task(hqe_model,
1385                                         models.SpecialTask.Task.CLEANUP)
1386                task_queued = True
1387            if self._should_run_verify(queue_entry):
1388                self._queue_special_task(hqe_model,
1389                                         models.SpecialTask.Task.VERIFY)
1390                task_queued = True
1391
1392        if not task_queued:
1393            queue_entry.on_pending()
1394
1395
1396    def _assign_new_group(self, queue_entries, group_name=''):
1397        if len(queue_entries) == 1:
1398            group_subdir_name = queue_entries[0].host.hostname
1399        else:
1400            group_subdir_name = self._next_group_name(group_name)
1401            logging.info('Running synchronous job %d hosts %s as %s',
1402                self.id, [entry.host.hostname for entry in queue_entries],
1403                group_subdir_name)
1404
1405        for queue_entry in queue_entries:
1406            queue_entry.set_execution_subdir(group_subdir_name)
1407
1408
1409    def _choose_group_to_run(self, include_queue_entry):
1410        """
1411        @returns A tuple containing a list of HostQueueEntry instances to be
1412                used to run this Job, a string group name to suggest giving
1413                to this job in the results database.
1414        """
1415        atomic_group = include_queue_entry.atomic_group
1416        chosen_entries = [include_queue_entry]
1417        if atomic_group:
1418            num_entries_wanted = atomic_group.max_number_of_machines
1419        else:
1420            num_entries_wanted = self.synch_count
1421        num_entries_wanted -= len(chosen_entries)
1422
1423        if num_entries_wanted > 0:
1424            where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1425            pending_entries = list(HostQueueEntry.fetch(
1426                     where=where_clause,
1427                     params=(self.id, include_queue_entry.id)))
1428
1429            # Sort the chosen hosts by hostname before slicing.
1430            def cmp_queue_entries_by_hostname(entry_a, entry_b):
1431                return Host.cmp_for_sort(entry_a.host, entry_b.host)
1432            pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1433            chosen_entries += pending_entries[:num_entries_wanted]
1434
1435        # Sanity check.  We'll only ever be called if this can be met.
1436        if len(chosen_entries) < self.synch_count:
1437            message = ('job %s got less than %s chosen entries: %s' % (
1438                    self.id, self.synch_count, chosen_entries))
1439            logging.error(message)
1440            email_manager.manager.enqueue_notify_email(
1441                    'Job not started, too few chosen entries', message)
1442            return []
1443
1444        group_name = include_queue_entry.get_group_name()
1445
1446        self._assign_new_group(chosen_entries, group_name=group_name)
1447        return chosen_entries
1448
1449
1450    def run_if_ready(self, queue_entry):
1451        """
1452        Run this job by kicking its HQEs into status='Starting' if enough
1453        hosts are ready for it to run.
1454
1455        Cleans up by kicking HQEs into status='Stopped' if this Job is not
1456        ready to run.
1457        """
1458        if not self.is_ready():
1459            self.stop_if_necessary()
1460        elif queue_entry.atomic_group:
1461            self.run_with_ready_delay(queue_entry)
1462        else:
1463            self.run(queue_entry)
1464
1465
1466    def run_with_ready_delay(self, queue_entry):
1467        """
1468        Start a delay to wait for more hosts to enter Pending state before
1469        launching an atomic group job.  Once set, the a delay cannot be reset.
1470
1471        @param queue_entry: The HostQueueEntry object to get atomic group
1472                info from and pass to run_if_ready when the delay is up.
1473
1474        @returns An Agent to run the job as appropriate or None if a delay
1475                has already been set.
1476        """
1477        assert queue_entry.job_id == self.id
1478        assert queue_entry.atomic_group
1479        delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1480        over_max_threshold = (self._pending_count() >=
1481                self._max_hosts_needed_to_run(queue_entry.atomic_group))
1482        delay_expired = (self._delay_ready_task and
1483                         time.time() >= self._delay_ready_task.end_time)
1484
1485        # Delay is disabled or we already have enough?  Do not wait to run.
1486        if not delay or over_max_threshold or delay_expired:
1487            self.run(queue_entry)
1488        else:
1489            queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1490
1491
1492    def request_abort(self):
1493        """Request that this Job be aborted on the next scheduler cycle."""
1494        self.model().abort()
1495
1496
1497    def schedule_delayed_callback_task(self, queue_entry):
1498        queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1499
1500        if self._delay_ready_task:
1501            return None
1502
1503        delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1504
1505        def run_job_after_delay():
1506            logging.info('Job %s done waiting for extra hosts.', self)
1507            # Check to see if the job is still relevant.  It could have aborted
1508            # while we were waiting or hosts could have disappearred, etc.
1509            if self._pending_count() < self._min_hosts_needed_to_run():
1510                logging.info('Job %s had too few Pending hosts after waiting '
1511                             'for extras.  Not running.', self)
1512                self.request_abort()
1513                return
1514            return self.run(queue_entry)
1515
1516        logging.info('Job %s waiting up to %s seconds for more hosts.',
1517                     self.id, delay)
1518        self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1519                                                 callback=run_job_after_delay)
1520        return self._delay_ready_task
1521
1522
1523    def run(self, queue_entry):
1524        """
1525        @param queue_entry: The HostQueueEntry instance calling this method.
1526        """
1527        if queue_entry.atomic_group and self._atomic_and_has_started():
1528            logging.error('Job.run() called on running atomic Job %d '
1529                          'with HQE %s.', self.id, queue_entry)
1530            return
1531        queue_entries = self._choose_group_to_run(queue_entry)
1532        if queue_entries:
1533            self._finish_run(queue_entries)
1534
1535
1536    def _finish_run(self, queue_entries):
1537        for queue_entry in queue_entries:
1538            queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1539        self.abort_delay_ready_task()
1540
1541
1542    def abort_delay_ready_task(self):
1543        """Abort the delayed task associated with this job, if any."""
1544        if self._delay_ready_task:
1545            # Cancel any pending callback that would try to run again
1546            # as we are already running.
1547            self._delay_ready_task.abort()
1548
1549
1550    def __str__(self):
1551        return '%s-%s' % (self.id, self.owner)
1552