1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Rdb server module.
6"""
7
8import logging
9
10import common
11
12from django.core import exceptions as django_exceptions
13from django.db.models import fields
14from django.db.models import Q
15from autotest_lib.frontend.afe import models
16from autotest_lib.scheduler import rdb_cache_manager
17from autotest_lib.scheduler import rdb_hosts
18from autotest_lib.scheduler import rdb_requests
19from autotest_lib.scheduler import rdb_utils
20from autotest_lib.server import utils
21
22try:
23    from chromite.lib import metrics
24except ImportError:
25    metrics = utils.metrics_mock
26
27
28_rdb_timer_name = 'chromeos/autotest/scheduler/rdb/durations/%s'
29_is_master = not utils.is_shard()
30
31# Qeury managers: Provide a layer of abstraction over the database by
32# encapsulating common query patterns used by the rdb.
33class BaseHostQueryManager(object):
34    """Base manager for host queries on all hosts.
35    """
36
37    host_objects = models.Host.objects
38
39
40    def update_hosts(self, host_ids, **kwargs):
41        """Update fields on a hosts.
42
43        @param host_ids: A list of ids of hosts to update.
44        @param kwargs: A key value dictionary corresponding to column, value
45            in the host database.
46        """
47        self.host_objects.filter(id__in=host_ids).update(**kwargs)
48
49
50    @rdb_hosts.return_rdb_host
51    def get_hosts(self, ids):
52        """Get host objects for the given ids.
53
54        @param ids: The ids for which we need host objects.
55
56        @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
57        """
58        return self.host_objects.filter(id__in=ids).order_by('id')
59
60
61    @rdb_hosts.return_rdb_host
62    def find_hosts(self, deps, acls):
63        """Finds valid hosts matching deps, acls.
64
65        @param deps: A list of dependencies to match.
66        @param acls: A list of acls, at least one of which must coincide with
67            an acl group the chosen host is in.
68
69        @return: A set of matching hosts available.
70        """
71        hosts_available = self.host_objects.filter(invalid=0)
72        queries = [Q(labels__id=dep) for dep in deps]
73        queries += [Q(aclgroup__id__in=acls)]
74        for query in queries:
75            hosts_available = hosts_available.filter(query)
76        return set(hosts_available)
77
78
79class AvailableHostQueryManager(BaseHostQueryManager):
80    """Query manager for requests on un-leased, un-locked hosts.
81    """
82
83    host_objects = models.Host.leased_objects
84
85
86# Request Handlers: Used in conjunction with requests in rdb_utils, these
87# handlers acquire hosts for a request and record the acquisition in
88# an response_map dictionary keyed on the request itself, with the host/hosts
89# as values.
90class BaseHostRequestHandler(object):
91    """Handler for requests related to hosts, leased or unleased.
92
93    This class is only capable of blindly returning host information.
94    """
95
96    def __init__(self):
97        self.host_query_manager = BaseHostQueryManager()
98        self.response_map = {}
99
100
101    def update_response_map(self, request, response, append=False):
102        """Record a response for a request.
103
104        The response_map only contains requests that were either satisfied, or
105        that ran into an exception. Often this translates to reserving hosts
106        against a request. If the rdb hit an exception processing a request, the
107        exception gets recorded in the map for the client to reraise.
108
109        @param response: A response for the request.
110        @param request: The request that has reserved these hosts.
111        @param append: Boolean, whether to append new hosts in
112                       |response| for existing request.
113                       Will not append if existing response is
114                       a list of exceptions.
115
116        @raises RDBException: If an empty values is added to the map.
117        """
118        if not response:
119            raise rdb_utils.RDBException('response_map dict can only contain '
120                    'valid responses. Request %s, response %s is invalid.' %
121                     (request, response))
122        exist_response = self.response_map.setdefault(request, [])
123        if exist_response and not append:
124            raise rdb_utils.RDBException('Request %s already has response %s '
125                                         'the rdb cannot return multiple '
126                                         'responses for the same request.' %
127                                         (request, response))
128        if exist_response and append and not isinstance(
129                exist_response[0], rdb_hosts.RDBHost):
130            # Do not append if existing response contains exception.
131            return
132        exist_response.extend(response)
133
134
135    def _check_response_map(self):
136        """Verify that we never give the same host to different requests.
137
138        @raises RDBException: If the same host is assigned to multiple requests.
139        """
140        unique_hosts = set([])
141        for request, response in self.response_map.iteritems():
142            # Each value in the response map can only either be a list of
143            # RDBHosts or a list of RDBExceptions, not a mix of both.
144            if isinstance(response[0], rdb_hosts.RDBHost):
145                if any([host in unique_hosts for host in response]):
146                    raise rdb_utils.RDBException(
147                            'Assigning the same host to multiple requests. New '
148                            'hosts %s, request %s, response_map: %s' %
149                            (response, request, self.response_map))
150                else:
151                    unique_hosts = unique_hosts.union(response)
152
153
154    def _record_exceptions(self, request, exceptions):
155        """Record a list of exceptions for a request.
156
157        @param request: The request for which the exceptions were hit.
158        @param exceptions: The exceptions hit while processing the request.
159        """
160        rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
161        self.update_response_map(request, rdb_exceptions)
162
163
164    def get_response(self):
165        """Convert all RDBServerHostWrapper objects to host info dictionaries.
166
167        @return: A dictionary mapping requests to a list of matching host_infos.
168
169        @raises RDBException: If the same host is assigned to multiple requests.
170        """
171        self._check_response_map()
172        for request, response in self.response_map.iteritems():
173            self.response_map[request] = [reply.wire_format()
174                                          for reply in response]
175        return self.response_map
176
177
178    def update_hosts(self, update_requests):
179        """Updates host tables with a payload.
180
181        @param update_requests: A list of update requests, as defined in
182            rdb_requests.UpdateHostRequest.
183        """
184        # Last payload for a host_id wins in the case of conflicting requests.
185        unique_host_requests = {}
186        for request in update_requests:
187            if unique_host_requests.get(request.host_id):
188                unique_host_requests[request.host_id].update(request.payload)
189            else:
190                unique_host_requests[request.host_id] = request.payload
191
192        # Batch similar payloads so we can do them in one table scan.
193        similar_requests = {}
194        for host_id, payload in unique_host_requests.iteritems():
195            similar_requests.setdefault(payload, []).append(host_id)
196
197        # If fields of the update don't match columns in the database,
198        # record the exception in the response map. This also means later
199        # updates will get applied even if previous updates fail.
200        for payload, hosts in similar_requests.iteritems():
201            try:
202                response = self.host_query_manager.update_hosts(hosts, **payload)
203            except (django_exceptions.FieldError,
204                    fields.FieldDoesNotExist, ValueError) as e:
205                for host in hosts:
206                    # Since update requests have a consistent hash this will map
207                    # to the same key as the original request.
208                    request = rdb_requests.UpdateHostRequest(
209                            host_id=host, payload=payload).get_request()
210                    self._record_exceptions(request, [e])
211
212
213    def batch_get_hosts(self, host_requests):
214        """Get hosts matching the requests.
215
216        This method does not acquire the hosts, i.e it reserves hosts against
217        requests leaving their leased state untouched.
218
219        @param host_requests: A list of requests, as defined in
220            rdb_utils.BaseHostRequest.
221        """
222        host_ids = set([request.host_id for request in host_requests])
223        host_map = {}
224
225        # This list will not contain available hosts if executed using
226        # an AvailableHostQueryManager.
227        for host in self.host_query_manager.get_hosts(host_ids):
228            host_map[host.id] = host
229        for request in host_requests:
230            if request.host_id in host_map:
231                self.update_response_map(request, [host_map[request.host_id]])
232            else:
233                logging.warning('rdb could not get host for request: %s, it '
234                                'is already leased or locked', request)
235
236
237class AvailableHostRequestHandler(BaseHostRequestHandler):
238    """Handler for requests related to available (unleased and unlocked) hosts.
239
240    This class is capable of acquiring or validating hosts for requests.
241    """
242
243
244    def __init__(self):
245        self.host_query_manager = AvailableHostQueryManager()
246        self.cache = rdb_cache_manager.RDBHostCacheManager()
247        self.response_map = {}
248        self.unsatisfied_requests = 0
249        self.leased_hosts_count = 0
250        self.request_accountant = None
251
252
253    @metrics.SecondsTimerDecorator(_rdb_timer_name % 'lease_hosts')
254    def lease_hosts(self, hosts):
255        """Leases a list of hosts.
256
257        @param hosts: A list of RDBServerHostWrapper instances to lease.
258
259        @return: The list of RDBServerHostWrappers that were successfully
260            leased.
261        """
262        #TODO(beeps): crbug.com/353183.
263        unleased_hosts = set(hosts)
264        leased_hosts = set([])
265        for host in unleased_hosts:
266            try:
267                host.lease()
268            except rdb_utils.RDBException as e:
269                logging.error('Unable to lease host %s: %s', host.hostname, e)
270            else:
271                leased_hosts.add(host)
272        return list(leased_hosts)
273
274
275    @classmethod
276    def valid_host_assignment(cls, request, host):
277        """Check if a host, request pairing is valid.
278
279        @param request: The request to match against the host.
280        @param host: An RDBServerHostWrapper instance.
281
282        @return: True if the host, request assignment is valid.
283
284        @raises RDBException: If the request already has another host_ids
285            associated with it.
286        """
287        if request.host_id and request.host_id != host.id:
288            raise rdb_utils.RDBException(
289                    'Cannot assign a different host for request: %s, it '
290                    'already has one: %s ' % (request, host.id))
291
292        # Getting all labels and acls might result in large queries, so
293        # bail early if the host is already leased.
294        if host.leased:
295            return False
296        # If a host is invalid it must be a one time host added to the
297        # afe specifically for this purpose, so it doesn't require acl checking.
298        acl_match = (request.acls.intersection(host.acls) or host.invalid)
299        label_match = (request.deps.intersection(host.labels) == request.deps)
300        return acl_match and label_match
301
302
303    @classmethod
304    def _sort_hosts_by_preferred_deps(cls, hosts, preferred_deps):
305        """Sort hosts in the order of how many preferred deps it has.
306
307        This allows rdb always choose the hosts with the most preferred deps
308        for a request. One important use case is including cros-version as
309        a preferred dependence. By choosing a host with the same cros-version,
310        we can save the time on provisioning it. Note this is not guaranteed
311        if preferred_deps contains other labels as well.
312
313        @param hosts: A list of hosts to sort.
314        @param preferred_deps: A list of deps that are preferred.
315
316        @return: A list of sorted hosts.
317
318        """
319        hosts = sorted(
320                hosts,
321                key=lambda host: len(set(preferred_deps) & set(host.labels)),
322                reverse=True)
323        return hosts
324
325
326    @rdb_cache_manager.memoize_hosts
327    def _acquire_hosts(self, request, hosts_required, is_acquire_min_duts=False,
328                       **kwargs):
329        """Acquire hosts for a group of similar requests.
330
331        Find and acquire hosts that can satisfy a group of requests.
332        1. If the caching decorator doesn't pass in a list of matching hosts
333           via the MEMOIZE_KEY this method will directly check the database for
334           matching hosts.
335        2. If all matching hosts are not leased for this request, the remaining
336           hosts are returned to the caching decorator, to place in the cache.
337
338        @param hosts_required: Number of hosts required to satisfy request.
339        @param request: The request for hosts.
340        @param is_acquire_min_duts: Boolean. Indicate whether this is to
341                                    acquire minimum required duts, only used
342                                    for stats purpose.
343
344        @return: The list of excess matching hosts.
345        """
346        hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, [])
347        if not hosts:
348            hosts = self.host_query_manager.find_hosts(
349                            request.deps, request.acls)
350
351        # <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)->
352        # |   -leased_hosts-  |   -stale cached hosts-  | -unleased matching- |
353        # --used this request---used by earlier request----------unused--------
354        hosts = self._sort_hosts_by_preferred_deps(
355                hosts, request.preferred_deps)
356        attempt_lease_hosts = min(len(hosts), hosts_required)
357        leased_host_count = 0
358        if attempt_lease_hosts:
359            leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts])
360            if leased_hosts:
361                self.update_response_map(request, leased_hosts, append=True)
362
363            # [:attempt_leased_hosts] - leased_hosts will include hosts that
364            # failed leasing, most likely because they're already leased, so
365            # don't cache them again.
366            leased_host_count = len(leased_hosts)
367            failed_leasing = attempt_lease_hosts - leased_host_count
368            if failed_leasing > 0:
369                # For the sake of simplicity this calculation assumes that
370                # leasing only fails if there's a stale cached host already
371                # leased by a previous request, ergo, we can only get here
372                # through a cache hit.
373                line_length = len(hosts)
374                self.cache.stale_entries.append(
375                        (float(failed_leasing)/line_length) * 100)
376            self.leased_hosts_count += leased_host_count
377        if is_acquire_min_duts:
378            self.request_accountant.record_acquire_min_duts(
379                    request, hosts_required, leased_host_count)
380        self.unsatisfied_requests += max(hosts_required - leased_host_count, 0)
381        # Cache the unleased matching hosts against the request.
382        return hosts[attempt_lease_hosts:]
383
384
385    @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_acquire_hosts')
386    def batch_acquire_hosts(self, host_requests):
387        """Acquire hosts for a list of requests.
388
389        The act of acquisition involves finding and leasing a set of hosts
390        that match the parameters of a request. Each acquired host is added
391        to the response_map dictionary as an RDBServerHostWrapper.
392
393        @param host_requests: A list of requests to acquire hosts.
394        """
395        distinct_requests = 0
396
397        logging.debug('Processing %s host acquisition requests',
398                      len(host_requests))
399
400        self.request_accountant = rdb_utils.RequestAccountant(host_requests)
401        # First pass tries to satisfy min_duts for each suite.
402        for request in self.request_accountant.requests:
403            to_acquire = self.request_accountant.get_min_duts(request)
404            if to_acquire > 0:
405                self._acquire_hosts(request, to_acquire,
406                                    is_acquire_min_duts=True)
407            distinct_requests += 1
408
409        # Second pass tries to allocate duts to the rest unsatisfied requests.
410        for request in self.request_accountant.requests:
411            to_acquire = self.request_accountant.get_duts(request)
412            if to_acquire > 0:
413                self._acquire_hosts(request, to_acquire,
414                                    is_acquire_min_duts=False)
415
416        self.cache.record_stats()
417        logging.debug('Host acquisition stats: distinct requests: %s, leased '
418                      'hosts: %s, unsatisfied requests: %s', distinct_requests,
419                      self.leased_hosts_count, self.unsatisfied_requests)
420
421
422    @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_validate_hosts')
423    def batch_validate_hosts(self, requests):
424        """Validate requests with hosts.
425
426        Reserve all hosts, check each one for validity and discard invalid
427        request-host pairings. Lease the remaining hsots.
428
429        @param requests: A list of requests to validate.
430
431        @raises RDBException: If multiple hosts or the wrong host is returned
432            for a response.
433        """
434        # The following cases are possible for frontend requests:
435        # 1. Multiple requests for 1 host, with different acls/deps/priority:
436        #    These form distinct requests because they hash differently.
437        #    The response map will contain entries like: {r1: h1, r2: h1}
438        #    after the batch_get_hosts call. There are 2 sub-cases:
439        #        a. Same deps/acls, different priority:
440        #           Since we sort the requests based on priority, the
441        #           higher priority request r1, will lease h1. The
442        #           validation of r2, h1 will fail because of the r1 lease.
443        #        b. Different deps/acls, only one of which matches the host:
444        #           The matching request will lease h1. The other host
445        #           pairing will get dropped from the response map.
446        # 2. Multiple requests with the same acls/deps/priority and 1 host:
447        #    These all have the same request hash, so the response map will
448        #    contain: {r: h}, regardless of the number of r's. If this is not
449        #    a valid host assignment it will get dropped from the response.
450        self.batch_get_hosts(set(requests))
451        for request in sorted(self.response_map.keys(),
452                key=lambda request: request.priority, reverse=True):
453            hosts = self.response_map[request]
454            if len(hosts) > 1:
455                raise rdb_utils.RDBException('Got multiple hosts for a single '
456                        'request. Hosts: %s, request %s.' % (hosts, request))
457            # Job-shard is 1:1 mapping. Because a job can only belongs
458            # to one shard, or belongs to master, we disallow frontend job
459            # that spans hosts on and off shards or across multiple shards,
460            # which would otherwise break the 1:1 mapping.
461            # As such, on master, if a request asks for multiple hosts and
462            # if any host is found on shard, we assume other requested hosts
463            # would also be on the same shard.  We can safely drop this request.
464            ignore_request = _is_master and any(
465                    [host.shard_id for host in hosts])
466            if (not ignore_request and
467                    (self.valid_host_assignment(request, hosts[0]) and
468                        self.lease_hosts(hosts))):
469                continue
470            del self.response_map[request]
471            logging.warning('Request %s was not able to lease host %s',
472                            request, hosts[0])
473
474
475# Request dispatchers: Create the appropriate request handler, send a list
476# of requests to one of its methods. The corresponding request handler in
477# rdb_lib must understand how to match each request with a response from a
478# dispatcher, the easiest way to achieve this is to returned the response_map
479# attribute of the request handler, after making the appropriate requests.
480def get_hosts(host_requests):
481    """Get host information about the requested hosts.
482
483    @param host_requests: A list of requests as defined in BaseHostRequest.
484    @return: A dictionary mapping each request to a list of hosts.
485    """
486    rdb_handler = BaseHostRequestHandler()
487    rdb_handler.batch_get_hosts(host_requests)
488    return rdb_handler.get_response()
489
490
491def update_hosts(update_requests):
492    """Update hosts.
493
494    @param update_requests: A list of updates to host tables
495        as defined in UpdateHostRequest.
496    """
497    rdb_handler = BaseHostRequestHandler()
498    rdb_handler.update_hosts(update_requests)
499    return rdb_handler.get_response()
500
501
502def rdb_host_request_dispatcher(host_requests):
503    """Dispatcher for all host acquisition queries.
504
505    @param host_requests: A list of requests for acquiring hosts, as defined in
506        AcquireHostRequest.
507    @return: A dictionary mapping each request to a list of hosts, or
508        an empty list if none could satisfy the request. Eg:
509        {AcquireHostRequest.template: [host_info_dictionaries]}
510    """
511    validation_requests = []
512    require_hosts_requests = []
513
514    # Validation requests are made by a job scheduled against a specific host
515    # specific host (eg: through the frontend) and only require the rdb to
516    # match the parameters of the host against the request. Acquisition
517    # requests are made by jobs that need hosts (eg: suites) and the rdb needs
518    # to find hosts matching the parameters of the request.
519    for request in host_requests:
520        if request.host_id:
521            validation_requests.append(request)
522        else:
523            require_hosts_requests.append(request)
524
525    rdb_handler = AvailableHostRequestHandler()
526    rdb_handler.batch_validate_hosts(validation_requests)
527    rdb_handler.batch_acquire_hosts(require_hosts_requests)
528    return rdb_handler.get_response()
529