1#pylint: disable-msg=C0111
2
3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Scheduler library classes.
8"""
9
10import collections
11import logging
12
13import common
14
15from autotest_lib.client.common_lib.cros.graphite import autotest_stats
16from autotest_lib.frontend import setup_django_environment
17from autotest_lib.frontend.afe import models
18from autotest_lib.server.cros.dynamic_suite import constants
19from autotest_lib.scheduler import scheduler_models
20from autotest_lib.scheduler import scheduler_lib
21
22
23_job_timer = autotest_stats.Timer('scheduler.job_query_manager')
24class AFEJobQueryManager(object):
25    """Query manager for AFE Jobs."""
26
27    # A subquery to only get inactive hostless jobs.
28    hostless_query = 'host_id IS NULL AND meta_host IS NULL'
29
30
31    @_job_timer.decorate
32    def get_pending_queue_entries(self, only_hostless=False):
33        """
34        Fetch a list of new host queue entries.
35
36        The ordering of this list is important, as every new agent
37        we schedule can potentially contribute to the process count
38        on the drone, which has a static limit. The sort order
39        prioritizes jobs as follows:
40        1. High priority jobs: Based on the afe_job's priority
41        2. With hosts and metahosts: This will only happen if we don't
42            activate the hqe after assigning a host to it in
43            schedule_new_jobs.
44        3. With hosts but without metahosts: When tests are scheduled
45            through the frontend the owner of the job would have chosen
46            a host for it.
47        4. Without hosts but with metahosts: This is the common case of
48            a new test that needs a DUT. We assign a host and set it to
49            active so it shouldn't show up in case 2 on the next tick.
50        5. Without hosts and without metahosts: Hostless suite jobs, that
51            will result in new jobs that fall under category 4.
52
53        A note about the ordering of cases 3 and 4:
54        Prioritizing one case above the other leads to earlier acquisition
55        of the following resources: 1. process slots on the drone 2. machines.
56        - When a user schedules a job through the afe they choose a specific
57          host for it. Jobs with metahost can utilize any host that satisfies
58          the metahost criterion. This means that if we had scheduled 4 before
59          3 there is a good chance that a job which could've used another host,
60          will now use the host assigned to a metahost-less job. Given the
61          availability of machines in pool:suites, this almost guarantees
62          starvation for jobs scheduled through the frontend.
63        - Scheduling 4 before 3 also has its pros however, since a suite
64          has the concept of a time out, whereas users can wait. If we hit the
65          process count on the drone a suite can timeout waiting on the test,
66          but a user job generally has a much longer timeout, and relatively
67          harmless consequences.
68        The current ordering was chosed because it is more likely that we will
69        run out of machines in pool:suites than processes on the drone.
70
71        @returns A list of HQEs ordered according to sort_order.
72        """
73        sort_order = ('afe_jobs.priority DESC, '
74                      'ISNULL(host_id), '
75                      'ISNULL(meta_host), '
76                      'parent_job_id, '
77                      'job_id')
78        # Don't execute jobs that should be executed by a shard in the global
79        # scheduler.
80        # This won't prevent the shard scheduler to run this, as the shard db
81        # doesn't have an an entry in afe_shards_labels.
82        query=('NOT complete AND NOT active AND status="Queued"'
83               'AND NOT aborted AND afe_shards_labels.id IS NULL')
84
85        # TODO(jakobjuelich, beeps): Optimize this query. Details:
86        # Compressed output of EXPLAIN <query>:
87        # +------------------------+--------+-------------------------+-------+
88        # | table                  | type   | key                     | rows  |
89        # +------------------------+--------+-------------------------+-------+
90        # | afe_host_queue_entries | ref    | host_queue_entry_status | 30536 |
91        # | afe_shards_labels      | ref    | shard_label_id_fk       |     1 |
92        # | afe_jobs               | eq_ref | PRIMARY                 |     1 |
93        # +------------------------+--------+-------------------------+-------+
94        # This shows the first part of the query fetches a lot of objects, that
95        # are then filtered. The joins are comparably fast: There's usually just
96        # one or none shard mapping that can be answered fully using an index
97        # (shard_label_id_fk), similar thing applies to the job.
98        #
99        # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued),
100        # it might be more efficient to filter on the meta_host first, instead
101        # of the status.
102        if only_hostless:
103            query = '%s AND (%s)' % (query, self.hostless_query)
104        return list(scheduler_models.HostQueueEntry.fetch(
105            joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) '
106                   'LEFT JOIN afe_shards_labels ON ('
107                   'meta_host=afe_shards_labels.label_id)'),
108            where=query, order_by=sort_order))
109
110
111    @_job_timer.decorate
112    def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False):
113        """
114        Returns all queued SpecialTasks prioritized for repair first, then
115        cleanup, then verify.
116
117        @param only_tasks_with_leased_hosts: If true, this method only returns
118            tasks with leased hosts.
119
120        @return: list of afe.models.SpecialTasks sorted according to priority.
121        """
122        queued_tasks = models.SpecialTask.objects.filter(is_active=False,
123                                                         is_complete=False,
124                                                         host__locked=False)
125        # exclude hosts with active queue entries unless the SpecialTask is for
126        # that queue entry
127        queued_tasks = models.SpecialTask.objects.add_join(
128                queued_tasks, 'afe_host_queue_entries', 'host_id',
129                join_condition='afe_host_queue_entries.active',
130                join_from_key='host_id', force_left_join=True)
131        queued_tasks = queued_tasks.extra(
132                where=['(afe_host_queue_entries.id IS NULL OR '
133                       'afe_host_queue_entries.id = '
134                               'afe_special_tasks.queue_entry_id)'])
135        if only_tasks_with_leased_hosts:
136            queued_tasks = queued_tasks.filter(host__leased=True)
137
138        # reorder tasks by priority
139        task_priority_order = [models.SpecialTask.Task.REPAIR,
140                               models.SpecialTask.Task.CLEANUP,
141                               models.SpecialTask.Task.VERIFY,
142                               models.SpecialTask.Task.RESET,
143                               models.SpecialTask.Task.PROVISION]
144        def task_priority_key(task):
145            return task_priority_order.index(task.task)
146        return sorted(queued_tasks, key=task_priority_key)
147
148
149    @classmethod
150    def get_overlapping_jobs(cls):
151        """A helper method to get all active jobs using the same host.
152
153        @return: A list of dictionaries with the hqe id, job_id and host_id
154            of the currently overlapping jobs.
155        """
156        # Filter all active hqes and stand alone special tasks to make sure
157        # a host isn't being used by two jobs at the same time. An incomplete
158        # stand alone special task can share a host with an active hqe, an
159        # example of this is the cleanup scheduled in gathering.
160        hqe_hosts = list(models.HostQueueEntry.objects.filter(
161                active=1, complete=0, host_id__isnull=False).values_list(
162                'host_id', flat=True))
163        special_task_hosts = list(models.SpecialTask.objects.filter(
164                is_active=1, is_complete=0, host_id__isnull=False,
165                queue_entry_id__isnull=True).values_list('host_id', flat=True))
166        host_counts = collections.Counter(
167                hqe_hosts + special_task_hosts).most_common()
168        multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
169        return list(models.HostQueueEntry.objects.filter(
170                host_id__in=multiple_hosts, active=True).values(
171                        'id', 'job_id', 'host_id'))
172
173
174    @_job_timer.decorate
175    def get_suite_host_assignment(self):
176        """A helper method to get how many hosts each suite is holding.
177
178        @return: Two dictionaries (suite_host_num, hosts_to_suites)
179                 suite_host_num maps suite job id to number of hosts
180                 holding by its child jobs.
181                 hosts_to_suites contains current hosts held by
182                 any suites, and maps the host id to its parent_job_id.
183        """
184        query = models.HostQueueEntry.objects.filter(
185                host_id__isnull=False, complete=0, active=1,
186                job__parent_job_id__isnull=False)
187        suite_host_num = {}
188        hosts_to_suites = {}
189        for hqe in query:
190            host_id = hqe.host_id
191            parent_job_id = hqe.job.parent_job_id
192            count = suite_host_num.get(parent_job_id, 0)
193            suite_host_num[parent_job_id] = count + 1
194            hosts_to_suites[host_id] = parent_job_id
195        return suite_host_num, hosts_to_suites
196
197
198    @_job_timer.decorate
199    def get_min_duts_of_suites(self, suite_job_ids):
200        """Load suite_min_duts job keyval for a set of suites.
201
202        @param suite_job_ids: A set of suite job ids.
203
204        @return: A dictionary where the key is a suite job id,
205                 the value is the value of 'suite_min_duts'.
206        """
207        query = models.JobKeyval.objects.filter(
208                job_id__in=suite_job_ids,
209                key=constants.SUITE_MIN_DUTS_KEY, value__isnull=False)
210        return dict((keyval.job_id, int(keyval.value)) for keyval in query)
211
212
213_host_timer = autotest_stats.Timer('scheduler.host_query_manager')
214class AFEHostQueryManager(object):
215    """Query manager for AFE Hosts."""
216
217    def __init__(self):
218        """Create an AFEHostQueryManager.
219
220        @param db: A connection to the database with the afe_hosts table.
221        """
222        self._db = scheduler_lib.ConnectionManager().get_connection()
223
224
225    def _process_many2many_dict(self, rows, flip=False):
226        result = {}
227        for row in rows:
228            left_id, right_id = int(row[0]), int(row[1])
229            if flip:
230                left_id, right_id = right_id, left_id
231            result.setdefault(left_id, set()).add(right_id)
232        return result
233
234
235    def _get_sql_id_list(self, id_list):
236        return ','.join(str(item_id) for item_id in id_list)
237
238
239    def _get_many2many_dict(self, query, id_list, flip=False):
240        if not id_list:
241            return {}
242        query %= self._get_sql_id_list(id_list)
243        rows = self._db.execute(query)
244        return self._process_many2many_dict(rows, flip)
245
246
247    @_host_timer.decorate
248    def _get_ready_hosts(self):
249        # We don't lose anything by re-doing these checks
250        # even though we release hosts on the same conditions.
251        # In the future we might have multiple clients that
252        # release_hosts and/or lock them independent of the
253        # scheduler tick.
254        hosts = scheduler_models.Host.fetch(
255            where="NOT afe_hosts.leased "
256                  "AND NOT afe_hosts.locked "
257                  "AND (afe_hosts.status IS NULL "
258                      "OR afe_hosts.status = 'Ready')")
259        return dict((host.id, host) for host in hosts)
260
261
262    @_host_timer.decorate
263    def _get_job_acl_groups(self, job_ids):
264        query = """
265        SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
266        FROM afe_jobs
267        INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
268        INNER JOIN afe_acl_groups_users ON
269                afe_acl_groups_users.user_id = afe_users.id
270        WHERE afe_jobs.id IN (%s)
271        """
272        return self._get_many2many_dict(query, job_ids)
273
274
275    @_host_timer.decorate
276    def _get_job_ineligible_hosts(self, job_ids):
277        query = """
278        SELECT job_id, host_id
279        FROM afe_ineligible_host_queues
280        WHERE job_id IN (%s)
281        """
282        return self._get_many2many_dict(query, job_ids)
283
284
285    @_host_timer.decorate
286    def _get_job_dependencies(self, job_ids):
287        query = """
288        SELECT job_id, label_id
289        FROM afe_jobs_dependency_labels
290        WHERE job_id IN (%s)
291        """
292        return self._get_many2many_dict(query, job_ids)
293
294    @_host_timer.decorate
295    def _get_host_acls(self, host_ids):
296        query = """
297        SELECT host_id, aclgroup_id
298        FROM afe_acl_groups_hosts
299        WHERE host_id IN (%s)
300        """
301        return self._get_many2many_dict(query, host_ids)
302
303
304    @_host_timer.decorate
305    def _get_label_hosts(self, host_ids):
306        if not host_ids:
307            return {}, {}
308        query = """
309        SELECT label_id, host_id
310        FROM afe_hosts_labels
311        WHERE host_id IN (%s)
312        """ % self._get_sql_id_list(host_ids)
313        rows = self._db.execute(query)
314        labels_to_hosts = self._process_many2many_dict(rows)
315        hosts_to_labels = self._process_many2many_dict(rows, flip=True)
316        return labels_to_hosts, hosts_to_labels
317
318
319    @classmethod
320    def find_unused_healty_hosts(cls):
321        """Get hosts that are currently unused and in the READY state.
322
323        @return: A list of host objects, one for each unused healthy host.
324        """
325        # Avoid any host with a currently active queue entry against it.
326        hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
327                    'ON (afe_hosts.id = active_hqe.host_id AND '
328                    'active_hqe.active)')
329
330        # Avoid any host with a new special task against it. There are 2 cases
331        # when an inactive but incomplete special task will not use the host
332        # this tick: 1. When the host is locked 2. When an active hqe already
333        # has special tasks for the same host. In both these cases this host
334        # will not be in the ready hosts list anyway. In all other cases,
335        # an incomplete special task will grab the host before a new job does
336        # by assigning an agent to it.
337        special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
338                             'ON (afe_hosts.id = new_tasks.host_id AND '
339                             'new_tasks.is_complete=0)')
340
341        return scheduler_models.Host.fetch(
342            joins='%s %s' % (hqe_join, special_task_join),
343            where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
344                  "AND afe_hosts.leased "
345                  "AND NOT afe_hosts.locked "
346                  "AND (afe_hosts.status IS NULL "
347                          "OR afe_hosts.status = 'Ready')")
348
349
350    @_host_timer.decorate
351    def set_leased(self, leased_value, **kwargs):
352        """Modify the leased bit on the hosts with ids in host_ids.
353
354        @param leased_value: The True/False value of the leased column for
355            the hosts with ids in host_ids.
356        @param kwargs: The args to use in finding matching hosts.
357        """
358        logging.info('Setting leased = %s for the hosts that match %s',
359                     leased_value, kwargs)
360        models.Host.objects.filter(**kwargs).update(leased=leased_value)
361
362
363    @_host_timer.decorate
364    def _get_labels(self, job_dependencies):
365        """
366        Calculate a dict mapping label id to label object so that we don't
367        frequently round trip to the database every time we need a label.
368
369        @param job_dependencies: A dict mapping an integer job id to a list of
370            integer label id's.  ie. {job_id: [label_id]}
371        @return: A dict mapping an integer label id to a scheduler model label
372            object.  ie. {label_id: label_object}
373
374        """
375        id_to_label = dict()
376        # Pull all the labels on hosts we might look at
377        host_labels = scheduler_models.Label.fetch(
378                where="id IN (SELECT label_id FROM afe_hosts_labels)")
379        id_to_label.update([(label.id, label) for label in host_labels])
380        # and pull all the labels on jobs we might look at.
381        job_label_set = set()
382        for job_deps in job_dependencies.values():
383            job_label_set.update(job_deps)
384        # On the rare/impossible chance that no jobs have any labels, we
385        # can skip this.
386        if job_label_set:
387            job_string_label_list = ','.join([str(x) for x in job_label_set])
388            job_labels = scheduler_models.Label.fetch(
389                    where="id IN (%s)" % job_string_label_list)
390            id_to_label.update([(label.id, label) for label in job_labels])
391        return id_to_label
392
393
394    @_host_timer.decorate
395    def refresh(self, pending_queue_entries):
396        """Update the query manager.
397
398        Cache information about a list of queue entries and eligible hosts
399        from the database so clients can avoid expensive round trips during
400        host acquisition.
401
402        @param pending_queue_entries: A list of queue entries about which we
403            need information.
404        """
405        self._hosts_available = self._get_ready_hosts()
406        relevant_jobs = [queue_entry.job_id
407                         for queue_entry in pending_queue_entries]
408        self._job_acls = self._get_job_acl_groups(relevant_jobs)
409        self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
410        self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
411        host_ids = self._hosts_available.keys()
412        self._host_acls = self._get_host_acls(host_ids)
413        self._label_hosts, self._host_labels = (
414                self._get_label_hosts(host_ids))
415        self._labels = self._get_labels(self._job_dependencies)
416