1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Rdb server module.
6"""
7
8import logging
9
10import common
11
12from django.core import exceptions as django_exceptions
13from django.db.models import fields
14from django.db.models import Q
15from autotest_lib.frontend.afe import models
16from autotest_lib.scheduler import rdb_cache_manager
17from autotest_lib.scheduler import rdb_hosts
18from autotest_lib.scheduler import rdb_requests
19from autotest_lib.scheduler import rdb_utils
20from autotest_lib.server import utils
21
22try:
23    from chromite.lib import metrics
24except ImportError:
25    metrics = utils.metrics_mock
26
27
28_rdb_timer_name = 'chromeos/autotest/scheduler/rdb/durations/%s'
29_is_master = not utils.is_shard()
30
31# Qeury managers: Provide a layer of abstraction over the database by
32# encapsulating common query patterns used by the rdb.
33class BaseHostQueryManager(object):
34    """Base manager for host queries on all hosts.
35    """
36
37    host_objects = models.Host.objects
38
39
40    def update_hosts(self, host_ids, **kwargs):
41        """Update fields on a hosts.
42
43        @param host_ids: A list of ids of hosts to update.
44        @param kwargs: A key value dictionary corresponding to column, value
45            in the host database.
46        """
47        self.host_objects.filter(id__in=host_ids).update(**kwargs)
48
49
50    @rdb_hosts.return_rdb_host
51    def get_hosts(self, ids):
52        """Get host objects for the given ids.
53
54        @param ids: The ids for which we need host objects.
55
56        @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
57        """
58        return self.host_objects.filter(id__in=ids).order_by('id')
59
60
61    @rdb_hosts.return_rdb_host
62    def find_hosts(self, deps, acls):
63        """Finds valid hosts matching deps, acls.
64
65        @param deps: A list/frozenset of dependencies (label id) to match.
66        @param acls: A list/frozenset of acls, at least one of which must
67            coincide with an acl group the chosen host is in.
68
69        @return: A set of matching hosts available.
70        """
71        hosts_available = self.host_objects.filter(invalid=0)
72        hosts_available = hosts_available.filter(Q(aclgroup__id__in=acls))
73        hosts_available = models.Host.get_hosts_with_label_ids(
74                list(deps), hosts_available)
75        return set(hosts_available)
76
77
78class AvailableHostQueryManager(BaseHostQueryManager):
79    """Query manager for requests on un-leased, un-locked hosts.
80    """
81
82    host_objects = models.Host.leased_objects
83
84
85# Request Handlers: Used in conjunction with requests in rdb_utils, these
86# handlers acquire hosts for a request and record the acquisition in
87# an response_map dictionary keyed on the request itself, with the host/hosts
88# as values.
89class BaseHostRequestHandler(object):
90    """Handler for requests related to hosts, leased or unleased.
91
92    This class is only capable of blindly returning host information.
93    """
94
95    def __init__(self):
96        self.host_query_manager = BaseHostQueryManager()
97        self.response_map = {}
98
99
100    def update_response_map(self, request, response, append=False):
101        """Record a response for a request.
102
103        The response_map only contains requests that were either satisfied, or
104        that ran into an exception. Often this translates to reserving hosts
105        against a request. If the rdb hit an exception processing a request, the
106        exception gets recorded in the map for the client to reraise.
107
108        @param response: A response for the request.
109        @param request: The request that has reserved these hosts.
110        @param append: Boolean, whether to append new hosts in
111                       |response| for existing request.
112                       Will not append if existing response is
113                       a list of exceptions.
114
115        @raises RDBException: If an empty values is added to the map.
116        """
117        if not response:
118            raise rdb_utils.RDBException('response_map dict can only contain '
119                    'valid responses. Request %s, response %s is invalid.' %
120                     (request, response))
121        exist_response = self.response_map.setdefault(request, [])
122        if exist_response and not append:
123            raise rdb_utils.RDBException('Request %s already has response %s '
124                                         'the rdb cannot return multiple '
125                                         'responses for the same request.' %
126                                         (request, response))
127        if exist_response and append and not isinstance(
128                exist_response[0], rdb_hosts.RDBHost):
129            # Do not append if existing response contains exception.
130            return
131        exist_response.extend(response)
132
133
134    def _check_response_map(self):
135        """Verify that we never give the same host to different requests.
136
137        @raises RDBException: If the same host is assigned to multiple requests.
138        """
139        unique_hosts = set([])
140        for request, response in self.response_map.iteritems():
141            # Each value in the response map can only either be a list of
142            # RDBHosts or a list of RDBExceptions, not a mix of both.
143            if isinstance(response[0], rdb_hosts.RDBHost):
144                if any([host in unique_hosts for host in response]):
145                    raise rdb_utils.RDBException(
146                            'Assigning the same host to multiple requests. New '
147                            'hosts %s, request %s, response_map: %s' %
148                            (response, request, self.response_map))
149                else:
150                    unique_hosts = unique_hosts.union(response)
151
152
153    def _record_exceptions(self, request, exceptions):
154        """Record a list of exceptions for a request.
155
156        @param request: The request for which the exceptions were hit.
157        @param exceptions: The exceptions hit while processing the request.
158        """
159        rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
160        self.update_response_map(request, rdb_exceptions)
161
162
163    def get_response(self):
164        """Convert all RDBServerHostWrapper objects to host info dictionaries.
165
166        @return: A dictionary mapping requests to a list of matching host_infos.
167
168        @raises RDBException: If the same host is assigned to multiple requests.
169        """
170        self._check_response_map()
171        for request, response in self.response_map.iteritems():
172            self.response_map[request] = [reply.wire_format()
173                                          for reply in response]
174        return self.response_map
175
176
177    def update_hosts(self, update_requests):
178        """Updates host tables with a payload.
179
180        @param update_requests: A list of update requests, as defined in
181            rdb_requests.UpdateHostRequest.
182        """
183        # Last payload for a host_id wins in the case of conflicting requests.
184        unique_host_requests = {}
185        for request in update_requests:
186            if unique_host_requests.get(request.host_id):
187                unique_host_requests[request.host_id].update(request.payload)
188            else:
189                unique_host_requests[request.host_id] = request.payload
190
191        # Batch similar payloads so we can do them in one table scan.
192        similar_requests = {}
193        for host_id, payload in unique_host_requests.iteritems():
194            similar_requests.setdefault(payload, []).append(host_id)
195
196        # If fields of the update don't match columns in the database,
197        # record the exception in the response map. This also means later
198        # updates will get applied even if previous updates fail.
199        for payload, hosts in similar_requests.iteritems():
200            try:
201                response = self.host_query_manager.update_hosts(hosts, **payload)
202            except (django_exceptions.FieldError,
203                    fields.FieldDoesNotExist, ValueError) as e:
204                for host in hosts:
205                    # Since update requests have a consistent hash this will map
206                    # to the same key as the original request.
207                    request = rdb_requests.UpdateHostRequest(
208                            host_id=host, payload=payload).get_request()
209                    self._record_exceptions(request, [e])
210
211
212    def batch_get_hosts(self, host_requests):
213        """Get hosts matching the requests.
214
215        This method does not acquire the hosts, i.e it reserves hosts against
216        requests leaving their leased state untouched.
217
218        @param host_requests: A list of requests, as defined in
219            rdb_utils.BaseHostRequest.
220        """
221        host_ids = set([request.host_id for request in host_requests])
222        host_map = {}
223
224        # This list will not contain available hosts if executed using
225        # an AvailableHostQueryManager.
226        for host in self.host_query_manager.get_hosts(host_ids):
227            host_map[host.id] = host
228        for request in host_requests:
229            if request.host_id in host_map:
230                self.update_response_map(request, [host_map[request.host_id]])
231            else:
232                logging.warning('rdb could not get host for request: %s, it '
233                                'is already leased or locked', request)
234
235
236class AvailableHostRequestHandler(BaseHostRequestHandler):
237    """Handler for requests related to available (unleased and unlocked) hosts.
238
239    This class is capable of acquiring or validating hosts for requests.
240    """
241
242
243    def __init__(self):
244        self.host_query_manager = AvailableHostQueryManager()
245        self.cache = rdb_cache_manager.RDBHostCacheManager()
246        self.response_map = {}
247        self.unsatisfied_requests = 0
248        self.leased_hosts_count = 0
249        self.request_accountant = None
250
251
252    @metrics.SecondsTimerDecorator(_rdb_timer_name % 'lease_hosts')
253    def lease_hosts(self, hosts):
254        """Leases a list of hosts.
255
256        @param hosts: A list of RDBServerHostWrapper instances to lease.
257
258        @return: The list of RDBServerHostWrappers that were successfully
259            leased.
260        """
261        #TODO(beeps): crbug.com/353183.
262        unleased_hosts = set(hosts)
263        leased_hosts = set([])
264        for host in unleased_hosts:
265            try:
266                host.lease()
267            except rdb_utils.RDBException as e:
268                logging.error('Unable to lease host %s: %s', host.hostname, e)
269            else:
270                leased_hosts.add(host)
271        return list(leased_hosts)
272
273
274    @classmethod
275    def valid_host_assignment(cls, request, host):
276        """Check if a host, request pairing is valid.
277
278        @param request: The request to match against the host.
279        @param host: An RDBServerHostWrapper instance.
280
281        @return: True if the host, request assignment is valid.
282
283        @raises RDBException: If the request already has another host_ids
284            associated with it.
285        """
286        if request.host_id and request.host_id != host.id:
287            raise rdb_utils.RDBException(
288                    'Cannot assign a different host for request: %s, it '
289                    'already has one: %s ' % (request, host.id))
290
291        # Getting all labels and acls might result in large queries, so
292        # bail early if the host is already leased.
293        if host.leased:
294            return False
295        # If a host is invalid it must be a one time host added to the
296        # afe specifically for this purpose, so it doesn't require acl checking.
297        acl_match = (request.acls.intersection(host.acls) or host.invalid)
298        label_match = (request.deps.intersection(host.labels) == request.deps)
299        return acl_match and label_match
300
301
302    @classmethod
303    def _sort_hosts_by_preferred_deps(cls, hosts, preferred_deps):
304        """Sort hosts in the order of how many preferred deps it has.
305
306        This allows rdb always choose the hosts with the most preferred deps
307        for a request. One important use case is including cros-version as
308        a preferred dependence. By choosing a host with the same cros-version,
309        we can save the time on provisioning it. Note this is not guaranteed
310        if preferred_deps contains other labels as well.
311
312        @param hosts: A list of hosts to sort.
313        @param preferred_deps: A list of deps that are preferred.
314
315        @return: A list of sorted hosts.
316
317        """
318        hosts = sorted(
319                hosts,
320                key=lambda host: len(set(preferred_deps) & set(host.labels)),
321                reverse=True)
322        return hosts
323
324
325    @rdb_cache_manager.memoize_hosts
326    def _acquire_hosts(self, request, hosts_required, is_acquire_min_duts=False,
327                       **kwargs):
328        """Acquire hosts for a group of similar requests.
329
330        Find and acquire hosts that can satisfy a group of requests.
331        1. If the caching decorator doesn't pass in a list of matching hosts
332           via the MEMOIZE_KEY this method will directly check the database for
333           matching hosts.
334        2. If all matching hosts are not leased for this request, the remaining
335           hosts are returned to the caching decorator, to place in the cache.
336
337        @param hosts_required: Number of hosts required to satisfy request.
338        @param request: The request for hosts.
339        @param is_acquire_min_duts: Boolean. Indicate whether this is to
340                                    acquire minimum required duts, only used
341                                    for stats purpose.
342
343        @return: The list of excess matching hosts.
344        """
345        hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, [])
346        if not hosts:
347            hosts = self.host_query_manager.find_hosts(
348                            request.deps, request.acls)
349
350        # <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)->
351        # |   -leased_hosts-  |   -stale cached hosts-  | -unleased matching- |
352        # --used this request---used by earlier request----------unused--------
353        hosts = self._sort_hosts_by_preferred_deps(
354                hosts, request.preferred_deps)
355        attempt_lease_hosts = min(len(hosts), hosts_required)
356        leased_host_count = 0
357        if attempt_lease_hosts:
358            leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts])
359            if leased_hosts:
360                self.update_response_map(request, leased_hosts, append=True)
361
362            # [:attempt_leased_hosts] - leased_hosts will include hosts that
363            # failed leasing, most likely because they're already leased, so
364            # don't cache them again.
365            leased_host_count = len(leased_hosts)
366            failed_leasing = attempt_lease_hosts - leased_host_count
367            if failed_leasing > 0:
368                # For the sake of simplicity this calculation assumes that
369                # leasing only fails if there's a stale cached host already
370                # leased by a previous request, ergo, we can only get here
371                # through a cache hit.
372                line_length = len(hosts)
373                self.cache.stale_entries.append(
374                        (float(failed_leasing)/line_length) * 100)
375            self.leased_hosts_count += leased_host_count
376        if is_acquire_min_duts:
377            self.request_accountant.record_acquire_min_duts(
378                    request, hosts_required, leased_host_count)
379        self.unsatisfied_requests += max(hosts_required - leased_host_count, 0)
380        # Cache the unleased matching hosts against the request.
381        return hosts[attempt_lease_hosts:]
382
383
384    @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_acquire_hosts')
385    def batch_acquire_hosts(self, host_requests):
386        """Acquire hosts for a list of requests.
387
388        The act of acquisition involves finding and leasing a set of hosts
389        that match the parameters of a request. Each acquired host is added
390        to the response_map dictionary as an RDBServerHostWrapper.
391
392        @param host_requests: A list of requests to acquire hosts.
393        """
394        distinct_requests = 0
395
396        logging.debug('Processing %s host acquisition requests',
397                      len(host_requests))
398        metrics.Gauge('chromeos/autotest/scheduler/pending_host_acq_requests'
399                      ).set(len(host_requests))
400
401        self.request_accountant = rdb_utils.RequestAccountant(host_requests)
402        # First pass tries to satisfy min_duts for each suite.
403        for request in self.request_accountant.requests:
404            to_acquire = self.request_accountant.get_min_duts(request)
405            if to_acquire > 0:
406                self._acquire_hosts(request, to_acquire,
407                                    is_acquire_min_duts=True)
408            distinct_requests += 1
409
410        # Second pass tries to allocate duts to the rest unsatisfied requests.
411        for request in self.request_accountant.requests:
412            to_acquire = self.request_accountant.get_duts(request)
413            if to_acquire > 0:
414                self._acquire_hosts(request, to_acquire,
415                                    is_acquire_min_duts=False)
416
417        self.cache.record_stats()
418        logging.debug('Host acquisition stats: distinct requests: %s, leased '
419                      'hosts: %s, unsatisfied requests: %s', distinct_requests,
420                      self.leased_hosts_count, self.unsatisfied_requests)
421
422
423    @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_validate_hosts')
424    def batch_validate_hosts(self, requests):
425        """Validate requests with hosts.
426
427        Reserve all hosts, check each one for validity and discard invalid
428        request-host pairings. Lease the remaining hsots.
429
430        @param requests: A list of requests to validate.
431
432        @raises RDBException: If multiple hosts or the wrong host is returned
433            for a response.
434        """
435        # The following cases are possible for frontend requests:
436        # 1. Multiple requests for 1 host, with different acls/deps/priority:
437        #    These form distinct requests because they hash differently.
438        #    The response map will contain entries like: {r1: h1, r2: h1}
439        #    after the batch_get_hosts call. There are 2 sub-cases:
440        #        a. Same deps/acls, different priority:
441        #           Since we sort the requests based on priority, the
442        #           higher priority request r1, will lease h1. The
443        #           validation of r2, h1 will fail because of the r1 lease.
444        #        b. Different deps/acls, only one of which matches the host:
445        #           The matching request will lease h1. The other host
446        #           pairing will get dropped from the response map.
447        # 2. Multiple requests with the same acls/deps/priority and 1 host:
448        #    These all have the same request hash, so the response map will
449        #    contain: {r: h}, regardless of the number of r's. If this is not
450        #    a valid host assignment it will get dropped from the response.
451        self.batch_get_hosts(set(requests))
452        for request in sorted(self.response_map.keys(),
453                key=lambda request: request.priority, reverse=True):
454            hosts = self.response_map[request]
455            if len(hosts) > 1:
456                raise rdb_utils.RDBException('Got multiple hosts for a single '
457                        'request. Hosts: %s, request %s.' % (hosts, request))
458            # Job-shard is 1:1 mapping. Because a job can only belongs
459            # to one shard, or belongs to master, we disallow frontend job
460            # that spans hosts on and off shards or across multiple shards,
461            # which would otherwise break the 1:1 mapping.
462            # As such, on master, if a request asks for multiple hosts and
463            # if any host is found on shard, we assume other requested hosts
464            # would also be on the same shard.  We can safely drop this request.
465            ignore_request = _is_master and any(
466                    [host.shard_id for host in hosts])
467            if (not ignore_request and
468                    (self.valid_host_assignment(request, hosts[0]) and
469                        self.lease_hosts(hosts))):
470                continue
471            del self.response_map[request]
472            logging.warning('Request %s was not able to lease host %s',
473                            request, hosts[0])
474
475
476# Request dispatchers: Create the appropriate request handler, send a list
477# of requests to one of its methods. The corresponding request handler in
478# rdb_lib must understand how to match each request with a response from a
479# dispatcher, the easiest way to achieve this is to returned the response_map
480# attribute of the request handler, after making the appropriate requests.
481def get_hosts(host_requests):
482    """Get host information about the requested hosts.
483
484    @param host_requests: A list of requests as defined in BaseHostRequest.
485    @return: A dictionary mapping each request to a list of hosts.
486    """
487    rdb_handler = BaseHostRequestHandler()
488    rdb_handler.batch_get_hosts(host_requests)
489    return rdb_handler.get_response()
490
491
492def update_hosts(update_requests):
493    """Update hosts.
494
495    @param update_requests: A list of updates to host tables
496        as defined in UpdateHostRequest.
497    """
498    rdb_handler = BaseHostRequestHandler()
499    rdb_handler.update_hosts(update_requests)
500    return rdb_handler.get_response()
501
502
503def rdb_host_request_dispatcher(host_requests):
504    """Dispatcher for all host acquisition queries.
505
506    @param host_requests: A list of requests for acquiring hosts, as defined in
507        AcquireHostRequest.
508    @return: A dictionary mapping each request to a list of hosts, or
509        an empty list if none could satisfy the request. Eg:
510        {AcquireHostRequest.template: [host_info_dictionaries]}
511    """
512    validation_requests = []
513    require_hosts_requests = []
514
515    # Validation requests are made by a job scheduled against a specific host
516    # specific host (eg: through the frontend) and only require the rdb to
517    # match the parameters of the host against the request. Acquisition
518    # requests are made by jobs that need hosts (eg: suites) and the rdb needs
519    # to find hosts matching the parameters of the request.
520    for request in host_requests:
521        if request.host_id:
522            validation_requests.append(request)
523        else:
524            require_hosts_requests.append(request)
525
526    rdb_handler = AvailableHostRequestHandler()
527    rdb_handler.batch_validate_hosts(validation_requests)
528    rdb_handler.batch_acquire_hosts(require_hosts_requests)
529    return rdb_handler.get_response()
530