1#!/usr/bin/env python
2
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import task_loop
8
9import glib
10import logging
11import mox
12import time
13import unittest
14
15class TaskLoopTestCase(unittest.TestCase):
16    """
17    Test fixture for TaskLoop class.
18
19    These unit-tests have a trade-off between speed and stability. The whole
20    suite takes about half a minute to run, and probably could be speeded up a
21    little bit. But, making the numbers too small might make the tests flaky.
22    """
23
24    def setUp(self):
25        self._mox = mox.Mox()
26        self._callback_mocker = self._mox.CreateMock(TaskLoopTestCase)
27        self._task_loop = task_loop.get_instance()
28
29    # ##########################################################################
30    # Tests
31
32    def test_post_task_simple(self):
33        """Post a simple task and expect it gets dispatched."""
34        self._callback_mocker._callback()
35
36        self._mox.ReplayAll()
37        self._task_loop.post_task(self._callback_mocker._callback)
38        self._run_task_loop(2)
39        self._mox.VerifyAll()
40
41
42    def test_post_task_set_attribute(self):
43        """Post a task that accesses an attribute from the context object."""
44        self.flag = False
45        self._task_loop.post_task(self._callback_set_attribute)
46        self._run_task_loop(2)
47        self.assertTrue(self.flag)
48
49
50    def test_post_task_with_argument(self):
51        """Post task with some argument."""
52        arg = True
53        self._callback_mocker._callback_with_arguments(arg)
54
55        self._mox.ReplayAll()
56        self._task_loop.post_task(
57                self._callback_mocker._callback_with_arguments, arg)
58        self._run_task_loop(2)
59        self._mox.VerifyAll()
60
61
62    def test_post_task_after_delay(self):
63        """Post a task with some delay and check that the delay is respected."""
64        start_time = time.time()
65        self.time = start_time
66        self._task_loop.post_task_after_delay(self._callback_set_time, 3000)
67        self._run_task_loop(5)
68        delayed_time = self.time - start_time
69        self.assertGreaterEqual(delayed_time, 3)
70
71
72    def test_post_repeated_task(self):
73        """Post a repeated task and check it gets dispatched multiple times."""
74        self.count = 0
75        self._task_loop.post_repeated_task(self._callback_increment_count, 1000)
76        self._run_task_loop(5)
77        self.assertGreaterEqual(self.count, 3)
78
79
80    def test_ignore_delays(self):
81        """Post a task and test ignore_delays mode."""
82        self._task_loop.ignore_delays = False
83
84        self._task_loop.post_task_after_delay(self._callback_mocker._callback,
85                                              10000)
86        # Not enough time to dispatch the posted task
87        self._run_task_loop(1)
88        self._mox.VerifyAll()
89
90
91    def test_cancel_posted_task(self):
92        """Test that a cancelled task is not dispatched."""
93        post_id = self._task_loop.post_task_after_delay(
94                self._callback_mocker._callback,
95                2000)
96        self._task_loop.post_task(self._callback_cancel_task, post_id)
97        self._run_task_loop(3)
98        self._mox.VerifyAll()
99
100
101    def test_multiple_cancels(self):
102        """Test that successive cancels after a successful cancel fail."""
103        post_id = self._task_loop.post_task_after_delay(
104                self._callback_mocker._callback,
105                2000)
106        self._task_loop.post_task(self._callback_cancel_task, post_id)
107        self._task_loop.post_task(self._callback_cancel_cancelled_task, post_id)
108        self._run_task_loop(3)
109        self._mox.VerifyAll()
110
111
112    def test_random_delays(self):
113        """Test that random delays works (sort of). This test could be flaky."""
114        # Warning: This test could be flaky. Add more differences?
115        self.count = 0
116        self.times = {}
117        self._task_loop.random_delays = True
118        self._task_loop.max_random_delay_ms = 1000
119        self._task_loop.post_repeated_task(self._callback_record_times, 500)
120        self._run_task_loop(5)
121        self.assertGreaterEqual(self.count, 4)
122        # Test that not all time gaps are almost the same
123        diff1 = round(self.times[1] - self.times[0], 3)
124        diff2 = round(self.times[2] - self.times[1], 3)
125        diff3 = round(self.times[3] - self.times[2], 3)
126        self.assertTrue(diff1 != diff2 or diff2 != diff3 or diff3 != diff1)
127
128    # ##########################################################################
129    # Helper functions
130
131    def _stop_task_loop(self):
132        print('Stopping task_loop.')
133        self._task_loop.stop()
134
135    def _run_task_loop(self, run_for_seconds):
136        """
137        Runs the task loop for |run_for_seconds| seconds. This function is
138        blocking, so the main thread will return only after |run_for_seconds|.
139        """
140        # post a task to stop the task loop.
141        glib.timeout_add(run_for_seconds*1000, self._stop_task_loop)
142        self._task_loop.start()
143        # We will continue only when the stop task has been executed.
144
145    # ##########################################################################
146    # Callbacks for tests
147
148    def _callback(self):
149        print('Actual TaskLoopTestCase._callback called!')
150
151
152    def _callback_set_attribute(self):
153        self.flag = True
154
155
156    def _callback_set_time(self):
157        self.time = time.time()
158
159
160    def _callback_increment_count(self):
161        self.count = self.count + 1
162
163
164    def _callback_record_times(self):
165        self.times[self.count] = time.time()
166        self.count = self.count + 1
167
168
169    def _callback_with_arguments(self, arg):
170        pass
171
172
173    def _callback_cancel_task(self, post_id):
174        self._task_loop.cancel_posted_task(post_id)
175
176
177    def _callback_cancel_cancelled_task(self, post_id):
178        self.assertFalse(self._task_loop.cancel_posted_task(post_id))
179
180
181if __name__ == '__main__':
182    logging.basicConfig(level=logging.DEBUG)
183    unittest.main()
184