1#!/usr/bin/python
2
3import unittest
4import common
5from autotest_lib.tko import status_lib, parser_lib
6from autotest_lib.client.common_lib import log
7
8
9class clean_raw_line_test(unittest.TestCase):
10    def test_default(self):
11        raw_line_temp = 'this \r is a %s line \x00 yeah\n'
12        raw_line = raw_line_temp % status_lib.DEFAULT_BLACKLIST[0]
13        cleaned = status_lib.clean_raw_line(raw_line)
14        self.assertEquals(cleaned, raw_line_temp % '')
15
16
17    def test_multi(self):
18        blacklist = ('\r\x00', 'FOOBAR', 'BLAh')
19        raw_line_temp = 'this \x00 FOO is BAR \r a %s line %s BL yeah %s ah\n'
20        raw_line = raw_line_temp % blacklist
21        cleaned = status_lib.clean_raw_line(raw_line, blacklist)
22        self.assertEquals(
23            cleaned, raw_line_temp % (('',) * len(blacklist)))
24
25
26class line_buffer_test(unittest.TestCase):
27    def test_get_empty(self):
28        buf = status_lib.line_buffer()
29        self.assertRaises(IndexError, buf.get)
30
31
32    def test_get_single(self):
33        buf = status_lib.line_buffer()
34        buf.put("single line")
35        self.assertEquals(buf.get(), "single line")
36        self.assertRaises(IndexError, buf.get)
37
38
39    def test_is_fifo(self):
40        buf = status_lib.line_buffer()
41        lines = ["line #%d" for x in xrange(10)]
42        for line in lines:
43            buf.put(line)
44        results = []
45        while buf.size():
46            results.append(buf.get())
47        self.assertEquals(lines, results)
48
49
50    def test_put_multiple_same_as_multiple_puts(self):
51        buf_put, buf_multi = [status_lib.line_buffer()
52                              for x in xrange(2)]
53        lines = ["line #%d" % x for x in xrange(10)]
54        for line in lines:
55            buf_put.put(line)
56        buf_multi.put_multiple(lines)
57        counter = 0
58        while buf_put.size():
59            self.assertEquals(buf_put.size(), buf_multi.size())
60            line = "line #%d" % counter
61            self.assertEquals(buf_put.get(), line)
62            self.assertEquals(buf_multi.get(), line)
63            counter += 1
64
65
66    def test_put_back_is_lifo(self):
67        buf = status_lib.line_buffer()
68        lines = ["1", "2", "3"]
69        for line in lines:
70            buf.put(line)
71        results = []
72        results.append(buf.get())
73        buf.put_back("1")
74        buf.put_back("0")
75        while buf.size():
76            results.append(buf.get())
77        self.assertEquals(results, ["1", "0", "1", "2", "3"])
78
79
80    def test_size_increased_by_put(self):
81        buf = status_lib.line_buffer()
82        self.assertEquals(buf.size(), 0)
83        buf.put("1")
84        buf.put("2")
85        self.assertEquals(buf.size(), 2)
86        buf.put("3")
87        self.assertEquals(buf.size(), 3)
88
89
90    def test_size_decreased_by_get(self):
91        buf = status_lib.line_buffer()
92        buf.put("1")
93        buf.put("2")
94        buf.put("3")
95        self.assertEquals(buf.size(), 3)
96        buf.get()
97        self.assertEquals(buf.size(), 2)
98        buf.get()
99        buf.get()
100        self.assertEquals(buf.size(), 0)
101
102
103class status_stack_test(unittest.TestCase):
104    statuses = log.job_statuses
105
106    def test_default_to_nostatus(self):
107        stack = status_lib.status_stack()
108        self.assertEquals(stack.current_status(), "NOSTATUS")
109
110
111    def test_default_on_start_to_nostatus(self):
112        stack = status_lib.status_stack()
113        stack.update("FAIL")
114        stack.start()
115        self.assertEquals(stack.current_status(), "NOSTATUS")
116
117
118    def test_size_always_at_least_zero(self):
119        stack = status_lib.status_stack()
120        self.assertEquals(stack.size(), 0)
121        stack.start()
122        stack.end()
123        self.assertEquals(stack.size(), 0)
124        stack.end()
125        self.assertEquals(stack.size(), 0)
126
127
128    def test_anything_overrides_nostatus(self):
129        for status in self.statuses:
130            stack = status_lib.status_stack()
131            stack.update(status)
132            self.assertEquals(stack.current_status(), status)
133
134
135    def test_worse_overrides_better(self):
136        for i in xrange(len(self.statuses)):
137            worse_status = self.statuses[i]
138            for j in xrange(i + 1, len(self.statuses)):
139                stack = status_lib.status_stack()
140                better_status = self.statuses[j]
141                stack.update(better_status)
142                stack.update(worse_status)
143                self.assertEquals(stack.current_status(),
144                                  worse_status)
145
146
147    def test_better_never_overrides_better(self):
148        for i in xrange(len(self.statuses)):
149            better_status = self.statuses[i]
150            for j in xrange(i):
151                stack = status_lib.status_stack()
152                worse_status = self.statuses[j]
153                stack.update(worse_status)
154                stack.update(better_status)
155                self.assertEquals(stack.current_status(),
156                                  worse_status)
157
158
159    def test_stack_is_lifo(self):
160        stack = status_lib.status_stack()
161        stack.update("GOOD")
162        stack.start()
163        stack.update("FAIL")
164        stack.start()
165        stack.update("WARN")
166        self.assertEquals(stack.end(), "WARN")
167        self.assertEquals(stack.end(), "FAIL")
168        self.assertEquals(stack.end(), "GOOD")
169        self.assertEquals(stack.end(), "NOSTATUS")
170
171
172class parser_test(unittest.TestCase):
173    available_versions = [0, 1]
174    def test_can_import_available_versions(self):
175        for version in self.available_versions:
176            p = parser_lib.parser(0)
177            self.assertNotEqual(p, None)
178
179
180if __name__ == "__main__":
181    unittest.main()
182