1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Rdb server module. 6""" 7 8import logging 9 10import common 11 12from django.core import exceptions as django_exceptions 13from django.db.models import fields 14from django.db.models import Q 15from autotest_lib.frontend.afe import models 16from autotest_lib.scheduler import rdb_cache_manager 17from autotest_lib.scheduler import rdb_hosts 18from autotest_lib.scheduler import rdb_requests 19from autotest_lib.scheduler import rdb_utils 20from autotest_lib.server import utils 21 22try: 23 from chromite.lib import metrics 24except ImportError: 25 metrics = utils.metrics_mock 26 27 28_rdb_timer_name = 'chromeos/autotest/scheduler/rdb/durations/%s' 29_is_master = not utils.is_shard() 30 31# Qeury managers: Provide a layer of abstraction over the database by 32# encapsulating common query patterns used by the rdb. 33class BaseHostQueryManager(object): 34 """Base manager for host queries on all hosts. 35 """ 36 37 host_objects = models.Host.objects 38 39 40 def update_hosts(self, host_ids, **kwargs): 41 """Update fields on a hosts. 42 43 @param host_ids: A list of ids of hosts to update. 44 @param kwargs: A key value dictionary corresponding to column, value 45 in the host database. 46 """ 47 self.host_objects.filter(id__in=host_ids).update(**kwargs) 48 49 50 @rdb_hosts.return_rdb_host 51 def get_hosts(self, ids): 52 """Get host objects for the given ids. 53 54 @param ids: The ids for which we need host objects. 55 56 @returns: A list of RDBServerHostWrapper objects, ordered by host_id. 57 """ 58 return self.host_objects.filter(id__in=ids).order_by('id') 59 60 61 @rdb_hosts.return_rdb_host 62 def find_hosts(self, deps, acls): 63 """Finds valid hosts matching deps, acls. 64 65 @param deps: A list of dependencies to match. 66 @param acls: A list of acls, at least one of which must coincide with 67 an acl group the chosen host is in. 68 69 @return: A set of matching hosts available. 70 """ 71 hosts_available = self.host_objects.filter(invalid=0) 72 queries = [Q(labels__id=dep) for dep in deps] 73 queries += [Q(aclgroup__id__in=acls)] 74 for query in queries: 75 hosts_available = hosts_available.filter(query) 76 return set(hosts_available) 77 78 79class AvailableHostQueryManager(BaseHostQueryManager): 80 """Query manager for requests on un-leased, un-locked hosts. 81 """ 82 83 host_objects = models.Host.leased_objects 84 85 86# Request Handlers: Used in conjunction with requests in rdb_utils, these 87# handlers acquire hosts for a request and record the acquisition in 88# an response_map dictionary keyed on the request itself, with the host/hosts 89# as values. 90class BaseHostRequestHandler(object): 91 """Handler for requests related to hosts, leased or unleased. 92 93 This class is only capable of blindly returning host information. 94 """ 95 96 def __init__(self): 97 self.host_query_manager = BaseHostQueryManager() 98 self.response_map = {} 99 100 101 def update_response_map(self, request, response, append=False): 102 """Record a response for a request. 103 104 The response_map only contains requests that were either satisfied, or 105 that ran into an exception. Often this translates to reserving hosts 106 against a request. If the rdb hit an exception processing a request, the 107 exception gets recorded in the map for the client to reraise. 108 109 @param response: A response for the request. 110 @param request: The request that has reserved these hosts. 111 @param append: Boolean, whether to append new hosts in 112 |response| for existing request. 113 Will not append if existing response is 114 a list of exceptions. 115 116 @raises RDBException: If an empty values is added to the map. 117 """ 118 if not response: 119 raise rdb_utils.RDBException('response_map dict can only contain ' 120 'valid responses. Request %s, response %s is invalid.' % 121 (request, response)) 122 exist_response = self.response_map.setdefault(request, []) 123 if exist_response and not append: 124 raise rdb_utils.RDBException('Request %s already has response %s ' 125 'the rdb cannot return multiple ' 126 'responses for the same request.' % 127 (request, response)) 128 if exist_response and append and not isinstance( 129 exist_response[0], rdb_hosts.RDBHost): 130 # Do not append if existing response contains exception. 131 return 132 exist_response.extend(response) 133 134 135 def _check_response_map(self): 136 """Verify that we never give the same host to different requests. 137 138 @raises RDBException: If the same host is assigned to multiple requests. 139 """ 140 unique_hosts = set([]) 141 for request, response in self.response_map.iteritems(): 142 # Each value in the response map can only either be a list of 143 # RDBHosts or a list of RDBExceptions, not a mix of both. 144 if isinstance(response[0], rdb_hosts.RDBHost): 145 if any([host in unique_hosts for host in response]): 146 raise rdb_utils.RDBException( 147 'Assigning the same host to multiple requests. New ' 148 'hosts %s, request %s, response_map: %s' % 149 (response, request, self.response_map)) 150 else: 151 unique_hosts = unique_hosts.union(response) 152 153 154 def _record_exceptions(self, request, exceptions): 155 """Record a list of exceptions for a request. 156 157 @param request: The request for which the exceptions were hit. 158 @param exceptions: The exceptions hit while processing the request. 159 """ 160 rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions] 161 self.update_response_map(request, rdb_exceptions) 162 163 164 def get_response(self): 165 """Convert all RDBServerHostWrapper objects to host info dictionaries. 166 167 @return: A dictionary mapping requests to a list of matching host_infos. 168 169 @raises RDBException: If the same host is assigned to multiple requests. 170 """ 171 self._check_response_map() 172 for request, response in self.response_map.iteritems(): 173 self.response_map[request] = [reply.wire_format() 174 for reply in response] 175 return self.response_map 176 177 178 def update_hosts(self, update_requests): 179 """Updates host tables with a payload. 180 181 @param update_requests: A list of update requests, as defined in 182 rdb_requests.UpdateHostRequest. 183 """ 184 # Last payload for a host_id wins in the case of conflicting requests. 185 unique_host_requests = {} 186 for request in update_requests: 187 if unique_host_requests.get(request.host_id): 188 unique_host_requests[request.host_id].update(request.payload) 189 else: 190 unique_host_requests[request.host_id] = request.payload 191 192 # Batch similar payloads so we can do them in one table scan. 193 similar_requests = {} 194 for host_id, payload in unique_host_requests.iteritems(): 195 similar_requests.setdefault(payload, []).append(host_id) 196 197 # If fields of the update don't match columns in the database, 198 # record the exception in the response map. This also means later 199 # updates will get applied even if previous updates fail. 200 for payload, hosts in similar_requests.iteritems(): 201 try: 202 response = self.host_query_manager.update_hosts(hosts, **payload) 203 except (django_exceptions.FieldError, 204 fields.FieldDoesNotExist, ValueError) as e: 205 for host in hosts: 206 # Since update requests have a consistent hash this will map 207 # to the same key as the original request. 208 request = rdb_requests.UpdateHostRequest( 209 host_id=host, payload=payload).get_request() 210 self._record_exceptions(request, [e]) 211 212 213 def batch_get_hosts(self, host_requests): 214 """Get hosts matching the requests. 215 216 This method does not acquire the hosts, i.e it reserves hosts against 217 requests leaving their leased state untouched. 218 219 @param host_requests: A list of requests, as defined in 220 rdb_utils.BaseHostRequest. 221 """ 222 host_ids = set([request.host_id for request in host_requests]) 223 host_map = {} 224 225 # This list will not contain available hosts if executed using 226 # an AvailableHostQueryManager. 227 for host in self.host_query_manager.get_hosts(host_ids): 228 host_map[host.id] = host 229 for request in host_requests: 230 if request.host_id in host_map: 231 self.update_response_map(request, [host_map[request.host_id]]) 232 else: 233 logging.warning('rdb could not get host for request: %s, it ' 234 'is already leased or locked', request) 235 236 237class AvailableHostRequestHandler(BaseHostRequestHandler): 238 """Handler for requests related to available (unleased and unlocked) hosts. 239 240 This class is capable of acquiring or validating hosts for requests. 241 """ 242 243 244 def __init__(self): 245 self.host_query_manager = AvailableHostQueryManager() 246 self.cache = rdb_cache_manager.RDBHostCacheManager() 247 self.response_map = {} 248 self.unsatisfied_requests = 0 249 self.leased_hosts_count = 0 250 self.request_accountant = None 251 252 253 @metrics.SecondsTimerDecorator(_rdb_timer_name % 'lease_hosts') 254 def lease_hosts(self, hosts): 255 """Leases a list of hosts. 256 257 @param hosts: A list of RDBServerHostWrapper instances to lease. 258 259 @return: The list of RDBServerHostWrappers that were successfully 260 leased. 261 """ 262 #TODO(beeps): crbug.com/353183. 263 unleased_hosts = set(hosts) 264 leased_hosts = set([]) 265 for host in unleased_hosts: 266 try: 267 host.lease() 268 except rdb_utils.RDBException as e: 269 logging.error('Unable to lease host %s: %s', host.hostname, e) 270 else: 271 leased_hosts.add(host) 272 return list(leased_hosts) 273 274 275 @classmethod 276 def valid_host_assignment(cls, request, host): 277 """Check if a host, request pairing is valid. 278 279 @param request: The request to match against the host. 280 @param host: An RDBServerHostWrapper instance. 281 282 @return: True if the host, request assignment is valid. 283 284 @raises RDBException: If the request already has another host_ids 285 associated with it. 286 """ 287 if request.host_id and request.host_id != host.id: 288 raise rdb_utils.RDBException( 289 'Cannot assign a different host for request: %s, it ' 290 'already has one: %s ' % (request, host.id)) 291 292 # Getting all labels and acls might result in large queries, so 293 # bail early if the host is already leased. 294 if host.leased: 295 return False 296 # If a host is invalid it must be a one time host added to the 297 # afe specifically for this purpose, so it doesn't require acl checking. 298 acl_match = (request.acls.intersection(host.acls) or host.invalid) 299 label_match = (request.deps.intersection(host.labels) == request.deps) 300 return acl_match and label_match 301 302 303 @classmethod 304 def _sort_hosts_by_preferred_deps(cls, hosts, preferred_deps): 305 """Sort hosts in the order of how many preferred deps it has. 306 307 This allows rdb always choose the hosts with the most preferred deps 308 for a request. One important use case is including cros-version as 309 a preferred dependence. By choosing a host with the same cros-version, 310 we can save the time on provisioning it. Note this is not guaranteed 311 if preferred_deps contains other labels as well. 312 313 @param hosts: A list of hosts to sort. 314 @param preferred_deps: A list of deps that are preferred. 315 316 @return: A list of sorted hosts. 317 318 """ 319 hosts = sorted( 320 hosts, 321 key=lambda host: len(set(preferred_deps) & set(host.labels)), 322 reverse=True) 323 return hosts 324 325 326 @rdb_cache_manager.memoize_hosts 327 def _acquire_hosts(self, request, hosts_required, is_acquire_min_duts=False, 328 **kwargs): 329 """Acquire hosts for a group of similar requests. 330 331 Find and acquire hosts that can satisfy a group of requests. 332 1. If the caching decorator doesn't pass in a list of matching hosts 333 via the MEMOIZE_KEY this method will directly check the database for 334 matching hosts. 335 2. If all matching hosts are not leased for this request, the remaining 336 hosts are returned to the caching decorator, to place in the cache. 337 338 @param hosts_required: Number of hosts required to satisfy request. 339 @param request: The request for hosts. 340 @param is_acquire_min_duts: Boolean. Indicate whether this is to 341 acquire minimum required duts, only used 342 for stats purpose. 343 344 @return: The list of excess matching hosts. 345 """ 346 hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, []) 347 if not hosts: 348 hosts = self.host_query_manager.find_hosts( 349 request.deps, request.acls) 350 351 # <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)-> 352 # | -leased_hosts- | -stale cached hosts- | -unleased matching- | 353 # --used this request---used by earlier request----------unused-------- 354 hosts = self._sort_hosts_by_preferred_deps( 355 hosts, request.preferred_deps) 356 attempt_lease_hosts = min(len(hosts), hosts_required) 357 leased_host_count = 0 358 if attempt_lease_hosts: 359 leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts]) 360 if leased_hosts: 361 self.update_response_map(request, leased_hosts, append=True) 362 363 # [:attempt_leased_hosts] - leased_hosts will include hosts that 364 # failed leasing, most likely because they're already leased, so 365 # don't cache them again. 366 leased_host_count = len(leased_hosts) 367 failed_leasing = attempt_lease_hosts - leased_host_count 368 if failed_leasing > 0: 369 # For the sake of simplicity this calculation assumes that 370 # leasing only fails if there's a stale cached host already 371 # leased by a previous request, ergo, we can only get here 372 # through a cache hit. 373 line_length = len(hosts) 374 self.cache.stale_entries.append( 375 (float(failed_leasing)/line_length) * 100) 376 self.leased_hosts_count += leased_host_count 377 if is_acquire_min_duts: 378 self.request_accountant.record_acquire_min_duts( 379 request, hosts_required, leased_host_count) 380 self.unsatisfied_requests += max(hosts_required - leased_host_count, 0) 381 # Cache the unleased matching hosts against the request. 382 return hosts[attempt_lease_hosts:] 383 384 385 @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_acquire_hosts') 386 def batch_acquire_hosts(self, host_requests): 387 """Acquire hosts for a list of requests. 388 389 The act of acquisition involves finding and leasing a set of hosts 390 that match the parameters of a request. Each acquired host is added 391 to the response_map dictionary as an RDBServerHostWrapper. 392 393 @param host_requests: A list of requests to acquire hosts. 394 """ 395 distinct_requests = 0 396 397 logging.debug('Processing %s host acquisition requests', 398 len(host_requests)) 399 400 self.request_accountant = rdb_utils.RequestAccountant(host_requests) 401 # First pass tries to satisfy min_duts for each suite. 402 for request in self.request_accountant.requests: 403 to_acquire = self.request_accountant.get_min_duts(request) 404 if to_acquire > 0: 405 self._acquire_hosts(request, to_acquire, 406 is_acquire_min_duts=True) 407 distinct_requests += 1 408 409 # Second pass tries to allocate duts to the rest unsatisfied requests. 410 for request in self.request_accountant.requests: 411 to_acquire = self.request_accountant.get_duts(request) 412 if to_acquire > 0: 413 self._acquire_hosts(request, to_acquire, 414 is_acquire_min_duts=False) 415 416 self.cache.record_stats() 417 logging.debug('Host acquisition stats: distinct requests: %s, leased ' 418 'hosts: %s, unsatisfied requests: %s', distinct_requests, 419 self.leased_hosts_count, self.unsatisfied_requests) 420 421 422 @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_validate_hosts') 423 def batch_validate_hosts(self, requests): 424 """Validate requests with hosts. 425 426 Reserve all hosts, check each one for validity and discard invalid 427 request-host pairings. Lease the remaining hsots. 428 429 @param requests: A list of requests to validate. 430 431 @raises RDBException: If multiple hosts or the wrong host is returned 432 for a response. 433 """ 434 # The following cases are possible for frontend requests: 435 # 1. Multiple requests for 1 host, with different acls/deps/priority: 436 # These form distinct requests because they hash differently. 437 # The response map will contain entries like: {r1: h1, r2: h1} 438 # after the batch_get_hosts call. There are 2 sub-cases: 439 # a. Same deps/acls, different priority: 440 # Since we sort the requests based on priority, the 441 # higher priority request r1, will lease h1. The 442 # validation of r2, h1 will fail because of the r1 lease. 443 # b. Different deps/acls, only one of which matches the host: 444 # The matching request will lease h1. The other host 445 # pairing will get dropped from the response map. 446 # 2. Multiple requests with the same acls/deps/priority and 1 host: 447 # These all have the same request hash, so the response map will 448 # contain: {r: h}, regardless of the number of r's. If this is not 449 # a valid host assignment it will get dropped from the response. 450 self.batch_get_hosts(set(requests)) 451 for request in sorted(self.response_map.keys(), 452 key=lambda request: request.priority, reverse=True): 453 hosts = self.response_map[request] 454 if len(hosts) > 1: 455 raise rdb_utils.RDBException('Got multiple hosts for a single ' 456 'request. Hosts: %s, request %s.' % (hosts, request)) 457 # Job-shard is 1:1 mapping. Because a job can only belongs 458 # to one shard, or belongs to master, we disallow frontend job 459 # that spans hosts on and off shards or across multiple shards, 460 # which would otherwise break the 1:1 mapping. 461 # As such, on master, if a request asks for multiple hosts and 462 # if any host is found on shard, we assume other requested hosts 463 # would also be on the same shard. We can safely drop this request. 464 ignore_request = _is_master and any( 465 [host.shard_id for host in hosts]) 466 if (not ignore_request and 467 (self.valid_host_assignment(request, hosts[0]) and 468 self.lease_hosts(hosts))): 469 continue 470 del self.response_map[request] 471 logging.warning('Request %s was not able to lease host %s', 472 request, hosts[0]) 473 474 475# Request dispatchers: Create the appropriate request handler, send a list 476# of requests to one of its methods. The corresponding request handler in 477# rdb_lib must understand how to match each request with a response from a 478# dispatcher, the easiest way to achieve this is to returned the response_map 479# attribute of the request handler, after making the appropriate requests. 480def get_hosts(host_requests): 481 """Get host information about the requested hosts. 482 483 @param host_requests: A list of requests as defined in BaseHostRequest. 484 @return: A dictionary mapping each request to a list of hosts. 485 """ 486 rdb_handler = BaseHostRequestHandler() 487 rdb_handler.batch_get_hosts(host_requests) 488 return rdb_handler.get_response() 489 490 491def update_hosts(update_requests): 492 """Update hosts. 493 494 @param update_requests: A list of updates to host tables 495 as defined in UpdateHostRequest. 496 """ 497 rdb_handler = BaseHostRequestHandler() 498 rdb_handler.update_hosts(update_requests) 499 return rdb_handler.get_response() 500 501 502def rdb_host_request_dispatcher(host_requests): 503 """Dispatcher for all host acquisition queries. 504 505 @param host_requests: A list of requests for acquiring hosts, as defined in 506 AcquireHostRequest. 507 @return: A dictionary mapping each request to a list of hosts, or 508 an empty list if none could satisfy the request. Eg: 509 {AcquireHostRequest.template: [host_info_dictionaries]} 510 """ 511 validation_requests = [] 512 require_hosts_requests = [] 513 514 # Validation requests are made by a job scheduled against a specific host 515 # specific host (eg: through the frontend) and only require the rdb to 516 # match the parameters of the host against the request. Acquisition 517 # requests are made by jobs that need hosts (eg: suites) and the rdb needs 518 # to find hosts matching the parameters of the request. 519 for request in host_requests: 520 if request.host_id: 521 validation_requests.append(request) 522 else: 523 require_hosts_requests.append(request) 524 525 rdb_handler = AvailableHostRequestHandler() 526 rdb_handler.batch_validate_hosts(validation_requests) 527 rdb_handler.batch_acquire_hosts(require_hosts_requests) 528 return rdb_handler.get_response() 529