1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import threading
17
18import grpc
19
20_NOT_YET_OBSERVED = object()
21logging.basicConfig()
22_LOGGER = logging.getLogger(__name__)
23
24
25def _cancel(handler):
26    return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!')
27
28
29def _is_active(handler):
30    return handler.is_active()
31
32
33def _time_remaining(unused_handler):
34    raise NotImplementedError()
35
36
37def _add_callback(handler, callback):
38    return handler.add_callback(callback)
39
40
41def _initial_metadata(handler):
42    return handler.initial_metadata()
43
44
45def _trailing_metadata(handler):
46    trailing_metadata, unused_code, unused_details = handler.termination()
47    return trailing_metadata
48
49
50def _code(handler):
51    unused_trailing_metadata, code, unused_details = handler.termination()
52    return code
53
54
55def _details(handler):
56    unused_trailing_metadata, unused_code, details = handler.termination()
57    return details
58
59
60class _Call(grpc.Call):
61
62    def __init__(self, handler):
63        self._handler = handler
64
65    def cancel(self):
66        _cancel(self._handler)
67
68    def is_active(self):
69        return _is_active(self._handler)
70
71    def time_remaining(self):
72        return _time_remaining(self._handler)
73
74    def add_callback(self, callback):
75        return _add_callback(self._handler, callback)
76
77    def initial_metadata(self):
78        return _initial_metadata(self._handler)
79
80    def trailing_metadata(self):
81        return _trailing_metadata(self._handler)
82
83    def code(self):
84        return _code(self._handler)
85
86    def details(self):
87        return _details(self._handler)
88
89
90class _RpcErrorCall(grpc.RpcError, grpc.Call):
91
92    def __init__(self, handler):
93        self._handler = handler
94
95    def cancel(self):
96        _cancel(self._handler)
97
98    def is_active(self):
99        return _is_active(self._handler)
100
101    def time_remaining(self):
102        return _time_remaining(self._handler)
103
104    def add_callback(self, callback):
105        return _add_callback(self._handler, callback)
106
107    def initial_metadata(self):
108        return _initial_metadata(self._handler)
109
110    def trailing_metadata(self):
111        return _trailing_metadata(self._handler)
112
113    def code(self):
114        return _code(self._handler)
115
116    def details(self):
117        return _details(self._handler)
118
119
120def _next(handler):
121    read = handler.take_response()
122    if read.code is None:
123        return read.response
124    elif read.code is grpc.StatusCode.OK:
125        raise StopIteration()
126    else:
127        raise _RpcErrorCall(handler)
128
129
130class _HandlerExtras(object):
131
132    def __init__(self):
133        self.condition = threading.Condition()
134        self.unary_response = _NOT_YET_OBSERVED
135        self.cancelled = False
136
137
138def _with_extras_cancel(handler, extras):
139    with extras.condition:
140        if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'):
141            extras.cancelled = True
142            return True
143        else:
144            return False
145
146
147def _extras_without_cancelled(extras):
148    with extras.condition:
149        return extras.cancelled
150
151
152def _running(handler):
153    return handler.is_active()
154
155
156def _done(handler):
157    return not handler.is_active()
158
159
160def _with_extras_unary_response(handler, extras):
161    with extras.condition:
162        if extras.unary_response is _NOT_YET_OBSERVED:
163            read = handler.take_response()
164            if read.code is None:
165                extras.unary_response = read.response
166                return read.response
167            else:
168                raise _RpcErrorCall(handler)
169        else:
170            return extras.unary_response
171
172
173def _exception(unused_handler):
174    raise NotImplementedError('TODO!')
175
176
177def _traceback(unused_handler):
178    raise NotImplementedError('TODO!')
179
180
181def _add_done_callback(handler, callback, future):
182    adapted_callback = lambda: callback(future)
183    if not handler.add_callback(adapted_callback):
184        callback(future)
185
186
187class _FutureCall(grpc.Future, grpc.Call):
188
189    def __init__(self, handler, extras):
190        self._handler = handler
191        self._extras = extras
192
193    def cancel(self):
194        return _with_extras_cancel(self._handler, self._extras)
195
196    def cancelled(self):
197        return _extras_without_cancelled(self._extras)
198
199    def running(self):
200        return _running(self._handler)
201
202    def done(self):
203        return _done(self._handler)
204
205    def result(self):
206        return _with_extras_unary_response(self._handler, self._extras)
207
208    def exception(self):
209        return _exception(self._handler)
210
211    def traceback(self):
212        return _traceback(self._handler)
213
214    def add_done_callback(self, fn):
215        _add_done_callback(self._handler, fn, self)
216
217    def is_active(self):
218        return _is_active(self._handler)
219
220    def time_remaining(self):
221        return _time_remaining(self._handler)
222
223    def add_callback(self, callback):
224        return _add_callback(self._handler, callback)
225
226    def initial_metadata(self):
227        return _initial_metadata(self._handler)
228
229    def trailing_metadata(self):
230        return _trailing_metadata(self._handler)
231
232    def code(self):
233        return _code(self._handler)
234
235    def details(self):
236        return _details(self._handler)
237
238
239def consume_requests(request_iterator, handler):
240
241    def _consume():
242        while True:
243            try:
244                request = next(request_iterator)
245                added = handler.add_request(request)
246                if not added:
247                    break
248            except StopIteration:
249                handler.close_requests()
250                break
251            except Exception:  # pylint: disable=broad-except
252                details = 'Exception iterating requests!'
253                _LOGGER.exception(details)
254                handler.cancel(grpc.StatusCode.UNKNOWN, details)
255
256    consumption = threading.Thread(target=_consume)
257    consumption.start()
258
259
260def blocking_unary_response(handler):
261    read = handler.take_response()
262    if read.code is None:
263        unused_trailing_metadata, code, unused_details = handler.termination()
264        if code is grpc.StatusCode.OK:
265            return read.response
266        else:
267            raise _RpcErrorCall(handler)
268    else:
269        raise _RpcErrorCall(handler)
270
271
272def blocking_unary_response_with_call(handler):
273    read = handler.take_response()
274    if read.code is None:
275        unused_trailing_metadata, code, unused_details = handler.termination()
276        if code is grpc.StatusCode.OK:
277            return read.response, _Call(handler)
278        else:
279            raise _RpcErrorCall(handler)
280    else:
281        raise _RpcErrorCall(handler)
282
283
284def future_call(handler):
285    return _FutureCall(handler, _HandlerExtras())
286
287
288class ResponseIteratorCall(grpc.Call):
289
290    def __init__(self, handler):
291        self._handler = handler
292
293    def __iter__(self):
294        return self
295
296    def __next__(self):
297        return _next(self._handler)
298
299    def next(self):
300        return _next(self._handler)
301
302    def cancel(self):
303        _cancel(self._handler)
304
305    def is_active(self):
306        return _is_active(self._handler)
307
308    def time_remaining(self):
309        return _time_remaining(self._handler)
310
311    def add_callback(self, callback):
312        return _add_callback(self._handler, callback)
313
314    def initial_metadata(self):
315        return _initial_metadata(self._handler)
316
317    def trailing_metadata(self):
318        return _trailing_metadata(self._handler)
319
320    def code(self):
321        return _code(self._handler)
322
323    def details(self):
324        return _details(self._handler)
325