1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test times."""
15
16import collections
17import logging
18import threading
19import time as _time
20
21import grpc
22import grpc_testing
23
24logging.basicConfig()
25_LOGGER = logging.getLogger(__name__)
26
27
28def _call(behaviors):
29    for behavior in behaviors:
30        try:
31            behavior()
32        except Exception:  # pylint: disable=broad-except
33            _LOGGER.exception('Exception calling behavior "%r"!', behavior)
34
35
36def _call_in_thread(behaviors):
37    calling = threading.Thread(target=_call, args=(behaviors,))
38    calling.start()
39    # NOTE(nathaniel): Because this function is called from "strict" Time
40    # implementations, it blocks until after all behaviors have terminated.
41    calling.join()
42
43
44class _State(object):
45
46    def __init__(self):
47        self.condition = threading.Condition()
48        self.times_to_behaviors = collections.defaultdict(list)
49
50
51class _Delta(
52        collections.namedtuple('_Delta', (
53            'mature_behaviors',
54            'earliest_mature_time',
55            'earliest_immature_time',
56        ))):
57    pass
58
59
60def _process(state, now):
61    mature_behaviors = []
62    earliest_mature_time = None
63    while state.times_to_behaviors:
64        earliest_time = min(state.times_to_behaviors)
65        if earliest_time <= now:
66            if earliest_mature_time is None:
67                earliest_mature_time = earliest_time
68            earliest_mature_behaviors = state.times_to_behaviors.pop(
69                earliest_time)
70            mature_behaviors.extend(earliest_mature_behaviors)
71        else:
72            earliest_immature_time = earliest_time
73            break
74    else:
75        earliest_immature_time = None
76    return _Delta(mature_behaviors, earliest_mature_time,
77                  earliest_immature_time)
78
79
80class _Future(grpc.Future):
81
82    def __init__(self, state, behavior, time):
83        self._state = state
84        self._behavior = behavior
85        self._time = time
86        self._cancelled = False
87
88    def cancel(self):
89        with self._state.condition:
90            if self._cancelled:
91                return True
92            else:
93                behaviors_at_time = self._state.times_to_behaviors.get(
94                    self._time)
95                if behaviors_at_time is None:
96                    return False
97                else:
98                    behaviors_at_time.remove(self._behavior)
99                    if not behaviors_at_time:
100                        self._state.times_to_behaviors.pop(self._time)
101                        self._state.condition.notify_all()
102                    self._cancelled = True
103                    return True
104
105    def cancelled(self):
106        with self._state.condition:
107            return self._cancelled
108
109    def running(self):
110        raise NotImplementedError()
111
112    def done(self):
113        raise NotImplementedError()
114
115    def result(self, timeout=None):
116        raise NotImplementedError()
117
118    def exception(self, timeout=None):
119        raise NotImplementedError()
120
121    def traceback(self, timeout=None):
122        raise NotImplementedError()
123
124    def add_done_callback(self, fn):
125        raise NotImplementedError()
126
127
128class StrictRealTime(grpc_testing.Time):
129
130    def __init__(self):
131        self._state = _State()
132        self._active = False
133        self._calling = None
134
135    def _activity(self):
136        while True:
137            with self._state.condition:
138                while True:
139                    now = _time.time()
140                    delta = _process(self._state, now)
141                    self._state.condition.notify_all()
142                    if delta.mature_behaviors:
143                        self._calling = delta.earliest_mature_time
144                        break
145                    self._calling = None
146                    if delta.earliest_immature_time is None:
147                        self._active = False
148                        return
149                    else:
150                        timeout = max(0, delta.earliest_immature_time - now)
151                        self._state.condition.wait(timeout=timeout)
152            _call(delta.mature_behaviors)
153
154    def _ensure_called_through(self, time):
155        with self._state.condition:
156            while ((self._state.times_to_behaviors and
157                    min(self._state.times_to_behaviors) < time) or
158                   (self._calling is not None and self._calling < time)):
159                self._state.condition.wait()
160
161    def _call_at(self, behavior, time):
162        with self._state.condition:
163            self._state.times_to_behaviors[time].append(behavior)
164            if self._active:
165                self._state.condition.notify_all()
166            else:
167                activity = threading.Thread(target=self._activity)
168                activity.start()
169                self._active = True
170            return _Future(self._state, behavior, time)
171
172    def time(self):
173        return _time.time()
174
175    def call_in(self, behavior, delay):
176        return self._call_at(behavior, _time.time() + delay)
177
178    def call_at(self, behavior, time):
179        return self._call_at(behavior, time)
180
181    def sleep_for(self, duration):
182        time = _time.time() + duration
183        _time.sleep(duration)
184        self._ensure_called_through(time)
185
186    def sleep_until(self, time):
187        _time.sleep(max(0, time - _time.time()))
188        self._ensure_called_through(time)
189
190
191class StrictFakeTime(grpc_testing.Time):
192
193    def __init__(self, time):
194        self._state = _State()
195        self._time = time
196
197    def time(self):
198        return self._time
199
200    def call_in(self, behavior, delay):
201        if delay <= 0:
202            _call_in_thread((behavior,))
203        else:
204            with self._state.condition:
205                time = self._time + delay
206                self._state.times_to_behaviors[time].append(behavior)
207        return _Future(self._state, behavior, time)
208
209    def call_at(self, behavior, time):
210        with self._state.condition:
211            if time <= self._time:
212                _call_in_thread((behavior,))
213            else:
214                self._state.times_to_behaviors[time].append(behavior)
215        return _Future(self._state, behavior, time)
216
217    def sleep_for(self, duration):
218        if 0 < duration:
219            with self._state.condition:
220                self._time += duration
221                delta = _process(self._state, self._time)
222                _call_in_thread(delta.mature_behaviors)
223
224    def sleep_until(self, time):
225        with self._state.condition:
226            if self._time < time:
227                self._time = time
228                delta = _process(self._state, self._time)
229                _call_in_thread(delta.mature_behaviors)
230