1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Utilities for the gRPC Python Beta API."""
15
16import threading
17import time
18
19# implementations is referenced from specification in this module.
20from grpc.beta import implementations  # pylint: disable=unused-import
21from grpc.beta import interfaces
22from grpc.framework.foundation import callable_util
23from grpc.framework.foundation import future
24
25_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
26    'Exception calling connectivity future "done" callback!')
27
28
29class _ChannelReadyFuture(future.Future):
30
31    def __init__(self, channel):
32        self._condition = threading.Condition()
33        self._channel = channel
34
35        self._matured = False
36        self._cancelled = False
37        self._done_callbacks = []
38
39    def _block(self, timeout):
40        until = None if timeout is None else time.time() + timeout
41        with self._condition:
42            while True:
43                if self._cancelled:
44                    raise future.CancelledError()
45                elif self._matured:
46                    return
47                else:
48                    if until is None:
49                        self._condition.wait()
50                    else:
51                        remaining = until - time.time()
52                        if remaining < 0:
53                            raise future.TimeoutError()
54                        else:
55                            self._condition.wait(timeout=remaining)
56
57    def _update(self, connectivity):
58        with self._condition:
59            if (not self._cancelled and
60                    connectivity is interfaces.ChannelConnectivity.READY):
61                self._matured = True
62                self._channel.unsubscribe(self._update)
63                self._condition.notify_all()
64                done_callbacks = tuple(self._done_callbacks)
65                self._done_callbacks = None
66            else:
67                return
68
69        for done_callback in done_callbacks:
70            callable_util.call_logging_exceptions(
71                done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
72
73    def cancel(self):
74        with self._condition:
75            if not self._matured:
76                self._cancelled = True
77                self._channel.unsubscribe(self._update)
78                self._condition.notify_all()
79                done_callbacks = tuple(self._done_callbacks)
80                self._done_callbacks = None
81            else:
82                return False
83
84        for done_callback in done_callbacks:
85            callable_util.call_logging_exceptions(
86                done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
87
88        return True
89
90    def cancelled(self):
91        with self._condition:
92            return self._cancelled
93
94    def running(self):
95        with self._condition:
96            return not self._cancelled and not self._matured
97
98    def done(self):
99        with self._condition:
100            return self._cancelled or self._matured
101
102    def result(self, timeout=None):
103        self._block(timeout)
104        return None
105
106    def exception(self, timeout=None):
107        self._block(timeout)
108        return None
109
110    def traceback(self, timeout=None):
111        self._block(timeout)
112        return None
113
114    def add_done_callback(self, fn):
115        with self._condition:
116            if not self._cancelled and not self._matured:
117                self._done_callbacks.append(fn)
118                return
119
120        fn(self)
121
122    def start(self):
123        with self._condition:
124            self._channel.subscribe(self._update, try_to_connect=True)
125
126    def __del__(self):
127        with self._condition:
128            if not self._cancelled and not self._matured:
129                self._channel.unsubscribe(self._update)
130
131
132def channel_ready_future(channel):
133    """Creates a future.Future tracking when an implementations.Channel is ready.
134
135  Cancelling the returned future.Future does not tell the given
136  implementations.Channel to abandon attempts it may have been making to
137  connect; cancelling merely deactivates the return future.Future's
138  subscription to the given implementations.Channel's connectivity.
139
140  Args:
141    channel: An implementations.Channel.
142
143  Returns:
144    A future.Future that matures when the given Channel has connectivity
145      interfaces.ChannelConnectivity.READY.
146  """
147    ready_future = _ChannelReadyFuture(channel)
148    ready_future.start()
149    return ready_future
150