1# pylint: disable=missing-docstring 2 3"""Database model classes for the scheduler. 4 5Contains model classes abstracting the various DB tables used by the scheduler. 6These overlap the Django models in basic functionality, but were written before 7the Django models existed and have not yet been phased out. Some of them 8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic 9which would probably be ill-suited for inclusion in the general Django model 10classes. 11 12Globals: 13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches 14 one of these statuses, an email will be sent to the job's email_list. 15 comes from global_config. 16_base_url: URL to the local AFE server, used to construct URLs for emails. 17_db: DatabaseConnection for this module. 18_drone_manager: reference to global DroneManager instance. 19""" 20 21import datetime 22import itertools 23import logging 24import re 25import time 26import weakref 27 28from autotest_lib.client.common_lib import global_config, host_protections 29from autotest_lib.client.common_lib import time_utils 30from autotest_lib.client.common_lib import utils 31from autotest_lib.client.common_lib.cros.graphite import autotest_es 32from autotest_lib.frontend.afe import models, model_attributes 33from autotest_lib.scheduler import drone_manager, email_manager 34from autotest_lib.scheduler import rdb_lib 35from autotest_lib.scheduler import scheduler_config 36from autotest_lib.scheduler import scheduler_lib 37from autotest_lib.server import afe_urls 38from autotest_lib.server.cros import provision 39 40try: 41 from chromite.lib import metrics 42except ImportError: 43 metrics = utils.metrics_mock 44 45 46_notify_email_statuses = [] 47_base_url = None 48 49_db = None 50_drone_manager = None 51 52def initialize(): 53 global _db 54 _db = scheduler_lib.ConnectionManager().get_connection() 55 56 notify_statuses_list = global_config.global_config.get_config_value( 57 scheduler_config.CONFIG_SECTION, "notify_email_statuses", 58 default='') 59 global _notify_email_statuses 60 _notify_email_statuses = [status for status in 61 re.split(r'[\s,;:]', notify_statuses_list.lower()) 62 if status] 63 64 # AUTOTEST_WEB.base_url is still a supported config option as some people 65 # may wish to override the entire url. 66 global _base_url 67 config_base_url = global_config.global_config.get_config_value( 68 scheduler_config.CONFIG_SECTION, 'base_url', default='') 69 if config_base_url: 70 _base_url = config_base_url 71 else: 72 _base_url = afe_urls.ROOT_URL 73 74 initialize_globals() 75 76 77def initialize_globals(): 78 global _drone_manager 79 _drone_manager = drone_manager.instance() 80 81 82def get_job_metadata(job): 83 """Get a dictionary of the job information. 84 85 The return value is a dictionary that includes job information like id, 86 name and parent job information. The value will be stored in metadata 87 database. 88 89 @param job: A Job object. 90 @return: A dictionary containing the job id, owner and name. 91 """ 92 if not job: 93 logging.error('Job is None, no metadata returned.') 94 return {} 95 try: 96 return {'job_id': job.id, 97 'owner': job.owner, 98 'job_name': job.name, 99 'parent_job_id': job.parent_job_id} 100 except AttributeError as e: 101 logging.error('Job has missing attribute: %s', e) 102 return {} 103 104 105class DelayedCallTask(object): 106 """ 107 A task object like AgentTask for an Agent to run that waits for the 108 specified amount of time to have elapsed before calling the supplied 109 callback once and finishing. If the callback returns anything, it is 110 assumed to be a new Agent instance and will be added to the dispatcher. 111 112 @attribute end_time: The absolute posix time after which this task will 113 call its callback when it is polled and be finished. 114 115 Also has all attributes required by the Agent class. 116 """ 117 def __init__(self, delay_seconds, callback, now_func=None): 118 """ 119 @param delay_seconds: The delay in seconds from now that this task 120 will call the supplied callback and be done. 121 @param callback: A callable to be called by this task once after at 122 least delay_seconds time has elapsed. It must return None 123 or a new Agent instance. 124 @param now_func: A time.time like function. Default: time.time. 125 Used for testing. 126 """ 127 assert delay_seconds > 0 128 assert callable(callback) 129 if not now_func: 130 now_func = time.time 131 self._now_func = now_func 132 self._callback = callback 133 134 self.end_time = self._now_func() + delay_seconds 135 136 # These attributes are required by Agent. 137 self.aborted = False 138 self.host_ids = () 139 self.success = False 140 self.queue_entry_ids = () 141 self.num_processes = 0 142 143 144 def poll(self): 145 if not self.is_done() and self._now_func() >= self.end_time: 146 self._callback() 147 self.success = True 148 149 150 def is_done(self): 151 return self.success or self.aborted 152 153 154 def abort(self): 155 self.aborted = True 156 157 158class DBError(Exception): 159 """Raised by the DBObject constructor when its select fails.""" 160 161 162class DBObject(object): 163 """A miniature object relational model for the database.""" 164 165 # Subclasses MUST override these: 166 _table_name = '' 167 _fields = () 168 169 # A mapping from (type, id) to the instance of the object for that 170 # particular id. This prevents us from creating new Job() and Host() 171 # instances for every HostQueueEntry object that we instantiate as 172 # multiple HQEs often share the same Job. 173 _instances_by_type_and_id = weakref.WeakValueDictionary() 174 _initialized = False 175 176 177 def __new__(cls, id=None, **kwargs): 178 """ 179 Look to see if we already have an instance for this particular type 180 and id. If so, use it instead of creating a duplicate instance. 181 """ 182 if id is not None: 183 instance = cls._instances_by_type_and_id.get((cls, id)) 184 if instance: 185 return instance 186 return super(DBObject, cls).__new__(cls, id=id, **kwargs) 187 188 189 def __init__(self, id=None, row=None, new_record=False, always_query=True): 190 assert bool(id) or bool(row) 191 if id is not None and row is not None: 192 assert id == row[0] 193 assert self._table_name, '_table_name must be defined in your class' 194 assert self._fields, '_fields must be defined in your class' 195 if not new_record: 196 if self._initialized and not always_query: 197 return # We've already been initialized. 198 if id is None: 199 id = row[0] 200 # Tell future constructors to use us instead of re-querying while 201 # this instance is still around. 202 self._instances_by_type_and_id[(type(self), id)] = self 203 204 self.__table = self._table_name 205 206 self.__new_record = new_record 207 208 if row is None: 209 row = self._fetch_row_from_db(id) 210 211 if self._initialized: 212 differences = self._compare_fields_in_row(row) 213 if differences: 214 logging.warning( 215 'initialized %s %s instance requery is updating: %s', 216 type(self), self.id, differences) 217 self._update_fields_from_row(row) 218 self._initialized = True 219 220 221 @classmethod 222 def _clear_instance_cache(cls): 223 """Used for testing, clear the internal instance cache.""" 224 cls._instances_by_type_and_id.clear() 225 226 227 def _fetch_row_from_db(self, row_id): 228 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table 229 rows = _db.execute(sql, (row_id,)) 230 if not rows: 231 raise DBError("row not found (table=%s, row id=%s)" 232 % (self.__table, row_id)) 233 return rows[0] 234 235 236 def _assert_row_length(self, row): 237 assert len(row) == len(self._fields), ( 238 "table = %s, row = %s/%d, fields = %s/%d" % ( 239 self.__table, row, len(row), self._fields, len(self._fields))) 240 241 242 def _compare_fields_in_row(self, row): 243 """ 244 Given a row as returned by a SELECT query, compare it to our existing in 245 memory fields. Fractional seconds are stripped from datetime values 246 before comparison. 247 248 @param row - A sequence of values corresponding to fields named in 249 The class attribute _fields. 250 251 @returns A dictionary listing the differences keyed by field name 252 containing tuples of (current_value, row_value). 253 """ 254 self._assert_row_length(row) 255 differences = {} 256 for field, row_value in itertools.izip(self._fields, row): 257 current_value = getattr(self, field) 258 if (isinstance(current_value, datetime.datetime) 259 and isinstance(row_value, datetime.datetime)): 260 current_value = current_value.strftime(time_utils.TIME_FMT) 261 row_value = row_value.strftime(time_utils.TIME_FMT) 262 if current_value != row_value: 263 differences[field] = (current_value, row_value) 264 return differences 265 266 267 def _update_fields_from_row(self, row): 268 """ 269 Update our field attributes using a single row returned by SELECT. 270 271 @param row - A sequence of values corresponding to fields named in 272 the class fields list. 273 """ 274 self._assert_row_length(row) 275 276 self._valid_fields = set() 277 for field, value in itertools.izip(self._fields, row): 278 setattr(self, field, value) 279 self._valid_fields.add(field) 280 281 self._valid_fields.remove('id') 282 283 284 def update_from_database(self): 285 assert self.id is not None 286 row = self._fetch_row_from_db(self.id) 287 self._update_fields_from_row(row) 288 289 290 def count(self, where, table = None): 291 if not table: 292 table = self.__table 293 294 rows = _db.execute(""" 295 SELECT count(*) FROM %s 296 WHERE %s 297 """ % (table, where)) 298 299 assert len(rows) == 1 300 301 return int(rows[0][0]) 302 303 304 def update_field(self, field, value): 305 assert field in self._valid_fields 306 307 if getattr(self, field) == value: 308 return 309 310 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field) 311 _db.execute(query, (value, self.id)) 312 313 setattr(self, field, value) 314 315 316 def save(self): 317 if self.__new_record: 318 keys = self._fields[1:] # avoid id 319 columns = ','.join([str(key) for key in keys]) 320 values = [] 321 for key in keys: 322 value = getattr(self, key) 323 if value is None: 324 values.append('NULL') 325 else: 326 values.append('"%s"' % value) 327 values_str = ','.join(values) 328 query = ('INSERT INTO %s (%s) VALUES (%s)' % 329 (self.__table, columns, values_str)) 330 _db.execute(query) 331 # Update our id to the one the database just assigned to us. 332 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0] 333 334 335 def delete(self): 336 self._instances_by_type_and_id.pop((type(self), id), None) 337 self._initialized = False 338 self._valid_fields.clear() 339 query = 'DELETE FROM %s WHERE id=%%s' % self.__table 340 _db.execute(query, (self.id,)) 341 342 343 @staticmethod 344 def _prefix_with(string, prefix): 345 if string: 346 string = prefix + string 347 return string 348 349 350 @classmethod 351 def fetch(cls, where='', params=(), joins='', order_by=''): 352 """ 353 Construct instances of our class based on the given database query. 354 355 @yields One class instance for each row fetched. 356 """ 357 order_by = cls._prefix_with(order_by, 'ORDER BY ') 358 where = cls._prefix_with(where, 'WHERE ') 359 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s ' 360 '%(where)s %(order_by)s' % {'table' : cls._table_name, 361 'joins' : joins, 362 'where' : where, 363 'order_by' : order_by}) 364 rows = _db.execute(query, params) 365 return [cls(id=row[0], row=row) for row in rows] 366 367 368class IneligibleHostQueue(DBObject): 369 _table_name = 'afe_ineligible_host_queues' 370 _fields = ('id', 'job_id', 'host_id') 371 372 373class AtomicGroup(DBObject): 374 _table_name = 'afe_atomic_groups' 375 _fields = ('id', 'name', 'description', 'max_number_of_machines', 376 'invalid') 377 378 379class Label(DBObject): 380 _table_name = 'afe_labels' 381 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid', 382 'only_if_needed', 'atomic_group_id') 383 384 385 def __repr__(self): 386 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % ( 387 self.name, self.id, self.atomic_group_id) 388 389 390class Host(DBObject): 391 _table_name = 'afe_hosts' 392 # TODO(ayatane): synch_id is not used, remove after fixing DB. 393 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status', 394 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty', 395 'leased', 'shard_id', 'lock_reason') 396 397 398 def set_status(self,status): 399 logging.info('%s -> %s', self.hostname, status) 400 self.update_field('status',status) 401 402 403 def platform_and_labels(self): 404 """ 405 Returns a tuple (platform_name, list_of_all_label_names). 406 """ 407 rows = _db.execute(""" 408 SELECT afe_labels.name, afe_labels.platform 409 FROM afe_labels 410 INNER JOIN afe_hosts_labels ON 411 afe_labels.id = afe_hosts_labels.label_id 412 WHERE afe_hosts_labels.host_id = %s 413 ORDER BY afe_labels.name 414 """, (self.id,)) 415 platform = None 416 all_labels = [] 417 for label_name, is_platform in rows: 418 if is_platform: 419 platform = label_name 420 all_labels.append(label_name) 421 return platform, all_labels 422 423 424 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE) 425 426 427 @classmethod 428 def cmp_for_sort(cls, a, b): 429 """ 430 A comparison function for sorting Host objects by hostname. 431 432 This strips any trailing numeric digits, ignores leading 0s and 433 compares hostnames by the leading name and the trailing digits as a 434 number. If both hostnames do not match this pattern, they are simply 435 compared as lower case strings. 436 437 Example of how hostnames will be sorted: 438 439 alice, host1, host2, host09, host010, host10, host11, yolkfolk 440 441 This hopefully satisfy most people's hostname sorting needs regardless 442 of their exact naming schemes. Nobody sane should have both a host10 443 and host010 (but the algorithm works regardless). 444 """ 445 lower_a = a.hostname.lower() 446 lower_b = b.hostname.lower() 447 match_a = cls._ALPHANUM_HOST_RE.match(lower_a) 448 match_b = cls._ALPHANUM_HOST_RE.match(lower_b) 449 if match_a and match_b: 450 name_a, number_a_str = match_a.groups() 451 name_b, number_b_str = match_b.groups() 452 number_a = int(number_a_str.lstrip('0')) 453 number_b = int(number_b_str.lstrip('0')) 454 result = cmp((name_a, number_a), (name_b, number_b)) 455 if result == 0 and lower_a != lower_b: 456 # If they compared equal above but the lower case names are 457 # indeed different, don't report equality. abc012 != abc12. 458 return cmp(lower_a, lower_b) 459 return result 460 else: 461 return cmp(lower_a, lower_b) 462 463 464class HostQueueEntry(DBObject): 465 _table_name = 'afe_host_queue_entries' 466 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host', 467 'active', 'complete', 'deleted', 'execution_subdir', 468 'atomic_group_id', 'aborted', 'started_on', 'finished_on') 469 470 _COMPLETION_COUNT_METRIC = metrics.Counter( 471 'chromeos/autotest/scheduler/hqe_completion_count') 472 473 def __init__(self, id=None, row=None, **kwargs): 474 assert id or row 475 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs) 476 self.job = Job(self.job_id) 477 478 if self.host_id: 479 self.host = rdb_lib.get_hosts([self.host_id])[0] 480 self.host.dbg_str = self.get_dbg_str() 481 self.host.metadata = get_job_metadata(self.job) 482 else: 483 self.host = None 484 485 486 @classmethod 487 def clone(cls, template): 488 """ 489 Creates a new row using the values from a template instance. 490 491 The new instance will not exist in the database or have a valid 492 id attribute until its save() method is called. 493 """ 494 assert isinstance(template, cls) 495 new_row = [getattr(template, field) for field in cls._fields] 496 clone = cls(row=new_row, new_record=True) 497 clone.id = None 498 return clone 499 500 501 def _view_job_url(self): 502 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id) 503 504 505 def get_labels(self): 506 """ 507 Get all labels associated with this host queue entry (either via the 508 meta_host or as a job dependency label). The labels yielded are not 509 guaranteed to be unique. 510 511 @yields Label instances associated with this host_queue_entry. 512 """ 513 if self.meta_host: 514 yield Label(id=self.meta_host, always_query=False) 515 labels = Label.fetch( 516 joins="JOIN afe_jobs_dependency_labels AS deps " 517 "ON (afe_labels.id = deps.label_id)", 518 where="deps.job_id = %d" % self.job.id) 519 for label in labels: 520 yield label 521 522 523 def set_host(self, host): 524 if host: 525 logging.info('Assigning host %s to entry %s', host.hostname, self) 526 self.update_field('host_id', host.id) 527 self.block_host(host.id) 528 else: 529 logging.info('Releasing host from %s', self) 530 self.unblock_host(self.host.id) 531 self.update_field('host_id', None) 532 533 self.host = host 534 535 536 def block_host(self, host_id): 537 logging.info("creating block %s/%s", self.job.id, host_id) 538 row = [0, self.job.id, host_id] 539 block = IneligibleHostQueue(row=row, new_record=True) 540 block.save() 541 542 543 def unblock_host(self, host_id): 544 logging.info("removing block %s/%s", self.job.id, host_id) 545 blocks = IneligibleHostQueue.fetch( 546 'job_id=%d and host_id=%d' % (self.job.id, host_id)) 547 for block in blocks: 548 block.delete() 549 550 551 def set_execution_subdir(self, subdir=None): 552 if subdir is None: 553 assert self.host 554 subdir = self.host.hostname 555 self.update_field('execution_subdir', subdir) 556 557 558 def _get_hostname(self): 559 if self.host: 560 return self.host.hostname 561 return 'no host' 562 563 564 def get_dbg_str(self): 565 """Get a debug string to identify this host. 566 567 @return: A string containing the hqe and job id. 568 """ 569 try: 570 return 'HQE: %s, for job: %s' % (self.id, self.job_id) 571 except AttributeError as e: 572 return 'HQE has not been initialized yet: %s' % e 573 574 575 def __str__(self): 576 flags = [] 577 if self.active: 578 flags.append('active') 579 if self.complete: 580 flags.append('complete') 581 if self.deleted: 582 flags.append('deleted') 583 if self.aborted: 584 flags.append('aborted') 585 flags_str = ','.join(flags) 586 if flags_str: 587 flags_str = ' [%s]' % flags_str 588 return ("%s and host: %s has status:%s%s" % 589 (self.get_dbg_str(), self._get_hostname(), self.status, 590 flags_str)) 591 592 593 def record_state(self, type_str, state, value): 594 """Record metadata in elasticsearch. 595 596 If ES configured to use http, then we will time that http request. 597 Otherwise, it uses UDP, so we will not need to time it. 598 599 @param type_str: sets the _type field in elasticsearch db. 600 @param state: string representing what state we are recording, 601 e.g. 'status' 602 @param value: value of the state, e.g. 'verifying' 603 """ 604 metadata = { 605 'time_changed': time.time(), 606 state: value, 607 'job_id': self.job_id, 608 } 609 if self.host: 610 metadata['hostname'] = self.host.hostname 611 autotest_es.post(type_str=type_str, metadata=metadata) 612 613 614 def set_status(self, status): 615 logging.info("%s -> %s", self, status) 616 617 self.update_field('status', status) 618 619 active = (status in models.HostQueueEntry.ACTIVE_STATUSES) 620 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES) 621 assert not (active and complete) 622 623 self.update_field('active', active) 624 625 # The ordering of these operations is important. Once we set the 626 # complete bit this job will become indistinguishable from all 627 # the other complete jobs, unless we first set shard_id to NULL 628 # to signal to the shard_client that we need to upload it. However, 629 # we can only set both these after we've updated finished_on etc 630 # within _on_complete or the job will get synced in an intermediate 631 # state. This means that if someone sigkills the scheduler between 632 # setting finished_on and complete, we will have inconsistent jobs. 633 # This should be fine, because nothing critical checks finished_on, 634 # and the scheduler should never be killed mid-tick. 635 if complete: 636 self._on_complete(status) 637 self._email_on_job_complete() 638 639 self.update_field('complete', complete) 640 641 should_email_status = (status.lower() in _notify_email_statuses or 642 'all' in _notify_email_statuses) 643 if should_email_status: 644 self._email_on_status(status) 645 logging.debug('HQE Set Status Complete') 646 self.record_state('hqe_status', 'status', status) 647 648 649 def _on_complete(self, status): 650 metric_fields = {'status': status.lower()} 651 if self.host: 652 metric_fields['board'] = self.host.board or '' 653 if len(self.host.pools) == 1: 654 metric_fields['pool'] = self.host.pools[0] 655 else: 656 metric_fields['pool'] = 'MULTIPLE' 657 else: 658 metric_fields['board'] = 'NO_HOST' 659 metric_fields['pool'] = 'NO_HOST' 660 self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields) 661 if status is not models.HostQueueEntry.Status.ABORTED: 662 self.job.stop_if_necessary() 663 if self.started_on: 664 self.set_finished_on_now() 665 if self.job.shard_id is not None: 666 # If shard_id is None, the job will be synced back to the master 667 self.job.update_field('shard_id', None) 668 if not self.execution_subdir: 669 return 670 # unregister any possible pidfiles associated with this queue entry 671 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES: 672 pidfile_id = _drone_manager.get_pidfile_id_from( 673 self.execution_path(), pidfile_name=pidfile_name) 674 _drone_manager.unregister_pidfile(pidfile_id) 675 676 677 def _get_status_email_contents(self, status, summary=None, hostname=None): 678 """ 679 Gather info for the status notification e-mails. 680 681 If needed, we could start using the Django templating engine to create 682 the subject and the e-mail body, but that doesn't seem necessary right 683 now. 684 685 @param status: Job status text. Mandatory. 686 @param summary: Job summary text. Optional. 687 @param hostname: A hostname for the job. Optional. 688 689 @return: Tuple (subject, body) for the notification e-mail. 690 """ 691 job_stats = Job(id=self.job.id).get_execution_details() 692 693 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' % 694 (self.job.id, self.job.name, status)) 695 696 if hostname is not None: 697 subject += '| Hostname: %s ' % hostname 698 699 if status not in ["1 Failed", "Failed"]: 700 subject += '| Success Rate: %.2f %%' % job_stats['success_rate'] 701 702 body = "Job ID: %s\n" % self.job.id 703 body += "Job name: %s\n" % self.job.name 704 if hostname is not None: 705 body += "Host: %s\n" % hostname 706 if summary is not None: 707 body += "Summary: %s\n" % summary 708 body += "Status: %s\n" % status 709 body += "Results interface URL: %s\n" % self._view_job_url() 710 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time'] 711 if int(job_stats['total_executed']) > 0: 712 body += "User tests executed: %s\n" % job_stats['total_executed'] 713 body += "User tests passed: %s\n" % job_stats['total_passed'] 714 body += "User tests failed: %s\n" % job_stats['total_failed'] 715 body += ("User tests success rate: %.2f %%\n" % 716 job_stats['success_rate']) 717 718 if job_stats['failed_rows']: 719 body += "Failures:\n" 720 body += job_stats['failed_rows'] 721 722 return subject, body 723 724 725 def _email_on_status(self, status): 726 hostname = self._get_hostname() 727 subject, body = self._get_status_email_contents(status, None, hostname) 728 email_manager.manager.send_email(self.job.email_list, subject, body) 729 730 731 def _email_on_job_complete(self): 732 if not self.job.is_finished(): 733 return 734 735 summary = [] 736 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id) 737 for queue_entry in hosts_queue: 738 summary.append("Host: %s Status: %s" % 739 (queue_entry._get_hostname(), 740 queue_entry.status)) 741 742 summary = "\n".join(summary) 743 status_counts = models.Job.objects.get_status_counts( 744 [self.job.id])[self.job.id] 745 status = ', '.join('%d %s' % (count, status) for status, count 746 in status_counts.iteritems()) 747 748 subject, body = self._get_status_email_contents(status, summary, None) 749 email_manager.manager.send_email(self.job.email_list, subject, body) 750 751 752 def schedule_pre_job_tasks(self): 753 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s", 754 self.job.name, self.meta_host, self.atomic_group_id, 755 self.job.id, self.id, self.host.hostname, self.status) 756 757 self._do_schedule_pre_job_tasks() 758 759 760 def _do_schedule_pre_job_tasks(self): 761 self.job.schedule_pre_job_tasks(queue_entry=self) 762 763 764 def requeue(self): 765 assert self.host 766 self.set_status(models.HostQueueEntry.Status.QUEUED) 767 self.update_field('started_on', None) 768 self.update_field('finished_on', None) 769 # verify/cleanup failure sets the execution subdir, so reset it here 770 self.set_execution_subdir('') 771 if self.meta_host: 772 self.set_host(None) 773 774 775 @property 776 def aborted_by(self): 777 self._load_abort_info() 778 return self._aborted_by 779 780 781 @property 782 def aborted_on(self): 783 self._load_abort_info() 784 return self._aborted_on 785 786 787 def _load_abort_info(self): 788 """ Fetch info about who aborted the job. """ 789 if hasattr(self, "_aborted_by"): 790 return 791 rows = _db.execute(""" 792 SELECT afe_users.login, 793 afe_aborted_host_queue_entries.aborted_on 794 FROM afe_aborted_host_queue_entries 795 INNER JOIN afe_users 796 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id 797 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s 798 """, (self.id,)) 799 if rows: 800 self._aborted_by, self._aborted_on = rows[0] 801 else: 802 self._aborted_by = self._aborted_on = None 803 804 805 def on_pending(self): 806 """ 807 Called when an entry in a synchronous job has passed verify. If the 808 job is ready to run, sets the entries to STARTING. Otherwise, it leaves 809 them in PENDING. 810 """ 811 self.set_status(models.HostQueueEntry.Status.PENDING) 812 self.host.set_status(models.Host.Status.PENDING) 813 814 # Some debug code here: sends an email if an asynchronous job does not 815 # immediately enter Starting. 816 # TODO: Remove this once we figure out why asynchronous jobs are getting 817 # stuck in Pending. 818 self.job.run_if_ready(queue_entry=self) 819 if (self.job.synch_count == 1 and 820 self.status == models.HostQueueEntry.Status.PENDING): 821 subject = 'Job %s (id %s)' % (self.job.name, self.job.id) 822 message = 'Asynchronous job stuck in Pending' 823 email_manager.manager.enqueue_notify_email(subject, message) 824 825 826 def abort(self, dispatcher): 827 assert self.aborted and not self.complete 828 829 Status = models.HostQueueEntry.Status 830 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING): 831 # do nothing; post-job tasks will finish and then mark this entry 832 # with status "Aborted" and take care of the host 833 return 834 835 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING, 836 Status.WAITING): 837 # If hqe is in any of these status, it should not have any 838 # unfinished agent before it can be aborted. 839 agents = dispatcher.get_agents_for_entry(self) 840 # Agent with finished task can be left behind. This is added to 841 # handle the special case of aborting hostless job in STARTING 842 # status, in which the agent has only a HostlessQueueTask 843 # associated. The finished HostlessQueueTask will be cleaned up in 844 # the next tick, so it's safe to leave the agent there. Without 845 # filtering out finished agent, HQE abort won't be able to proceed. 846 assert all([agent.is_done() for agent in agents]) 847 # If hqe is still in STARTING status, it may not have assigned a 848 # host yet. 849 if self.host: 850 self.host.set_status(models.Host.Status.READY) 851 elif (self.status == Status.VERIFYING or 852 self.status == Status.RESETTING): 853 models.SpecialTask.objects.create( 854 task=models.SpecialTask.Task.CLEANUP, 855 host=models.Host.objects.get(id=self.host.id), 856 requested_by=self.job.owner_model()) 857 elif self.status == Status.PROVISIONING: 858 models.SpecialTask.objects.create( 859 task=models.SpecialTask.Task.REPAIR, 860 host=models.Host.objects.get(id=self.host.id), 861 requested_by=self.job.owner_model()) 862 863 self.set_status(Status.ABORTED) 864 865 866 def execution_tag(self): 867 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE ' 868 'complete!=1 AND execution_subdir="" AND ' 869 'status!="Queued";') 870 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET ' 871 'status="Aborted" WHERE id=%s;') 872 try: 873 assert self.execution_subdir 874 except AssertionError: 875 # TODO(scottz): Remove temporary fix/info gathering pathway for 876 # crosbug.com/31595 once issue is root caused. 877 logging.error('No execution_subdir for host queue id:%s.', self.id) 878 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES) 879 for row in _db.execute(SQL_SUSPECT_ENTRIES): 880 logging.error(row) 881 logging.error('====DB DEBUG====\n') 882 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id 883 logging.error('EXECUTING: %s', fix_query) 884 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id) 885 raise AssertionError(('self.execution_subdir not found. ' 886 'See log for details.')) 887 888 return "%s/%s" % (self.job.tag(), self.execution_subdir) 889 890 891 def execution_path(self): 892 return self.execution_tag() 893 894 895 def set_started_on_now(self): 896 self.update_field('started_on', datetime.datetime.now()) 897 898 899 def set_finished_on_now(self): 900 self.update_field('finished_on', datetime.datetime.now()) 901 902 903 def is_hostless(self): 904 return (self.host_id is None 905 and self.meta_host is None) 906 907 908class Job(DBObject): 909 _table_name = 'afe_jobs' 910 _fields = ('id', 'owner', 'name', 'priority', 'control_file', 911 'control_type', 'created_on', 'synch_count', 'timeout', 912 'run_verify', 'email_list', 'reboot_before', 'reboot_after', 913 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id', 914 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id', 915 'test_retry', 'run_reset', 'timeout_mins', 'shard_id', 916 'require_ssp') 917 918 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on 919 # all status='Pending' atomic group HQEs incase a delay was running when the 920 # scheduler was restarted and no more hosts ever successfully exit Verify. 921 922 def __init__(self, id=None, row=None, **kwargs): 923 assert id or row 924 super(Job, self).__init__(id=id, row=row, **kwargs) 925 self._owner_model = None # caches model instance of owner 926 self.update_image_path = None # path of OS image to install 927 928 929 def model(self): 930 return models.Job.objects.get(id=self.id) 931 932 933 def owner_model(self): 934 # work around the fact that the Job owner field is a string, not a 935 # foreign key 936 if not self._owner_model: 937 self._owner_model = models.User.objects.get(login=self.owner) 938 return self._owner_model 939 940 941 def tag(self): 942 return "%s-%s" % (self.id, self.owner) 943 944 945 def is_image_update_job(self): 946 """ 947 Discover if the current job requires an OS update. 948 949 @return: True/False if OS should be updated before job is run. 950 """ 951 # All image update jobs have the parameterized_job_id set. 952 if not self.parameterized_job_id: 953 return False 954 955 # Retrieve the ID of the ParameterizedJob this job is an instance of. 956 rows = _db.execute(""" 957 SELECT test_id 958 FROM afe_parameterized_jobs 959 WHERE id = %s 960 """, (self.parameterized_job_id,)) 961 if not rows: 962 return False 963 test_id = rows[0][0] 964 965 # Retrieve the ID of the known autoupdate_ParameterizedJob. 966 rows = _db.execute(""" 967 SELECT id 968 FROM afe_autotests 969 WHERE name = 'autoupdate_ParameterizedJob' 970 """) 971 if not rows: 972 return False 973 update_id = rows[0][0] 974 975 # If the IDs are the same we've found an image update job. 976 if test_id == update_id: 977 # Finally, get the path to the OS image to install. 978 rows = _db.execute(""" 979 SELECT parameter_value 980 FROM afe_parameterized_job_parameters 981 WHERE parameterized_job_id = %s 982 """, (self.parameterized_job_id,)) 983 if rows: 984 # Save the path in update_image_path to use later as a command 985 # line parameter to autoserv. 986 self.update_image_path = rows[0][0] 987 return True 988 989 return False 990 991 992 def get_execution_details(self): 993 """ 994 Get test execution details for this job. 995 996 @return: Dictionary with test execution details 997 """ 998 def _find_test_jobs(rows): 999 """ 1000 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.* 1001 Those are autotest 'internal job' tests, so they should not be 1002 counted when evaluating the test stats. 1003 1004 @param rows: List of rows (matrix) with database results. 1005 """ 1006 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]') 1007 n_test_jobs = 0 1008 for r in rows: 1009 test_name = r[0] 1010 if job_test_pattern.match(test_name): 1011 n_test_jobs += 1 1012 1013 return n_test_jobs 1014 1015 stats = {} 1016 1017 rows = _db.execute(""" 1018 SELECT t.test, s.word, t.reason 1019 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s 1020 WHERE t.job_idx = j.job_idx 1021 AND s.status_idx = t.status 1022 AND j.afe_job_id = %s 1023 ORDER BY t.reason 1024 """ % self.id) 1025 1026 failed_rows = [r for r in rows if not r[1] == 'GOOD'] 1027 1028 n_test_jobs = _find_test_jobs(rows) 1029 n_test_jobs_failed = _find_test_jobs(failed_rows) 1030 1031 total_executed = len(rows) - n_test_jobs 1032 total_failed = len(failed_rows) - n_test_jobs_failed 1033 1034 if total_executed > 0: 1035 success_rate = 100 - ((total_failed / float(total_executed)) * 100) 1036 else: 1037 success_rate = 0 1038 1039 stats['total_executed'] = total_executed 1040 stats['total_failed'] = total_failed 1041 stats['total_passed'] = total_executed - total_failed 1042 stats['success_rate'] = success_rate 1043 1044 status_header = ("Test Name", "Status", "Reason") 1045 if failed_rows: 1046 stats['failed_rows'] = utils.matrix_to_string(failed_rows, 1047 status_header) 1048 else: 1049 stats['failed_rows'] = '' 1050 1051 time_row = _db.execute(""" 1052 SELECT started_time, finished_time 1053 FROM tko_jobs 1054 WHERE afe_job_id = %s 1055 """ % self.id) 1056 1057 if time_row: 1058 t_begin, t_end = time_row[0] 1059 try: 1060 delta = t_end - t_begin 1061 minutes, seconds = divmod(delta.seconds, 60) 1062 hours, minutes = divmod(minutes, 60) 1063 stats['execution_time'] = ("%02d:%02d:%02d" % 1064 (hours, minutes, seconds)) 1065 # One of t_end or t_begin are None 1066 except TypeError: 1067 stats['execution_time'] = '(could not determine)' 1068 else: 1069 stats['execution_time'] = '(none)' 1070 1071 return stats 1072 1073 1074 def keyval_dict(self): 1075 return self.model().keyval_dict() 1076 1077 1078 def _pending_count(self): 1079 """The number of HostQueueEntries for this job in the Pending state.""" 1080 pending_entries = models.HostQueueEntry.objects.filter( 1081 job=self.id, status=models.HostQueueEntry.Status.PENDING) 1082 return pending_entries.count() 1083 1084 1085 def is_ready(self): 1086 pending_count = self._pending_count() 1087 ready = (pending_count >= self.synch_count) 1088 1089 if not ready: 1090 logging.info( 1091 'Job %s not ready: %s pending, %s required ', 1092 self, pending_count, self.synch_count) 1093 1094 return ready 1095 1096 1097 def num_machines(self, clause = None): 1098 sql = "job_id=%s" % self.id 1099 if clause: 1100 sql += " AND (%s)" % clause 1101 return self.count(sql, table='afe_host_queue_entries') 1102 1103 1104 def num_queued(self): 1105 return self.num_machines('not complete') 1106 1107 1108 def num_active(self): 1109 return self.num_machines('active') 1110 1111 1112 def num_complete(self): 1113 return self.num_machines('complete') 1114 1115 1116 def is_finished(self): 1117 return self.num_complete() == self.num_machines() 1118 1119 1120 def _not_yet_run_entries(self, include_active=True): 1121 if include_active: 1122 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES) 1123 else: 1124 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES) 1125 return models.HostQueueEntry.objects.filter(job=self.id, 1126 status__in=statuses) 1127 1128 1129 def _stop_all_entries(self): 1130 """Stops the job's inactive pre-job HQEs.""" 1131 entries_to_stop = self._not_yet_run_entries( 1132 include_active=False) 1133 for child_entry in entries_to_stop: 1134 assert not child_entry.complete, ( 1135 '%s status=%s, active=%s, complete=%s' % 1136 (child_entry.id, child_entry.status, child_entry.active, 1137 child_entry.complete)) 1138 if child_entry.status == models.HostQueueEntry.Status.PENDING: 1139 child_entry.host.status = models.Host.Status.READY 1140 child_entry.host.save() 1141 child_entry.status = models.HostQueueEntry.Status.STOPPED 1142 child_entry.save() 1143 1144 1145 def stop_if_necessary(self): 1146 not_yet_run = self._not_yet_run_entries() 1147 if not_yet_run.count() < self.synch_count: 1148 self._stop_all_entries() 1149 1150 1151 def _next_group_name(self): 1152 """@returns a directory name to use for the next host group results.""" 1153 group_name = '' 1154 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name)) 1155 query = models.HostQueueEntry.objects.filter( 1156 job=self.id).values('execution_subdir').distinct() 1157 subdirs = (entry['execution_subdir'] for entry in query) 1158 group_matches = (group_count_re.match(subdir) for subdir in subdirs) 1159 ids = [int(match.group(1)) for match in group_matches if match] 1160 if ids: 1161 next_id = max(ids) + 1 1162 else: 1163 next_id = 0 1164 return '%sgroup%d' % (group_name, next_id) 1165 1166 1167 def get_group_entries(self, queue_entry_from_group): 1168 """ 1169 @param queue_entry_from_group: A HostQueueEntry instance to find other 1170 group entries on this job for. 1171 1172 @returns A list of HostQueueEntry objects all executing this job as 1173 part of the same group as the one supplied (having the same 1174 execution_subdir). 1175 """ 1176 execution_subdir = queue_entry_from_group.execution_subdir 1177 return list(HostQueueEntry.fetch( 1178 where='job_id=%s AND execution_subdir=%s', 1179 params=(self.id, execution_subdir))) 1180 1181 1182 def _should_run_cleanup(self, queue_entry): 1183 if self.reboot_before == model_attributes.RebootBefore.ALWAYS: 1184 return True 1185 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY: 1186 return queue_entry.host.dirty 1187 return False 1188 1189 1190 def _should_run_verify(self, queue_entry): 1191 do_not_verify = (queue_entry.host.protection == 1192 host_protections.Protection.DO_NOT_VERIFY) 1193 if do_not_verify: 1194 return False 1195 # If RebootBefore is set to NEVER, then we won't run reset because 1196 # we can't cleanup, so we need to weaken a Reset into a Verify. 1197 weaker_reset = (self.run_reset and 1198 self.reboot_before == model_attributes.RebootBefore.NEVER) 1199 return self.run_verify or weaker_reset 1200 1201 1202 def _should_run_reset(self, queue_entry): 1203 can_verify = (queue_entry.host.protection != 1204 host_protections.Protection.DO_NOT_VERIFY) 1205 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER 1206 return (can_reboot and can_verify and (self.run_reset or 1207 (self._should_run_cleanup(queue_entry) and 1208 self._should_run_verify(queue_entry)))) 1209 1210 1211 def _should_run_provision(self, queue_entry): 1212 """ 1213 Determine if the queue_entry needs to have a provision task run before 1214 it to provision queue_entry.host. 1215 1216 @param queue_entry: The host queue entry in question. 1217 @returns: True if we should schedule a provision task, False otherwise. 1218 1219 """ 1220 # If we get to this point, it means that the scheduler has already 1221 # vetted that all the unprovisionable labels match, so we can just 1222 # find all labels on the job that aren't on the host to get the list 1223 # of what we need to provision. (See the scheduling logic in 1224 # host_scheduler.py:is_host_eligable_for_job() where we discard all 1225 # actionable labels when assigning jobs to hosts.) 1226 job_labels = {x.name for x in queue_entry.get_labels()} 1227 # Skip provision if `skip_provision` is listed in the job labels. 1228 if provision.SKIP_PROVISION in job_labels: 1229 return False 1230 _, host_labels = queue_entry.host.platform_and_labels() 1231 # If there are any labels on the job that are not on the host and they 1232 # are labels that provisioning knows how to change, then that means 1233 # there is provisioning work to do. If there's no provisioning work to 1234 # do, then obviously we have no reason to schedule a provision task! 1235 diff = job_labels - set(host_labels) 1236 if any([provision.Provision.acts_on(x) for x in diff]): 1237 return True 1238 return False 1239 1240 1241 def _queue_special_task(self, queue_entry, task): 1242 """ 1243 Create a special task and associate it with a host queue entry. 1244 1245 @param queue_entry: The queue entry this special task should be 1246 associated with. 1247 @param task: One of the members of the enum models.SpecialTask.Task. 1248 @returns: None 1249 1250 """ 1251 models.SpecialTask.objects.create( 1252 host=models.Host.objects.get(id=queue_entry.host_id), 1253 queue_entry=queue_entry, task=task) 1254 1255 1256 def schedule_pre_job_tasks(self, queue_entry): 1257 """ 1258 Queue all of the special tasks that need to be run before a host 1259 queue entry may run. 1260 1261 If no special taskes need to be scheduled, then |on_pending| will be 1262 called directly. 1263 1264 @returns None 1265 1266 """ 1267 task_queued = False 1268 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id) 1269 1270 if self._should_run_provision(queue_entry): 1271 self._queue_special_task(hqe_model, 1272 models.SpecialTask.Task.PROVISION) 1273 task_queued = True 1274 elif self._should_run_reset(queue_entry): 1275 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET) 1276 task_queued = True 1277 else: 1278 if self._should_run_cleanup(queue_entry): 1279 self._queue_special_task(hqe_model, 1280 models.SpecialTask.Task.CLEANUP) 1281 task_queued = True 1282 if self._should_run_verify(queue_entry): 1283 self._queue_special_task(hqe_model, 1284 models.SpecialTask.Task.VERIFY) 1285 task_queued = True 1286 1287 if not task_queued: 1288 queue_entry.on_pending() 1289 1290 1291 def _assign_new_group(self, queue_entries): 1292 if len(queue_entries) == 1: 1293 group_subdir_name = queue_entries[0].host.hostname 1294 else: 1295 group_subdir_name = self._next_group_name() 1296 logging.info('Running synchronous job %d hosts %s as %s', 1297 self.id, [entry.host.hostname for entry in queue_entries], 1298 group_subdir_name) 1299 1300 for queue_entry in queue_entries: 1301 queue_entry.set_execution_subdir(group_subdir_name) 1302 1303 1304 def _choose_group_to_run(self, include_queue_entry): 1305 """ 1306 @returns A tuple containing a list of HostQueueEntry instances to be 1307 used to run this Job, a string group name to suggest giving 1308 to this job in the results database. 1309 """ 1310 chosen_entries = [include_queue_entry] 1311 num_entries_wanted = self.synch_count 1312 num_entries_wanted -= len(chosen_entries) 1313 1314 if num_entries_wanted > 0: 1315 where_clause = 'job_id = %s AND status = "Pending" AND id != %s' 1316 pending_entries = list(HostQueueEntry.fetch( 1317 where=where_clause, 1318 params=(self.id, include_queue_entry.id))) 1319 1320 # Sort the chosen hosts by hostname before slicing. 1321 def cmp_queue_entries_by_hostname(entry_a, entry_b): 1322 return Host.cmp_for_sort(entry_a.host, entry_b.host) 1323 pending_entries.sort(cmp=cmp_queue_entries_by_hostname) 1324 chosen_entries += pending_entries[:num_entries_wanted] 1325 1326 # Sanity check. We'll only ever be called if this can be met. 1327 if len(chosen_entries) < self.synch_count: 1328 message = ('job %s got less than %s chosen entries: %s' % ( 1329 self.id, self.synch_count, chosen_entries)) 1330 logging.error(message) 1331 email_manager.manager.enqueue_notify_email( 1332 'Job not started, too few chosen entries', message) 1333 return [] 1334 1335 self._assign_new_group(chosen_entries) 1336 return chosen_entries 1337 1338 1339 def run_if_ready(self, queue_entry): 1340 """ 1341 Run this job by kicking its HQEs into status='Starting' if enough 1342 hosts are ready for it to run. 1343 1344 Cleans up by kicking HQEs into status='Stopped' if this Job is not 1345 ready to run. 1346 """ 1347 if not self.is_ready(): 1348 self.stop_if_necessary() 1349 else: 1350 self.run(queue_entry) 1351 1352 1353 def request_abort(self): 1354 """Request that this Job be aborted on the next scheduler cycle.""" 1355 self.model().abort() 1356 1357 1358 def run(self, queue_entry): 1359 """ 1360 @param queue_entry: The HostQueueEntry instance calling this method. 1361 """ 1362 queue_entries = self._choose_group_to_run(queue_entry) 1363 if queue_entries: 1364 self._finish_run(queue_entries) 1365 1366 1367 def _finish_run(self, queue_entries): 1368 for queue_entry in queue_entries: 1369 queue_entry.set_status(models.HostQueueEntry.Status.STARTING) 1370 1371 1372 def __str__(self): 1373 return '%s-%s' % (self.id, self.owner) 1374