1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import threading
16
17import grpc_testing
18from grpc_testing import _common
19from grpc_testing._server import _handler
20from grpc_testing._server import _rpc
21from grpc_testing._server import _server_rpc
22from grpc_testing._server import _service
23from grpc_testing._server import _servicer_context
24
25
26def _implementation(descriptors_to_servicers, method_descriptor):
27    servicer = descriptors_to_servicers[method_descriptor.containing_service]
28    return getattr(servicer, method_descriptor.name)
29
30
31def _unary_unary_service(request):
32
33    def service(implementation, rpc, servicer_context):
34        _service.unary_unary(implementation, rpc, request, servicer_context)
35
36    return service
37
38
39def _unary_stream_service(request):
40
41    def service(implementation, rpc, servicer_context):
42        _service.unary_stream(implementation, rpc, request, servicer_context)
43
44    return service
45
46
47def _stream_unary_service(handler):
48
49    def service(implementation, rpc, servicer_context):
50        _service.stream_unary(implementation, rpc, handler, servicer_context)
51
52    return service
53
54
55def _stream_stream_service(handler):
56
57    def service(implementation, rpc, servicer_context):
58        _service.stream_stream(implementation, rpc, handler, servicer_context)
59
60    return service
61
62
63class _Serverish(_common.Serverish):
64
65    def __init__(self, descriptors_to_servicers, time):
66        self._descriptors_to_servicers = descriptors_to_servicers
67        self._time = time
68
69    def _invoke(self, service_behavior, method_descriptor, handler,
70                invocation_metadata, deadline):
71        implementation = _implementation(self._descriptors_to_servicers,
72                                         method_descriptor)
73        rpc = _rpc.Rpc(handler, invocation_metadata)
74        if handler.add_termination_callback(rpc.extrinsic_abort):
75            servicer_context = _servicer_context.ServicerContext(
76                rpc, self._time, deadline)
77            service_thread = threading.Thread(
78                target=service_behavior,
79                args=(
80                    implementation,
81                    rpc,
82                    servicer_context,
83                ))
84            service_thread.start()
85
86    def invoke_unary_unary(self, method_descriptor, handler,
87                           invocation_metadata, request, deadline):
88        self._invoke(
89            _unary_unary_service(request), method_descriptor, handler,
90            invocation_metadata, deadline)
91
92    def invoke_unary_stream(self, method_descriptor, handler,
93                            invocation_metadata, request, deadline):
94        self._invoke(
95            _unary_stream_service(request), method_descriptor, handler,
96            invocation_metadata, deadline)
97
98    def invoke_stream_unary(self, method_descriptor, handler,
99                            invocation_metadata, deadline):
100        self._invoke(
101            _stream_unary_service(handler), method_descriptor, handler,
102            invocation_metadata, deadline)
103
104    def invoke_stream_stream(self, method_descriptor, handler,
105                             invocation_metadata, deadline):
106        self._invoke(
107            _stream_stream_service(handler), method_descriptor, handler,
108            invocation_metadata, deadline)
109
110
111def _deadline_and_handler(requests_closed, time, timeout):
112    if timeout is None:
113        return None, _handler.handler_without_deadline(requests_closed)
114    else:
115        deadline = time.time() + timeout
116        handler = _handler.handler_with_deadline(requests_closed, time,
117                                                 deadline)
118        return deadline, handler
119
120
121class _Server(grpc_testing.Server):
122
123    def __init__(self, serverish, time):
124        self._serverish = serverish
125        self._time = time
126
127    def invoke_unary_unary(self, method_descriptor, invocation_metadata,
128                           request, timeout):
129        deadline, handler = _deadline_and_handler(True, self._time, timeout)
130        self._serverish.invoke_unary_unary(
131            method_descriptor, handler, invocation_metadata, request, deadline)
132        return _server_rpc.UnaryUnaryServerRpc(handler)
133
134    def invoke_unary_stream(self, method_descriptor, invocation_metadata,
135                            request, timeout):
136        deadline, handler = _deadline_and_handler(True, self._time, timeout)
137        self._serverish.invoke_unary_stream(
138            method_descriptor, handler, invocation_metadata, request, deadline)
139        return _server_rpc.UnaryStreamServerRpc(handler)
140
141    def invoke_stream_unary(self, method_descriptor, invocation_metadata,
142                            timeout):
143        deadline, handler = _deadline_and_handler(False, self._time, timeout)
144        self._serverish.invoke_stream_unary(method_descriptor, handler,
145                                            invocation_metadata, deadline)
146        return _server_rpc.StreamUnaryServerRpc(handler)
147
148    def invoke_stream_stream(self, method_descriptor, invocation_metadata,
149                             timeout):
150        deadline, handler = _deadline_and_handler(False, self._time, timeout)
151        self._serverish.invoke_stream_stream(method_descriptor, handler,
152                                             invocation_metadata, deadline)
153        return _server_rpc.StreamStreamServerRpc(handler)
154
155
156def server_from_descriptor_to_servicers(descriptors_to_servicers, time):
157    return _Server(_Serverish(descriptors_to_servicers, time), time)
158