1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import grpc_testing 16from grpc_testing._channel import _channel_rpc 17from grpc_testing._channel import _multi_callable 18 19 20# All serializer and deserializer parameters are not (yet) used by this 21# test infrastructure. 22# pylint: disable=unused-argument 23class TestingChannel(grpc_testing.Channel): 24 25 def __init__(self, time, state): 26 self._time = time 27 self._state = state 28 29 def subscribe(self, callback, try_to_connect=False): 30 raise NotImplementedError() 31 32 def unsubscribe(self, callback): 33 raise NotImplementedError() 34 35 def unary_unary(self, 36 method, 37 request_serializer=None, 38 response_deserializer=None): 39 return _multi_callable.UnaryUnary(method, self._state) 40 41 def unary_stream(self, 42 method, 43 request_serializer=None, 44 response_deserializer=None): 45 return _multi_callable.UnaryStream(method, self._state) 46 47 def stream_unary(self, 48 method, 49 request_serializer=None, 50 response_deserializer=None): 51 return _multi_callable.StreamUnary(method, self._state) 52 53 def stream_stream(self, 54 method, 55 request_serializer=None, 56 response_deserializer=None): 57 return _multi_callable.StreamStream(method, self._state) 58 59 def _close(self): 60 # TODO(https://github.com/grpc/grpc/issues/12531): Decide what 61 # action to take here, if any? 62 pass 63 64 def __enter__(self): 65 return self 66 67 def __exit__(self, exc_type, exc_val, exc_tb): 68 self._close() 69 return False 70 71 def close(self): 72 self._close() 73 74 def take_unary_unary(self, method_descriptor): 75 return _channel_rpc.unary_unary(self._state, method_descriptor) 76 77 def take_unary_stream(self, method_descriptor): 78 return _channel_rpc.unary_stream(self._state, method_descriptor) 79 80 def take_stream_unary(self, method_descriptor): 81 return _channel_rpc.stream_unary(self._state, method_descriptor) 82 83 def take_stream_stream(self, method_descriptor): 84 return _channel_rpc.stream_stream(self._state, method_descriptor) 85 86 87# pylint: enable=unused-argument 88