1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import abc
16import threading
17
18import grpc
19from grpc_testing import _common
20
21_CLIENT_INACTIVE = object()
22
23
24class Handler(_common.ServerRpcHandler):
25
26    @abc.abstractmethod
27    def initial_metadata(self):
28        raise NotImplementedError()
29
30    @abc.abstractmethod
31    def add_request(self, request):
32        raise NotImplementedError()
33
34    @abc.abstractmethod
35    def take_response(self):
36        raise NotImplementedError()
37
38    @abc.abstractmethod
39    def requests_closed(self):
40        raise NotImplementedError()
41
42    @abc.abstractmethod
43    def cancel(self):
44        raise NotImplementedError()
45
46    @abc.abstractmethod
47    def unary_response_termination(self):
48        raise NotImplementedError()
49
50    @abc.abstractmethod
51    def stream_response_termination(self):
52        raise NotImplementedError()
53
54
55class _Handler(Handler):
56
57    def __init__(self, requests_closed):
58        self._condition = threading.Condition()
59        self._requests = []
60        self._requests_closed = requests_closed
61        self._initial_metadata = None
62        self._responses = []
63        self._trailing_metadata = None
64        self._code = None
65        self._details = None
66        self._unary_response = None
67        self._expiration_future = None
68        self._termination_callbacks = []
69
70    def send_initial_metadata(self, initial_metadata):
71        with self._condition:
72            self._initial_metadata = initial_metadata
73            self._condition.notify_all()
74
75    def take_request(self):
76        with self._condition:
77            while True:
78                if self._code is None:
79                    if self._requests:
80                        request = self._requests.pop(0)
81                        self._condition.notify_all()
82                        return _common.ServerRpcRead(request, False, False)
83                    elif self._requests_closed:
84                        return _common.REQUESTS_CLOSED
85                    else:
86                        self._condition.wait()
87                else:
88                    return _common.TERMINATED
89
90    def is_active(self):
91        with self._condition:
92            return self._code is None
93
94    def add_response(self, response):
95        with self._condition:
96            self._responses.append(response)
97            self._condition.notify_all()
98
99    def send_termination(self, trailing_metadata, code, details):
100        with self._condition:
101            self._trailing_metadata = trailing_metadata
102            self._code = code
103            self._details = details
104            if self._expiration_future is not None:
105                self._expiration_future.cancel()
106            self._condition.notify_all()
107
108    def add_termination_callback(self, callback):
109        with self._condition:
110            if self._code is None:
111                self._termination_callbacks.append(callback)
112                return True
113            else:
114                return False
115
116    def initial_metadata(self):
117        with self._condition:
118            while True:
119                if self._initial_metadata is None:
120                    if self._code is None:
121                        self._condition.wait()
122                    else:
123                        raise ValueError(
124                            'No initial metadata despite status code!')
125                else:
126                    return self._initial_metadata
127
128    def add_request(self, request):
129        with self._condition:
130            self._requests.append(request)
131            self._condition.notify_all()
132
133    def take_response(self):
134        with self._condition:
135            while True:
136                if self._responses:
137                    response = self._responses.pop(0)
138                    self._condition.notify_all()
139                    return response
140                elif self._code is None:
141                    self._condition.wait()
142                else:
143                    raise ValueError('No more responses!')
144
145    def requests_closed(self):
146        with self._condition:
147            self._requests_closed = True
148            self._condition.notify_all()
149
150    def cancel(self):
151        with self._condition:
152            if self._code is None:
153                self._code = _CLIENT_INACTIVE
154                termination_callbacks = self._termination_callbacks
155                self._termination_callbacks = None
156                if self._expiration_future is not None:
157                    self._expiration_future.cancel()
158                self._condition.notify_all()
159        for termination_callback in termination_callbacks:
160            termination_callback()
161
162    def unary_response_termination(self):
163        with self._condition:
164            while True:
165                if self._code is _CLIENT_INACTIVE:
166                    raise ValueError('Huh? Cancelled but wanting status?')
167                elif self._code is None:
168                    self._condition.wait()
169                else:
170                    if self._unary_response is None:
171                        if self._responses:
172                            self._unary_response = self._responses.pop(0)
173                    return (
174                        self._unary_response,
175                        self._trailing_metadata,
176                        self._code,
177                        self._details,
178                    )
179
180    def stream_response_termination(self):
181        with self._condition:
182            while True:
183                if self._code is _CLIENT_INACTIVE:
184                    raise ValueError('Huh? Cancelled but wanting status?')
185                elif self._code is None:
186                    self._condition.wait()
187                else:
188                    return self._trailing_metadata, self._code, self._details,
189
190    def expire(self):
191        with self._condition:
192            if self._code is None:
193                if self._initial_metadata is None:
194                    self._initial_metadata = _common.FUSSED_EMPTY_METADATA
195                self._trailing_metadata = _common.FUSSED_EMPTY_METADATA
196                self._code = grpc.StatusCode.DEADLINE_EXCEEDED
197                self._details = 'Took too much time!'
198                termination_callbacks = self._termination_callbacks
199                self._termination_callbacks = None
200                self._condition.notify_all()
201        for termination_callback in termination_callbacks:
202            termination_callback()
203
204    def set_expiration_future(self, expiration_future):
205        with self._condition:
206            self._expiration_future = expiration_future
207
208
209def handler_without_deadline(requests_closed):
210    return _Handler(requests_closed)
211
212
213def handler_with_deadline(requests_closed, time, deadline):
214    handler = _Handler(requests_closed)
215    expiration_future = time.call_at(handler.expire, deadline)
216    handler.set_expiration_future(expiration_future)
217    return handler
218