1# pylint: disable=missing-docstring 2 3import logging 4from datetime import datetime 5import django.core 6try: 7 from django.db import models as dbmodels, connection 8except django.core.exceptions.ImproperlyConfigured: 9 raise ImportError('Django database not yet configured. Import either ' 10 'setup_django_environment or ' 11 'setup_django_lite_environment from ' 12 'autotest_lib.frontend before any imports that ' 13 'depend on django models.') 14from xml.sax import saxutils 15import common 16from autotest_lib.frontend.afe import model_logic, model_attributes 17from autotest_lib.frontend.afe import rdb_model_extensions 18from autotest_lib.frontend import settings, thread_local 19from autotest_lib.client.common_lib import enum, error, host_protections 20from autotest_lib.client.common_lib import global_config 21from autotest_lib.client.common_lib import host_queue_entry_states 22from autotest_lib.client.common_lib import control_data, priorities, decorators 23from autotest_lib.client.common_lib import site_utils 24from autotest_lib.client.common_lib.cros.graphite import autotest_es 25from autotest_lib.server import utils as server_utils 26 27# job options and user preferences 28DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 29DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 30 31 32class AclAccessViolation(Exception): 33 """\ 34 Raised when an operation is attempted with proper permissions as 35 dictated by ACLs. 36 """ 37 38 39class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 40 """\ 41 An atomic group defines a collection of hosts which must only be scheduled 42 all at once. Any host with a label having an atomic group will only be 43 scheduled for a job at the same time as other hosts sharing that label. 44 45 Required: 46 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 47 max_number_of_machines: The maximum number of machines that will be 48 scheduled at once when scheduling jobs to this atomic group. 49 The job.synch_count is considered the minimum. 50 51 Optional: 52 description: Arbitrary text description of this group's purpose. 53 """ 54 name = dbmodels.CharField(max_length=255, unique=True) 55 description = dbmodels.TextField(blank=True) 56 # This magic value is the default to simplify the scheduler logic. 57 # It must be "large". The common use of atomic groups is to want all 58 # machines in the group to be used, limits on which subset used are 59 # often chosen via dependency labels. 60 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 61 # "infinity" is around 3.3 million. 62 INFINITE_MACHINES = 333333333 63 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 64 invalid = dbmodels.BooleanField(default=False, 65 editable=settings.FULL_ADMIN) 66 67 name_field = 'name' 68 objects = model_logic.ModelWithInvalidManager() 69 valid_objects = model_logic.ValidObjectsManager() 70 71 72 def enqueue_job(self, job, is_template=False): 73 """Enqueue a job on an associated atomic group of hosts. 74 75 @param job: A job to enqueue. 76 @param is_template: Whether the status should be "Template". 77 """ 78 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 79 is_template=is_template) 80 queue_entry.save() 81 82 83 def clean_object(self): 84 self.label_set.clear() 85 86 87 class Meta: 88 """Metadata for class AtomicGroup.""" 89 db_table = 'afe_atomic_groups' 90 91 92 def __unicode__(self): 93 return unicode(self.name) 94 95 96class Label(model_logic.ModelWithInvalid, dbmodels.Model): 97 """\ 98 Required: 99 name: label name 100 101 Optional: 102 kernel_config: URL/path to kernel config for jobs run on this label. 103 platform: If True, this is a platform label (defaults to False). 104 only_if_needed: If True, a Host with this label can only be used if that 105 label is requested by the job/test (either as the meta_host or 106 in the job_dependencies). 107 atomic_group: The atomic group associated with this label. 108 """ 109 name = dbmodels.CharField(max_length=255, unique=True) 110 kernel_config = dbmodels.CharField(max_length=255, blank=True) 111 platform = dbmodels.BooleanField(default=False) 112 invalid = dbmodels.BooleanField(default=False, 113 editable=settings.FULL_ADMIN) 114 only_if_needed = dbmodels.BooleanField(default=False) 115 116 name_field = 'name' 117 objects = model_logic.ModelWithInvalidManager() 118 valid_objects = model_logic.ValidObjectsManager() 119 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 120 121 122 def clean_object(self): 123 self.host_set.clear() 124 self.test_set.clear() 125 126 127 def enqueue_job(self, job, is_template=False): 128 """Enqueue a job on any host of this label. 129 130 @param job: A job to enqueue. 131 @param is_template: Whether the status should be "Template". 132 """ 133 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 134 is_template=is_template) 135 queue_entry.save() 136 137 138 139 class Meta: 140 """Metadata for class Label.""" 141 db_table = 'afe_labels' 142 143 144 def __unicode__(self): 145 return unicode(self.name) 146 147 148class Shard(dbmodels.Model, model_logic.ModelExtensions): 149 150 hostname = dbmodels.CharField(max_length=255, unique=True) 151 152 name_field = 'hostname' 153 154 labels = dbmodels.ManyToManyField(Label, blank=True, 155 db_table='afe_shards_labels') 156 157 class Meta: 158 """Metadata for class ParameterizedJob.""" 159 db_table = 'afe_shards' 160 161 162 def rpc_hostname(self): 163 """Get the rpc hostname of the shard. 164 165 @return: Just the shard hostname for all non-testing environments. 166 The address of the default gateway for vm testing environments. 167 """ 168 # TODO: Figure out a better solution for testing. Since no 2 shards 169 # can run on the same host, if the shard hostname is localhost we 170 # conclude that it must be a vm in a test cluster. In such situations 171 # a name of localhost:<port> is necessary to achieve the correct 172 # afe links/redirection from the frontend (this happens through the 173 # host), but for rpcs that are performed *on* the shard, they need to 174 # use the address of the gateway. 175 # In the virtual machine testing environment (i.e., puppylab), each 176 # shard VM has a hostname like localhost:<port>. In the real cluster 177 # environment, a shard node does not have 'localhost' for its hostname. 178 # The following hostname substitution is needed only for the VM 179 # in puppylab. 180 # The 'hostname' should not be replaced in the case of real cluster. 181 if site_utils.is_puppylab_vm(self.hostname): 182 hostname = self.hostname.split(':')[0] 183 return self.hostname.replace( 184 hostname, site_utils.DEFAULT_VM_GATEWAY) 185 return self.hostname 186 187 188class Drone(dbmodels.Model, model_logic.ModelExtensions): 189 """ 190 A scheduler drone 191 192 hostname: the drone's hostname 193 """ 194 hostname = dbmodels.CharField(max_length=255, unique=True) 195 196 name_field = 'hostname' 197 objects = model_logic.ExtendedManager() 198 199 200 def save(self, *args, **kwargs): 201 if not User.current_user().is_superuser(): 202 raise Exception('Only superusers may edit drones') 203 super(Drone, self).save(*args, **kwargs) 204 205 206 def delete(self): 207 if not User.current_user().is_superuser(): 208 raise Exception('Only superusers may delete drones') 209 super(Drone, self).delete() 210 211 212 class Meta: 213 """Metadata for class Drone.""" 214 db_table = 'afe_drones' 215 216 def __unicode__(self): 217 return unicode(self.hostname) 218 219 220class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 221 """ 222 A set of scheduler drones 223 224 These will be used by the scheduler to decide what drones a job is allowed 225 to run on. 226 227 name: the drone set's name 228 drones: the drones that are part of the set 229 """ 230 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 231 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 232 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 233 'SCHEDULER', 'default_drone_set_name', default=None) 234 235 name = dbmodels.CharField(max_length=255, unique=True) 236 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 237 238 name_field = 'name' 239 objects = model_logic.ExtendedManager() 240 241 242 def save(self, *args, **kwargs): 243 if not User.current_user().is_superuser(): 244 raise Exception('Only superusers may edit drone sets') 245 super(DroneSet, self).save(*args, **kwargs) 246 247 248 def delete(self): 249 if not User.current_user().is_superuser(): 250 raise Exception('Only superusers may delete drone sets') 251 super(DroneSet, self).delete() 252 253 254 @classmethod 255 def drone_sets_enabled(cls): 256 """Returns whether drone sets are enabled. 257 258 @param cls: Implicit class object. 259 """ 260 return cls.DRONE_SETS_ENABLED 261 262 263 @classmethod 264 def default_drone_set_name(cls): 265 """Returns the default drone set name. 266 267 @param cls: Implicit class object. 268 """ 269 return cls.DEFAULT_DRONE_SET_NAME 270 271 272 @classmethod 273 def get_default(cls): 274 """Gets the default drone set name, compatible with Job.add_object. 275 276 @param cls: Implicit class object. 277 """ 278 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 279 280 281 @classmethod 282 def resolve_name(cls, drone_set_name): 283 """ 284 Returns the name of one of these, if not None, in order of preference: 285 1) the drone set given, 286 2) the current user's default drone set, or 287 3) the global default drone set 288 289 or returns None if drone sets are disabled 290 291 @param cls: Implicit class object. 292 @param drone_set_name: A drone set name. 293 """ 294 if not cls.drone_sets_enabled(): 295 return None 296 297 user = User.current_user() 298 user_drone_set_name = user.drone_set and user.drone_set.name 299 300 return drone_set_name or user_drone_set_name or cls.get_default().name 301 302 303 def get_drone_hostnames(self): 304 """ 305 Gets the hostnames of all drones in this drone set 306 """ 307 return set(self.drones.all().values_list('hostname', flat=True)) 308 309 310 class Meta: 311 """Metadata for class DroneSet.""" 312 db_table = 'afe_drone_sets' 313 314 def __unicode__(self): 315 return unicode(self.name) 316 317 318class User(dbmodels.Model, model_logic.ModelExtensions): 319 """\ 320 Required: 321 login :user login name 322 323 Optional: 324 access_level: 0=User (default), 1=Admin, 100=Root 325 """ 326 ACCESS_ROOT = 100 327 ACCESS_ADMIN = 1 328 ACCESS_USER = 0 329 330 AUTOTEST_SYSTEM = 'autotest_system' 331 332 login = dbmodels.CharField(max_length=255, unique=True) 333 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 334 335 # user preferences 336 reboot_before = dbmodels.SmallIntegerField( 337 choices=model_attributes.RebootBefore.choices(), blank=True, 338 default=DEFAULT_REBOOT_BEFORE) 339 reboot_after = dbmodels.SmallIntegerField( 340 choices=model_attributes.RebootAfter.choices(), blank=True, 341 default=DEFAULT_REBOOT_AFTER) 342 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 343 show_experimental = dbmodels.BooleanField(default=False) 344 345 name_field = 'login' 346 objects = model_logic.ExtendedManager() 347 348 349 def save(self, *args, **kwargs): 350 # is this a new object being saved for the first time? 351 first_time = (self.id is None) 352 user = thread_local.get_user() 353 if user and not user.is_superuser() and user.login != self.login: 354 raise AclAccessViolation("You cannot modify user " + self.login) 355 super(User, self).save(*args, **kwargs) 356 if first_time: 357 everyone = AclGroup.objects.get(name='Everyone') 358 everyone.users.add(self) 359 360 361 def is_superuser(self): 362 """Returns whether the user has superuser access.""" 363 return self.access_level >= self.ACCESS_ROOT 364 365 366 @classmethod 367 def current_user(cls): 368 """Returns the current user. 369 370 @param cls: Implicit class object. 371 """ 372 user = thread_local.get_user() 373 if user is None: 374 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 375 user.access_level = cls.ACCESS_ROOT 376 user.save() 377 return user 378 379 380 @classmethod 381 def get_record(cls, data): 382 """Check the database for an identical record. 383 384 Check for a record with matching id and login. If one exists, 385 return it. If one does not exist there is a possibility that 386 the following cases have happened: 387 1. Same id, different login 388 We received: "1 chromeos-test" 389 And we have: "1 debug-user" 390 In this case we need to delete "1 debug_user" and insert 391 "1 chromeos-test". 392 393 2. Same login, different id: 394 We received: "1 chromeos-test" 395 And we have: "2 chromeos-test" 396 In this case we need to delete "2 chromeos-test" and insert 397 "1 chromeos-test". 398 399 As long as this method deletes bad records and raises the 400 DoesNotExist exception the caller will handle creating the 401 new record. 402 403 @raises: DoesNotExist, if a record with the matching login and id 404 does not exist. 405 """ 406 407 # Both the id and login should be uniqe but there are cases when 408 # we might already have a user with the same login/id because 409 # current_user will proactively create a user record if it doesn't 410 # exist. Since we want to avoid conflict between the master and 411 # shard, just delete any existing user records that don't match 412 # what we're about to deserialize from the master. 413 try: 414 return cls.objects.get(login=data['login'], id=data['id']) 415 except cls.DoesNotExist: 416 cls.delete_matching_record(login=data['login']) 417 cls.delete_matching_record(id=data['id']) 418 raise 419 420 421 class Meta: 422 """Metadata for class User.""" 423 db_table = 'afe_users' 424 425 def __unicode__(self): 426 return unicode(self.login) 427 428 429class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 430 model_logic.ModelWithAttributes): 431 """\ 432 Required: 433 hostname 434 435 optional: 436 locked: if true, host is locked and will not be queued 437 438 Internal: 439 From AbstractHostModel: 440 status: string describing status of host 441 invalid: true if the host has been deleted 442 protection: indicates what can be done to this host during repair 443 lock_time: DateTime at which the host was locked 444 dirty: true if the host has been used without being rebooted 445 Local: 446 locked_by: user that locked the host, or null if the host is unlocked 447 """ 448 449 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 450 'hostattribute_set', 451 'labels', 452 'shard']) 453 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) 454 455 456 def custom_deserialize_relation(self, link, data): 457 assert link == 'shard', 'Link %s should not be deserialized' % link 458 self.shard = Shard.deserialize(data) 459 460 461 # Note: Only specify foreign keys here, specify all native host columns in 462 # rdb_model_extensions instead. 463 Protection = host_protections.Protection 464 labels = dbmodels.ManyToManyField(Label, blank=True, 465 db_table='afe_hosts_labels') 466 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 467 name_field = 'hostname' 468 objects = model_logic.ModelWithInvalidManager() 469 valid_objects = model_logic.ValidObjectsManager() 470 leased_objects = model_logic.LeasedHostManager() 471 472 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 473 474 def __init__(self, *args, **kwargs): 475 super(Host, self).__init__(*args, **kwargs) 476 self._record_attributes(['status']) 477 478 479 @staticmethod 480 def create_one_time_host(hostname): 481 """Creates a one-time host. 482 483 @param hostname: The name for the host. 484 """ 485 query = Host.objects.filter(hostname=hostname) 486 if query.count() == 0: 487 host = Host(hostname=hostname, invalid=True) 488 host.do_validate() 489 else: 490 host = query[0] 491 if not host.invalid: 492 raise model_logic.ValidationError({ 493 'hostname' : '%s already exists in the autotest DB. ' 494 'Select it rather than entering it as a one time ' 495 'host.' % hostname 496 }) 497 host.protection = host_protections.Protection.DO_NOT_REPAIR 498 host.locked = False 499 host.save() 500 host.clean_object() 501 return host 502 503 504 @classmethod 505 def assign_to_shard(cls, shard, known_ids): 506 """Assigns hosts to a shard. 507 508 For all labels that have been assigned to a shard, all hosts that 509 have at least one of the shard's labels are assigned to the shard. 510 Hosts that are assigned to the shard but aren't already present on the 511 shard are returned. 512 513 Board to shard mapping is many-to-one. Many different boards can be 514 hosted in a shard. However, DUTs of a single board cannot be distributed 515 into more than one shard. 516 517 @param shard: The shard object to assign labels/hosts for. 518 @param known_ids: List of all host-ids the shard already knows. 519 This is used to figure out which hosts should be sent 520 to the shard. If shard_ids were used instead, hosts 521 would only be transferred once, even if the client 522 failed persisting them. 523 The number of hosts usually lies in O(100), so the 524 overhead is acceptable. 525 526 @returns the hosts objects that should be sent to the shard. 527 """ 528 529 # Disclaimer: concurrent heartbeats should theoretically not occur in 530 # the current setup. As they may be introduced in the near future, 531 # this comment will be left here. 532 533 # Sending stuff twice is acceptable, but forgetting something isn't. 534 # Detecting duplicates on the client is easy, but here it's harder. The 535 # following options were considered: 536 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 537 # than select returned, as concurrently more hosts might have been 538 # inserted 539 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 540 # hosts for the shard, this is overhead 541 # - SELECT and then UPDATE only selected without requerying afterwards: 542 # returns the old state of the records. 543 host_ids = set(Host.objects.filter( 544 labels__in=shard.labels.all(), 545 leased=False 546 ).exclude( 547 id__in=known_ids, 548 ).values_list('pk', flat=True)) 549 550 if host_ids: 551 Host.objects.filter(pk__in=host_ids).update(shard=shard) 552 return list(Host.objects.filter(pk__in=host_ids).all()) 553 return [] 554 555 def resurrect_object(self, old_object): 556 super(Host, self).resurrect_object(old_object) 557 # invalid hosts can be in use by the scheduler (as one-time hosts), so 558 # don't change the status 559 self.status = old_object.status 560 561 562 def clean_object(self): 563 self.aclgroup_set.clear() 564 self.labels.clear() 565 566 567 def record_state(self, type_str, state, value, other_metadata=None): 568 """Record metadata in elasticsearch. 569 570 @param type_str: sets the _type field in elasticsearch db. 571 @param state: string representing what state we are recording, 572 e.g. 'locked' 573 @param value: value of the state, e.g. True 574 @param other_metadata: Other metadata to store in metaDB. 575 """ 576 metadata = { 577 state: value, 578 'hostname': self.hostname, 579 } 580 if other_metadata: 581 metadata = dict(metadata.items() + other_metadata.items()) 582 autotest_es.post(use_http=True, type_str=type_str, metadata=metadata) 583 584 585 def save(self, *args, **kwargs): 586 # extra spaces in the hostname can be a sneaky source of errors 587 self.hostname = self.hostname.strip() 588 # is this a new object being saved for the first time? 589 first_time = (self.id is None) 590 if not first_time: 591 AclGroup.check_for_acl_violation_hosts([self]) 592 # If locked is changed, send its status and user made the change to 593 # metaDB. Locks are important in host history because if a device is 594 # locked then we don't really care what state it is in. 595 if self.locked and not self.locked_by: 596 self.locked_by = User.current_user() 597 if not self.lock_time: 598 self.lock_time = datetime.now() 599 self.record_state('lock_history', 'locked', self.locked, 600 {'changed_by': self.locked_by.login, 601 'lock_reason': self.lock_reason}) 602 self.dirty = True 603 elif not self.locked and self.locked_by: 604 self.record_state('lock_history', 'locked', self.locked, 605 {'changed_by': self.locked_by.login}) 606 self.locked_by = None 607 self.lock_time = None 608 super(Host, self).save(*args, **kwargs) 609 if first_time: 610 everyone = AclGroup.objects.get(name='Everyone') 611 everyone.hosts.add(self) 612 self._check_for_updated_attributes() 613 614 615 def delete(self): 616 AclGroup.check_for_acl_violation_hosts([self]) 617 for queue_entry in self.hostqueueentry_set.all(): 618 queue_entry.deleted = True 619 queue_entry.abort() 620 super(Host, self).delete() 621 622 623 def on_attribute_changed(self, attribute, old_value): 624 assert attribute == 'status' 625 logging.info(self.hostname + ' -> ' + self.status) 626 627 628 def enqueue_job(self, job, is_template=False): 629 """Enqueue a job on this host. 630 631 @param job: A job to enqueue. 632 @param is_template: Whther the status should be "Template". 633 """ 634 queue_entry = HostQueueEntry.create(host=self, job=job, 635 is_template=is_template) 636 # allow recovery of dead hosts from the frontend 637 if not self.active_queue_entry() and self.is_dead(): 638 self.status = Host.Status.READY 639 self.save() 640 queue_entry.save() 641 642 block = IneligibleHostQueue(job=job, host=self) 643 block.save() 644 645 646 def platform(self): 647 """The platform of the host.""" 648 # TODO(showard): slighly hacky? 649 platforms = self.labels.filter(platform=True) 650 if len(platforms) == 0: 651 return None 652 return platforms[0] 653 platform.short_description = 'Platform' 654 655 656 @classmethod 657 def check_no_platform(cls, hosts): 658 """Verify the specified hosts have no associated platforms. 659 660 @param cls: Implicit class object. 661 @param hosts: The hosts to verify. 662 @raises model_logic.ValidationError if any hosts already have a 663 platform. 664 """ 665 Host.objects.populate_relationships(hosts, Label, 'label_list') 666 errors = [] 667 for host in hosts: 668 platforms = [label.name for label in host.label_list 669 if label.platform] 670 if platforms: 671 # do a join, just in case this host has multiple platforms, 672 # we'll be able to see it 673 errors.append('Host %s already has a platform: %s' % ( 674 host.hostname, ', '.join(platforms))) 675 if errors: 676 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 677 678 679 @classmethod 680 def check_board_labels_allowed(cls, hosts, new_labels=[]): 681 """Verify the specified hosts have valid board labels and the given 682 new board labels can be added. 683 684 @param cls: Implicit class object. 685 @param hosts: The hosts to verify. 686 @param new_labels: A list of labels to be added to the hosts. 687 688 @raises model_logic.ValidationError if any host has invalid board labels 689 or the given board labels cannot be added to the hsots. 690 """ 691 Host.objects.populate_relationships(hosts, Label, 'label_list') 692 errors = [] 693 for host in hosts: 694 boards = [label.name for label in host.label_list 695 if label.name.startswith('board:')] 696 if not server_utils.board_labels_allowed(boards + new_labels): 697 # do a join, just in case this host has multiple boards, 698 # we'll be able to see it 699 errors.append('Host %s already has board labels: %s' % ( 700 host.hostname, ', '.join(boards))) 701 if errors: 702 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 703 704 705 def is_dead(self): 706 """Returns whether the host is dead (has status repair failed).""" 707 return self.status == Host.Status.REPAIR_FAILED 708 709 710 def active_queue_entry(self): 711 """Returns the active queue entry for this host, or None if none.""" 712 active = list(self.hostqueueentry_set.filter(active=True)) 713 if not active: 714 return None 715 assert len(active) == 1, ('More than one active entry for ' 716 'host ' + self.hostname) 717 return active[0] 718 719 720 def _get_attribute_model_and_args(self, attribute): 721 return HostAttribute, dict(host=self, attribute=attribute) 722 723 724 @classmethod 725 def get_attribute_model(cls): 726 """Return the attribute model. 727 728 Override method in parent class. See ModelExtensions for details. 729 @returns: The attribute model of Host. 730 """ 731 return HostAttribute 732 733 734 class Meta: 735 """Metadata for the Host class.""" 736 db_table = 'afe_hosts' 737 738 739 def __unicode__(self): 740 return unicode(self.hostname) 741 742 743class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): 744 """Arbitrary keyvals associated with hosts.""" 745 746 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 747 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 748 host = dbmodels.ForeignKey(Host) 749 attribute = dbmodels.CharField(max_length=90) 750 value = dbmodels.CharField(max_length=300) 751 752 objects = model_logic.ExtendedManager() 753 754 class Meta: 755 """Metadata for the HostAttribute class.""" 756 db_table = 'afe_host_attributes' 757 758 759 @classmethod 760 def get_record(cls, data): 761 """Check the database for an identical record. 762 763 Use host_id and attribute to search for a existing record. 764 765 @raises: DoesNotExist, if no record found 766 @raises: MultipleObjectsReturned if multiple records found. 767 """ 768 # TODO(fdeng): We should use host_id and attribute together as 769 # a primary key in the db. 770 return cls.objects.get(host_id=data['host_id'], 771 attribute=data['attribute']) 772 773 774 @classmethod 775 def deserialize(cls, data): 776 """Override deserialize in parent class. 777 778 Do not deserialize id as id is not kept consistent on master and shards. 779 780 @param data: A dictionary of data to deserialize. 781 782 @returns: A HostAttribute object. 783 """ 784 if data: 785 data.pop('id') 786 return super(HostAttribute, cls).deserialize(data) 787 788 789class Test(dbmodels.Model, model_logic.ModelExtensions): 790 """\ 791 Required: 792 author: author name 793 description: description of the test 794 name: test name 795 time: short, medium, long 796 test_class: This describes the class for your the test belongs in. 797 test_category: This describes the category for your tests 798 test_type: Client or Server 799 path: path to pass to run_test() 800 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 801 async job. If it's >1 it's sync job for that number of machines 802 i.e. if sync_count = 2 it is a sync job that requires two 803 machines. 804 Optional: 805 dependencies: What the test requires to run. Comma deliminated list 806 dependency_labels: many-to-many relationship with labels corresponding to 807 test dependencies. 808 experimental: If this is set to True production servers will ignore the test 809 run_verify: Whether or not the scheduler should run the verify stage 810 run_reset: Whether or not the scheduler should run the reset stage 811 test_retry: Number of times to retry test if the test did not complete 812 successfully. (optional, default: 0) 813 """ 814 TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1) 815 816 name = dbmodels.CharField(max_length=255, unique=True) 817 author = dbmodels.CharField(max_length=255) 818 test_class = dbmodels.CharField(max_length=255) 819 test_category = dbmodels.CharField(max_length=255) 820 dependencies = dbmodels.CharField(max_length=255, blank=True) 821 description = dbmodels.TextField(blank=True) 822 experimental = dbmodels.BooleanField(default=True) 823 run_verify = dbmodels.BooleanField(default=False) 824 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 825 default=TestTime.MEDIUM) 826 test_type = dbmodels.SmallIntegerField( 827 choices=control_data.CONTROL_TYPE.choices()) 828 sync_count = dbmodels.IntegerField(default=1) 829 path = dbmodels.CharField(max_length=255, unique=True) 830 test_retry = dbmodels.IntegerField(blank=True, default=0) 831 run_reset = dbmodels.BooleanField(default=True) 832 833 dependency_labels = ( 834 dbmodels.ManyToManyField(Label, blank=True, 835 db_table='afe_autotests_dependency_labels')) 836 name_field = 'name' 837 objects = model_logic.ExtendedManager() 838 839 840 def admin_description(self): 841 """Returns a string representing the admin description.""" 842 escaped_description = saxutils.escape(self.description) 843 return '<span style="white-space:pre">%s</span>' % escaped_description 844 admin_description.allow_tags = True 845 admin_description.short_description = 'Description' 846 847 848 class Meta: 849 """Metadata for class Test.""" 850 db_table = 'afe_autotests' 851 852 def __unicode__(self): 853 return unicode(self.name) 854 855 856class TestParameter(dbmodels.Model): 857 """ 858 A declared parameter of a test 859 """ 860 test = dbmodels.ForeignKey(Test) 861 name = dbmodels.CharField(max_length=255) 862 863 class Meta: 864 """Metadata for class TestParameter.""" 865 db_table = 'afe_test_parameters' 866 unique_together = ('test', 'name') 867 868 def __unicode__(self): 869 return u'%s (%s)' % (self.name, self.test.name) 870 871 872class Profiler(dbmodels.Model, model_logic.ModelExtensions): 873 """\ 874 Required: 875 name: profiler name 876 test_type: Client or Server 877 878 Optional: 879 description: arbirary text description 880 """ 881 name = dbmodels.CharField(max_length=255, unique=True) 882 description = dbmodels.TextField(blank=True) 883 884 name_field = 'name' 885 objects = model_logic.ExtendedManager() 886 887 888 class Meta: 889 """Metadata for class Profiler.""" 890 db_table = 'afe_profilers' 891 892 def __unicode__(self): 893 return unicode(self.name) 894 895 896class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 897 """\ 898 Required: 899 name: name of ACL group 900 901 Optional: 902 description: arbitrary description of group 903 """ 904 905 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 906 907 name = dbmodels.CharField(max_length=255, unique=True) 908 description = dbmodels.CharField(max_length=255, blank=True) 909 users = dbmodels.ManyToManyField(User, blank=False, 910 db_table='afe_acl_groups_users') 911 hosts = dbmodels.ManyToManyField(Host, blank=True, 912 db_table='afe_acl_groups_hosts') 913 914 name_field = 'name' 915 objects = model_logic.ExtendedManager() 916 917 @staticmethod 918 def check_for_acl_violation_hosts(hosts): 919 """Verify the current user has access to the specified hosts. 920 921 @param hosts: The hosts to verify against. 922 @raises AclAccessViolation if the current user doesn't have access 923 to a host. 924 """ 925 user = User.current_user() 926 if user.is_superuser(): 927 return 928 accessible_host_ids = set( 929 host.id for host in Host.objects.filter(aclgroup__users=user)) 930 for host in hosts: 931 # Check if the user has access to this host, 932 # but only if it is not a metahost or a one-time-host. 933 no_access = (isinstance(host, Host) 934 and not host.invalid 935 and int(host.id) not in accessible_host_ids) 936 if no_access: 937 raise AclAccessViolation("%s does not have access to %s" % 938 (str(user), str(host))) 939 940 941 @staticmethod 942 def check_abort_permissions(queue_entries): 943 """Look for queue entries that aren't abortable by the current user. 944 945 An entry is not abortable if: 946 * the job isn't owned by this user, and 947 * the machine isn't ACL-accessible, or 948 * the machine is in the "Everyone" ACL 949 950 @param queue_entries: The queue entries to check. 951 @raises AclAccessViolation if a queue entry is not abortable by the 952 current user. 953 """ 954 user = User.current_user() 955 if user.is_superuser(): 956 return 957 not_owned = queue_entries.exclude(job__owner=user.login) 958 # I do this using ID sets instead of just Django filters because 959 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 960 # 1.0. 961 # TODO: Use Django filters, now that we're using 1.0. 962 accessible_ids = set( 963 entry.id for entry 964 in not_owned.filter(host__aclgroup__users__login=user.login)) 965 public_ids = set(entry.id for entry 966 in not_owned.filter(host__aclgroup__name='Everyone')) 967 cannot_abort = [entry for entry in not_owned.select_related() 968 if entry.id not in accessible_ids 969 or entry.id in public_ids] 970 if len(cannot_abort) == 0: 971 return 972 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 973 entry.host_or_metahost_name()) 974 for entry in cannot_abort) 975 raise AclAccessViolation('You cannot abort the following job entries: ' 976 + entry_names) 977 978 979 def check_for_acl_violation_acl_group(self): 980 """Verifies the current user has acces to this ACL group. 981 982 @raises AclAccessViolation if the current user doesn't have access to 983 this ACL group. 984 """ 985 user = User.current_user() 986 if user.is_superuser(): 987 return 988 if self.name == 'Everyone': 989 raise AclAccessViolation("You cannot modify 'Everyone'!") 990 if not user in self.users.all(): 991 raise AclAccessViolation("You do not have access to %s" 992 % self.name) 993 994 @staticmethod 995 def on_host_membership_change(): 996 """Invoked when host membership changes.""" 997 everyone = AclGroup.objects.get(name='Everyone') 998 999 # find hosts that aren't in any ACL group and add them to Everyone 1000 # TODO(showard): this is a bit of a hack, since the fact that this query 1001 # works is kind of a coincidence of Django internals. This trick 1002 # doesn't work in general (on all foreign key relationships). I'll 1003 # replace it with a better technique when the need arises. 1004 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 1005 everyone.hosts.add(*orphaned_hosts.distinct()) 1006 1007 # find hosts in both Everyone and another ACL group, and remove them 1008 # from Everyone 1009 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 1010 acled_hosts = set() 1011 for host in hosts_in_everyone: 1012 # Has an ACL group other than Everyone 1013 if host.aclgroup_set.count() > 1: 1014 acled_hosts.add(host) 1015 everyone.hosts.remove(*acled_hosts) 1016 1017 1018 def delete(self): 1019 if (self.name == 'Everyone'): 1020 raise AclAccessViolation("You cannot delete 'Everyone'!") 1021 self.check_for_acl_violation_acl_group() 1022 super(AclGroup, self).delete() 1023 self.on_host_membership_change() 1024 1025 1026 def add_current_user_if_empty(self): 1027 """Adds the current user if the set of users is empty.""" 1028 if not self.users.count(): 1029 self.users.add(User.current_user()) 1030 1031 1032 def perform_after_save(self, change): 1033 """Called after a save. 1034 1035 @param change: Whether there was a change. 1036 """ 1037 if not change: 1038 self.users.add(User.current_user()) 1039 self.add_current_user_if_empty() 1040 self.on_host_membership_change() 1041 1042 1043 def save(self, *args, **kwargs): 1044 change = bool(self.id) 1045 if change: 1046 # Check the original object for an ACL violation 1047 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 1048 super(AclGroup, self).save(*args, **kwargs) 1049 self.perform_after_save(change) 1050 1051 1052 class Meta: 1053 """Metadata for class AclGroup.""" 1054 db_table = 'afe_acl_groups' 1055 1056 def __unicode__(self): 1057 return unicode(self.name) 1058 1059 1060class ParameterizedJob(dbmodels.Model): 1061 """ 1062 Auxiliary configuration for a parameterized job. 1063 1064 This class is obsolete, and ought to be dead. Due to a series of 1065 unfortunate events, it can't be deleted: 1066 * In `class Job` we're required to keep a reference to this class 1067 for the sake of the scheduler unit tests. 1068 * The existence of the reference in `Job` means that certain 1069 methods here will get called from the `get_jobs` RPC. 1070 So, the definitions below seem to be the minimum stub we can support 1071 unless/until we change the database schema. 1072 """ 1073 1074 @classmethod 1075 def smart_get(cls, id_or_name, *args, **kwargs): 1076 """For compatibility with Job.add_object. 1077 1078 @param cls: Implicit class object. 1079 @param id_or_name: The ID or name to get. 1080 @param args: Non-keyword arguments. 1081 @param kwargs: Keyword arguments. 1082 """ 1083 return cls.objects.get(pk=id_or_name) 1084 1085 1086 def job(self): 1087 """Returns the job if it exists, or else None.""" 1088 jobs = self.job_set.all() 1089 assert jobs.count() <= 1 1090 return jobs and jobs[0] or None 1091 1092 1093 class Meta: 1094 """Metadata for class ParameterizedJob.""" 1095 db_table = 'afe_parameterized_jobs' 1096 1097 def __unicode__(self): 1098 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 1099 1100 1101class JobManager(model_logic.ExtendedManager): 1102 'Custom manager to provide efficient status counts querying.' 1103 def get_status_counts(self, job_ids): 1104 """Returns a dict mapping the given job IDs to their status count dicts. 1105 1106 @param job_ids: A list of job IDs. 1107 """ 1108 if not job_ids: 1109 return {} 1110 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1111 cursor = connection.cursor() 1112 cursor.execute(""" 1113 SELECT job_id, status, aborted, complete, COUNT(*) 1114 FROM afe_host_queue_entries 1115 WHERE job_id IN %s 1116 GROUP BY job_id, status, aborted, complete 1117 """ % id_list) 1118 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1119 for job_id, status, aborted, complete, count in cursor.fetchall(): 1120 job_dict = all_job_counts[job_id] 1121 full_status = HostQueueEntry.compute_full_status(status, aborted, 1122 complete) 1123 job_dict.setdefault(full_status, 0) 1124 job_dict[full_status] += count 1125 return all_job_counts 1126 1127 1128class Job(dbmodels.Model, model_logic.ModelExtensions): 1129 """\ 1130 owner: username of job owner 1131 name: job name (does not have to be unique) 1132 priority: Integer priority value. Higher is more important. 1133 control_file: contents of control file 1134 control_type: Client or Server 1135 created_on: date of job creation 1136 submitted_on: date of job submission 1137 synch_count: how many hosts should be used per autoserv execution 1138 run_verify: Whether or not to run the verify phase 1139 run_reset: Whether or not to run the reset phase 1140 timeout: DEPRECATED - hours from queuing time until job times out 1141 timeout_mins: minutes from job queuing time until the job times out 1142 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1143 times out 1144 max_runtime_mins: minutes from job starting time until job times out 1145 email_list: list of people to email on completion delimited by any of: 1146 white space, ',', ':', ';' 1147 dependency_labels: many-to-many relationship with labels corresponding to 1148 job dependencies 1149 reboot_before: Never, If dirty, or Always 1150 reboot_after: Never, If all tests passed, or Always 1151 parse_failed_repair: if True, a failed repair launched by this job will have 1152 its results parsed as part of the job. 1153 drone_set: The set of drones to run this job on 1154 parent_job: Parent job (optional) 1155 test_retry: Number of times to retry test if the test did not complete 1156 successfully. (optional, default: 0) 1157 require_ssp: Require server-side packaging unless require_ssp is set to 1158 False. (optional, default: None) 1159 """ 1160 1161 # TODO: Investigate, if jobkeyval_set is really needed. 1162 # dynamic_suite will write them into an attached file for the drone, but 1163 # it doesn't seem like they are actually used. If they aren't used, remove 1164 # jobkeyval_set here. 1165 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1166 'hostqueueentry_set', 1167 'jobkeyval_set', 1168 'shard']) 1169 1170 # SQL for selecting jobs that should be sent to shard. 1171 # We use raw sql as django filters were not optimized. 1172 # The following jobs are excluded by the SQL. 1173 # - Non-aborted jobs known to shard as specified in |known_ids|. 1174 # Note for jobs aborted on master, even if already known to shard, 1175 # will be sent to shard again so that shard can abort them. 1176 # - Completed jobs 1177 # - Active jobs 1178 # - Jobs without host_queue_entries 1179 NON_ABORTED_KNOWN_JOBS = '(t2.aborted = 0 AND t1.id IN (%(known_ids)s))' 1180 1181 SQL_SHARD_JOBS = ( 1182 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1183 'INNER JOIN afe_host_queue_entries t2 ON ' 1184 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1185 ' %(check_known_jobs)s) ' 1186 'LEFT OUTER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) ' 1187 'JOIN afe_shards_labels t4 ' 1188 ' ON (t4.label_id = t3.label_id OR t4.label_id = t2.meta_host) ' 1189 'WHERE t4.shard_id = %(shard_id)s' 1190 ) 1191 1192 # Jobs can be created with assigned hosts and have no dependency 1193 # labels nor meta_host. 1194 # We are looking for: 1195 # - a job whose hqe's meta_host is null 1196 # - a job whose hqe has a host 1197 # - one of the host's labels matches the shard's label. 1198 # Non-aborted known jobs, completed jobs, active jobs, jobs 1199 # without hqe are exluded as we do with SQL_SHARD_JOBS. 1200 SQL_SHARD_JOBS_WITH_HOSTS = ( 1201 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1202 'INNER JOIN afe_host_queue_entries t2 ON ' 1203 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1204 ' AND t2.meta_host IS NULL AND t2.host_id IS NOT NULL ' 1205 ' %(check_known_jobs)s) ' 1206 'LEFT OUTER JOIN afe_hosts_labels t3 ON (t2.host_id = t3.host_id) ' 1207 'WHERE (t3.label_id IN ' 1208 ' (SELECT label_id FROM afe_shards_labels ' 1209 ' WHERE shard_id = %(shard_id)s))' 1210 ) 1211 1212 # Even if we had filters about complete, active and aborted 1213 # bits in the above two SQLs, there is a chance that 1214 # the result may still contain a job with an hqe with 'complete=1' 1215 # or 'active=1' or 'aborted=0 and afe_job.id in known jobs.' 1216 # This happens when a job has two (or more) hqes and at least 1217 # one hqe has different bits than others. 1218 # We use a second sql to ensure we exclude all un-desired jobs. 1219 SQL_JOBS_TO_EXCLUDE =( 1220 'SELECT t1.id FROM afe_jobs t1 ' 1221 'INNER JOIN afe_host_queue_entries t2 ON ' 1222 ' (t1.id = t2.job_id) ' 1223 'WHERE (t1.id in (%(candidates)s) ' 1224 ' AND (t2.complete=1 OR t2.active=1 ' 1225 ' %(check_known_jobs)s))' 1226 ) 1227 1228 def _deserialize_relation(self, link, data): 1229 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1230 for obj in data: 1231 obj['job_id'] = self.id 1232 1233 super(Job, self)._deserialize_relation(link, data) 1234 1235 1236 def custom_deserialize_relation(self, link, data): 1237 assert link == 'shard', 'Link %s should not be deserialized' % link 1238 self.shard = Shard.deserialize(data) 1239 1240 1241 def sanity_check_update_from_shard(self, shard, updated_serialized): 1242 # If the job got aborted on the master after the client fetched it 1243 # no shard_id will be set. The shard might still push updates though, 1244 # as the job might complete before the abort bit syncs to the shard. 1245 # Alternative considered: The master scheduler could be changed to not 1246 # set aborted jobs to completed that are sharded out. But that would 1247 # require database queries and seemed more complicated to implement. 1248 # This seems safe to do, as there won't be updates pushed from the wrong 1249 # shards should be powered off and wiped hen they are removed from the 1250 # master. 1251 if self.shard_id and self.shard_id != shard.id: 1252 raise error.UnallowedRecordsSentToMaster( 1253 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1254 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1255 shard.id)) 1256 1257 1258 # TIMEOUT is deprecated. 1259 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1260 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1261 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1262 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1263 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1264 # completed. 1265 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1266 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1267 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1268 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1269 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1270 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, 1271 default=False) 1272 1273 owner = dbmodels.CharField(max_length=255) 1274 name = dbmodels.CharField(max_length=255) 1275 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1276 control_file = dbmodels.TextField(null=True, blank=True) 1277 control_type = dbmodels.SmallIntegerField( 1278 choices=control_data.CONTROL_TYPE.choices(), 1279 blank=True, # to allow 0 1280 default=control_data.CONTROL_TYPE.CLIENT) 1281 created_on = dbmodels.DateTimeField() 1282 synch_count = dbmodels.IntegerField(blank=True, default=0) 1283 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1284 run_verify = dbmodels.BooleanField(default=False) 1285 email_list = dbmodels.CharField(max_length=250, blank=True) 1286 dependency_labels = ( 1287 dbmodels.ManyToManyField(Label, blank=True, 1288 db_table='afe_jobs_dependency_labels')) 1289 reboot_before = dbmodels.SmallIntegerField( 1290 choices=model_attributes.RebootBefore.choices(), blank=True, 1291 default=DEFAULT_REBOOT_BEFORE) 1292 reboot_after = dbmodels.SmallIntegerField( 1293 choices=model_attributes.RebootAfter.choices(), blank=True, 1294 default=DEFAULT_REBOOT_AFTER) 1295 parse_failed_repair = dbmodels.BooleanField( 1296 default=DEFAULT_PARSE_FAILED_REPAIR) 1297 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1298 # completed. 1299 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1300 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1301 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1302 1303 # TODO(jrbarnette) We have to keep `parameterized_job` around or it 1304 # breaks the scheduler_models unit tests (and fixing the unit tests 1305 # will break the scheduler, so don't do that). 1306 # 1307 # The ultimate fix is to delete the column from the database table 1308 # at which point, you _must_ delete this. Until you're ready to do 1309 # that, DON'T MUCK WITH IT. 1310 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1311 blank=True) 1312 1313 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1314 1315 test_retry = dbmodels.IntegerField(blank=True, default=0) 1316 1317 run_reset = dbmodels.BooleanField(default=True) 1318 1319 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1320 1321 # If this is None on the master, a slave should be found. 1322 # If this is None on a slave, it should be synced back to the master 1323 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1324 1325 # If this is None, server-side packaging will be used for server side test, 1326 # unless it's disabled in global config AUTOSERV/enable_ssp_container. 1327 require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True) 1328 1329 # custom manager 1330 objects = JobManager() 1331 1332 1333 @decorators.cached_property 1334 def labels(self): 1335 """All the labels of this job""" 1336 # We need to convert dependency_labels to a list, because all() gives us 1337 # back an iterator, and storing/caching an iterator means we'd only be 1338 # able to read from it once. 1339 return list(self.dependency_labels.all()) 1340 1341 1342 def is_server_job(self): 1343 """Returns whether this job is of type server.""" 1344 return self.control_type == control_data.CONTROL_TYPE.SERVER 1345 1346 1347 @classmethod 1348 def create(cls, owner, options, hosts): 1349 """Creates a job. 1350 1351 The job is created by taking some information (the listed args) and 1352 filling in the rest of the necessary information. 1353 1354 @param cls: Implicit class object. 1355 @param owner: The owner for the job. 1356 @param options: An options object. 1357 @param hosts: The hosts to use. 1358 """ 1359 AclGroup.check_for_acl_violation_hosts(hosts) 1360 1361 control_file = options.get('control_file') 1362 1363 user = User.current_user() 1364 if options.get('reboot_before') is None: 1365 options['reboot_before'] = user.get_reboot_before_display() 1366 if options.get('reboot_after') is None: 1367 options['reboot_after'] = user.get_reboot_after_display() 1368 1369 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1370 1371 if options.get('timeout_mins') is None and options.get('timeout'): 1372 options['timeout_mins'] = options['timeout'] * 60 1373 1374 job = cls.add_object( 1375 owner=owner, 1376 name=options['name'], 1377 priority=options['priority'], 1378 control_file=control_file, 1379 control_type=options['control_type'], 1380 synch_count=options.get('synch_count'), 1381 # timeout needs to be deleted in the future. 1382 timeout=options.get('timeout'), 1383 timeout_mins=options.get('timeout_mins'), 1384 max_runtime_mins=options.get('max_runtime_mins'), 1385 run_verify=options.get('run_verify'), 1386 email_list=options.get('email_list'), 1387 reboot_before=options.get('reboot_before'), 1388 reboot_after=options.get('reboot_after'), 1389 parse_failed_repair=options.get('parse_failed_repair'), 1390 created_on=datetime.now(), 1391 drone_set=drone_set, 1392 parent_job=options.get('parent_job_id'), 1393 test_retry=options.get('test_retry'), 1394 run_reset=options.get('run_reset'), 1395 require_ssp=options.get('require_ssp')) 1396 1397 job.dependency_labels = options['dependencies'] 1398 1399 if options.get('keyvals'): 1400 for key, value in options['keyvals'].iteritems(): 1401 JobKeyval.objects.create(job=job, key=key, value=value) 1402 1403 return job 1404 1405 1406 @classmethod 1407 def assign_to_shard(cls, shard, known_ids): 1408 """Assigns unassigned jobs to a shard. 1409 1410 For all labels that have been assigned to this shard, all jobs that 1411 have this label, are assigned to this shard. 1412 1413 Jobs that are assigned to the shard but aren't already present on the 1414 shard are returned. 1415 1416 @param shard: The shard to assign jobs to. 1417 @param known_ids: List of all ids of incomplete jobs, the shard already 1418 knows about. 1419 This is used to figure out which jobs should be sent 1420 to the shard. If shard_ids were used instead, jobs 1421 would only be transferred once, even if the client 1422 failed persisting them. 1423 The number of unfinished jobs usually lies in O(1000). 1424 Assuming one id takes 8 chars in the json, this means 1425 overhead that lies in the lower kilobyte range. 1426 A not in query with 5000 id's takes about 30ms. 1427 1428 @returns The job objects that should be sent to the shard. 1429 """ 1430 # Disclaimer: Concurrent heartbeats should not occur in today's setup. 1431 # If this changes or they are triggered manually, this applies: 1432 # Jobs may be returned more than once by concurrent calls of this 1433 # function, as there is a race condition between SELECT and UPDATE. 1434 job_ids = set([]) 1435 check_known_jobs_exclude = '' 1436 check_known_jobs_include = '' 1437 1438 if known_ids: 1439 check_known_jobs = ( 1440 cls.NON_ABORTED_KNOWN_JOBS % 1441 {'known_ids': ','.join([str(i) for i in known_ids])}) 1442 check_known_jobs_exclude = 'AND NOT ' + check_known_jobs 1443 check_known_jobs_include = 'OR ' + check_known_jobs 1444 1445 for sql in [cls.SQL_SHARD_JOBS, cls.SQL_SHARD_JOBS_WITH_HOSTS]: 1446 query = Job.objects.raw(sql % { 1447 'check_known_jobs': check_known_jobs_exclude, 1448 'shard_id': shard.id}) 1449 job_ids |= set([j.id for j in query]) 1450 1451 if job_ids: 1452 query = Job.objects.raw( 1453 cls.SQL_JOBS_TO_EXCLUDE % 1454 {'check_known_jobs': check_known_jobs_include, 1455 'candidates': ','.join([str(i) for i in job_ids])}) 1456 job_ids -= set([j.id for j in query]) 1457 1458 if job_ids: 1459 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1460 return list(Job.objects.filter(pk__in=job_ids).all()) 1461 return [] 1462 1463 1464 def queue(self, hosts, is_template=False): 1465 """Enqueue a job on the given hosts. 1466 1467 @param hosts: The hosts to use. 1468 @param is_template: Whether the status should be "Template". 1469 """ 1470 if not hosts: 1471 # hostless job 1472 entry = HostQueueEntry.create(job=self, is_template=is_template) 1473 entry.save() 1474 return 1475 1476 for host in hosts: 1477 host.enqueue_job(self, is_template=is_template) 1478 1479 1480 def user(self): 1481 """Gets the user of this job, or None if it doesn't exist.""" 1482 try: 1483 return User.objects.get(login=self.owner) 1484 except self.DoesNotExist: 1485 return None 1486 1487 1488 def abort(self): 1489 """Aborts this job.""" 1490 for queue_entry in self.hostqueueentry_set.all(): 1491 queue_entry.abort() 1492 1493 1494 def tag(self): 1495 """Returns a string tag for this job.""" 1496 return server_utils.get_job_tag(self.id, self.owner) 1497 1498 1499 def keyval_dict(self): 1500 """Returns all keyvals for this job as a dictionary.""" 1501 return dict((keyval.key, keyval.value) 1502 for keyval in self.jobkeyval_set.all()) 1503 1504 1505 @classmethod 1506 def get_attribute_model(cls): 1507 """Return the attribute model. 1508 1509 Override method in parent class. This class is called when 1510 deserializing the one-to-many relationship betwen Job and JobKeyval. 1511 On deserialization, we will try to clear any existing job keyvals 1512 associated with a job to avoid any inconsistency. 1513 Though Job doesn't implement ModelWithAttribute, we still treat 1514 it as an attribute model for this purpose. 1515 1516 @returns: The attribute model of Job. 1517 """ 1518 return JobKeyval 1519 1520 1521 class Meta: 1522 """Metadata for class Job.""" 1523 db_table = 'afe_jobs' 1524 1525 def __unicode__(self): 1526 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1527 1528 1529class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1530 """Keyvals associated with jobs""" 1531 1532 SERIALIZATION_LINKS_TO_KEEP = set(['job']) 1533 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 1534 1535 job = dbmodels.ForeignKey(Job) 1536 key = dbmodels.CharField(max_length=90) 1537 value = dbmodels.CharField(max_length=300) 1538 1539 objects = model_logic.ExtendedManager() 1540 1541 1542 @classmethod 1543 def get_record(cls, data): 1544 """Check the database for an identical record. 1545 1546 Use job_id and key to search for a existing record. 1547 1548 @raises: DoesNotExist, if no record found 1549 @raises: MultipleObjectsReturned if multiple records found. 1550 """ 1551 # TODO(fdeng): We should use job_id and key together as 1552 # a primary key in the db. 1553 return cls.objects.get(job_id=data['job_id'], key=data['key']) 1554 1555 1556 @classmethod 1557 def deserialize(cls, data): 1558 """Override deserialize in parent class. 1559 1560 Do not deserialize id as id is not kept consistent on master and shards. 1561 1562 @param data: A dictionary of data to deserialize. 1563 1564 @returns: A JobKeyval object. 1565 """ 1566 if data: 1567 data.pop('id') 1568 return super(JobKeyval, cls).deserialize(data) 1569 1570 1571 class Meta: 1572 """Metadata for class JobKeyval.""" 1573 db_table = 'afe_job_keyvals' 1574 1575 1576class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1577 """Represents an ineligible host queue.""" 1578 job = dbmodels.ForeignKey(Job) 1579 host = dbmodels.ForeignKey(Host) 1580 1581 objects = model_logic.ExtendedManager() 1582 1583 class Meta: 1584 """Metadata for class IneligibleHostQueue.""" 1585 db_table = 'afe_ineligible_host_queues' 1586 1587 1588class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1589 """Represents a host queue entry.""" 1590 1591 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1592 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 1593 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) 1594 1595 1596 def custom_deserialize_relation(self, link, data): 1597 assert link == 'meta_host' 1598 self.meta_host = Label.deserialize(data) 1599 1600 1601 def sanity_check_update_from_shard(self, shard, updated_serialized, 1602 job_ids_sent): 1603 if self.job_id not in job_ids_sent: 1604 raise error.UnallowedRecordsSentToMaster( 1605 'Sent HostQueueEntry without corresponding ' 1606 'job entry: %s' % updated_serialized) 1607 1608 1609 Status = host_queue_entry_states.Status 1610 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1611 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1612 PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES 1613 IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES 1614 1615 job = dbmodels.ForeignKey(Job) 1616 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1617 status = dbmodels.CharField(max_length=255) 1618 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1619 db_column='meta_host') 1620 active = dbmodels.BooleanField(default=False) 1621 complete = dbmodels.BooleanField(default=False) 1622 deleted = dbmodels.BooleanField(default=False) 1623 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1624 default='') 1625 # If atomic_group is set, this is a virtual HostQueueEntry that will 1626 # be expanded into many actual hosts within the group at schedule time. 1627 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1628 aborted = dbmodels.BooleanField(default=False) 1629 started_on = dbmodels.DateTimeField(null=True, blank=True) 1630 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1631 1632 objects = model_logic.ExtendedManager() 1633 1634 1635 def __init__(self, *args, **kwargs): 1636 super(HostQueueEntry, self).__init__(*args, **kwargs) 1637 self._record_attributes(['status']) 1638 1639 1640 @classmethod 1641 def create(cls, job, host=None, meta_host=None, 1642 is_template=False): 1643 """Creates a new host queue entry. 1644 1645 @param cls: Implicit class object. 1646 @param job: The associated job. 1647 @param host: The associated host. 1648 @param meta_host: The associated meta host. 1649 @param is_template: Whether the status should be "Template". 1650 """ 1651 if is_template: 1652 status = cls.Status.TEMPLATE 1653 else: 1654 status = cls.Status.QUEUED 1655 1656 return cls(job=job, host=host, meta_host=meta_host, status=status) 1657 1658 1659 def save(self, *args, **kwargs): 1660 self._set_active_and_complete() 1661 super(HostQueueEntry, self).save(*args, **kwargs) 1662 self._check_for_updated_attributes() 1663 1664 1665 def execution_path(self): 1666 """ 1667 Path to this entry's results (relative to the base results directory). 1668 """ 1669 return server_utils.get_hqe_exec_path(self.job.tag(), 1670 self.execution_subdir) 1671 1672 1673 def host_or_metahost_name(self): 1674 """Returns the first non-None name found in priority order. 1675 1676 The priority order checked is: (1) host name; (2) meta host name 1677 """ 1678 if self.host: 1679 return self.host.hostname 1680 else: 1681 assert self.meta_host 1682 return self.meta_host.name 1683 1684 1685 def _set_active_and_complete(self): 1686 if self.status in self.ACTIVE_STATUSES: 1687 self.active, self.complete = True, False 1688 elif self.status in self.COMPLETE_STATUSES: 1689 self.active, self.complete = False, True 1690 else: 1691 self.active, self.complete = False, False 1692 1693 1694 def on_attribute_changed(self, attribute, old_value): 1695 assert attribute == 'status' 1696 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 1697 self.status) 1698 1699 1700 def is_meta_host_entry(self): 1701 'True if this is a entry has a meta_host instead of a host.' 1702 return self.host is None and self.meta_host is not None 1703 1704 1705 # This code is shared between rpc_interface and models.HostQueueEntry. 1706 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 1707 # a class method was the best way to refactor it. Attempting to put it in 1708 # rpc_utils or a new utils module failed as that would require us to import 1709 # models.py but to call it from here we would have to import the utils.py 1710 # thus creating a cycle. 1711 @classmethod 1712 def abort_host_queue_entries(cls, host_queue_entries): 1713 """Aborts a collection of host_queue_entries. 1714 1715 Abort these host queue entry and all host queue entries of jobs created 1716 by them. 1717 1718 @param host_queue_entries: List of host queue entries we want to abort. 1719 """ 1720 # This isn't completely immune to race conditions since it's not atomic, 1721 # but it should be safe given the scheduler's behavior. 1722 1723 # TODO(milleral): crbug.com/230100 1724 # The |abort_host_queue_entries| rpc does nearly exactly this, 1725 # however, trying to re-use the code generates some horrible 1726 # circular import error. I'd be nice to refactor things around 1727 # sometime so the code could be reused. 1728 1729 # Fixpoint algorithm to find the whole tree of HQEs to abort to 1730 # minimize the total number of database queries: 1731 children = set() 1732 new_children = set(host_queue_entries) 1733 while new_children: 1734 children.update(new_children) 1735 new_child_ids = [hqe.job_id for hqe in new_children] 1736 new_children = HostQueueEntry.objects.filter( 1737 job__parent_job__in=new_child_ids, 1738 complete=False, aborted=False).all() 1739 # To handle circular parental relationships 1740 new_children = set(new_children) - children 1741 1742 # Associate a user with the host queue entries that we're about 1743 # to abort so that we can look up who to blame for the aborts. 1744 now = datetime.now() 1745 user = User.current_user() 1746 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 1747 aborted_by=user, aborted_on=now) for hqe in children] 1748 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 1749 # Bulk update all of the HQEs to set the abort bit. 1750 child_ids = [hqe.id for hqe in children] 1751 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 1752 1753 1754 def abort(self): 1755 """ Aborts this host queue entry. 1756 1757 Abort this host queue entry and all host queue entries of jobs created by 1758 this one. 1759 1760 """ 1761 if not self.complete and not self.aborted: 1762 HostQueueEntry.abort_host_queue_entries([self]) 1763 1764 1765 @classmethod 1766 def compute_full_status(cls, status, aborted, complete): 1767 """Returns a modified status msg if the host queue entry was aborted. 1768 1769 @param cls: Implicit class object. 1770 @param status: The original status message. 1771 @param aborted: Whether the host queue entry was aborted. 1772 @param complete: Whether the host queue entry was completed. 1773 """ 1774 if aborted and not complete: 1775 return 'Aborted (%s)' % status 1776 return status 1777 1778 1779 def full_status(self): 1780 """Returns the full status of this host queue entry, as a string.""" 1781 return self.compute_full_status(self.status, self.aborted, 1782 self.complete) 1783 1784 1785 def _postprocess_object_dict(self, object_dict): 1786 object_dict['full_status'] = self.full_status() 1787 1788 1789 class Meta: 1790 """Metadata for class HostQueueEntry.""" 1791 db_table = 'afe_host_queue_entries' 1792 1793 1794 1795 def __unicode__(self): 1796 hostname = None 1797 if self.host: 1798 hostname = self.host.hostname 1799 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 1800 1801 1802class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1803 """Represents an aborted host queue entry.""" 1804 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 1805 aborted_by = dbmodels.ForeignKey(User) 1806 aborted_on = dbmodels.DateTimeField() 1807 1808 objects = model_logic.ExtendedManager() 1809 1810 1811 def save(self, *args, **kwargs): 1812 self.aborted_on = datetime.now() 1813 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 1814 1815 class Meta: 1816 """Metadata for class AbortedHostQueueEntry.""" 1817 db_table = 'afe_aborted_host_queue_entries' 1818 1819 1820class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 1821 """\ 1822 Tasks to run on hosts at the next time they are in the Ready state. Use this 1823 for high-priority tasks, such as forced repair or forced reinstall. 1824 1825 host: host to run this task on 1826 task: special task to run 1827 time_requested: date and time the request for this task was made 1828 is_active: task is currently running 1829 is_complete: task has finished running 1830 is_aborted: task was aborted 1831 time_started: date and time the task started 1832 time_finished: date and time the task finished 1833 queue_entry: Host queue entry waiting on this task (or None, if task was not 1834 started in preparation of a job) 1835 """ 1836 Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision', 1837 string_values=True) 1838 1839 host = dbmodels.ForeignKey(Host, blank=False, null=False) 1840 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 1841 blank=False, null=False) 1842 requested_by = dbmodels.ForeignKey(User) 1843 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 1844 null=False) 1845 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 1846 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 1847 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 1848 time_started = dbmodels.DateTimeField(null=True, blank=True) 1849 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 1850 success = dbmodels.BooleanField(default=False, blank=False, null=False) 1851 time_finished = dbmodels.DateTimeField(null=True, blank=True) 1852 1853 objects = model_logic.ExtendedManager() 1854 1855 1856 def save(self, **kwargs): 1857 if self.queue_entry: 1858 self.requested_by = User.objects.get( 1859 login=self.queue_entry.job.owner) 1860 super(SpecialTask, self).save(**kwargs) 1861 1862 1863 def execution_path(self): 1864 """Returns the execution path for a special task.""" 1865 return server_utils.get_special_task_exec_path( 1866 self.host.hostname, self.id, self.task, self.time_requested) 1867 1868 1869 # property to emulate HostQueueEntry.status 1870 @property 1871 def status(self): 1872 """Returns a host queue entry status appropriate for a speical task.""" 1873 return server_utils.get_special_task_status( 1874 self.is_complete, self.success, self.is_active) 1875 1876 1877 # property to emulate HostQueueEntry.started_on 1878 @property 1879 def started_on(self): 1880 """Returns the time at which this special task started.""" 1881 return self.time_started 1882 1883 1884 @classmethod 1885 def schedule_special_task(cls, host, task): 1886 """Schedules a special task on a host if not already scheduled. 1887 1888 @param cls: Implicit class object. 1889 @param host: The host to use. 1890 @param task: The task to schedule. 1891 """ 1892 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 1893 is_active=False, 1894 is_complete=False) 1895 if existing_tasks: 1896 return existing_tasks[0] 1897 1898 special_task = SpecialTask(host=host, task=task, 1899 requested_by=User.current_user()) 1900 special_task.save() 1901 return special_task 1902 1903 1904 def abort(self): 1905 """ Abort this special task.""" 1906 self.is_aborted = True 1907 self.save() 1908 1909 1910 def activate(self): 1911 """ 1912 Sets a task as active and sets the time started to the current time. 1913 """ 1914 logging.info('Starting: %s', self) 1915 self.is_active = True 1916 self.time_started = datetime.now() 1917 self.save() 1918 1919 1920 def finish(self, success): 1921 """Sets a task as completed. 1922 1923 @param success: Whether or not the task was successful. 1924 """ 1925 logging.info('Finished: %s', self) 1926 self.is_active = False 1927 self.is_complete = True 1928 self.success = success 1929 if self.time_started: 1930 self.time_finished = datetime.now() 1931 self.save() 1932 1933 1934 class Meta: 1935 """Metadata for class SpecialTask.""" 1936 db_table = 'afe_special_tasks' 1937 1938 1939 def __unicode__(self): 1940 result = u'Special Task %s (host %s, task %s, time %s)' % ( 1941 self.id, self.host, self.task, self.time_requested) 1942 if self.is_complete: 1943 result += u' (completed)' 1944 elif self.is_active: 1945 result += u' (active)' 1946 1947 return result 1948 1949 1950class StableVersion(dbmodels.Model, model_logic.ModelExtensions): 1951 1952 board = dbmodels.CharField(max_length=255, unique=True) 1953 version = dbmodels.CharField(max_length=255) 1954 1955 class Meta: 1956 """Metadata for class StableVersion.""" 1957 db_table = 'afe_stable_versions' 1958