1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import threading 16 17import grpc_testing 18from grpc_testing import _common 19from grpc_testing._server import _handler 20from grpc_testing._server import _rpc 21from grpc_testing._server import _server_rpc 22from grpc_testing._server import _service 23from grpc_testing._server import _servicer_context 24 25 26def _implementation(descriptors_to_servicers, method_descriptor): 27 servicer = descriptors_to_servicers[method_descriptor.containing_service] 28 return getattr(servicer, method_descriptor.name) 29 30 31def _unary_unary_service(request): 32 33 def service(implementation, rpc, servicer_context): 34 _service.unary_unary(implementation, rpc, request, servicer_context) 35 36 return service 37 38 39def _unary_stream_service(request): 40 41 def service(implementation, rpc, servicer_context): 42 _service.unary_stream(implementation, rpc, request, servicer_context) 43 44 return service 45 46 47def _stream_unary_service(handler): 48 49 def service(implementation, rpc, servicer_context): 50 _service.stream_unary(implementation, rpc, handler, servicer_context) 51 52 return service 53 54 55def _stream_stream_service(handler): 56 57 def service(implementation, rpc, servicer_context): 58 _service.stream_stream(implementation, rpc, handler, servicer_context) 59 60 return service 61 62 63class _Serverish(_common.Serverish): 64 65 def __init__(self, descriptors_to_servicers, time): 66 self._descriptors_to_servicers = descriptors_to_servicers 67 self._time = time 68 69 def _invoke(self, service_behavior, method_descriptor, handler, 70 invocation_metadata, deadline): 71 implementation = _implementation(self._descriptors_to_servicers, 72 method_descriptor) 73 rpc = _rpc.Rpc(handler, invocation_metadata) 74 if handler.add_termination_callback(rpc.extrinsic_abort): 75 servicer_context = _servicer_context.ServicerContext( 76 rpc, self._time, deadline) 77 service_thread = threading.Thread( 78 target=service_behavior, 79 args=( 80 implementation, 81 rpc, 82 servicer_context, 83 )) 84 service_thread.start() 85 86 def invoke_unary_unary(self, method_descriptor, handler, 87 invocation_metadata, request, deadline): 88 self._invoke( 89 _unary_unary_service(request), method_descriptor, handler, 90 invocation_metadata, deadline) 91 92 def invoke_unary_stream(self, method_descriptor, handler, 93 invocation_metadata, request, deadline): 94 self._invoke( 95 _unary_stream_service(request), method_descriptor, handler, 96 invocation_metadata, deadline) 97 98 def invoke_stream_unary(self, method_descriptor, handler, 99 invocation_metadata, deadline): 100 self._invoke( 101 _stream_unary_service(handler), method_descriptor, handler, 102 invocation_metadata, deadline) 103 104 def invoke_stream_stream(self, method_descriptor, handler, 105 invocation_metadata, deadline): 106 self._invoke( 107 _stream_stream_service(handler), method_descriptor, handler, 108 invocation_metadata, deadline) 109 110 111def _deadline_and_handler(requests_closed, time, timeout): 112 if timeout is None: 113 return None, _handler.handler_without_deadline(requests_closed) 114 else: 115 deadline = time.time() + timeout 116 handler = _handler.handler_with_deadline(requests_closed, time, 117 deadline) 118 return deadline, handler 119 120 121class _Server(grpc_testing.Server): 122 123 def __init__(self, serverish, time): 124 self._serverish = serverish 125 self._time = time 126 127 def invoke_unary_unary(self, method_descriptor, invocation_metadata, 128 request, timeout): 129 deadline, handler = _deadline_and_handler(True, self._time, timeout) 130 self._serverish.invoke_unary_unary( 131 method_descriptor, handler, invocation_metadata, request, deadline) 132 return _server_rpc.UnaryUnaryServerRpc(handler) 133 134 def invoke_unary_stream(self, method_descriptor, invocation_metadata, 135 request, timeout): 136 deadline, handler = _deadline_and_handler(True, self._time, timeout) 137 self._serverish.invoke_unary_stream( 138 method_descriptor, handler, invocation_metadata, request, deadline) 139 return _server_rpc.UnaryStreamServerRpc(handler) 140 141 def invoke_stream_unary(self, method_descriptor, invocation_metadata, 142 timeout): 143 deadline, handler = _deadline_and_handler(False, self._time, timeout) 144 self._serverish.invoke_stream_unary(method_descriptor, handler, 145 invocation_metadata, deadline) 146 return _server_rpc.StreamUnaryServerRpc(handler) 147 148 def invoke_stream_stream(self, method_descriptor, invocation_metadata, 149 timeout): 150 deadline, handler = _deadline_and_handler(False, self._time, timeout) 151 self._serverish.invoke_stream_stream(method_descriptor, handler, 152 invocation_metadata, deadline) 153 return _server_rpc.StreamStreamServerRpc(handler) 154 155 156def server_from_descriptor_to_servicers(descriptors_to_servicers, time): 157 return _Server(_Serverish(descriptors_to_servicers, time), time) 158