1import datetime 2 3import common 4from autotest_lib.frontend import setup_test_environment 5from autotest_lib.frontend import thread_local 6from autotest_lib.frontend.afe import models, model_attributes 7from autotest_lib.client.common_lib import global_config 8from autotest_lib.client.common_lib.test_utils import mock 9 10class FrontendTestMixin(object): 11 # pylint: disable=missing-docstring 12 def _fill_in_test_data(self): 13 """Populate the test database with some hosts and labels.""" 14 if models.DroneSet.drone_sets_enabled(): 15 models.DroneSet.objects.create( 16 name=models.DroneSet.default_drone_set_name()) 17 18 acl_group = models.AclGroup.objects.create(name='my_acl') 19 acl_group.users.add(models.User.current_user()) 20 21 self.hosts = [models.Host.objects.create(hostname=hostname) 22 for hostname in 23 ('host1', 'host2', 'host3', 'host4', 'host5', 'host6', 24 'host7', 'host8', 'host9')] 25 26 acl_group.hosts = self.hosts 27 models.AclGroup.smart_get('Everyone').hosts = [] 28 29 self.labels = [models.Label.objects.create(name=name) for name in 30 ('label1', 'label2', 'label3', 31 'label6', 'label7', 'unused')] 32 33 platform = models.Label.objects.create(name='myplatform', platform=True) 34 for host in self.hosts: 35 host.labels.add(platform) 36 37 self.label1, self.label2, self.label3, self.label6, self.label7, _ \ 38 = self.labels 39 40 self.label3.only_if_needed = True 41 self.label3.save() 42 self.hosts[0].labels.add(self.label1) 43 self.hosts[1].labels.add(self.label2) 44 for hostnum in xrange(4,7): # host5..host7 45 self.hosts[hostnum].labels.add(self.label6) 46 self.hosts[6].labels.add(self.label7) 47 for hostnum in xrange(7,9): # host8..host9 48 self.hosts[hostnum].labels.add(self.label6) 49 self.hosts[hostnum].labels.add(self.label7) 50 51 52 def _frontend_common_setup(self, fill_data=True, setup_tables=True): 53 self.god = mock.mock_god(ut=self) 54 if setup_tables: 55 setup_test_environment.set_up() 56 global_config.global_config.override_config_value( 57 'SERVER', 'rpc_logging', 'False') 58 if fill_data and setup_tables: 59 self._fill_in_test_data() 60 61 62 def _frontend_common_teardown(self): 63 setup_test_environment.tear_down() 64 thread_local.set_user(None) 65 self.god.unstub_all() 66 67 68 def _create_job(self, hosts=[], metahosts=[], priority=0, active=False, 69 synchronous=False, hostless=False, 70 drone_set=None, control_file='control', 71 owner='autotest_system', parent_job_id=None): 72 """ 73 Create a job row in the test database. 74 75 @param hosts - A list of explicit host ids for this job to be 76 scheduled on. 77 @param metahosts - A list of label ids for each host that this job 78 should be scheduled on (meta host scheduling). 79 @param priority - The job priority (integer). 80 @param active - bool, mark this job as running or not in the database? 81 @param synchronous - bool, if True use synch_count=2 otherwise use 82 synch_count=1. 83 @param hostless - if True, this job is intended to be hostless (in that 84 case, hosts, and metahosts must all be empty) 85 @param owner - The owner of the job. Aclgroups from which a job can 86 acquire hosts change with the aclgroups of the owners. 87 @param parent_job_id - The id of a parent_job. If a job with the id 88 doesn't already exist one will be created. 89 90 @raises model.DoesNotExist: If parent_job_id is specified but a job with 91 id=parent_job_id does not exist. 92 93 @returns A Django frontend.afe.models.Job instance. 94 """ 95 if not drone_set: 96 drone_set = (models.DroneSet.default_drone_set_name() 97 and models.DroneSet.get_default()) 98 99 synch_count = synchronous and 2 or 1 100 created_on = datetime.datetime(2008, 1, 1) 101 status = models.HostQueueEntry.Status.QUEUED 102 if active: 103 status = models.HostQueueEntry.Status.RUNNING 104 105 parent_job = (models.Job.objects.get(id=parent_job_id) 106 if parent_job_id else None) 107 job = models.Job.objects.create( 108 name='test', owner=owner, priority=priority, 109 synch_count=synch_count, created_on=created_on, 110 reboot_before=model_attributes.RebootBefore.NEVER, 111 drone_set=drone_set, control_file=control_file, 112 parent_job=parent_job, require_ssp=None) 113 114 # Update the job's dependencies to include the metahost. 115 for metahost_label in metahosts: 116 dep = models.Label.objects.get(id=metahost_label) 117 job.dependency_labels.add(dep) 118 119 for host_id in hosts: 120 models.HostQueueEntry.objects.create(job=job, host_id=host_id, 121 status=status) 122 models.IneligibleHostQueue.objects.create(job=job, host_id=host_id) 123 for label_id in metahosts: 124 models.HostQueueEntry.objects.create(job=job, meta_host_id=label_id, 125 status=status) 126 127 if hostless: 128 assert not (hosts or metahosts) 129 models.HostQueueEntry.objects.create(job=job, status=status) 130 return job 131 132 133 def _create_job_simple(self, hosts, use_metahost=False, 134 priority=0, active=False, drone_set=None, 135 parent_job_id=None): 136 """An alternative interface to _create_job""" 137 args = {'hosts' : [], 'metahosts' : []} 138 if use_metahost: 139 args['metahosts'] = hosts 140 else: 141 args['hosts'] = hosts 142 return self._create_job( 143 priority=priority, active=active, drone_set=drone_set, 144 parent_job_id=parent_job_id, **args) 145