1"""The ABAT harness interface
2
3The interface as required for ABAT.
4"""
5
6__author__ = """Copyright Andy Whitcroft 2006"""
7
8from autotest_lib.client.bin import utils
9import os, harness, time, re
10
11def autobench_load(fn):
12    disks = re.compile(r'^\s*DATS_FREE_DISKS\s*=(.*\S)\s*$')
13    parts = re.compile(r'^\s*DATS_FREE_PARTITIONS\s*=(.*\S)\s*$')
14    modules = re.compile(r'^\s*INITRD_MODULES\s*=(.*\S)\s*$')
15
16    conf = {}
17
18    try:
19        fd = file(fn, "r")
20    except:
21        return conf
22    for ln in fd.readlines():
23        m = disks.match(ln)
24        if m:
25            val = m.groups()[0]
26            conf['disks'] = val.strip('"').split()
27        m = parts.match(ln)
28        if m:
29            val = m.groups()[0]
30            conf['partitions'] = val.strip('"').split()
31        m = modules.match(ln)
32        if m:
33            val = m.groups()[0]
34            conf['modules'] = val.strip('"').split()
35    fd.close()
36
37    return conf
38
39
40class harness_ABAT(harness.harness):
41    """The ABAT server harness
42
43    Properties:
44            job
45                    The job object for this job
46    """
47
48    def __init__(self, job, harness_args):
49        """
50                job
51                        The job object for this job
52        """
53        self.setup(job)
54
55        if 'ABAT_STATUS' in os.environ:
56            self.status = file(os.environ['ABAT_STATUS'], "w")
57        else:
58            self.status = None
59
60
61    def __send(self, msg):
62        if self.status:
63            msg = msg.rstrip()
64            self.status.write(msg + "\n")
65            self.status.flush()
66
67
68    def __send_status(self, code, subdir, operation, msg):
69        self.__send("STATUS %s %s %s %s" % (code, subdir, operation, msg))
70
71
72    def __root_device(self):
73        device = None
74        root = re.compile(r'^\S*(/dev/\S+).*\s/\s*$')
75
76        df = utils.system_output('df -lP')
77        for line in df.split("\n"):
78            m = root.match(line)
79            if m:
80                device = m.groups()[0]
81
82        return device
83
84
85    def run_start(self):
86        """A run within this job is starting"""
87        self.__send_status('GOOD', '----', '----', 'run starting')
88
89        # Load up the autobench.conf if it exists.
90        conf = autobench_load("/etc/autobench.conf")
91        if 'partitions' in conf:
92            self.job.config_set('partition.partitions',
93                    conf['partitions'])
94
95        # Search the boot loader configuration for the autobench entry,
96        # and extract its args.
97        args = None
98        for entry in self.job.bootloader.get_entries().itervalues():
99            if entry['title'].startswith('autobench'):
100                args = entry.get('args')
101
102        if args:
103            args = re.sub(r'autobench_args:.*', '', args)
104            args = re.sub(r'root=\S*', '', args)
105            args += " root=" + self.__root_device()
106
107            self.job.config_set('boot.default_args', args)
108
109        # Turn off boot_once semantics.
110        self.job.config_set('boot.set_default', True)
111
112        # For RedHat installs we do not load up the module.conf
113        # as they cannot be builtin.  Pass them as arguments.
114        vendor = utils.get_os_vendor()
115        if vendor in ['Red Hat', 'Fedora Core'] and 'modules' in conf:
116            args = '--allow-missing'
117            for mod in conf['modules']:
118                args += " --with " + mod
119            self.job.config_set('kernel.mkinitrd_extra_args', args)
120
121
122    def run_reboot(self):
123        """A run within this job is performing a reboot
124           (expect continue following reboot)
125        """
126        self.__send("REBOOT")
127
128
129    def run_complete(self):
130        """A run within this job is completing (all done)"""
131        self.__send("DONE")
132
133
134    def test_status_detail(self, code, subdir, operation, msg, tag,
135                           optional_fields):
136        """A test within this job is completing (detail)"""
137
138        # Send the first line with the status code as a STATUS message.
139        lines = msg.split("\n")
140        self.__send_status(code, subdir, operation, lines[0])
141
142
143    def test_status(self, msg, tag):
144        lines = msg.split("\n")
145
146        # Send each line as a SUMMARY message.
147        for line in lines:
148            self.__send("SUMMARY :" + line)
149