1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import random
16import threading
17import time
18import unittest
19
20import grpc_testing
21
22_QUANTUM = 0.3
23_MANY = 10000
24# Tests that run in real time can either wait for the scheduler to
25# eventually run what needs to be run (and risk timing out) or declare
26# that the scheduler didn't schedule work reasonably fast enough. We
27# choose the latter for this test.
28_PATHOLOGICAL_SCHEDULING = 'pathological thread scheduling!'
29
30
31class _TimeNoter(object):
32
33    def __init__(self, time):
34        self._condition = threading.Condition()
35        self._time = time
36        self._call_times = []
37
38    def __call__(self):
39        with self._condition:
40            self._call_times.append(self._time.time())
41
42    def call_times(self):
43        with self._condition:
44            return tuple(self._call_times)
45
46
47class TimeTest(object):
48
49    def test_sleep_for(self):
50        start_time = self._time.time()
51        self._time.sleep_for(_QUANTUM)
52        end_time = self._time.time()
53
54        self.assertLessEqual(start_time + _QUANTUM, end_time)
55
56    def test_sleep_until(self):
57        start_time = self._time.time()
58        self._time.sleep_until(start_time + _QUANTUM)
59        end_time = self._time.time()
60
61        self.assertLessEqual(start_time + _QUANTUM, end_time)
62
63    def test_call_in(self):
64        time_noter = _TimeNoter(self._time)
65
66        start_time = self._time.time()
67        self._time.call_in(time_noter, _QUANTUM)
68        self._time.sleep_for(_QUANTUM * 2)
69        call_times = time_noter.call_times()
70
71        self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING)
72        self.assertLessEqual(start_time + _QUANTUM, call_times[0])
73
74    def test_call_at(self):
75        time_noter = _TimeNoter(self._time)
76
77        start_time = self._time.time()
78        self._time.call_at(time_noter, self._time.time() + _QUANTUM)
79        self._time.sleep_for(_QUANTUM * 2)
80        call_times = time_noter.call_times()
81
82        self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING)
83        self.assertLessEqual(start_time + _QUANTUM, call_times[0])
84
85    def test_cancel(self):
86        time_noter = _TimeNoter(self._time)
87
88        future = self._time.call_in(time_noter, _QUANTUM * 2)
89        self._time.sleep_for(_QUANTUM)
90        cancelled = future.cancel()
91        self._time.sleep_for(_QUANTUM * 2)
92        call_times = time_noter.call_times()
93
94        self.assertFalse(call_times, msg=_PATHOLOGICAL_SCHEDULING)
95        self.assertTrue(cancelled)
96        self.assertTrue(future.cancelled())
97
98    def test_many(self):
99        test_events = tuple(threading.Event() for _ in range(_MANY))
100        possibly_cancelled_futures = {}
101        background_noise_futures = []
102
103        for test_event in test_events:
104            possibly_cancelled_futures[test_event] = self._time.call_in(
105                test_event.set, _QUANTUM * (2 + random.random()))
106        for _ in range(_MANY):
107            background_noise_futures.append(
108                self._time.call_in(threading.Event().set,
109                                   _QUANTUM * 1000 * random.random()))
110        self._time.sleep_for(_QUANTUM)
111        cancelled = set()
112        for test_event, test_future in possibly_cancelled_futures.items():
113            if bool(random.randint(0, 1)) and test_future.cancel():
114                cancelled.add(test_event)
115        self._time.sleep_for(_QUANTUM * 3)
116
117        for test_event in test_events:
118            (self.assertFalse if test_event in cancelled else
119             self.assertTrue)(test_event.is_set())
120        for background_noise_future in background_noise_futures:
121            background_noise_future.cancel()
122
123    def test_same_behavior_used_several_times(self):
124        time_noter = _TimeNoter(self._time)
125
126        start_time = self._time.time()
127        first_future_at_one = self._time.call_in(time_noter, _QUANTUM)
128        second_future_at_one = self._time.call_in(time_noter, _QUANTUM)
129        first_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3)
130        second_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3)
131        self._time.sleep_for(_QUANTUM * 2)
132        first_future_at_one_cancelled = first_future_at_one.cancel()
133        second_future_at_one_cancelled = second_future_at_one.cancel()
134        first_future_at_three_cancelled = first_future_at_three.cancel()
135        self._time.sleep_for(_QUANTUM * 2)
136        second_future_at_three_cancelled = second_future_at_three.cancel()
137        first_future_at_three_cancelled_again = first_future_at_three.cancel()
138        call_times = time_noter.call_times()
139
140        self.assertEqual(3, len(call_times), msg=_PATHOLOGICAL_SCHEDULING)
141        self.assertFalse(first_future_at_one_cancelled)
142        self.assertFalse(second_future_at_one_cancelled)
143        self.assertTrue(first_future_at_three_cancelled)
144        self.assertFalse(second_future_at_three_cancelled)
145        self.assertTrue(first_future_at_three_cancelled_again)
146        self.assertLessEqual(start_time + _QUANTUM, call_times[0])
147        self.assertLessEqual(start_time + _QUANTUM, call_times[1])
148        self.assertLessEqual(start_time + _QUANTUM * 3, call_times[2])
149
150
151class StrictRealTimeTest(TimeTest, unittest.TestCase):
152
153    def setUp(self):
154        self._time = grpc_testing.strict_real_time()
155
156
157class StrictFakeTimeTest(TimeTest, unittest.TestCase):
158
159    def setUp(self):
160        self._time = grpc_testing.strict_fake_time(
161            random.randint(0, int(time.time())))
162
163
164if __name__ == '__main__':
165    unittest.main(verbosity=2)
166