#!/usr/bin/python # # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import datetime, unittest import mox import common # We want to import setup_django_lite_environment first so that the database # is setup correctly. from autotest_lib.frontend import setup_django_lite_environment from autotest_lib.client.common_lib import utils from autotest_lib.frontend.afe import models, rpc_interface from django import test from autotest_lib.server.cros import repair_utils # See complete_failures_functional_tests.py for why we need this. class MockDatetime(datetime.datetime): """Used to mock out parts of datetime.datetime.""" pass # We use a mock rpc client object so that we instead directly use the server. class MockAFE(): """Used to mock out the rpc client.""" def run(self, func, **kwargs): """ The fake run call directly contacts the server. @param func: The name of the remote function that is being called. @param kwargs: The arguments to the remotely called function. """ return utils.strip_unicode(getattr(rpc_interface, func)(**kwargs)) class FindProblemTestTests(mox.MoxTestBase, test.TestCase): """Test that we properly find the last ran job.""" def setUp(self): super(FindProblemTestTests, self).setUp() self.mox.StubOutWithMock(MockDatetime, 'today') self.datetime = datetime.datetime datetime.datetime = MockDatetime self._orig_cutoff = repair_utils._CUTOFF_AFTER_TIMEOUT_MINS self._orig_timeout = repair_utils._DEFAULT_TEST_TIMEOUT_MINS def tearDown(self): repair_utils._DEFAULT_TEST_TIMEOUT_MINS = self._orig_timeout repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = self._orig_cutoff datetime.datetime = self.datetime super(FindProblemTestTests, self).tearDown() def test_should_get_most_recent_job(self): """Test that, for a given host, we get the last job ran on that host.""" host = models.Host(hostname='host') host.save() old_job = models.Job(owner='me', name='old_job', created_on=datetime.datetime(2012, 1, 1)) old_job.save() old_host_queue_entry = models.HostQueueEntry( job=old_job, host=host, status='test', started_on=datetime.datetime(2012, 1, 1, 1)) old_host_queue_entry.save() new_job = models.Job(owner='me', name='new_job', created_on=datetime.datetime(2012, 1, 1)) new_job.save() new_host_queue_entry = models.HostQueueEntry( job=new_job, host=host, status='test', started_on=datetime.datetime(2012, 1, 1, 2)) new_host_queue_entry.save() mock_rpc = MockAFE() datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1)) repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440 repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60 self.mox.ReplayAll() result = repair_utils._find_problem_test('host', mock_rpc) self.assertEqual(result['job']['name'], 'new_job') def test_should_get_job_for_specified_host_only(self): """Test that we only get a job that is for the given host.""" correct_job = models.Job(owner='me', name='correct_job', created_on=datetime.datetime(2012, 1, 1)) correct_job.save() correct_host = models.Host(hostname='correct_host') correct_host.save() correct_host_queue_entry = models.HostQueueEntry( job=correct_job, host=correct_host, status='test', started_on=datetime.datetime(2012, 1, 1, 1)) correct_host_queue_entry.save() wrong_job = models.Job(owner='me', name='wrong_job', created_on=datetime.datetime(2012, 1, 1)) wrong_job.save() wrong_host = models.Host(hostname='wrong_host') wrong_host.save() wrong_host_queue_entry = models.HostQueueEntry( job=wrong_job, host=wrong_host, status='test', started_on=datetime.datetime(2012, 1, 1, 2)) wrong_host_queue_entry.save() mock_rpc = MockAFE() datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1)) repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440 repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60 self.mox.ReplayAll() result = repair_utils._find_problem_test('correct_host', mock_rpc) self.assertEqual(result['job']['name'], 'correct_job') def test_return_jobs_ran_soon_after_max_job_runtime(self): """Test that we get jobs that are just past the max runtime.""" host = models.Host(hostname='host') host.save() new_job = models.Job(owner='me', name='new_job', created_on=datetime.datetime(2012, 1, 1, 0, 0)) new_job.save() new_host_queue_entry = models.HostQueueEntry( job=new_job, host=host, status='test', started_on=datetime.datetime(2012, 1, 1, 2)) new_host_queue_entry.save() mock_rpc = MockAFE() datetime.datetime.today().AndReturn(datetime.datetime(2012, 1, 2, 0, 30)) repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440 repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60 self.mox.ReplayAll() result = repair_utils._find_problem_test('host', mock_rpc) self.assertEqual(result['job']['name'], 'new_job') if __name__ == '__main__': unittest.main()