#!/usr/bin/env python # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import task_loop import glib import logging import mox import time import unittest class TaskLoopTestCase(unittest.TestCase): """ Test fixture for TaskLoop class. These unit-tests have a trade-off between speed and stability. The whole suite takes about half a minute to run, and probably could be speeded up a little bit. But, making the numbers too small might make the tests flaky. """ def setUp(self): self._mox = mox.Mox() self._callback_mocker = self._mox.CreateMock(TaskLoopTestCase) self._task_loop = task_loop.get_instance() # ########################################################################## # Tests def test_post_task_simple(self): """Post a simple task and expect it gets dispatched.""" self._callback_mocker._callback() self._mox.ReplayAll() self._task_loop.post_task(self._callback_mocker._callback) self._run_task_loop(2) self._mox.VerifyAll() def test_post_task_set_attribute(self): """Post a task that accesses an attribute from the context object.""" self.flag = False self._task_loop.post_task(self._callback_set_attribute) self._run_task_loop(2) self.assertTrue(self.flag) def test_post_task_with_argument(self): """Post task with some argument.""" arg = True self._callback_mocker._callback_with_arguments(arg) self._mox.ReplayAll() self._task_loop.post_task( self._callback_mocker._callback_with_arguments, arg) self._run_task_loop(2) self._mox.VerifyAll() def test_post_task_after_delay(self): """Post a task with some delay and check that the delay is respected.""" start_time = time.time() self.time = start_time self._task_loop.post_task_after_delay(self._callback_set_time, 3000) self._run_task_loop(5) delayed_time = self.time - start_time self.assertGreaterEqual(delayed_time, 3) def test_post_repeated_task(self): """Post a repeated task and check it gets dispatched multiple times.""" self.count = 0 self._task_loop.post_repeated_task(self._callback_increment_count, 1000) self._run_task_loop(5) self.assertGreaterEqual(self.count, 3) def test_ignore_delays(self): """Post a task and test ignore_delays mode.""" self._task_loop.ignore_delays = False self._task_loop.post_task_after_delay(self._callback_mocker._callback, 10000) # Not enough time to dispatch the posted task self._run_task_loop(1) self._mox.VerifyAll() def test_cancel_posted_task(self): """Test that a cancelled task is not dispatched.""" post_id = self._task_loop.post_task_after_delay( self._callback_mocker._callback, 2000) self._task_loop.post_task(self._callback_cancel_task, post_id) self._run_task_loop(3) self._mox.VerifyAll() def test_multiple_cancels(self): """Test that successive cancels after a successful cancel fail.""" post_id = self._task_loop.post_task_after_delay( self._callback_mocker._callback, 2000) self._task_loop.post_task(self._callback_cancel_task, post_id) self._task_loop.post_task(self._callback_cancel_cancelled_task, post_id) self._run_task_loop(3) self._mox.VerifyAll() def test_random_delays(self): """Test that random delays works (sort of). This test could be flaky.""" # Warning: This test could be flaky. Add more differences? self.count = 0 self.times = {} self._task_loop.random_delays = True self._task_loop.max_random_delay_ms = 1000 self._task_loop.post_repeated_task(self._callback_record_times, 500) self._run_task_loop(5) self.assertGreaterEqual(self.count, 4) # Test that not all time gaps are almost the same diff1 = round(self.times[1] - self.times[0], 3) diff2 = round(self.times[2] - self.times[1], 3) diff3 = round(self.times[3] - self.times[2], 3) self.assertTrue(diff1 != diff2 or diff2 != diff3 or diff3 != diff1) # ########################################################################## # Helper functions def _stop_task_loop(self): print('Stopping task_loop.') self._task_loop.stop() def _run_task_loop(self, run_for_seconds): """ Runs the task loop for |run_for_seconds| seconds. This function is blocking, so the main thread will return only after |run_for_seconds|. """ # post a task to stop the task loop. glib.timeout_add(run_for_seconds*1000, self._stop_task_loop) self._task_loop.start() # We will continue only when the stop task has been executed. # ########################################################################## # Callbacks for tests def _callback(self): print('Actual TaskLoopTestCase._callback called!') def _callback_set_attribute(self): self.flag = True def _callback_set_time(self): self.time = time.time() def _callback_increment_count(self): self.count = self.count + 1 def _callback_record_times(self): self.times[self.count] = time.time() self.count = self.count + 1 def _callback_with_arguments(self, arg): pass def _callback_cancel_task(self, post_id): self._task_loop.cancel_posted_task(post_id) def _callback_cancel_cancelled_task(self, post_id): self.assertFalse(self._task_loop.cancel_posted_task(post_id)) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) unittest.main()