1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import abc 16import threading 17 18import grpc 19from grpc_testing import _common 20 21_CLIENT_INACTIVE = object() 22 23 24class Handler(_common.ServerRpcHandler): 25 26 @abc.abstractmethod 27 def initial_metadata(self): 28 raise NotImplementedError() 29 30 @abc.abstractmethod 31 def add_request(self, request): 32 raise NotImplementedError() 33 34 @abc.abstractmethod 35 def take_response(self): 36 raise NotImplementedError() 37 38 @abc.abstractmethod 39 def requests_closed(self): 40 raise NotImplementedError() 41 42 @abc.abstractmethod 43 def cancel(self): 44 raise NotImplementedError() 45 46 @abc.abstractmethod 47 def unary_response_termination(self): 48 raise NotImplementedError() 49 50 @abc.abstractmethod 51 def stream_response_termination(self): 52 raise NotImplementedError() 53 54 55class _Handler(Handler): 56 57 def __init__(self, requests_closed): 58 self._condition = threading.Condition() 59 self._requests = [] 60 self._requests_closed = requests_closed 61 self._initial_metadata = None 62 self._responses = [] 63 self._trailing_metadata = None 64 self._code = None 65 self._details = None 66 self._unary_response = None 67 self._expiration_future = None 68 self._termination_callbacks = [] 69 70 def send_initial_metadata(self, initial_metadata): 71 with self._condition: 72 self._initial_metadata = initial_metadata 73 self._condition.notify_all() 74 75 def take_request(self): 76 with self._condition: 77 while True: 78 if self._code is None: 79 if self._requests: 80 request = self._requests.pop(0) 81 self._condition.notify_all() 82 return _common.ServerRpcRead(request, False, False) 83 elif self._requests_closed: 84 return _common.REQUESTS_CLOSED 85 else: 86 self._condition.wait() 87 else: 88 return _common.TERMINATED 89 90 def is_active(self): 91 with self._condition: 92 return self._code is None 93 94 def add_response(self, response): 95 with self._condition: 96 self._responses.append(response) 97 self._condition.notify_all() 98 99 def send_termination(self, trailing_metadata, code, details): 100 with self._condition: 101 self._trailing_metadata = trailing_metadata 102 self._code = code 103 self._details = details 104 if self._expiration_future is not None: 105 self._expiration_future.cancel() 106 self._condition.notify_all() 107 108 def add_termination_callback(self, callback): 109 with self._condition: 110 if self._code is None: 111 self._termination_callbacks.append(callback) 112 return True 113 else: 114 return False 115 116 def initial_metadata(self): 117 with self._condition: 118 while True: 119 if self._initial_metadata is None: 120 if self._code is None: 121 self._condition.wait() 122 else: 123 raise ValueError( 124 'No initial metadata despite status code!') 125 else: 126 return self._initial_metadata 127 128 def add_request(self, request): 129 with self._condition: 130 self._requests.append(request) 131 self._condition.notify_all() 132 133 def take_response(self): 134 with self._condition: 135 while True: 136 if self._responses: 137 response = self._responses.pop(0) 138 self._condition.notify_all() 139 return response 140 elif self._code is None: 141 self._condition.wait() 142 else: 143 raise ValueError('No more responses!') 144 145 def requests_closed(self): 146 with self._condition: 147 self._requests_closed = True 148 self._condition.notify_all() 149 150 def cancel(self): 151 with self._condition: 152 if self._code is None: 153 self._code = _CLIENT_INACTIVE 154 termination_callbacks = self._termination_callbacks 155 self._termination_callbacks = None 156 if self._expiration_future is not None: 157 self._expiration_future.cancel() 158 self._condition.notify_all() 159 for termination_callback in termination_callbacks: 160 termination_callback() 161 162 def unary_response_termination(self): 163 with self._condition: 164 while True: 165 if self._code is _CLIENT_INACTIVE: 166 raise ValueError('Huh? Cancelled but wanting status?') 167 elif self._code is None: 168 self._condition.wait() 169 else: 170 if self._unary_response is None: 171 if self._responses: 172 self._unary_response = self._responses.pop(0) 173 return ( 174 self._unary_response, 175 self._trailing_metadata, 176 self._code, 177 self._details, 178 ) 179 180 def stream_response_termination(self): 181 with self._condition: 182 while True: 183 if self._code is _CLIENT_INACTIVE: 184 raise ValueError('Huh? Cancelled but wanting status?') 185 elif self._code is None: 186 self._condition.wait() 187 else: 188 return self._trailing_metadata, self._code, self._details, 189 190 def expire(self): 191 with self._condition: 192 if self._code is None: 193 if self._initial_metadata is None: 194 self._initial_metadata = _common.FUSSED_EMPTY_METADATA 195 self._trailing_metadata = _common.FUSSED_EMPTY_METADATA 196 self._code = grpc.StatusCode.DEADLINE_EXCEEDED 197 self._details = 'Took too much time!' 198 termination_callbacks = self._termination_callbacks 199 self._termination_callbacks = None 200 self._condition.notify_all() 201 for termination_callback in termination_callbacks: 202 termination_callback() 203 204 def set_expiration_future(self, expiration_future): 205 with self._condition: 206 self._expiration_future = expiration_future 207 208 209def handler_without_deadline(requests_closed): 210 return _Handler(requests_closed) 211 212 213def handler_with_deadline(requests_closed, time, deadline): 214 handler = _Handler(requests_closed) 215 expiration_future = time.call_at(handler.expire, deadline) 216 handler.set_expiration_future(expiration_future) 217 return handler 218