1"""The ABAT harness interface 2 3The interface as required for ABAT. 4""" 5 6__author__ = """Copyright Andy Whitcroft 2006""" 7 8from autotest_lib.client.bin import utils 9import os, harness, time, re 10 11def autobench_load(fn): 12 disks = re.compile(r'^\s*DATS_FREE_DISKS\s*=(.*\S)\s*$') 13 parts = re.compile(r'^\s*DATS_FREE_PARTITIONS\s*=(.*\S)\s*$') 14 modules = re.compile(r'^\s*INITRD_MODULES\s*=(.*\S)\s*$') 15 16 conf = {} 17 18 try: 19 fd = file(fn, "r") 20 except: 21 return conf 22 for ln in fd.readlines(): 23 m = disks.match(ln) 24 if m: 25 val = m.groups()[0] 26 conf['disks'] = val.strip('"').split() 27 m = parts.match(ln) 28 if m: 29 val = m.groups()[0] 30 conf['partitions'] = val.strip('"').split() 31 m = modules.match(ln) 32 if m: 33 val = m.groups()[0] 34 conf['modules'] = val.strip('"').split() 35 fd.close() 36 37 return conf 38 39 40class harness_ABAT(harness.harness): 41 """The ABAT server harness 42 43 Properties: 44 job 45 The job object for this job 46 """ 47 48 def __init__(self, job, harness_args): 49 """ 50 job 51 The job object for this job 52 """ 53 self.setup(job) 54 55 if 'ABAT_STATUS' in os.environ: 56 self.status = file(os.environ['ABAT_STATUS'], "w") 57 else: 58 self.status = None 59 60 61 def __send(self, msg): 62 if self.status: 63 msg = msg.rstrip() 64 self.status.write(msg + "\n") 65 self.status.flush() 66 67 68 def __send_status(self, code, subdir, operation, msg): 69 self.__send("STATUS %s %s %s %s" % (code, subdir, operation, msg)) 70 71 72 def __root_device(self): 73 device = None 74 root = re.compile(r'^\S*(/dev/\S+).*\s/\s*$') 75 76 df = utils.system_output('df -lP') 77 for line in df.split("\n"): 78 m = root.match(line) 79 if m: 80 device = m.groups()[0] 81 82 return device 83 84 85 def run_start(self): 86 """A run within this job is starting""" 87 self.__send_status('GOOD', '----', '----', 'run starting') 88 89 # Load up the autobench.conf if it exists. 90 conf = autobench_load("/etc/autobench.conf") 91 if 'partitions' in conf: 92 self.job.config_set('partition.partitions', 93 conf['partitions']) 94 95 # Search the boot loader configuration for the autobench entry, 96 # and extract its args. 97 args = None 98 for entry in self.job.bootloader.get_entries().itervalues(): 99 if entry['title'].startswith('autobench'): 100 args = entry.get('args') 101 102 if args: 103 args = re.sub(r'autobench_args:.*', '', args) 104 args = re.sub(r'root=\S*', '', args) 105 args += " root=" + self.__root_device() 106 107 self.job.config_set('boot.default_args', args) 108 109 # Turn off boot_once semantics. 110 self.job.config_set('boot.set_default', True) 111 112 # For RedHat installs we do not load up the module.conf 113 # as they cannot be builtin. Pass them as arguments. 114 vendor = utils.get_os_vendor() 115 if vendor in ['Red Hat', 'Fedora Core'] and 'modules' in conf: 116 args = '--allow-missing' 117 for mod in conf['modules']: 118 args += " --with " + mod 119 self.job.config_set('kernel.mkinitrd_extra_args', args) 120 121 122 def run_reboot(self): 123 """A run within this job is performing a reboot 124 (expect continue following reboot) 125 """ 126 self.__send("REBOOT") 127 128 129 def run_complete(self): 130 """A run within this job is completing (all done)""" 131 self.__send("DONE") 132 133 134 def test_status_detail(self, code, subdir, operation, msg, tag, 135 optional_fields): 136 """A test within this job is completing (detail)""" 137 138 # Send the first line with the status code as a STATUS message. 139 lines = msg.split("\n") 140 self.__send_status(code, subdir, operation, lines[0]) 141 142 143 def test_status(self, msg, tag): 144 lines = msg.split("\n") 145 146 # Send each line as a SUMMARY message. 147 for line in lines: 148 self.__send("SUMMARY :" + line) 149