1#!/usr/bin/python2
2# pylint: disable=missing-docstring
3
4import logging
5import os
6import re
7import shutil
8import sys
9import tempfile
10import unittest
11
12from six.moves import range
13import six
14
15import common
16from autotest_lib.client.bin import job, sysinfo, harness
17from autotest_lib.client.bin import utils
18from autotest_lib.client.common_lib import error
19from autotest_lib.client.common_lib import logging_manager, logging_config
20from autotest_lib.client.common_lib import base_job_unittest
21from autotest_lib.client.common_lib.test_utils import mock
22
23
24class job_test_case(unittest.TestCase):
25    """Generic job TestCase class that defines a standard job setUp and
26    tearDown, with some standard stubs."""
27
28    job_class = job.base_client_job
29
30    def setUp(self):
31        self.god = mock.mock_god(ut=self)
32        self.god.stub_with(job.base_client_job, '_get_environ_autodir',
33                           classmethod(lambda cls: '/adir'))
34        self.job = self.job_class.__new__(self.job_class)
35        self.job._job_directory = base_job_unittest.stub_job_directory
36
37        _, self.control_file = tempfile.mkstemp()
38
39
40    def tearDown(self):
41        self.god.unstub_all()
42        os.remove(self.control_file)
43
44
45class test_find_base_directories(
46        base_job_unittest.test_find_base_directories.generic_tests,
47        job_test_case):
48
49    def test_autodir_equals_clientdir(self):
50        autodir, clientdir, _ = self.job._find_base_directories()
51        self.assertEqual(autodir, '/adir')
52        self.assertEqual(clientdir, '/adir')
53
54
55    def test_serverdir_is_none(self):
56        _, _, serverdir = self.job._find_base_directories()
57        self.assertEqual(serverdir, None)
58
59
60class abstract_test_init(base_job_unittest.test_init.generic_tests):
61    """Generic client job mixin used when defining variations on the
62    job.__init__ generic tests."""
63    OPTIONAL_ATTRIBUTES = (
64        base_job_unittest.test_init.generic_tests.OPTIONAL_ATTRIBUTES
65        - set(['control', 'harness']))
66
67
68class test_init_minimal_options(abstract_test_init, job_test_case):
69
70    def call_init(self):
71        # TODO(jadmanski): refactor more of the __init__ code to not need to
72        # stub out countless random APIs
73        self.god.stub_function_to_return(job.os, 'mkdir', None)
74        self.god.stub_function_to_return(job.os.path, 'exists', True)
75        self.god.stub_function_to_return(self.job, '_load_state', None)
76        self.god.stub_function_to_return(self.job, 'record', None)
77        self.god.stub_function_to_return(job.shutil, 'copyfile', None)
78        self.god.stub_function_to_return(job.logging_manager,
79                                         'configure_logging', None)
80        class manager:
81            def start_logging(self):
82                return None
83        self.god.stub_function_to_return(job.logging_manager,
84                                         'get_logging_manager', manager())
85        class stub_sysinfo:
86            def log_per_reboot_data(self):
87                return None
88        self.god.stub_function_to_return(job.sysinfo, 'sysinfo',
89                                         stub_sysinfo())
90        class stub_harness:
91            run_start = lambda self: None
92        self.god.stub_function_to_return(job.harness, 'select', stub_harness())
93        class options:
94            tag = ''
95            verbose = False
96            cont = False
97            harness = 'stub'
98            harness_args = None
99            hostname = None
100            user = None
101            log = False
102            args = ''
103            output_dir = ''
104        self.god.stub_function_to_return(job.utils, 'drop_caches', None)
105
106        self.job._job_state = base_job_unittest.stub_job_state
107        self.job.__init__(self.control_file, options)
108
109
110class placeholder(object):
111    """A simple placeholder for attributes"""
112    pass
113
114
115class first_line_comparator(mock.argument_comparator):
116    def __init__(self, first_line):
117        self.first_line = first_line
118
119
120    def is_satisfied_by(self, parameter):
121        return self.first_line == parameter.splitlines()[0]
122
123
124class test_base_job(unittest.TestCase):
125    def setUp(self):
126        # make god
127        self.god = mock.mock_god(ut=self)
128
129        # need to set some environ variables
130        self.autodir = "autodir"
131        os.environ['AUTODIR'] = self.autodir
132
133        # set up some variables
134        _, self.control = tempfile.mkstemp()
135        self.jobtag = "jobtag"
136
137        # get rid of stdout and logging
138        sys.stdout = six.StringIO()
139        logging_manager.configure_logging(logging_config.TestingConfig())
140        logging.disable(logging.CRITICAL)
141        def placeholder_configure_logging(*args, **kwargs):
142            pass
143        self.god.stub_with(logging_manager, 'configure_logging',
144                           placeholder_configure_logging)
145        real_get_logging_manager = logging_manager.get_logging_manager
146        def get_logging_manager_no_fds(manage_stdout_and_stderr=False,
147                                       redirect_fds=False):
148            return real_get_logging_manager(manage_stdout_and_stderr, False)
149        self.god.stub_with(logging_manager, 'get_logging_manager',
150                           get_logging_manager_no_fds)
151
152        # stub out some stuff
153        self.god.stub_function(os.path, 'exists')
154        self.god.stub_function(os.path, 'isdir')
155        self.god.stub_function(os, 'makedirs')
156        self.god.stub_function(os, 'mkdir')
157        self.god.stub_function(os, 'remove')
158        self.god.stub_function(shutil, 'rmtree')
159        self.god.stub_function(shutil, 'copyfile')
160        self.god.stub_function(job, 'open')
161        self.god.stub_function(utils, 'system')
162        self.god.stub_function(utils, 'drop_caches')
163        self.god.stub_function(harness, 'select')
164        self.god.stub_function(sysinfo, 'log_per_reboot_data')
165
166        self.god.stub_class(job.local_host, 'LocalHost')
167        self.god.stub_class(sysinfo, 'sysinfo')
168
169        self.god.stub_class_method(job.base_client_job,
170                                   '_cleanup_debugdir_files')
171        self.god.stub_class_method(job.base_client_job, '_cleanup_results_dir')
172
173        self.god.stub_with(job.base_job.job_directory, '_ensure_valid',
174                           lambda *_: None)
175
176
177    def tearDown(self):
178        sys.stdout = sys.__stdout__
179        self.god.unstub_all()
180        os.remove(self.control)
181
182
183    def _setup_pre_record_init(self, cont):
184        self.god.stub_function(self.job, '_load_state')
185
186        resultdir = os.path.join(self.autodir, 'results', self.jobtag)
187        tmpdir = os.path.join(self.autodir, 'tmp')
188        if not cont:
189            job.base_client_job._cleanup_debugdir_files.expect_call()
190            job.base_client_job._cleanup_results_dir.expect_call()
191
192        self.job._load_state.expect_call()
193
194        my_harness = self.god.create_mock_class(harness.harness,
195                                                'my_harness')
196        harness.select.expect_call(None,
197                                   self.job,
198                                   None).and_return(my_harness)
199
200        return resultdir, my_harness
201
202
203    def _setup_post_record_init(self, cont, resultdir, my_harness):
204        # now some specific stubs
205        self.god.stub_function(self.job, 'config_get')
206        self.god.stub_function(self.job, 'config_set')
207        self.god.stub_function(self.job, 'record')
208
209        # other setup
210        results = os.path.join(self.autodir, 'results')
211        download = os.path.join(self.autodir, 'tests', 'download')
212        pkgdir = os.path.join(self.autodir, 'packages')
213
214        utils.drop_caches.expect_call()
215        job_sysinfo = sysinfo.sysinfo.expect_new(resultdir)
216        if not cont:
217            os.path.exists.expect_call(download).and_return(False)
218            os.mkdir.expect_call(download)
219            shutil.copyfile.expect_call(mock.is_string_comparator(),
220                                 os.path.join(resultdir, 'control'))
221
222        job.local_host.LocalHost.expect_new(hostname='localhost')
223        job_sysinfo.log_per_reboot_data.expect_call()
224        if not cont:
225            self.job.record.expect_call('START', None, None)
226
227        my_harness.run_start.expect_call()
228
229
230    def construct_job(self, cont):
231        # will construct class instance using __new__
232        self.job = job.base_client_job.__new__(job.base_client_job)
233
234        # record
235        resultdir, my_harness = self._setup_pre_record_init(cont)
236        self._setup_post_record_init(cont, resultdir, my_harness)
237
238        # finish constructor
239        options = placeholder()
240        options.tag = self.jobtag
241        options.cont = cont
242        options.harness = None
243        options.harness_args = None
244        options.log = False
245        options.verbose = False
246        options.hostname = 'localhost'
247        options.user = 'my_user'
248        options.args = ''
249        options.output_dir = ''
250        self.job.__init__(self.control, options)
251
252        # check
253        self.god.check_playback()
254
255
256    def get_partition_mock(self, devname):
257        """
258        Create a mock of a partition object and return it.
259        """
260        class mock(object):
261            device = devname
262            get_mountpoint = self.god.create_mock_function('get_mountpoint')
263        return mock
264
265
266    def test_constructor_first_run(self):
267        self.construct_job(False)
268
269
270    def test_constructor_continuation(self):
271        self.construct_job(True)
272
273
274    def test_constructor_post_record_failure(self):
275        """
276        Test post record initialization failure.
277        """
278        self.job = job.base_client_job.__new__(job.base_client_job)
279        options = placeholder()
280        options.tag = self.jobtag
281        options.cont = False
282        options.harness = None
283        options.harness_args = None
284        options.log = False
285        options.verbose = False
286        options.hostname = 'localhost'
287        options.user = 'my_user'
288        options.args = ''
289        options.output_dir = ''
290        error = Exception('fail')
291
292        self.god.stub_function(self.job, '_post_record_init')
293        self.god.stub_function(self.job, 'record')
294
295        self._setup_pre_record_init(False)
296        self.job._post_record_init.expect_call(
297                self.control, options, True).and_raises(error)
298        self.job.record.expect_call(
299                'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
300                str(error))
301
302        self.assertRaises(
303                Exception, self.job.__init__, self.control, options,
304                drop_caches=True)
305
306        # check
307        self.god.check_playback()
308
309
310    def test_control_functions(self):
311        self.construct_job(True)
312        control_file = "blah"
313        self.job.control_set(control_file)
314        self.assertEquals(self.job.control_get(), os.path.abspath(control_file))
315
316
317    def test_harness_select(self):
318        self.construct_job(True)
319
320        # record
321        which = "which"
322        harness_args = ''
323        harness.select.expect_call(which, self.job,
324                                   harness_args).and_return(None)
325
326        # run and test
327        self.job.harness_select(which, harness_args)
328        self.god.check_playback()
329
330
331    def test_setup_dirs_raise(self):
332        self.construct_job(True)
333
334        # setup
335        results_dir = 'foo'
336        tmp_dir = 'bar'
337
338        # record
339        os.path.exists.expect_call(tmp_dir).and_return(True)
340        os.path.isdir.expect_call(tmp_dir).and_return(False)
341
342        # test
343        self.assertRaises(ValueError, self.job.setup_dirs, results_dir, tmp_dir)
344        self.god.check_playback()
345
346
347    def test_setup_dirs(self):
348        self.construct_job(True)
349
350        # setup
351        results_dir1 = os.path.join(self.job.resultdir, 'build')
352        results_dir2 = os.path.join(self.job.resultdir, 'build.2')
353        results_dir3 = os.path.join(self.job.resultdir, 'build.3')
354        tmp_dir = 'bar'
355
356        # record
357        os.path.exists.expect_call(tmp_dir).and_return(False)
358        os.mkdir.expect_call(tmp_dir)
359        os.path.isdir.expect_call(tmp_dir).and_return(True)
360        os.path.exists.expect_call(results_dir1).and_return(True)
361        os.path.exists.expect_call(results_dir2).and_return(True)
362        os.path.exists.expect_call(results_dir3).and_return(False)
363        os.path.exists.expect_call(results_dir3).and_return(False)
364        os.mkdir.expect_call(results_dir3)
365
366        # test
367        self.assertEqual(self.job.setup_dirs(None, tmp_dir),
368                         (results_dir3, tmp_dir))
369        self.god.check_playback()
370
371
372    def test_run_test_logs_test_error_from_unhandled_error(self):
373        self.construct_job(True)
374
375        # set up stubs
376        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
377        self.god.stub_function(self.job, "_runtest")
378
379        # create an unhandled error object
380        class MyError(error.TestError):
381            pass
382        real_error = MyError("this is the real error message")
383        unhandled_error = error.UnhandledTestError(real_error)
384
385        # set up the recording
386        testname = "error_test"
387        outputdir = os.path.join(self.job.resultdir, testname)
388        self.job.pkgmgr.get_package_name.expect_call(
389            testname, 'test').and_return(("", testname))
390        os.path.exists.expect_call(outputdir).and_return(False)
391        self.job.record.expect_call("START", testname, testname,
392                                    optional_fields=None)
393        self.job._runtest.expect_call(testname, "", None, (), {}).and_raises(
394            unhandled_error)
395        self.job.record.expect_call("ERROR", testname, testname,
396                                    first_line_comparator(str(real_error)))
397        self.job.record.expect_call("END ERROR", testname, testname)
398        self.job.harness.run_test_complete.expect_call()
399        utils.drop_caches.expect_call()
400
401        # run and check
402        self.job.run_test(testname)
403        self.god.check_playback()
404
405
406    def test_run_test_logs_non_test_error_from_unhandled_error(self):
407        self.construct_job(True)
408
409        # set up stubs
410        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
411        self.god.stub_function(self.job, "_runtest")
412
413        # create an unhandled error object
414        class MyError(Exception):
415            pass
416        real_error = MyError("this is the real error message")
417        unhandled_error = error.UnhandledTestError(real_error)
418        reason = first_line_comparator("Unhandled MyError: %s" % real_error)
419
420        # set up the recording
421        testname = "error_test"
422        outputdir = os.path.join(self.job.resultdir, testname)
423        self.job.pkgmgr.get_package_name.expect_call(
424            testname, 'test').and_return(("", testname))
425        os.path.exists.expect_call(outputdir).and_return(False)
426        self.job.record.expect_call("START", testname, testname,
427                                    optional_fields=None)
428        self.job._runtest.expect_call(testname, "", None, (), {}).and_raises(
429            unhandled_error)
430        self.job.record.expect_call("ERROR", testname, testname, reason)
431        self.job.record.expect_call("END ERROR", testname, testname)
432        self.job.harness.run_test_complete.expect_call()
433        utils.drop_caches.expect_call()
434
435        # run and check
436        self.job.run_test(testname)
437        self.god.check_playback()
438
439
440    def test_report_reboot_failure(self):
441        self.construct_job(True)
442
443        # record
444        self.job.record.expect_call("ABORT", "sub", "reboot.verify",
445                                    "boot failure")
446        self.job.record.expect_call("END ABORT", "sub", "reboot",
447                                    optional_fields={"kernel": "2.6.15-smp"})
448
449        # playback
450        self.job._record_reboot_failure("sub", "reboot.verify", "boot failure",
451                                        running_id="2.6.15-smp")
452        self.god.check_playback()
453
454
455    def _setup_check_post_reboot(self, mount_info, cpu_count):
456        # setup
457        self.god.stub_function(job.partition_lib, "get_partition_list")
458        self.god.stub_function(utils, "count_cpus")
459
460        part_list = [self.get_partition_mock("/dev/hda1"),
461                     self.get_partition_mock("/dev/hdb1")]
462        mount_list = ["/mnt/hda1", "/mnt/hdb1"]
463
464        # record
465        job.partition_lib.get_partition_list.expect_call(
466                self.job, exclude_swap=False).and_return(part_list)
467        for i in range(len(part_list)):
468            part_list[i].get_mountpoint.expect_call().and_return(mount_list[i])
469        if cpu_count is not None:
470            utils.count_cpus.expect_call().and_return(cpu_count)
471        self.job._state.set('client', 'mount_info', mount_info)
472        self.job._state.set('client', 'cpu_count', 8)
473
474
475    def test_check_post_reboot_success(self):
476        self.construct_job(True)
477
478        mount_info = set([("/dev/hda1", "/mnt/hda1"),
479                          ("/dev/hdb1", "/mnt/hdb1")])
480        self._setup_check_post_reboot(mount_info, 8)
481
482        # playback
483        self.job._check_post_reboot("sub")
484        self.god.check_playback()
485
486
487    def test_check_post_reboot_mounts_failure(self):
488        self.construct_job(True)
489
490        mount_info = set([("/dev/hda1", "/mnt/hda1")])
491        self._setup_check_post_reboot(mount_info, None)
492
493        self.god.stub_function(self.job, "_record_reboot_failure")
494        self.job._record_reboot_failure.expect_call("sub",
495                "reboot.verify_config", "mounted partitions are different after"
496                " reboot (old entries: set([]), new entries: set([('/dev/hdb1',"
497                " '/mnt/hdb1')]))", running_id=None)
498
499        # playback
500        self.assertRaises(error.JobError, self.job._check_post_reboot, "sub")
501        self.god.check_playback()
502
503
504    def test_check_post_reboot_cpu_failure(self):
505        self.construct_job(True)
506
507        mount_info = set([("/dev/hda1", "/mnt/hda1"),
508                          ("/dev/hdb1", "/mnt/hdb1")])
509        self._setup_check_post_reboot(mount_info, 4)
510
511        self.god.stub_function(self.job, "_record_reboot_failure")
512        self.job._record_reboot_failure.expect_call(
513            'sub', 'reboot.verify_config',
514            'Number of CPUs changed after reboot (old count: 8, new count: 4)',
515            running_id=None)
516
517        # playback
518        self.assertRaises(error.JobError, self.job._check_post_reboot, "sub")
519        self.god.check_playback()
520
521
522    def test_parse_args(self):
523        test_set = {"a='foo bar baz' b='moo apt'":
524                    ["a='foo bar baz'", "b='moo apt'"],
525                    "a='foo bar baz' only=gah":
526                    ["a='foo bar baz'", "only=gah"],
527                    "a='b c d' no=argh":
528                    ["a='b c d'", "no=argh"]}
529        for t in test_set:
530            parsed_args = job.base_client_job._parse_args(t)
531            expected_args = test_set[t]
532            self.assertEqual(parsed_args, expected_args)
533
534
535    def test_run_test_timeout_parameter_is_propagated(self):
536        self.construct_job(True)
537
538        # set up stubs
539        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
540        self.god.stub_function(self.job, "_runtest")
541
542        # create an unhandled error object
543        #class MyError(error.TestError):
544        #    pass
545        #real_error = MyError("this is the real error message")
546        #unhandled_error = error.UnhandledTestError(real_error)
547
548        # set up the recording
549        testname = "test"
550        outputdir = os.path.join(self.job.resultdir, testname)
551        self.job.pkgmgr.get_package_name.expect_call(
552            testname, 'test').and_return(("", testname))
553        os.path.exists.expect_call(outputdir).and_return(False)
554        timeout = 60
555        optional_fields = {}
556        optional_fields['timeout'] = timeout
557        self.job.record.expect_call("START", testname, testname,
558                                    optional_fields=optional_fields)
559        self.job._runtest.expect_call(testname, "", timeout, (), {})
560        self.job.record.expect_call("GOOD", testname, testname,
561                                    "completed successfully")
562        self.job.record.expect_call("END GOOD", testname, testname)
563        self.job.harness.run_test_complete.expect_call()
564        utils.drop_caches.expect_call()
565
566        # run and check
567        self.job.run_test(testname, timeout=timeout)
568        self.god.check_playback()
569
570
571class test_name_pattern(unittest.TestCase):
572    """Tests for _NAME_PATTERN."""
573
574    def _one_name_pattern_test(self, line, want):
575        """Parametrized test."""
576        match = re.match(job._NAME_PATTERN, line)
577        self.assertIsNotNone(match)
578        self.assertEqual(match.group(1), want)
579
580    def test_name_pattern_nospace_single(self):
581        self._one_name_pattern_test("NAME='some_Test'", 'some_Test')
582
583    def test_name_pattern_nospace_double(self):
584        self._one_name_pattern_test('NAME="some_Test"', 'some_Test')
585
586    def test_name_pattern_space_single(self):
587        self._one_name_pattern_test("NAME  =  'some_Test'", 'some_Test')
588
589    def test_name_pattern_space_double(self):
590        self._one_name_pattern_test('NAME  =  "some_Test"', 'some_Test')
591
592
593if __name__ == "__main__":
594    unittest.main()
595